/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.dag.scheduler.SaferSchemaDagScheduler;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.integ.testsuite.helpers.ZookeeperServiceProvider;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTestSuiteJob {
    private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
    private final HoodieTestSuiteConfig cfg;
    TypedProperties props;
    private transient FileSystem fs;
    private transient JavaSparkContext jsc;
    private transient SparkSession sparkSession;
    private transient HiveConf hiveConf;
    private BuiltinKeyGenerator keyGenerator;
    private transient HoodieTableMetaClient metaClient;

    public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
        Path outputPath;
        Path inputPath;
        log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
        this.cfg = cfg;
        this.jsc = jsc;
        cfg.propsFilePath = FSUtils.addSchemeIfLocalPath((String)cfg.propsFilePath).toString();
        this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
        this.fs = FSUtils.getFs((String)cfg.inputBasePath, (Configuration)jsc.hadoopConfiguration());
        this.props = UtilHelpers.readConfig((Configuration)this.fs.getConf(), (Path)new Path(cfg.propsFilePath), (List)cfg.configs).getProps();
        log.info("Creating workload generator with configs : {}", (Object)this.props.toString());
        this.hiveConf = HoodieTestSuiteJob.getDefaultHiveConf(jsc.hadoopConfiguration());
        this.keyGenerator = (BuiltinKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)this.props);
        this.metaClient = HoodieTableMetaClient.withPropertyBuilder().setTableType(cfg.tableType).setTableName(cfg.targetTableName).setArchiveLogFolder((String)HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
        if (cfg.cleanInput.booleanValue() && this.fs.exists(inputPath = new Path(cfg.inputBasePath))) {
            this.fs.delete(inputPath, true);
        }
        if (cfg.cleanOutput.booleanValue() && this.fs.exists(outputPath = new Path(cfg.targetBasePath))) {
            this.fs.delete(outputPath, true);
        }
    }

    int getSchemaVersionFromCommit(int nthCommit) throws Exception {
        int version = 0;
        try {
            HoodieTimeline timeline = new HoodieActiveTimeline(this.metaClient).getCommitsTimeline();
            HoodieInstant prevInstant = (HoodieInstant)timeline.nthFromLastInstant(nthCommit).get();
            HoodieCommitMetadata commit = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(prevInstant).get()), HoodieCommitMetadata.class);
            Map extraMetadata = commit.getExtraMetadata();
            String avroSchemaStr = (String)extraMetadata.get("schema");
            Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
            version = Integer.parseInt(avroSchema.getObjectProp("schemaVersion").toString());
            log.info(String.format("Last used schemaVersion from latest commit file was %d. Optimizing the DAG.", version));
        }
        catch (Exception e) {
            log.info("Last used schemaVersion could not be validated from commit file.  Skipping SaferSchema Optimization.");
        }
        return version;
    }

    private static HiveConf getDefaultHiveConf(Configuration cfg) {
        HiveConf hiveConf = new HiveConf();
        hiveConf.addResource(cfg);
        return hiveConf;
    }

    public static void main(String[] args) throws Exception {
        HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
        JCommander cmd = new JCommander((Object)cfg, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jssc = UtilHelpers.buildSparkContext((String)("workload-generator-" + cfg.outputTypeName + "-" + cfg.inputFormatName), (String)cfg.sparkMaster);
        new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
    }

    public WorkflowDag createWorkflowDag() throws IOException {
        WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator)ReflectionUtils.loadClass((String)this.cfg.workloadDagGenerator)).build() : DagUtils.convertYamlPathToDag(FSUtils.getFs((String)this.cfg.workloadYamlPath, (Configuration)this.jsc.hadoopConfiguration(), (boolean)true), this.cfg.workloadYamlPath);
        return workflowDag;
    }

    public void runTestSuite() {
        try {
            WorkflowDag workflowDag = this.createWorkflowDag();
            log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
            long startTime = System.currentTimeMillis();
            WriterContext writerContext = new WriterContext(this.jsc, this.props, this.cfg, this.keyGenerator, this.sparkSession);
            writerContext.initContext(this.jsc);
            this.startOtherServicesIfNeeded(writerContext);
            if (this.cfg.saferSchemaEvolution.booleanValue()) {
                int numRollbacks = 2;
                List root = workflowDag.getNodeList();
                if (!root.isEmpty() && root.get(0) instanceof RollbackNode) {
                    numRollbacks = root.get(0).getConfig().getNumRollbacks();
                }
                int version = this.getSchemaVersionFromCommit(numRollbacks - 1);
                SaferSchemaDagScheduler dagScheduler = new SaferSchemaDagScheduler(workflowDag, writerContext, this.jsc, version);
                dagScheduler.schedule();
            } else {
                DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, this.jsc);
                dagScheduler.schedule();
            }
            log.info("Finished scheduling all tasks, Time taken {}", (Object)(System.currentTimeMillis() - startTime));
        }
        catch (Exception e) {
            log.error("Failed to run Test Suite ", (Throwable)e);
            throw new HoodieException("Failed to run Test Suite ", (Throwable)e);
        }
        finally {
            this.stopQuietly();
        }
    }

    private void stopQuietly() {
        try {
            this.sparkSession.stop();
            this.jsc.stop();
        }
        catch (Exception e) {
            log.error("Unable to stop spark session", (Throwable)e);
        }
    }

    private void startOtherServicesIfNeeded(WriterContext writerContext) throws Exception {
        if (this.cfg.startHiveMetastore.booleanValue()) {
            HiveServiceProvider hiveServiceProvider = new HiveServiceProvider(DeltaConfig.Config.newBuilder().withHiveLocal(true).build());
            hiveServiceProvider.startLocalHiveServiceIfNeeded(writerContext.getHoodieTestSuiteWriter().getConfiguration());
            hiveServiceProvider.syncToLocalHiveIfNeeded(writerContext.getHoodieTestSuiteWriter());
        }
        if (this.cfg.startZookeeper.booleanValue()) {
            ZookeeperServiceProvider zookeeperServiceProvider = new ZookeeperServiceProvider(DeltaConfig.Config.newBuilder().withHiveLocal(true).build(), writerContext.getHoodieTestSuiteWriter().getConfiguration());
            zookeeperServiceProvider.startLocalZookeeperIfNeeded();
        }
    }

    public static class HoodieTestSuiteConfig
    extends HoodieDeltaStreamer.Config {
        @Parameter(names={"--input-base-path"}, description="base path for input data(Will be created if did not exist first time around. If exists, more data will be added to that path)", required=true)
        public String inputBasePath;
        @Parameter(names={"--workload-generator-classname"}, description="WorkflowDag of operations to generate the workload")
        public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
        @Parameter(names={"--workload-yaml-path"}, description="Workflow Dag yaml path to generate the workload")
        public String workloadYamlPath;
        @Parameter(names={"--delta-output-type"}, description="Subclass of org.apache.hudi.testsuite.workload.DeltaOutputMode to readAvro data.")
        public String outputTypeName = DeltaOutputMode.DFS.name();
        @Parameter(names={"--delta-input-format"}, description="Subclass of org.apache.hudi.testsuite.workload.DeltaOutputMode to read avro data.")
        public String inputFormatName = DeltaInputType.AVRO.name();
        @Parameter(names={"--input-file-size"}, description="The min/max size of the input files to generate", required=true)
        public Long limitFileSize = 0x7800000L;
        @Parameter(names={"--input-parallelism"}, description="Parallelism to use when generation input files", required=false)
        public Integer inputParallelism = 0;
        @Parameter(names={"--delete-old-input"}, description="Delete older input files once they have been ingested", required=false)
        public Boolean deleteOldInput = false;
        @Parameter(names={"--use-deltastreamer"}, description="Choose whether to use HoodieDeltaStreamer to perform ingestion. If set to false, HoodieWriteClient will be used")
        public Boolean useDeltaStreamer = false;
        @Parameter(names={"--clean-input"}, description="Clean the input folders and delete all files within it before starting the Job")
        public Boolean cleanInput = false;
        @Parameter(names={"--clean-output"}, description="Clean the output folders and delete all files within it before starting the Job")
        public Boolean cleanOutput = false;
        @Parameter(names={"--saferSchemaEvolution"}, description="Optimize the DAG for safer schema evolution.(If not provided, assumed to be false.)", required=false)
        public Boolean saferSchemaEvolution = false;
        @Parameter(names={"--start-zookeeper"}, description="Start Zookeeper instance to use for optimistic lock ")
        public Boolean startZookeeper = false;
        @Parameter(names={"--start-hive-metastore"}, description="Start Hive Metastore to use for optimistic lock ")
        public Boolean startHiveMetastore = false;
    }
}

