/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class HoodieMultiWriterTestSuiteJob {
    private static final Logger LOG = LogManager.getLogger(HoodieMultiWriterTestSuiteJob.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        HoodieMultiWriterTestSuiteConfig cfg = new HoodieMultiWriterTestSuiteConfig();
        JCommander cmd = new JCommander((Object)cfg, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jssc = UtilHelpers.buildSparkContext((String)("multi-writer-test-run-" + cfg.outputTypeName + "-" + cfg.inputFormatName), (String)cfg.sparkMaster);
        String[] inputPaths = cfg.inputBasePaths.split(",");
        String[] yamls = cfg.workloadYamlPaths.split(",");
        String[] propsFiles = cfg.propsFilePaths.split(",");
        if (inputPaths.length != yamls.length || yamls.length != propsFiles.length) {
            throw new HoodieException("Input paths, property file and yaml file counts does not match ");
        }
        ExecutorService executor = Executors.newFixedThreadPool(inputPaths.length);
        Random random = new Random();
        ArrayList<HoodieMultiWriterTestSuiteConfig> testSuiteConfigList = new ArrayList<HoodieMultiWriterTestSuiteConfig>();
        int jobIndex = 0;
        for (String inputPath : inputPaths) {
            HoodieMultiWriterTestSuiteConfig testSuiteConfig = new HoodieMultiWriterTestSuiteConfig();
            HoodieMultiWriterTestSuiteJob.deepCopyConfigs(cfg, testSuiteConfig);
            testSuiteConfig.inputBasePath = inputPath;
            testSuiteConfig.workloadYamlPath = yamls[jobIndex];
            testSuiteConfig.propsFilePath = propsFiles[jobIndex];
            testSuiteConfigList.add(testSuiteConfig);
            ++jobIndex;
        }
        AtomicBoolean jobFailed = new AtomicBoolean(false);
        AtomicInteger counter = new AtomicInteger(0);
        ArrayList<Long> waitTimes = new ArrayList<Long>();
        for (int i = 0; i < jobIndex; ++i) {
            if (i == 0) {
                waitTimes.add(0L);
                continue;
            }
            waitTimes.add(60000L + (long)random.nextInt(10000));
        }
        ArrayList<CompletableFuture<Boolean>> completableFutureList = new ArrayList<CompletableFuture<Boolean>>();
        testSuiteConfigList.forEach(hoodieTestSuiteConfig -> {
            try {
                Thread.sleep((Long)waitTimes.get(counter.get()));
                LOG.info((Object)("Starting job " + hoodieTestSuiteConfig.toString()));
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            completableFutureList.add(CompletableFuture.supplyAsync(() -> {
                boolean toReturn = true;
                try {
                    new HoodieTestSuiteJob((HoodieTestSuiteJob.HoodieTestSuiteConfig)((Object)hoodieTestSuiteConfig), jssc, false).runTestSuite();
                    LOG.info((Object)"Job completed successfully");
                }
                catch (Exception e) {
                    if (!jobFailed.getAndSet(true)) {
                        LOG.error((Object)("Exception thrown " + e.getMessage() + ", cause : " + e.getCause()));
                        throw new RuntimeException("HoodieTestSuiteJob Failed " + e.getCause() + ", and msg " + e.getMessage(), e);
                    }
                    LOG.info((Object)"Already a job failed. so, not throwing any exception ");
                }
                return toReturn;
            }, executor));
            counter.getAndIncrement();
        });
        LOG.info((Object)"Going to await until all jobs complete");
        try {
            CompletableFuture completableFuture = HoodieMultiWriterTestSuiteJob.allOfTerminateOnFailure(completableFutureList);
            completableFuture.get();
        }
        finally {
            executor.shutdownNow();
            if (jssc != null) {
                LOG.info((Object)"Completed and shutting down spark context ");
                LOG.info((Object)"Shutting down spark session and JavaSparkContext");
                SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate().stop();
                jssc.close();
            }
        }
    }

    public static CompletableFuture allOfTerminateOnFailure(List<CompletableFuture<Boolean>> futures) {
        CompletableFuture failure = new CompletableFuture();
        AtomicBoolean jobFailed = new AtomicBoolean(false);
        for (CompletableFuture<Boolean> f : futures) {
            f.exceptionally(ex -> {
                if (!jobFailed.getAndSet(true)) {
                    System.out.println("One of the job failed. Cancelling all other futures. " + ex.getCause() + ", " + ex.getMessage());
                    futures.forEach(future -> future.cancel(true));
                }
                return null;
            });
        }
        return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
    }

    static void deepCopyConfigs(HoodieMultiWriterTestSuiteConfig globalConfig, HoodieMultiWriterTestSuiteConfig tableConfig) {
        tableConfig.enableHiveSync = globalConfig.enableHiveSync;
        tableConfig.enableMetaSync = globalConfig.enableMetaSync;
        tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
        tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
        tableConfig.sourceClassName = globalConfig.sourceClassName;
        tableConfig.tableType = globalConfig.tableType;
        tableConfig.targetTableName = globalConfig.targetTableName;
        tableConfig.operation = globalConfig.operation;
        tableConfig.sourceLimit = globalConfig.sourceLimit;
        tableConfig.checkpoint = globalConfig.checkpoint;
        tableConfig.continuousMode = globalConfig.continuousMode;
        tableConfig.filterDupes = globalConfig.filterDupes;
        tableConfig.payloadClassName = globalConfig.payloadClassName;
        tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
        tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
        tableConfig.maxPendingClustering = globalConfig.maxPendingClustering;
        tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
        tableConfig.transformerClassNames = globalConfig.transformerClassNames;
        tableConfig.commitOnErrors = globalConfig.commitOnErrors;
        tableConfig.compactSchedulingMinShare = globalConfig.compactSchedulingMinShare;
        tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight;
        tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare;
        tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight;
        tableConfig.sparkMaster = globalConfig.sparkMaster;
        tableConfig.workloadDagGenerator = globalConfig.workloadDagGenerator;
        tableConfig.outputTypeName = globalConfig.outputTypeName;
        tableConfig.inputFormatName = globalConfig.inputFormatName;
        tableConfig.inputParallelism = globalConfig.inputParallelism;
        tableConfig.useDeltaStreamer = globalConfig.useDeltaStreamer;
        tableConfig.cleanInput = globalConfig.cleanInput;
        tableConfig.cleanOutput = globalConfig.cleanOutput;
        tableConfig.targetBasePath = globalConfig.targetBasePath;
    }

    public static class HoodieMultiWriterTestSuiteConfig
    extends HoodieTestSuiteJob.HoodieTestSuiteConfig {
        @Parameter(names={"--input-base-paths"}, description="base paths for input data(Will be created if did not exist first time around. If exists, more data will be added to that path)", required=true)
        public String inputBasePaths;
        @Parameter(names={"--workload-yaml-paths"}, description="Workflow Dag yaml path to generate the workload")
        public String workloadYamlPaths;
        @Parameter(names={"--props-paths"}, description="Workflow Dag yaml path to generate the workload")
        public String propsFilePaths;
    }
}

