/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class DeltaSync
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
    private final HoodieDeltaStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider userProvidedSchemaProvider;
    private transient SchemaProvider schemaProvider;
    private transient Option<Transformer> transformer;
    private KeyGenerator keyGenerator;
    private transient FileSystem fs;
    private transient JavaSparkContext jssc;
    private transient SparkSession sparkSession;
    private transient Configuration conf;
    private final TypedProperties props;
    private transient java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitTimelineOpt;
    private final SchemaSet processedSchema;
    private transient Option<EmbeddedTimelineService> embeddedTimelineService = Option.empty();
    private transient SparkRDDWriteClient writeClient;
    private transient HoodieDeltaStreamerMetrics metrics;

    public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this.cfg = cfg;
        this.jssc = jssc;
        this.sparkSession = sparkSession;
        this.fs = fs;
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        this.userProvidedSchemaProvider = schemaProvider;
        this.processedSchema = new SchemaSet();
        this.keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
        this.refreshTimeline();
        this.registerAvroSchemas(schemaProvider);
        this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
        this.metrics = new HoodieDeltaStreamerMetrics(this.getHoodieClientConfig(this.schemaProvider));
        this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, this.metrics));
        this.conf = conf;
    }

    public void refreshTimeline() throws IOException {
        if (this.fs.exists(new Path(this.cfg.targetBasePath))) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.fs.getConf())).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).build();
            switch (meta.getTableType()) {
                case COPY_ON_WRITE: {
                    this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
                    break;
                }
                case MERGE_ON_READ: {
                    this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
                    break;
                }
                default: {
                    throw new HoodieException("Unsupported table type :" + (Object)((Object)meta.getTableType()));
                }
            }
        } else {
            this.commitTimelineOpt = Option.empty();
            String partitionColumns = HoodieSparkUtils.getPartitionColumns(this.keyGenerator, this.props);
            HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))).setKeyGeneratorClassProp(this.props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())).setPreCombineField(this.cfg.sourceOrderingField).initTable(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath);
        }
    }

    public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
        Pair<Option<String>, JavaRDD<WriteStatus>> result = null;
        Timer.Context overallTimerContext = this.metrics.getOverallTimerContext();
        this.refreshTimeline();
        Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = this.readFromSource(this.commitTimelineOpt);
        if (null != srcRecordsWithCkpt) {
            if (null == this.writeClient) {
                this.schemaProvider = srcRecordsWithCkpt.getKey();
                this.setupWriteClient();
            } else {
                Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
                Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
                if (!this.processedSchema.isSchemaPresent(newSourceSchema) || !this.processedSchema.isSchemaPresent(newTargetSchema)) {
                    LOG.info((Object)("Seeing new schema. Source :" + newSourceSchema.toString(true) + ", Target :" + newTargetSchema.toString(true)));
                    this.reInitWriteClient(newSourceSchema, newTargetSchema);
                    this.processedSchema.addSchema(newSourceSchema);
                    this.processedSchema.addSchema(newTargetSchema);
                }
            }
            result = this.writeToSink(srcRecordsWithCkpt.getRight().getRight(), srcRecordsWithCkpt.getRight().getLeft(), this.metrics, overallTimerContext);
        }
        this.metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
        this.jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
        return result;
    }

    public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
        SchemaProvider schemaProvider;
        Option<Dataset<Row>> avroRDDOptional;
        String checkpointStr;
        InputBatch<Dataset<Row>> dataAndCheckpoint;
        Option<String> resumeCheckpointStr = Option.empty();
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
            if (lastCommit.isPresent()) {
                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
                if (this.cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")) || !this.cfg.checkpoint.equals(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key")))) {
                    resumeCheckpointStr = Option.of(this.cfg.checkpoint);
                } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.key"))) {
                    resumeCheckpointStr = Option.of(commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
                } else if (HoodieTimeline.compareTimestamps("00000000000002", HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
                    Option<String> prevCheckpoint = this.getPreviousCheckpoint(commitTimelineOpt.get());
                    if (prevCheckpoint.isPresent()) {
                        resumeCheckpointStr = prevCheckpoint;
                    } else {
                        throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + commitMetadata.toJsonString());
                    }
                }
                if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata("deltastreamer.checkpoint.reset_key"))) {
                    this.props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
                }
            }
        } else {
            String partitionColumns = HoodieSparkUtils.getPartitionColumns(this.keyGenerator, this.props);
            HoodieTableMetaClient.withPropertyBuilder().setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))).setKeyGeneratorClassProp(this.props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())).initTable(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath);
        }
        if (!resumeCheckpointStr.isPresent() && this.cfg.checkpoint != null) {
            resumeCheckpointStr = Option.of(this.cfg.checkpoint);
        }
        LOG.info((Object)("Checkpoint to resume from : " + resumeCheckpointStr));
        if (this.transformer.isPresent()) {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            Option<Dataset> transformed = dataAndCheckpoint.getBatch().map(data -> this.transformer.get().apply(this.jssc, this.sparkSession, (Dataset<Row>)data, this.props));
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            boolean reconcileSchema = this.props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
            if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
                avroRDDOptional = transformed.map(t -> HoodieSparkUtils.createRdd((Dataset<Row>)t, "hoodie_source", "hoodie.source", reconcileSchema, Option.of(this.userProvidedSchemaProvider.getTargetSchema())).toJavaRDD());
                schemaProvider = this.userProvidedSchemaProvider;
            } else {
                schemaProvider = transformed.map(r -> {
                    SchemaProvider targetSchemaProvider = null;
                    targetSchemaProvider = reconcileSchema ? UtilHelpers.createLatestSchemaProvider(r.schema(), this.jssc, this.fs, this.cfg.targetBasePath) : UtilHelpers.createRowBasedSchemaProvider(r.schema(), this.props, this.jssc);
                    return new DelegatingSchemaProvider(this.props, this.jssc, dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider);
                }).orElse(dataAndCheckpoint.getSchemaProvider());
                avroRDDOptional = transformed.map(t -> HoodieSparkUtils.createRdd((Dataset<Row>)t, "hoodie_source", "hoodie.source", reconcileSchema, Option.ofNullable(schemaProvider.getTargetSchema())).toJavaRDD());
            }
        } else {
            dataAndCheckpoint = this.formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, this.cfg.sourceLimit);
            avroRDDOptional = dataAndCheckpoint.getBatch();
            checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
            schemaProvider = dataAndCheckpoint.getSchemaProvider();
        }
        if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
            LOG.info((Object)("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"));
            return null;
        }
        if (!avroRDDOptional.isPresent() || ((JavaRDD)avroRDDOptional.get()).isEmpty()) {
            LOG.info((Object)"No new data, perform empty commit.");
            return Pair.of(schemaProvider, Pair.of(checkpointStr, this.jssc.emptyRDD()));
        }
        boolean shouldCombine = this.cfg.filterDupes != false || this.cfg.operation.equals((Object)WriteOperationType.UPSERT);
        JavaRDD avroRDD = (JavaRDD)avroRDDOptional.get();
        JavaRDD records = avroRDD.map((Function & Serializable)gr -> {
            HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(this.cfg.payloadClassName, gr, (Comparable)HoodieAvroUtils.getNestedFieldVal(gr, this.cfg.sourceOrderingField, false)) : DataSourceUtils.createPayload(this.cfg.payloadClassName, gr);
            return new HoodieRecord<HoodieRecordPayload>(this.keyGenerator.getKey((GenericRecord)gr), payload);
        });
        return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
    }

    protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
        return timeline.getReverseOrderedInstants().map(instant -> {
            try {
                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails((HoodieInstant)instant).get(), HoodieCommitMetadata.class);
                return Option.ofNullable(commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
            }
        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
    }

    /*
     * WARNING - void declaration
     * Enabled aggressive block sorting
     */
    private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) {
        void var5_7;
        JavaRDD<WriteStatus> writeStatusRDD;
        Option option2 = Option.empty();
        if (this.cfg.filterDupes.booleanValue()) {
            records = DataSourceUtils.dropDuplicates(this.jssc, records, this.writeClient.getConfig());
        }
        boolean isEmpty = records.isEmpty();
        String instantTime = this.startCommit();
        LOG.info((Object)("Starting commit  : " + instantTime));
        switch (this.cfg.operation) {
            case INSERT: {
                writeStatusRDD = this.writeClient.insert(records, instantTime);
                break;
            }
            case UPSERT: {
                writeStatusRDD = this.writeClient.upsert(records, instantTime);
                break;
            }
            case BULK_INSERT: {
                writeStatusRDD = this.writeClient.bulkInsert(records, instantTime);
                break;
            }
            case INSERT_OVERWRITE: {
                writeStatusRDD = this.writeClient.insertOverwrite(records, instantTime).getWriteStatuses();
                break;
            }
            case INSERT_OVERWRITE_TABLE: {
                writeStatusRDD = this.writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
                break;
            }
            default: {
                throw new HoodieDeltaStreamerException("Unknown operation : " + (Object)((Object)this.cfg.operation));
            }
        }
        long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
        long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
        boolean hasErrors = totalErrorRecords > 0L;
        long hiveSyncTimeMs = 0L;
        long metaSyncTimeMs = 0L;
        if (hasErrors && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error((Object)("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
            LOG.error((Object)"Printing out the top 100 errors");
            writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                LOG.error((Object)"Global error :", ws.getGlobalError());
                if (ws.getErrors().size() > 0) {
                    ws.getErrors().forEach((key, value) -> LOG.trace((Object)("Error for key:" + key + " is " + value)));
                }
            });
            this.writeClient.rollback(instantTime);
            throw new HoodieException("Commit " + instantTime + " failed and rolled-back !");
        }
        HashMap<String, String> checkpointCommitMetadata = new HashMap<String, String>();
        checkpointCommitMetadata.put("deltastreamer.checkpoint.key", checkpointStr);
        if (this.cfg.checkpoint != null) {
            checkpointCommitMetadata.put("deltastreamer.checkpoint.reset_key", this.cfg.checkpoint);
        }
        if (hasErrors) {
            LOG.warn((Object)("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords));
        }
        String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
        boolean success = this.writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap());
        if (!success) {
            LOG.info((Object)("Commit " + instantTime + " failed!"));
            throw new HoodieException("Commit " + instantTime + " failed!");
        }
        LOG.info((Object)("Commit " + instantTime + " successful!"));
        this.formatAdapter.getSource().onCommit(checkpointStr);
        if (this.cfg.isAsyncCompactionEnabled()) {
            Option<String> option3 = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (!isEmpty) {
            this.syncMeta(metrics);
        }
        long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0L;
        metrics.updateDeltaStreamerMetrics(overallTimeMs);
        return Pair.of(var5_7, writeStatusRDD);
    }

    private String startCommit() {
        int maxRetries = 2;
        int retryNum = 1;
        IllegalArgumentException lastException = null;
        while (retryNum <= 2) {
            try {
                String instantTime = HoodieActiveTimeline.createNewInstantTime();
                String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
                this.writeClient.startCommitWithTime(instantTime, commitActionType);
                return instantTime;
            }
            catch (IllegalArgumentException ie) {
                lastException = ie;
                LOG.error((Object)"Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable)ie);
                ++retryNum;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        throw lastException;
    }

    private String getSyncClassShortName(String syncClassName) {
        return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
    }

    private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
        HashSet<String> syncClientToolClasses = new HashSet<String>(Arrays.asList(this.cfg.syncClientToolClass.split(",")));
        if (this.cfg.enableHiveSync.booleanValue()) {
            this.cfg.enableMetaSync = true;
            syncClientToolClasses.add(HiveSyncTool.class.getName());
            LOG.info((Object)"When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
        }
        if (this.cfg.enableMetaSync.booleanValue()) {
            for (String impl : syncClientToolClasses) {
                Timer.Context syncContext = metrics.getMetaSyncTimerContext();
                switch (impl = impl.trim()) {
                    case "org.apache.hudi.hive.HiveSyncTool": {
                        this.syncHive();
                        break;
                    }
                    default: {
                        FileSystem fs = FSUtils.getFs(this.cfg.targetBasePath, this.jssc.hadoopConfiguration());
                        Properties properties = new Properties();
                        properties.putAll((Map<?, ?>)this.props);
                        properties.put("basePath", this.cfg.targetBasePath);
                        properties.put("baseFileFormat", this.cfg.baseFileFormat);
                        AbstractSyncTool syncTool = (AbstractSyncTool)ReflectionUtils.loadClass(impl, new Class[]{Properties.class, FileSystem.class}, new Object[]{properties, fs});
                        syncTool.syncHoodieTable();
                    }
                }
                long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0L;
                metrics.updateDeltaStreamerMetaSyncMetrics(this.getSyncClassShortName(impl), metaSyncTimeMs);
            }
        }
    }

    public void syncHive() {
        HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(this.props, this.cfg.targetBasePath, this.cfg.baseFileFormat);
        LOG.info((Object)("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + this.cfg.targetBasePath));
        HiveConf hiveConf = new HiveConf(this.conf, HiveConf.class);
        LOG.info((Object)("Hive Conf => " + hiveConf.getAllProperties().toString()));
        LOG.info((Object)("Hive Sync Conf => " + hiveSyncConfig.toString()));
        new HiveSyncTool(hiveSyncConfig, hiveConf, this.fs).syncHoodieTable();
    }

    public void syncHive(HiveConf conf) {
        this.conf = conf;
        this.syncHive();
    }

    public void setupWriteClient() throws IOException {
        if (null != this.schemaProvider) {
            Schema sourceSchema = this.schemaProvider.getSourceSchema();
            Schema targetSchema = this.schemaProvider.getTargetSchema();
            this.reInitWriteClient(sourceSchema, targetSchema);
        }
    }

    private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
        LOG.info((Object)"Setting up new Hoodie Write Client");
        this.registerAvroSchemas(sourceSchema, targetSchema);
        HoodieWriteConfig hoodieCfg = this.getHoodieClientConfig(targetSchema);
        if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
            if (!this.embeddedTimelineService.isPresent()) {
                this.embeddedTimelineService = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new HoodieSparkEngineContext(this.jssc), hoodieCfg);
            } else {
                EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(this.embeddedTimelineService.get(), hoodieCfg);
            }
        }
        if (null != this.writeClient) {
            this.writeClient.close();
        }
        this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(this.jssc), hoodieCfg, this.embeddedTimelineService);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
        return this.getHoodieClientConfig(schemaProvider != null ? schemaProvider.getTargetSchema() : null);
    }

    private HoodieWriteConfig getHoodieClientConfig(Schema schema2) {
        HoodieWriteConfig config;
        boolean combineBeforeUpsert = true;
        boolean autoCommit = false;
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes, true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withInlineCompaction(this.cfg.isInlineCompactionEnabled()).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(this.cfg.sourceOrderingField).build()).forTable(this.cfg.targetTableName).withAutoCommit(false).withProps(this.props);
        if (schema2 != null) {
            builder.withSchema(schema2.toString());
        }
        if ((config = builder.build()).writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
            HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
        }
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(this.props);
        ValidationUtils.checkArgument(config.inlineCompactionEnabled() == this.cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", HoodieCompactionConfig.INLINE_COMPACT.key(), this.cfg.isInlineCompactionEnabled()));
        ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
        ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
        ValidationUtils.checkArgument(config.shouldAutoCommit() == false, String.format("%s should be set to %s", HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), false));
        ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), this.cfg.filterDupes));
        ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true));
        return config;
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            this.registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
        }
    }

    private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
        if (null != sourceSchema) {
            ArrayList<Schema> schemas = new ArrayList<Schema>();
            schemas.add(sourceSchema);
            if (targetSchema != null) {
                schemas.add(targetSchema);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Registering Schema: " + schemas));
            }
            this.jssc.sc().getConf().registerAvroSchemas((Seq)JavaConversions.asScalaBuffer(schemas).toList());
        }
    }

    public void close() {
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
        LOG.info((Object)"Shutting down embedded timeline server");
        if (this.embeddedTimelineService.isPresent()) {
            this.embeddedTimelineService.get().stop();
        }
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public HoodieDeltaStreamer.Config getCfg() {
        return this.cfg;
    }

    public Option<HoodieTimeline> getCommitTimelineOpt() {
        return this.commitTimelineOpt;
    }

    public Option<String> getClusteringInstantOpt() {
        return this.writeClient.scheduleClustering(Option.empty());
    }
}

