/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.data.quality.execution;

import java.util.List;
import org.apache.dolphinscheduler.data.quality.config.Config;
import org.apache.dolphinscheduler.data.quality.exception.ConfigRuntimeException;
import org.apache.dolphinscheduler.data.quality.execution.Execution;
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchTransformer;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class SparkBatchExecution
implements Execution<BatchReader, BatchTransformer, BatchWriter> {
    private final SparkRuntimeEnvironment environment;

    public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException {
        this.environment = environment;
    }

    @Override
    public void execute(List<BatchReader> readers, List<BatchTransformer> transformers, List<BatchWriter> writers) {
        readers.forEach(reader -> this.registerInputTempView((BatchReader)reader, this.environment));
        if (!readers.isEmpty()) {
            Dataset<Row> ds = readers.get(0).read(this.environment);
            for (BatchTransformer tf : transformers) {
                ds = this.executeTransformer(this.environment, tf, ds);
                this.registerTransformTempView(tf, ds);
            }
            for (BatchWriter sink : writers) {
                this.executeWriter(this.environment, sink, ds);
            }
        }
        this.environment.sparkSession().stop();
    }

    private void registerTempView(String tableName, Dataset<Row> ds) {
        if (ds == null) {
            throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView");
        }
        ds.createOrReplaceTempView(tableName);
    }

    private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) {
        Config conf = reader.getConfig();
        if (!Boolean.TRUE.equals(conf.has("output_table"))) {
            throw new ConfigRuntimeException("[" + reader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config");
        }
        String tableName = conf.getString("output_table");
        this.registerTempView(tableName, reader.read(environment));
    }

    private Dataset<Row> executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, Dataset<Row> dataset) {
        Config config = transformer.getConfig();
        Dataset outputDataset = null;
        if (Boolean.TRUE.equals(config.has("input_table"))) {
            String[] tableNames;
            for (String sourceTableName : tableNames = config.getString("input_table").split(",")) {
                Dataset inputDataset = environment.sparkSession().read().table(sourceTableName);
                outputDataset = outputDataset == null ? inputDataset : outputDataset.union(inputDataset);
            }
        } else {
            outputDataset = dataset;
        }
        if (Boolean.TRUE.equals(config.has("tmp_table"))) {
            if (outputDataset == null) {
                outputDataset = dataset;
            }
            String tableName = config.getString("tmp_table");
            this.registerTempView(tableName, outputDataset);
        }
        return transformer.transform(outputDataset, environment);
    }

    private void registerTransformTempView(BatchTransformer transformer, Dataset<Row> ds) {
        Config config = transformer.getConfig();
        if (Boolean.TRUE.equals(config.has("output_table"))) {
            String tableName = config.getString("output_table");
            this.registerTempView(tableName, ds);
        }
    }

    private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, Dataset<Row> ds) {
        Config config = writer.getConfig();
        Dataset inputDataSet = ds;
        if (Boolean.TRUE.equals(config.has("input_table"))) {
            String sourceTableName = config.getString("input_table");
            inputDataSet = environment.sparkSession().read().table(sourceTableName);
        }
        writer.write(inputDataSet, environment);
    }
}

