/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.KeyGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class DeltaSync
implements Serializable {
    protected static volatile Logger log = LogManager.getLogger(DeltaSync.class);
    public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
    private final HoodieDeltaStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider schemaProvider;
    private transient Transformer transformer;
    private KeyGenerator keyGenerator;
    private transient FileSystem fs;
    private transient JavaSparkContext jssc;
    private transient SparkSession sparkSession;
    private transient HiveConf hiveConf;
    private final TypedProperties props;
    private transient java.util.function.Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitTimelineOpt;
    private transient HoodieWriteClient writeClient;
    private final HoodieTableType tableType;

    public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, java.util.function.Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this.cfg = cfg;
        this.jssc = jssc;
        this.sparkSession = sparkSession;
        this.fs = fs;
        this.tableType = tableType;
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        log.info((Object)("Creating delta streamer with configs : " + props.toString()));
        this.schemaProvider = schemaProvider;
        this.refreshTimeline();
        this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
        this.keyGenerator = DataSourceUtils.createKeyGenerator((TypedProperties)props);
        this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
        this.hiveConf = hiveConf;
        if (cfg.filterDupes.booleanValue()) {
            cfg.operation = cfg.operation == HoodieDeltaStreamer.Operation.UPSERT ? HoodieDeltaStreamer.Operation.INSERT : cfg.operation;
        }
        this.setupWriteClient();
    }

    private void refreshTimeline() throws IOException {
        if (this.fs.exists(new Path(this.cfg.targetBasePath))) {
            HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(this.fs.getConf()), this.cfg.targetBasePath);
            this.commitTimelineOpt = Option.of((Object)meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
        } else {
            this.commitTimelineOpt = Option.empty();
            HoodieTableMetaClient.initTableType((Configuration)new Configuration(this.jssc.hadoopConfiguration()), (String)this.cfg.targetBasePath, (String)this.cfg.storageType, (String)this.cfg.targetTableName, (String)"archived");
        }
    }

    public Option<String> syncOnce() throws Exception {
        Option<String> scheduledCompaction = Option.empty();
        HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(this.getHoodieClientConfig(this.schemaProvider));
        Timer.Context overallTimerContext = metrics.getOverallTimerContext();
        this.refreshTimeline();
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = this.readFromSource(this.commitTimelineOpt);
        if (null != srcRecordsWithCkpt) {
            if (null == this.schemaProvider) {
                this.schemaProvider = (SchemaProvider)srcRecordsWithCkpt.getKey();
                this.setupWriteClient();
            }
            scheduledCompaction = this.writeToSink((JavaRDD<HoodieRecord>)((JavaRDD)((Pair)srcRecordsWithCkpt.getRight()).getRight()), (String)((Pair)srcRecordsWithCkpt.getRight()).getLeft(), metrics, overallTimerContext);
        }
        this.jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
        return scheduledCompaction;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws Exception {
        SchemaProvider schemaProvider;
        Option avroRDDOptional;
        String checkpointStr;
        InputBatch<Dataset<Row>> dataAndCheckpoint;
        Option resumeCheckpointStr = Option.empty();
        if (commitTimelineOpt.isPresent()) {
            Option lastCommit = ((HoodieTimeline)commitTimelineOpt.get()).lastInstant();
            if (lastCommit.isPresent()) {
                HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])((HoodieTimeline)commitTimelineOpt.get()).getInstantDetails((HoodieInstant)lastCommit.get()).get()), HoodieCommitMetadata.class);
                if (this.cfg.checkpoint != null && !this.cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
                    resumeCheckpointStr = Option.of((Object)this.cfg.checkpoint);
                } else {
                    if (commitMetadata.getMetadata(CHECKPOINT_KEY) == null) throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer ");
                    resumeCheckpointStr = Option.of((Object)commitMetadata.getMetadata(CHECKPOINT_KEY));
                }
            }
        } else {
            HoodieTableMetaClient.initTableType((Configuration)new Configuration(this.jssc.hadoopConfiguration()), (String)this.cfg.targetBasePath, (String)this.cfg.storageType, (String)this.cfg.targetTableName, (String)"archived");
        }
        if (!resumeCheckpointStr.isPresent() && this.cfg.checkpoint != null) {
            resumeCheckpointStr = Option.of((Object)this.cfg.checkpoint);
        }
        log.info((Object)("Checkpoint to resume from : " + resumeCheckpointStr));
        if (this.transformer != null) {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInRowFormat((Option<String>)resumeCheckpointStr, this.cfg.sourceLimit);
            Option transformed = dataAndCheckpoint.getBatch().map(data -> this.transformer.apply(this.jssc, this.sparkSession, (Dataset<Row>)data, this.props));
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            avroRDDOptional = transformed.map(t -> AvroConversionUtils.createRdd((Dataset)t, (String)"hoodie_source", (String)"hoodie.source").toJavaRDD());
            schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null ? (SchemaProvider)transformed.map(r -> new RowBasedSchemaProvider(r.schema())).orElse((Object)dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider;
        } else {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInAvroFormat((Option<String>)resumeCheckpointStr, this.cfg.sourceLimit);
            avroRDDOptional = dataAndCheckpoint.getBatch();
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            schemaProvider = dataAndCheckpoint.getSchemaProvider();
        }
        if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
            log.info((Object)("No new data, source checkpoint has not changed. Nothing to commit.Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"));
            return null;
        }
        if (!avroRDDOptional.isPresent() || ((JavaRDD)avroRDDOptional.get()).isEmpty()) {
            log.info((Object)"No new data, perform empty commit.");
            return Pair.of((Object)schemaProvider, (Object)Pair.of((Object)checkpointStr, (Object)this.jssc.emptyRDD()));
        }
        JavaRDD avroRDD = (JavaRDD)avroRDDOptional.get();
        JavaRDD records = avroRDD.map((Function & Serializable)gr -> {
            HoodieRecordPayload payload = DataSourceUtils.createPayload((String)this.cfg.payloadClassName, (GenericRecord)gr, (Comparable)((Comparable)DataSourceUtils.getNestedFieldVal((GenericRecord)gr, (String)this.cfg.sourceOrderingField)));
            return new HoodieRecord(this.keyGenerator.getKey(gr), payload);
        });
        return Pair.of((Object)schemaProvider, (Object)Pair.of((Object)checkpointStr, (Object)records));
    }

    /*
     * Enabled aggressive block sorting
     */
    private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
        boolean success;
        JavaRDD writeStatusRDD;
        Option scheduledCompactionInstant = Option.empty();
        if (this.cfg.filterDupes.booleanValue()) {
            this.cfg.operation = this.cfg.operation == HoodieDeltaStreamer.Operation.UPSERT ? HoodieDeltaStreamer.Operation.INSERT : this.cfg.operation;
            records = DataSourceUtils.dropDuplicates((JavaSparkContext)this.jssc, (JavaRDD)records, (HoodieWriteConfig)this.writeClient.getConfig(), (Option)this.writeClient.getTimelineServer());
        }
        boolean isEmpty = records.isEmpty();
        String commitTime = this.startCommit();
        log.info((Object)("Starting commit  : " + commitTime));
        if (this.cfg.operation == HoodieDeltaStreamer.Operation.INSERT) {
            writeStatusRDD = this.writeClient.insert(records, commitTime);
        } else if (this.cfg.operation == HoodieDeltaStreamer.Operation.UPSERT) {
            writeStatusRDD = this.writeClient.upsert(records, commitTime);
        } else {
            if (this.cfg.operation != HoodieDeltaStreamer.Operation.BULK_INSERT) {
                throw new HoodieDeltaStreamerException("Unknown operation :" + (Object)((Object)this.cfg.operation));
            }
            writeStatusRDD = this.writeClient.bulkInsert(records, commitTime);
        }
        long totalErrorRecords = writeStatusRDD.mapToDouble((DoubleFunction & Serializable)ws -> ws.getTotalErrorRecords()).sum().longValue();
        long totalRecords = writeStatusRDD.mapToDouble((DoubleFunction & Serializable)ws -> ws.getTotalRecords()).sum().longValue();
        boolean hasErrors = totalErrorRecords > 0L;
        long hiveSyncTimeMs = 0L;
        if (hasErrors && !this.cfg.commitOnErrors.booleanValue()) {
            log.error((Object)("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
            log.error((Object)"Printing out the top 100 errors");
            writeStatusRDD.filter((Function & Serializable)ws -> ws.hasErrors()).take(100).forEach(ws -> {
                log.error((Object)"Global error :", ws.getGlobalError());
                if (ws.getErrors().size() > 0) {
                    ws.getErrors().entrySet().forEach(r -> log.trace((Object)("Error for key:" + r.getKey() + " is " + r.getValue())));
                }
            });
            this.writeClient.rollback(commitTime);
            throw new HoodieException("Commit " + commitTime + " failed and rolled-back !");
        }
        HashMap<String, String> checkpointCommitMetadata = new HashMap<String, String>();
        checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
        if (this.cfg.checkpoint != null) {
            checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, this.cfg.checkpoint);
        }
        if (hasErrors) {
            log.warn((Object)("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
        }
        if (!(success = this.writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata)))) {
            log.info((Object)("Commit " + commitTime + " failed!"));
            throw new HoodieException("Commit " + commitTime + " failed!");
        }
        log.info((Object)("Commit " + commitTime + " successful!"));
        if (this.cfg.isAsyncCompactionEnabled()) {
            scheduledCompactionInstant = this.writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata));
        }
        if (!isEmpty) {
            Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
            this.syncHive();
            hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0L;
        }
        long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0L;
        metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
        return scheduledCompactionInstant;
    }

    private String startCommit() {
        int maxRetries = 2;
        int retryNum = 1;
        IllegalArgumentException lastException = null;
        while (retryNum <= 2) {
            try {
                return this.writeClient.startCommit();
            }
            catch (IllegalArgumentException ie) {
                lastException = ie;
                log.error((Object)"Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable)ie);
                ++retryNum;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        throw lastException;
    }

    private void syncHive() throws ClassNotFoundException {
        if (this.cfg.enableHiveSync.booleanValue()) {
            HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig((TypedProperties)this.props, (String)this.cfg.targetBasePath);
            log.info((Object)("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + this.cfg.targetBasePath));
            new HiveSyncTool(hiveSyncConfig, this.hiveConf, this.fs).syncHoodieTable();
        }
    }

    public void setupWriteClient() {
        log.info((Object)"Setting up Hoodie Write Client");
        if (null != this.schemaProvider && null == this.writeClient) {
            this.registerAvroSchemas(this.schemaProvider);
            HoodieWriteConfig hoodieCfg = this.getHoodieClientConfig(this.schemaProvider);
            this.writeClient = new HoodieWriteClient(this.jssc, hoodieCfg, true);
            this.onInitializingHoodieWriteClient.apply(this.writeClient);
        }
    }

    private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
        HoodieWriteConfig config;
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes.booleanValue(), true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withInlineCompaction(Boolean.valueOf(this.cfg.isInlineCompactionEnabled())).build()).forTable(this.cfg.targetTableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withAutoCommit(false).withProps((Map)this.props);
        if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
            builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
        }
        Preconditions.checkArgument(((config = builder.build()).isInlineCompaction() == this.cfg.isInlineCompactionEnabled() ? 1 : 0) != 0);
        Preconditions.checkArgument((config.shouldAutoCommit() == false ? 1 : 0) != 0);
        Preconditions.checkArgument((config.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue() ? 1 : 0) != 0);
        Preconditions.checkArgument((boolean)config.shouldCombineBeforeUpsert());
        return config;
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            ArrayList<Schema> schemas = new ArrayList<Schema>();
            schemas.add(schemaProvider.getSourceSchema());
            if (schemaProvider.getTargetSchema() != null) {
                schemas.add(schemaProvider.getTargetSchema());
            }
            log.info((Object)("Registering Schema :" + schemas));
            this.jssc.sc().getConf().registerAvroSchemas((Seq)JavaConversions.asScalaBuffer(schemas).toList());
        }
    }

    public void close() {
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }
}

