/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieDeltaStreamerWrapper;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTestSuiteWriter
implements Serializable {
    private static Logger log = LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
    private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
    private HoodieWriteConfig writeConfig;
    private SparkRDDWriteClient writeClient;
    protected HoodieTestSuiteJob.HoodieTestSuiteConfig cfg;
    private Option<String> lastCheckpoint;
    private HoodieReadClient hoodieReadClient;
    private Properties props;
    private String schema;
    private transient Configuration configuration;
    private transient JavaSparkContext sparkContext;
    private static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<String>(Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
    private static final String GENERATED_DATA_PATH = "generated.data.path";

    public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
        HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
        this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
        this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
        this.writeConfig = this.getHoodieClientConfig(cfg, props, schema);
        if (!cfg.useDeltaStreamer.booleanValue()) {
            this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)context, this.writeConfig);
        }
        this.cfg = cfg;
        this.configuration = jsc.hadoopConfiguration();
        this.sparkContext = jsc;
        this.props = props;
        this.schema = schema;
    }

    public HoodieWriteConfig getWriteConfig() {
        return this.writeConfig;
    }

    private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties props, String schema) {
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField).build()).forTable(cfg.targetTableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withProps((Map)props);
        builder = builder.withSchema(schema);
        return builder.build();
    }

    private boolean allowWriteClientAccess(DagNode dagNode) {
        return VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE.contains(dagNode.getClass().getName());
    }

    public RDD<GenericRecord> getNextBatch() throws Exception {
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        JavaRDD inputRDD = (JavaRDD)((Pair)nextBatch.getRight()).getRight();
        return inputRDD.map((Function & Serializable)r -> (GenericRecord)r.getData().getInsertValue(new Schema.Parser().parse(this.schema)).get()).rdd();
    }

    public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
        return this.deltaStreamerWrapper.fetchSource();
    }

    public Option<String> startCommit() {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return Option.of((Object)HoodieActiveTimeline.createNewInstantTime());
        }
        return Option.of((Object)this.writeClient.startCommit());
    }

    public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.upsert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.insert();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.insert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.bulkInsert();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.bulkInsert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
        Option compactionPlanPair;
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.compact();
        }
        if (!instantTime.isPresent() && (compactionPlanPair = Option.fromJavaOptional(this.hoodieReadClient.getPendingCompactions().stream().findFirst())).isPresent()) {
            instantTime = Option.of((Object)((Pair)compactionPlanPair.get()).getLeft());
        }
        if (instantTime.isPresent()) {
            return (JavaRDD)this.writeClient.compact((String)instantTime.get());
        }
        return null;
    }

    public void inlineClustering() {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer");
        }
        Option clusteringInstantOpt = this.writeClient.scheduleClustering(Option.empty());
        clusteringInstantOpt.ifPresent(clusteringInstant -> {
            log.warn("Clustering instant :: " + clusteringInstant);
            this.writeClient.cluster(clusteringInstant, true);
        });
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception {
        if (!this.cfg.useDeltaStreamer.booleanValue()) {
            this.deltaStreamerWrapper.scheduleCompact();
            return Option.empty();
        }
        return this.writeClient.scheduleCompaction(previousCommitExtraMetadata);
    }

    public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats, Option<String> instantTime) {
        if (!this.cfg.useDeltaStreamer.booleanValue()) {
            HashMap<String, Object> extraMetadata = new HashMap<String, Object>();
            extraMetadata.put("deltastreamer.checkpoint.key", this.lastCheckpoint.get());
            if (generatedDataStats != null) {
                extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map((Function & Serializable)s -> s.getFilePath()).collect().get(0));
            }
            this.writeClient.commit((String)instantTime.get(), records, Option.of(extraMetadata));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException {
        if (this.cfg.useDeltaStreamer & !this.allowWriteClientAccess(dagNode)) {
            throw new IllegalAccessException("cannot access write client when testing in deltastreamer mode");
        }
        HoodieTestSuiteWriter hoodieTestSuiteWriter = this;
        synchronized (hoodieTestSuiteWriter) {
            if (this.writeClient == null) {
                this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(this.sparkContext), this.getHoodieClientConfig(this.cfg, this.props, this.schema));
            }
        }
        return this.writeClient;
    }

    public HoodieDeltaStreamerWrapper getDeltaStreamerWrapper() {
        return this.deltaStreamerWrapper;
    }

    public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() {
        return this.cfg;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public JavaSparkContext getSparkContext() {
        return this.sparkContext;
    }

    public Option<String> getLastCheckpoint() {
        return this.lastCheckpoint;
    }

    public Properties getProps() {
        return this.props;
    }

    public String getSchema() {
        return this.schema;
    }
}

