/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieSchemaUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.commit.HoodieStreamerDatasetBulkInsertCommitActionExecutor;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.exception.HoodieStreamerWriteException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.LazyCastingIterator;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.ErrorTableUtils;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.HoodieStreamerUtils;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SparkSampleWritesUtils;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.immutable.Seq;

public class StreamSync
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
    private static final String NULL_PLACEHOLDER = "[null]";
    public static final String CHECKPOINT_IGNORE_KEY = "deltastreamer.checkpoint.ignore_key";
    private final HoodieStreamer.Config cfg;
    private transient SourceFormatAdapter formatAdapter;
    private transient SchemaProvider userProvidedSchemaProvider;
    private transient SchemaProvider schemaProvider;
    private transient Option<Transformer> transformer;
    private String keyGenClassName;
    private transient HoodieStorage storage;
    private final transient HoodieSparkEngineContext hoodieSparkContext;
    private transient SparkSession sparkSession;
    private transient Configuration conf;
    private final TypedProperties props;
    private transient java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient;
    private transient Option<HoodieTimeline> commitsTimelineOpt;
    private transient Option<HoodieTimeline> allCommitsTimelineOpt;
    private final SchemaSet processedSchema;
    private transient Option<EmbeddedTimelineService> embeddedTimelineService = Option.empty();
    private transient SparkRDDWriteClient writeClient;
    private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
    private HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy;
    private transient HoodieIngestionMetrics metrics;
    private transient HoodieMetrics hoodieMetrics;
    private final boolean autoGenerateRecordKeys;

    @VisibleForTesting
    StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage2, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider, Option<BaseErrorTableWriter> errorTableWriter, SourceFormatAdapter formatAdapter, Option<Transformer> transformer, boolean autoGenerateRecordKeys) {
        this.cfg = cfg;
        this.hoodieSparkContext = hoodieSparkContext;
        this.sparkSession = sparkSession;
        this.storage = storage2;
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        this.userProvidedSchemaProvider = userProvidedSchemaProvider;
        this.processedSchema = new SchemaSet();
        this.autoGenerateRecordKeys = autoGenerateRecordKeys;
        this.keyGenClassName = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(new TypedProperties(props));
        this.conf = conf;
        this.errorTableWriter = errorTableWriter;
        this.formatAdapter = formatAdapter;
        this.transformer = transformer;
    }

    @Deprecated
    public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
        this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, java.util.function.Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient, StreamContext streamContext) throws IOException {
        this.cfg = cfg;
        this.hoodieSparkContext = hoodieSparkContext;
        this.sparkSession = sparkSession;
        this.storage = new HoodieHadoopStorage(fs);
        this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
        this.props = props;
        this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
        this.processedSchema = new SchemaSet();
        this.autoGenerateRecordKeys = KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props);
        this.keyGenClassName = HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(new TypedProperties(props));
        this.conf = conf;
        HoodieWriteConfig hoodieWriteConfig = this.getHoodieClientConfig();
        this.metrics = (HoodieIngestionMetrics)ReflectionUtils.loadClass(cfg.ingestionMetricsClass, new Class[]{HoodieMetricsConfig.class, HoodieStorage.class}, new Object[]{hoodieWriteConfig.getMetricsConfig(), this.storage});
        this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, this.storage);
        if (props.getBoolean(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), HoodieErrorTableConfig.ERROR_TABLE_ENABLED.defaultValue())) {
            this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, sparkSession, props, hoodieSparkContext, fs, Option.of(this.metrics));
            this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props);
        }
        this.initializeMetaClient();
        Source source = UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, this.metrics, streamContext);
        this.formatAdapter = new SourceFormatAdapter(source, this.errorTableWriter, Option.of(props));
        Supplier<Option<Schema>> schemaSupplier = this.schemaProvider == null ? Option::empty : () -> Option.ofNullable(this.schemaProvider.getSourceSchema());
        this.transformer = UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), schemaSupplier, this.errorTableWriter.isPresent());
    }

    public HoodieTableMetaClient initializeMetaClientAndRefreshTimeline() throws IOException {
        return this.initializeMetaClient(true);
    }

    private HoodieTableMetaClient initializeMetaClient() throws IOException {
        return this.initializeMetaClient(false);
    }

    private HoodieTableMetaClient initializeMetaClient(boolean refreshTimeline) throws IOException {
        if (this.storage.exists(new StoragePath(this.cfg.targetBasePath))) {
            try {
                HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.conf)).setBasePath(this.cfg.targetBasePath).setPayloadClassName(this.cfg.payloadClassName).setRecordMergerStrategy(null).setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(this.props).withPath(this.cfg.targetBasePath).build()).build();
                if (refreshTimeline) {
                    switch (metaClient.getTableType()) {
                        case COPY_ON_WRITE: 
                        case MERGE_ON_READ: {
                            this.commitsTimelineOpt = Option.of(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
                            this.allCommitsTimelineOpt = Option.of(metaClient.getActiveTimeline().getAllCommitsTimeline());
                            break;
                        }
                        default: {
                            throw new HoodieException("Unsupported table type :" + (Object)((Object)metaClient.getTableType()));
                        }
                    }
                }
                return metaClient;
            }
            catch (HoodieIOException e) {
                LOG.warn("Full exception msg " + e.getMessage());
                if (e.getMessage().contains("Could not load Hoodie properties") && e.getMessage().contains("hoodie.properties")) {
                    boolean hoodiePropertiesExists;
                    String basePathWithForwardSlash = this.cfg.targetBasePath.endsWith("/") ? this.cfg.targetBasePath : String.format("%s/", this.cfg.targetBasePath);
                    String pathToHoodieProps = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties");
                    String pathToHoodiePropsBackup = String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties.backup");
                    boolean bl = hoodiePropertiesExists = this.storage.exists(new StoragePath(basePathWithForwardSlash)) && this.storage.exists(new StoragePath(pathToHoodieProps)) && this.storage.exists(new StoragePath(pathToHoodiePropsBackup));
                    if (!hoodiePropertiesExists) {
                        LOG.warn("Base path exists, but table is not fully initialized. Re-initializing again");
                        HoodieTableMetaClient metaClientToValidate = this.initializeEmptyTable();
                        if (metaClientToValidate.reloadActiveTimeline().countInstants() > 0) {
                            this.storage.deleteDirectory(new StoragePath(String.format("%s%s/%s", basePathWithForwardSlash, ".hoodie", "hoodie.properties")));
                            throw new HoodieIOException("hoodie.properties is missing. Likely due to some external entity. Please populate the hoodie.properties and restart the pipeline. ", e.getIOException());
                        }
                        return metaClientToValidate;
                    }
                }
                throw e;
            }
        }
        return this.initializeEmptyTable();
    }

    private HoodieTableMetaClient initializeEmptyTable() throws IOException {
        return this.initializeEmptyTable(HoodieTableMetaClient.newTableBuilder(), SparkKeyGenUtils.getPartitionColumnsForKeyGenerator(this.props), HadoopFSUtils.getStorageConfWithCopy(this.hoodieSparkContext.hadoopConfiguration()));
    }

    HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder tableBuilder, String partitionColumns, StorageConfiguration<?> storageConf) throws IOException {
        this.commitsTimelineOpt = Option.empty();
        this.allCommitsTimelineOpt = Option.empty();
        return tableBuilder.setTableType(this.cfg.tableType).setTableName(this.cfg.targetTableName).setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue()).setPayloadClassName(this.cfg.payloadClassName).setRecordMergeStrategyId(this.cfg.recordMergeStrategyId).setRecordMergeMode(this.cfg.recordMergeMode).setBaseFileFormat(this.cfg.baseFileFormat).setPartitionFields(partitionColumns).setTableVersion(ConfigUtils.getIntWithAltKeys(this.props, HoodieWriteConfig.WRITE_TABLE_VERSION)).setRecordKeyFields(this.props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())).setPopulateMetaFields(this.props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())).setKeyGeneratorClassProp(this.keyGenClassName).setPreCombineField(this.cfg.sourceOrderingField).setPartitionMetafileUseBaseFormat(this.props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())).setCDCEnabled(this.props.getBoolean(HoodieTableConfig.CDC_ENABLED.key(), HoodieTableConfig.CDC_ENABLED.defaultValue())).setCDCSupplementalLoggingMode(this.props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue())).setShouldDropPartitionColumns(HoodieStreamerUtils.isDropPartitionColumns(this.props)).setHiveStylePartitioningEnable(this.props.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key(), Boolean.parseBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue()))).setUrlEncodePartitioning(this.props.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING.key(), Boolean.parseBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()))).initTable(storageConf, this.cfg.targetBasePath);
    }

    public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException {
        Pair<Option<String>, JavaRDD<WriteStatus>> result2 = null;
        Timer.Context overallTimerContext = this.metrics.getOverallTimerContext();
        HoodieTableMetaClient metaClient = this.initializeMetaClientAndRefreshTimeline();
        String instantTime = metaClient.createNewInstantTime();
        Pair<InputBatch, Boolean> inputBatchAndUseRowWriter = this.readFromSource(instantTime, metaClient);
        if (inputBatchAndUseRowWriter != null) {
            Option<String> pendingClusteringInstant;
            InputBatch inputBatch = inputBatchAndUseRowWriter.getLeft();
            boolean useRowWriter = inputBatchAndUseRowWriter.getRight();
            if (this.writeClient == null) {
                this.schemaProvider = inputBatch.getSchemaProvider();
                this.setupWriteClient(inputBatch.getBatch(), metaClient);
            } else {
                Schema newSourceSchema = inputBatch.getSchemaProvider().getSourceSchema();
                Schema newTargetSchema = inputBatch.getSchemaProvider().getTargetSchema();
                if (newSourceSchema != null && !this.processedSchema.isSchemaPresent(newSourceSchema) || newTargetSchema != null && !this.processedSchema.isSchemaPresent(newTargetSchema)) {
                    String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true);
                    String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true);
                    LOG.info("Seeing new schema. Source: {}, Target: {}", (Object)sourceStr, (Object)targetStr);
                    this.reInitWriteClient(newSourceSchema, newTargetSchema, inputBatch.getBatch(), metaClient);
                    if (newSourceSchema != null) {
                        this.processedSchema.addSchema(newSourceSchema);
                    }
                    if (newTargetSchema != null) {
                        this.processedSchema.addSchema(newTargetSchema);
                    }
                }
            }
            if (this.cfg.retryLastPendingInlineCompactionJob.booleanValue() && this.writeClient.getConfig().inlineCompactionEnabled()) {
                Option<String> pendingCompactionInstant = this.getLastPendingCompactionInstant(this.allCommitsTimelineOpt);
                if (pendingCompactionInstant.isPresent()) {
                    HoodieWriteMetadata writeMetadata = this.writeClient.compact(pendingCompactionInstant.get());
                    this.writeClient.commitCompaction(pendingCompactionInstant.get(), writeMetadata.getCommitMetadata().get(), Option.empty());
                    this.initializeMetaClientAndRefreshTimeline();
                    this.reInitWriteClient(this.schemaProvider.getSourceSchema(), this.schemaProvider.getTargetSchema(), null, metaClient);
                }
            } else if (this.cfg.retryLastPendingInlineClusteringJob.booleanValue() && this.writeClient.getConfig().inlineClusteringEnabled() && (pendingClusteringInstant = this.getLastPendingClusteringInstant(this.allCommitsTimelineOpt)).isPresent()) {
                this.writeClient.cluster(pendingClusteringInstant.get());
            }
            result2 = this.writeToSinkAndDoMetaSync(instantTime, inputBatch, useRowWriter, this.metrics, overallTimerContext);
        }
        if (this.schemaProvider != null) {
            this.schemaProvider.refresh();
        }
        this.metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
        return result2;
    }

    private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> pendingClusteringInstant = commitTimelineOpt.get().getLastPendingClusterInstant();
            return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().requestedTime()) : Option.empty();
        }
        return Option.empty();
    }

    private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> commitTimelineOpt) {
        if (commitTimelineOpt.isPresent()) {
            Option<HoodieInstant> pendingCompactionInstant = commitTimelineOpt.get().filterPendingCompactionTimeline().lastInstant();
            return pendingCompactionInstant.isPresent() ? Option.of(pendingCompactionInstant.get().requestedTime()) : Option.empty();
        }
        return Option.empty();
    }

    public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
        Option<Checkpoint> checkpointToResume = StreamerCheckpointUtils.getCheckpointToResumeFrom(this.commitsTimelineOpt, this.cfg, this.props);
        LOG.info("Checkpoint to resume from : " + checkpointToResume);
        int maxRetryCount = this.cfg.retryOnSourceFailures != false ? this.cfg.maxRetryCount : 1;
        int curRetryCount = 0;
        Pair<InputBatch, Boolean> sourceDataToSync = null;
        while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
            try {
                sourceDataToSync = this.fetchFromSourceAndPrepareRecords(checkpointToResume, instantTime, metaClient);
            }
            catch (HoodieSourceTimeoutException e) {
                if (curRetryCount >= maxRetryCount) {
                    throw e;
                }
                try {
                    LOG.error("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause());
                    LOG.error("Sleeping for " + this.cfg.retryIntervalSecs + " before retrying again. Current retry count " + curRetryCount + ", max retry count " + this.cfg.maxRetryCount);
                    Thread.sleep(this.cfg.retryIntervalSecs * 1000);
                }
                catch (InterruptedException ex) {
                    LOG.error("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage());
                }
            }
        }
        return sourceDataToSync;
    }

    private Pair<InputBatch, Boolean> fetchFromSourceAndPrepareRecords(Option<Checkpoint> resumeCheckpoint, String instantTime, HoodieTableMetaClient metaClient) {
        this.hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching next batch: " + this.cfg.targetTableName);
        HoodieRecord.HoodieRecordType recordType = UtilHelpers.createRecordMerger(this.props).getRecordType();
        if (recordType == HoodieRecord.HoodieRecordType.SPARK && HoodieTableType.valueOf(this.cfg.tableType) == HoodieTableType.MERGE_ON_READ && !this.cfg.operation.equals((Object)WriteOperationType.BULK_INSERT) && HoodieLogBlock.HoodieLogBlockType.fromId(this.props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "avro")) != HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
            throw new UnsupportedOperationException("Spark record only support parquet log.");
        }
        Pair<InputBatch, Boolean> inputBatchAndRowWriterEnabled = this.fetchNextBatchFromSource(resumeCheckpoint, metaClient);
        InputBatch inputBatch = inputBatchAndRowWriterEnabled.getLeft();
        boolean useRowWriter = inputBatchAndRowWriterEnabled.getRight();
        Checkpoint checkpoint = inputBatch.getCheckpointForNextBatch();
        SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
        if (!this.cfg.allowCommitOnNoCheckpointChange.booleanValue() && checkpoint.equals(resumeCheckpoint.orElse(null))) {
            LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpoint + "). New Checkpoint=(" + checkpoint + ")");
            String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
            this.hoodieMetrics.updateMetricsForEmptyData(commitActionType);
            return null;
        }
        this.hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty: " + this.cfg.targetTableName);
        if (useRowWriter) {
            return Pair.of(inputBatch, true);
        }
        Option<JavaRDD<HoodieRecord>> recordsOpt = HoodieStreamerUtils.createHoodieRecords(this.cfg, this.props, inputBatch.getBatch(), schemaProvider, recordType, this.autoGenerateRecordKeys, instantTime, this.errorTableWriter);
        return Pair.of(new InputBatch<JavaRDD<HoodieRecord>>(recordsOpt, checkpoint, schemaProvider), false);
    }

    @VisibleForTesting
    boolean canUseRowWriter(Schema targetSchema) {
        boolean rowWriterEnabled = this.isRowWriterEnabled();
        return rowWriterEnabled && targetSchema != null && HoodieDataTypeUtils.canUseRowWriter(targetSchema, this.conf);
    }

    @VisibleForTesting
    boolean isRowWriterEnabled() {
        return this.cfg.operation == WriteOperationType.BULK_INSERT && this.formatAdapter.getSource().getSourceType() == Source.SourceType.ROW && this.props.getBoolean("hoodie.streamer.write.row.writer.enable", false);
    }

    @VisibleForTesting
    Pair<InputBatch, Boolean> fetchNextBatchFromSource(Option<Checkpoint> resumeCheckpoint, HoodieTableMetaClient metaClient) {
        Option<JavaRDD> avroRDDOptional = null;
        Checkpoint checkpoint = null;
        SchemaProvider schemaProvider = null;
        InputBatch<Dataset<Row>> inputBatchForWriter = null;
        boolean reconcileSchema = this.props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
        if (this.transformer.isPresent()) {
            InputBatch<Dataset<Row>> dataAndCheckpoint = this.formatAdapter.fetchNewDataInRowFormat(resumeCheckpoint, this.cfg.sourceLimit);
            Option<Dataset<Row>> transformed = dataAndCheckpoint.getBatch().map(data2 -> this.transformer.get().apply(this.hoodieSparkContext.jsc(), this.sparkSession, (Dataset<Row>)data2, this.props));
            transformed = this.formatAdapter.processErrorEvents(transformed, ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
            checkpoint = dataAndCheckpoint.getCheckpointForNextBatch();
            if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null && this.userProvidedSchemaProvider.getTargetSchema() != InputBatch.NULL_SCHEMA) {
                schemaProvider = this.getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), this.userProvidedSchemaProvider, metaClient);
                boolean useRowWriter = this.canUseRowWriter(schemaProvider.getTargetSchema());
                if (useRowWriter) {
                    inputBatchForWriter = new InputBatch<Dataset<Row>>(transformed, checkpoint, schemaProvider);
                } else {
                    SchemaProvider finalSchemaProvider = schemaProvider;
                    avroRDDOptional = this.errorTableWriter.isPresent() && this.props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()) ? transformed.map(rowDataset -> {
                        Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = HoodieSparkUtils.safeCreateRDD((Dataset<Row>)rowDataset, "hoodie_source", "hoodie.source", reconcileSchema, Option.of(finalSchemaProvider.getTargetSchema()));
                        this.errorTableWriter.get().addErrorEvents(((RDD)safeCreateRDDs._2()).toJavaRDD().map((Function & Serializable)evStr -> new ErrorEvent<String>((String)evStr, ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
                        return ((RDD)safeCreateRDDs._1).toJavaRDD();
                    }) : transformed.map(rowDataset -> this.getTransformedRDD((Dataset<Row>)rowDataset, reconcileSchema, finalSchemaProvider.getTargetSchema()));
                }
            } else {
                Schema incomingSchema = transformed.map(df -> AvroConversionUtils.convertStructTypeToAvroSchema((DataType)df.schema(), AvroSchemaUtils.getAvroRecordQualifiedName(this.cfg.targetTableName))).orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
                schemaProvider = this.getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(), metaClient);
                if (this.canUseRowWriter(schemaProvider.getTargetSchema())) {
                    inputBatchForWriter = new InputBatch<Dataset<Row>>(transformed, checkpoint, schemaProvider);
                } else {
                    SchemaProvider finalSchemaProvider = schemaProvider;
                    avroRDDOptional = transformed.map(t -> this.getTransformedRDD((Dataset<Row>)t, reconcileSchema, finalSchemaProvider.getTargetSchema()));
                }
            }
        } else {
            if (this.isRowWriterEnabled()) {
                InputBatch<Dataset<Row>> inputBatchNeedsDeduceSchema = this.formatAdapter.fetchNewDataInRowFormat(resumeCheckpoint, this.cfg.sourceLimit);
                if (this.canUseRowWriter(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema())) {
                    inputBatchForWriter = new InputBatch<Dataset<Row>>(inputBatchNeedsDeduceSchema.getBatch(), inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(), this.getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(), inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
                } else {
                    LOG.warn("Row-writer is enabled but cannot be used due to the target schema");
                }
            }
            if (inputBatchForWriter == null) {
                InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = this.formatAdapter.fetchNewDataInAvroFormat(resumeCheckpoint, this.cfg.sourceLimit);
                checkpoint = dataAndCheckpoint.getCheckpointForNextBatch();
                schemaProvider = this.getDeducedSchemaProvider(dataAndCheckpoint.getSchemaProvider().getTargetSchema(), dataAndCheckpoint.getSchemaProvider(), metaClient);
                String serializedTargetSchema = schemaProvider.getTargetSchema().toString();
                avroRDDOptional = this.errorTableWriter.isPresent() && this.props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()) ? dataAndCheckpoint.getBatch().map(records -> {
                    Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = HoodieSparkUtils.safeRewriteRDD((RDD<GenericRecord>)records.rdd(), serializedTargetSchema);
                    this.errorTableWriter.get().addErrorEvents(((RDD)safeCreateRDDs._2()).toJavaRDD().map((Function & Serializable)evStr -> new ErrorEvent<String>((String)evStr, ErrorEvent.ErrorReason.INVALID_RECORD_SCHEMA)));
                    return ((RDD)safeCreateRDDs._1).toJavaRDD();
                }) : dataAndCheckpoint.getBatch().map(t -> t.mapPartitions((FlatMapFunction & Serializable)iterator2 -> new LazyCastingIterator((Iterator<GenericRecord>)iterator2, serializedTargetSchema)));
            }
        }
        if (inputBatchForWriter != null) {
            return Pair.of(inputBatchForWriter, true);
        }
        return Pair.of(new InputBatch(avroRDDOptional, checkpoint, schemaProvider), false);
    }

    @VisibleForTesting
    SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
        Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(this.hoodieSparkContext.jsc(), this.storage, this.cfg.targetBasePath, metaClient);
        Option<InternalSchema> internalSchemaOpt = HoodieConversionUtils.toJavaOption(HoodieSchemaUtils.getLatestTableInternalSchema(new HoodieConfig(HoodieStreamer.Config.getProps(this.conf, this.cfg)), metaClient));
        Schema targetSchema = HoodieSchemaUtils.deduceWriterSchema(HoodieAvroUtils.removeMetadataFields(incomingSchema), latestTableSchemaOpt, internalSchemaOpt, this.props);
        return new DelegatingSchemaProvider(this.props, this.hoodieSparkContext.jsc(), sourceSchemaProvider, new SimpleSchemaProvider(this.hoodieSparkContext.jsc(), targetSchema, this.props));
    }

    private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, boolean reconcileSchema, Schema readerSchema) {
        return HoodieSparkUtils.createRdd(rowDataset, "hoodie_source", "hoodie.source", reconcileSchema, Option.ofNullable(readerSchema)).toJavaRDD();
    }

    private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) {
        HoodieConfig hoodieConfig = new HoodieConfig(HoodieStreamer.Config.getProps(this.conf, this.cfg));
        hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), this.cfg.tableType);
        hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), this.cfg.payloadClassName);
        hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_MODE().key(), this.cfg.recordMergeMode.name());
        hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID().key(), this.cfg.recordMergeStrategyId);
        hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(this.props));
        hoodieConfig.setValue("path", this.cfg.targetBasePath);
        return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(), hoodieConfig, this.cfg.targetBasePath, this.cfg.targetTableName);
    }

    /*
     * WARNING - void declaration
     * Enabled aggressive block sorting
     */
    private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean useRowWriter, HoodieIngestionMetrics metrics, Timer.Context overallTimerContext) {
        void var6_8;
        boolean success;
        Map checkpointCommitMetadata;
        boolean hasErrors;
        Option option = Option.empty();
        WriteClientWriteResult writeClientWriteResult = this.writeToSink(inputBatch, instantTime, useRowWriter);
        JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResult.getWriteStatusRDD();
        Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds();
        long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
        long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
        long totalSuccessfulRecords = totalRecords - totalErrorRecords;
        LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d", instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords));
        if (totalRecords == 0L) {
            LOG.info("No new data, perform empty commit.");
        }
        boolean bl = hasErrors = totalErrorRecords > 0L;
        if (hasErrors && !this.cfg.commitOnErrors.booleanValue()) {
            LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
            LOG.error("Printing out the top 100 errors");
            writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
                LOG.error("Global error :", ws.getGlobalError());
                if (ws.getErrors().size() > 0) {
                    ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
                }
            });
            this.writeClient.rollback(instantTime);
            throw new HoodieStreamerWriteException("Commit " + instantTime + " failed and rolled-back !");
        }
        Map<Object, Object> map2 = !ConfigUtils.getBooleanWithAltKeys(this.props, HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP) ? (inputBatch.getCheckpointForNextBatch() != null ? inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(this.cfg.checkpoint, this.cfg.ignoreCheckpoint) : new StreamerCheckpointV2((String)null).getCheckpointCommitMetadata(this.cfg.checkpoint, this.cfg.ignoreCheckpoint)) : (checkpointCommitMetadata = Collections.emptyMap());
        if (hasErrors) {
            LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
        }
        String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
        if (this.errorTableWriter.isPresent()) {
            Option<String> commitedInstantTime = StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(this.commitsTimelineOpt);
            boolean errorTableSuccess = this.errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
            if (!errorTableSuccess) {
                switch (this.errorWriteFailureStrategy) {
                    case ROLLBACK_COMMIT: {
                        LOG.info("Commit " + instantTime + " failed!");
                        this.writeClient.rollback(instantTime);
                        throw new HoodieStreamerWriteException("Error table commit failed");
                    }
                    case LOG_ERROR: {
                        LOG.error("Error Table write failed for instant " + instantTime);
                        break;
                    }
                    default: {
                        throw new HoodieStreamerWriteException("Write failure strategy not implemented for " + (Object)((Object)this.errorWriteFailureStrategy));
                    }
                }
            }
        }
        if (!(success = this.writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty()))) {
            LOG.info("Commit " + instantTime + " failed!");
            throw new HoodieStreamerWriteException("Commit " + instantTime + " failed!");
        }
        LOG.info("Commit " + instantTime + " successful!");
        this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch() != null ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() : null);
        if (this.cfg.isAsyncCompactionEnabled()) {
            Option<String> option2 = this.writeClient.scheduleCompaction(Option.empty());
        }
        if (totalSuccessfulRecords > 0L || this.cfg.forceEmptyMetaSync.booleanValue()) {
            this.runMetaSync();
        } else {
            LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", totalSuccessfulRecords));
        }
        long overallTimeNanos = overallTimerContext != null ? overallTimerContext.stop() : 0L;
        metrics.updateStreamerMetrics(overallTimeNanos);
        return Pair.of(var6_8, writeStatusRDD);
    }

    private String startCommit(String instantTime, boolean retryEnabled) {
        int maxRetries = 2;
        int retryNum = 1;
        IllegalArgumentException lastException = null;
        while (retryNum <= 2) {
            try {
                String commitActionType = CommitUtils.getCommitActionType(this.cfg.operation, HoodieTableType.valueOf(this.cfg.tableType));
                this.writeClient.startCommitWithTime(instantTime, commitActionType);
                return instantTime;
            }
            catch (IllegalArgumentException ie) {
                lastException = ie;
                if (!retryEnabled) {
                    throw ie;
                }
                LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", (Throwable)ie);
                ++retryNum;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                instantTime = this.writeClient.createNewInstantTime();
            }
        }
        throw lastException;
    }

    private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instantTime, boolean useRowWriter) {
        WriteClientWriteResult writeClientWriteResult = null;
        instantTime = this.startCommit(instantTime, !this.autoGenerateRecordKeys);
        if (useRowWriter) {
            Dataset df = (Dataset)inputBatch.getBatch().orElseGet(() -> this.hoodieSparkContext.getSqlContext().emptyDataFrame());
            HoodieWriteConfig hoodieWriteConfig = this.prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
            HoodieStreamerDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, this.writeClient, instantTime);
            writeClientWriteResult = new WriteClientWriteResult(executor.execute((Dataset<Row>)df, !HoodieStreamerUtils.getPartitionColumns(this.props).isEmpty()).getWriteStatuses());
        } else {
            JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>)inputBatch.getBatch().orElseGet(() -> this.hoodieSparkContext.emptyRDD());
            if (this.cfg.filterDupes.booleanValue()) {
                records = DataSourceUtils.dropDuplicates(this.hoodieSparkContext, records, this.writeClient.getConfig());
            }
            HoodieWriteResult writeResult = null;
            switch (this.cfg.operation) {
                case INSERT: {
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.insert(records, instantTime));
                    break;
                }
                case UPSERT: {
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.upsert(records, instantTime));
                    break;
                }
                case BULK_INSERT: {
                    writeClientWriteResult = new WriteClientWriteResult(this.writeClient.bulkInsert(records, instantTime, DataSourceUtils.createUserDefinedBulkInsertPartitioner(this.writeClient.getConfig())));
                    break;
                }
                case INSERT_OVERWRITE: {
                    writeResult = this.writeClient.insertOverwrite(records, instantTime);
                    writeClientWriteResult = new WriteClientWriteResult(writeResult.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
                    break;
                }
                case INSERT_OVERWRITE_TABLE: {
                    writeResult = this.writeClient.insertOverwriteTable(records, instantTime);
                    writeClientWriteResult = new WriteClientWriteResult(writeResult.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
                    break;
                }
                case DELETE_PARTITION: {
                    List partitions = records.map((Function & Serializable)record -> record.getPartitionPath()).distinct().collect();
                    writeResult = this.writeClient.deletePartitions(partitions, instantTime);
                    writeClientWriteResult = new WriteClientWriteResult(writeResult.getWriteStatuses());
                    writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
                    break;
                }
                default: {
                    throw new HoodieStreamerException("Unknown operation : " + (Object)((Object)this.cfg.operation));
                }
            }
        }
        return writeClientWriteResult;
    }

    private String getSyncClassShortName(String syncClassName) {
        return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
    }

    public void runMetaSync() {
        List syncClientToolClasses = Arrays.stream(this.cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList());
        if (this.cfg.enableHiveSync.booleanValue()) {
            this.cfg.enableMetaSync = true;
            syncClientToolClasses.add(HiveSyncTool.class.getName());
            LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility");
        }
        if (this.cfg.enableMetaSync.booleanValue() && !syncClientToolClasses.isEmpty()) {
            HoodieTableMetaClient metaClient;
            LOG.debug("[MetaSync] Starting sync");
            try {
                metaClient = this.initializeMetaClient();
            }
            catch (IOException ex) {
                throw new HoodieIOException("Failed to load meta client", ex);
            }
            FileSystem fs = HadoopFSUtils.getFs(this.cfg.targetBasePath, this.hoodieSparkContext.hadoopConfiguration());
            TypedProperties metaProps = new TypedProperties();
            metaProps.putAll((Map<?, ?>)this.props);
            metaProps.putAll((Map<?, ?>)this.writeClient.getConfig().getProps());
            if (this.props.getBoolean(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
                metaProps.put(HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(this.props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), this.props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
            }
            if (this.writeClient.getConfig().getViewStorageConfig() != null) {
                metaProps.putAll((Map<?, ?>)this.writeClient.getConfig().getViewStorageConfig().getProps());
            }
            HashMap<String, HoodieException> failedMetaSyncs = new HashMap<String, HoodieException>();
            for (String impl : syncClientToolClasses) {
                if (impl.trim().isEmpty()) {
                    LOG.warn("Cannot run MetaSync with empty class name");
                    continue;
                }
                Timer.Context syncContext = this.metrics.getMetaSyncTimerContext();
                Option<HoodieMetaSyncException> metaSyncException = Option.empty();
                try {
                    SyncUtilHelpers.runHoodieMetaSync(impl.trim(), metaProps, this.conf, fs, this.cfg.targetBasePath, this.cfg.baseFileFormat, Option.of(metaClient));
                }
                catch (HoodieMetaSyncException e) {
                    metaSyncException = Option.of(e);
                }
                this.logMetaSync(impl, syncContext, failedMetaSyncs, metaSyncException);
            }
            if (!failedMetaSyncs.isEmpty()) {
                throw SyncUtilHelpers.getHoodieMetaSyncException(failedMetaSyncs);
            }
        }
    }

    private void logMetaSync(String impl, Timer.Context syncContext, Map<String, HoodieException> failedMetaSyncs, Option<HoodieMetaSyncException> metaSyncException) {
        long metaSyncTimeNanos = syncContext != null ? syncContext.stop() : 0L;
        this.metrics.updateStreamerMetaSyncMetrics(this.getSyncClassShortName(impl), metaSyncTimeNanos);
        long timeMs = metaSyncTimeNanos / 1000000L;
        String timeString = String.format("and took %d s %d ms ", timeMs / 1000L, timeMs % 1000L);
        if (metaSyncException.isPresent()) {
            LOG.error("[MetaSync] SyncTool class {} failed with exception {} {}", new Object[]{impl.trim(), metaSyncException.get(), timeString});
            failedMetaSyncs.put(impl, metaSyncException.get());
        } else {
            LOG.info("[MetaSync] SyncTool class {} completed successfully {}", (Object)impl.trim(), (Object)timeString);
        }
    }

    private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieTableMetaClient metaClient) throws IOException {
        if (null != this.schemaProvider) {
            Schema sourceSchema = this.schemaProvider.getSourceSchema();
            Schema targetSchema = this.schemaProvider.getTargetSchema();
            this.reInitWriteClient(sourceSchema, targetSchema, recordsOpt, metaClient);
        }
    }

    private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieTableMetaClient metaClient) throws IOException {
        LOG.info("Setting up new Hoodie Write Client");
        if (HoodieStreamerUtils.isDropPartitionColumns(this.props).booleanValue()) {
            targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(this.props));
        }
        Pair<HoodieWriteConfig, Schema> initialWriteConfigAndSchema = this.getHoodieClientConfigAndWriterSchema(targetSchema, true, metaClient);
        HoodieWriteConfig initialWriteConfig = initialWriteConfigAndSchema.getLeft();
        this.registerAvroSchemas(sourceSchema, initialWriteConfigAndSchema.getRight());
        HoodieWriteConfig writeConfig = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(this.hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig).orElse(initialWriteConfig);
        if (writeConfig.isEmbeddedTimelineServerEnabled()) {
            if (!this.embeddedTimelineService.isPresent()) {
                this.embeddedTimelineService = Option.of(EmbeddedTimelineServerHelper.createEmbeddedTimelineService(this.hoodieSparkContext, writeConfig));
            } else {
                EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(this.embeddedTimelineService.get(), writeConfig);
            }
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.writeClient = new SparkRDDWriteClient((HoodieEngineContext)this.hoodieSparkContext, writeConfig, this.embeddedTimelineService);
        this.onInitializingHoodieWriteClient.apply(this.writeClient);
    }

    private HoodieWriteConfig getHoodieClientConfig() {
        return this.getHoodieClientConfigAndWriterSchema(null, false, null).getLeft();
    }

    private Pair<HoodieWriteConfig, Schema> getHoodieClientConfigAndWriterSchema(Schema schema, boolean requireSchemaInConfig, HoodieTableMetaClient metaClient) {
        Schema returnSchema;
        boolean combineBeforeUpsert = true;
        boolean autoCommit = false;
        HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(this.cfg.targetBasePath).combineInput(this.cfg.filterDupes, true).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(this.cfg.isInlineCompactionEnabled()).build()).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(this.cfg.payloadClassName).withPayloadOrderingField(this.cfg.sourceOrderingField).build()).withRecordMergeMode(this.cfg.recordMergeMode).withRecordMergeStrategyId(this.cfg.recordMergeStrategyId).withRecordMergeImplClasses(this.cfg.recordMergeImplClasses).forTable(this.cfg.targetTableName).withAutoCommit(false).withProps(this.props);
        if (requireSchemaInConfig) {
            returnSchema = this.getSchemaForWriteConfig(schema, metaClient);
            builder.withSchema(returnSchema.toString());
        } else {
            returnSchema = schema;
        }
        HoodieWriteConfig config = builder.build();
        if (config.writeCommitCallbackOn()) {
            if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
            }
            if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) {
                HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config);
            }
        }
        HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(this.props);
        ValidationUtils.checkArgument(config.inlineCompactionEnabled() == this.cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", HoodieCompactionConfig.INLINE_COMPACT.key(), this.cfg.isInlineCompactionEnabled()));
        ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
        ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), String.format("%s should be set to %s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
        ValidationUtils.checkArgument(config.shouldAutoCommit() == false, String.format("%s should be set to %s", HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), false));
        ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == this.cfg.filterDupes.booleanValue(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), this.cfg.filterDupes));
        ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), true));
        return Pair.of(config, returnSchema);
    }

    private Schema getSchemaForWriteConfig(Schema targetSchema, HoodieTableMetaClient metaClient) {
        Schema newWriteSchema = targetSchema;
        try {
            int totalCompleted;
            if ((targetSchema == null || SchemaCompatibility.checkReaderWriterCompatibility((Schema)targetSchema, (Schema)InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE && SchemaCompatibility.checkReaderWriterCompatibility((Schema)InputBatch.NULL_SCHEMA, (Schema)targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) && (totalCompleted = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()) > 0) {
                TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
                Option<Schema> tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
                if (tableSchema.isPresent()) {
                    newWriteSchema = tableSchema.get();
                } else {
                    LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
                }
            }
            return newWriteSchema;
        }
        catch (Exception e) {
            throw new HoodieSchemaFetchException("Failed to fetch schema from table", e);
        }
    }

    private void registerAvroSchemas(SchemaProvider schemaProvider) {
        if (null != schemaProvider) {
            this.registerAvroSchemas(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
        }
    }

    private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) {
        ArrayList<Schema> schemas = new ArrayList<Schema>();
        if (sourceSchema != null) {
            schemas.add(sourceSchema);
        }
        if (targetSchema != null) {
            schemas.add(targetSchema);
        }
        if (!schemas.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering Schema: " + schemas);
            }
            this.hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas((Seq)JavaScalaConverters.convertJavaListToScalaList(schemas).toList());
        }
    }

    @Override
    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.formatAdapter != null) {
            this.formatAdapter.close();
        }
        LOG.info("Shutting down embedded timeline server");
        if (this.embeddedTimelineService.isPresent()) {
            this.embeddedTimelineService.get().stopForBasePath(this.cfg.targetBasePath);
        }
        if (this.metrics != null) {
            this.metrics.shutdown();
        }
    }

    public HoodieStorage getStorage() {
        return this.storage;
    }

    public TypedProperties getProps() {
        return this.props;
    }

    public HoodieStreamer.Config getCfg() {
        return this.cfg;
    }

    public Option<HoodieTimeline> getCommitsTimelineOpt() {
        return this.commitsTimelineOpt;
    }

    public HoodieIngestionMetrics getMetrics() {
        return this.metrics;
    }

    public Option<String> getClusteringInstantOpt() {
        if (this.writeClient != null) {
            return this.writeClient.scheduleClustering(Option.empty());
        }
        return Option.empty();
    }

    class WriteClientWriteResult {
        private Map<String, List<String>> partitionToReplacedFileIds = Collections.emptyMap();
        private JavaRDD<WriteStatus> writeStatusRDD;

        public WriteClientWriteResult(JavaRDD<WriteStatus> writeStatusRDD) {
            this.writeStatusRDD = writeStatusRDD;
        }

        public Map<String, List<String>> getPartitionToReplacedFileIds() {
            return this.partitionToReplacedFileIds;
        }

        public void setPartitionToReplacedFileIds(Map<String, List<String>> partitionToReplacedFileIds) {
            this.partitionToReplacedFileIds = partitionToReplacedFileIds;
        }

        public JavaRDD<WriteStatus> getWriteStatusRDD() {
            return this.writeStatusRDD;
        }
    }
}

