/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieInlineTestSuiteWriter
extends HoodieTestSuiteWriter {
    private static Logger log = LoggerFactory.getLogger(HoodieInlineTestSuiteWriter.class);
    private static final String GENERATED_DATA_PATH = "generated.data.path";

    public HoodieInlineTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
        super(jsc, props, cfg, schema);
    }

    @Override
    public void shutdownResources() {
    }

    @Override
    public RDD<GenericRecord> getNextBatch() throws Exception {
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        JavaRDD inputRDD = (JavaRDD)((Pair)nextBatch.getRight()).getRight();
        return inputRDD.map((Function & Serializable)r -> (GenericRecord)((HoodieAvroRecord)r).getData().getInsertValue(new Schema.Parser().parse(this.schema)).get()).rdd();
    }

    @Override
    public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
        return this.deltaStreamerWrapper.fetchSource();
    }

    @Override
    public Option<String> startCommit() {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return Option.of((Object)HoodieActiveTimeline.createNewInstantTime());
        }
        return Option.of((Object)this.writeClient.startCommit());
    }

    @Override
    public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.upsert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    @Override
    public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.insert();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.insert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    @Override
    public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.insertOverwrite();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.insertOverwrite((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get()).getWriteStatuses();
    }

    @Override
    public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.insertOverwriteTable();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.insertOverwriteTable((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get()).getWriteStatuses();
    }

    @Override
    public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.bulkInsert();
        }
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = this.fetchSource();
        this.lastCheckpoint = Option.of((Object)((Pair)nextBatch.getValue()).getLeft());
        return this.writeClient.bulkInsert((JavaRDD)((Pair)nextBatch.getRight()).getRight(), (String)instantTime.get());
    }

    @Override
    public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
        Option compactionPlanPair;
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            return this.deltaStreamerWrapper.compact();
        }
        if (!instantTime.isPresent() && (compactionPlanPair = Option.fromJavaOptional(this.hoodieReadClient.getPendingCompactions().stream().findFirst())).isPresent()) {
            instantTime = Option.of((Object)((Pair)compactionPlanPair.get()).getLeft());
        }
        if (instantTime.isPresent()) {
            HoodieWriteMetadata compactionMetadata = this.writeClient.compact((String)instantTime.get());
            return (JavaRDD)compactionMetadata.getWriteStatuses();
        }
        return null;
    }

    @Override
    public void inlineClustering() {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer");
        }
        Option clusteringInstantOpt = this.writeClient.scheduleClustering(Option.empty());
        clusteringInstantOpt.ifPresent(clusteringInstant -> {
            log.warn("Clustering instant :: " + clusteringInstant);
            this.writeClient.cluster(clusteringInstant, true);
        });
    }

    @Override
    public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception {
        if (this.cfg.useDeltaStreamer.booleanValue()) {
            this.deltaStreamerWrapper.scheduleCompact();
            return Option.empty();
        }
        return this.writeClient.scheduleCompaction(previousCommitExtraMetadata);
    }

    @Override
    public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats, Option<String> instantTime) {
        if (!this.cfg.useDeltaStreamer.booleanValue()) {
            HashMap<String, Object> extraMetadata = new HashMap<String, Object>();
            extraMetadata.put("deltastreamer.checkpoint.key", this.lastCheckpoint.get());
            if (generatedDataStats != null && generatedDataStats.count() > 1L) {
                extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map((Function & Serializable)s -> s.getFilePath()).collect().get(0));
            }
            this.writeClient.commit((String)instantTime.get(), records, Option.of(extraMetadata));
        }
    }

    @Override
    public void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats, Option<String> instantTime) throws IOException {
        if (!this.cfg.useDeltaStreamer.booleanValue()) {
            HashMap<String, Object> extraMetadata = new HashMap<String, Object>();
            extraMetadata.put("deltastreamer.checkpoint.key", this.lastCheckpoint.get());
            if (generatedDataStats != null && generatedDataStats.count() > 1L) {
                extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map((Function & Serializable)s -> s.getFilePath()).collect().get(0));
            }
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)this.writeClient.getConfig(), (HoodieEngineContext)this.writeClient.getEngineContext());
            HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata((HoodieTable)table, (String)instantTime.get(), (HoodieData)HoodieJavaRDD.of(records), this.writeClient.getConfig().getSchema());
            this.writeClient.commitCompaction((String)instantTime.get(), metadata, Option.of(extraMetadata));
        }
    }
}

