/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.dag;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.HoodieContinousTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class WriterContext {
    protected static Logger log = LogManager.getLogger(WriterContext.class);
    private final HoodieTestSuiteJob.HoodieTestSuiteConfig cfg;
    private TypedProperties props;
    private HoodieTestSuiteWriter hoodieTestSuiteWriter;
    private DeltaGenerator deltaGenerator;
    private transient SchemaProvider schemaProvider;
    private BuiltinKeyGenerator keyGenerator;
    private transient SparkSession sparkSession;
    private transient JavaSparkContext jsc;
    private ExecutorService executorService;

    public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
        this.cfg = cfg;
        this.props = props;
        this.keyGenerator = keyGenerator;
        this.sparkSession = sparkSession;
        this.jsc = jsc;
    }

    public void initContext(JavaSparkContext jsc) throws HoodieException {
        try {
            this.schemaProvider = UtilHelpers.createSchemaProvider((String)this.cfg.schemaProviderClassName, (TypedProperties)this.props, (JavaSparkContext)jsc);
            String schemaStr = this.schemaProvider.getSourceSchema().toString();
            this.hoodieTestSuiteWriter = this.cfg.testContinousMode != false && this.cfg.useDeltaStreamer != false ? new HoodieContinousTestSuiteWriter(jsc, (Properties)this.props, this.cfg, schemaStr) : new HoodieInlineTestSuiteWriter(jsc, (Properties)this.props, this.cfg, schemaStr);
            int inputParallelism = this.cfg.inputParallelism > 0 ? this.cfg.inputParallelism : jsc.defaultParallelism();
            this.deltaGenerator = new DeltaGenerator(new DFSDeltaConfig(DeltaOutputMode.valueOf(this.cfg.outputTypeName), DeltaInputType.valueOf(this.cfg.inputFormatName), new SerializableConfiguration(jsc.hadoopConfiguration()), this.cfg.inputBasePath, this.cfg.targetBasePath, schemaStr, this.cfg.limitFileSize, inputParallelism, this.cfg.deleteOldInput, this.cfg.useHudiToGenerateUpdates), jsc, this.sparkSession, schemaStr, this.keyGenerator);
            log.info((Object)String.format("Initialized writerContext with: %s", schemaStr));
            if (this.cfg.testContinousMode.booleanValue()) {
                this.executorService = Executors.newFixedThreadPool(1);
                this.executorService.execute(new TestSuiteWriterRunnable(this.hoodieTestSuiteWriter));
            }
        }
        catch (Exception e) {
            throw new HoodieException("Failed to reinitialize writerContext", (Throwable)e);
        }
    }

    public void reinitContext(Map<String, Object> newConfig) throws HoodieException {
        for (Map.Entry<String, Object> e : newConfig.entrySet()) {
            if (!this.props.containsKey((Object)e.getKey())) continue;
            this.props.setProperty(e.getKey(), e.getValue().toString());
        }
        this.initContext(this.jsc);
    }

    public HoodieTestSuiteWriter getHoodieTestSuiteWriter() {
        return this.hoodieTestSuiteWriter;
    }

    public DeltaGenerator getDeltaGenerator() {
        return this.deltaGenerator;
    }

    public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() {
        return this.cfg;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public String toString() {
        return this.hoodieTestSuiteWriter.toString() + "\n" + this.deltaGenerator.toString() + "\n";
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    public void shutdownResources() {
        this.hoodieTestSuiteWriter.shutdownResources();
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    class TestSuiteWriterRunnable
    implements Runnable {
        private HoodieTestSuiteWriter hoodieTestSuiteWriter;

        TestSuiteWriterRunnable(HoodieTestSuiteWriter hoodieTestSuiteWriter) {
            this.hoodieTestSuiteWriter = hoodieTestSuiteWriter;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(20000L);
                log.info((Object)"Starting continuous sync with deltastreamer ");
                this.hoodieTestSuiteWriter.getDeltaStreamerWrapper().sync();
                log.info((Object)"Completed continuous sync with deltastreamer ");
            }
            catch (Exception e) {
                log.error((Object)("Deltastreamer failed in continuous mode " + e.getMessage()));
                throw new HoodieException("Shutting down deltastreamer in continuous mode failed ", (Throwable)e);
            }
        }
    }
}

