/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.async.SparkAsyncClusteringService;
import org.apache.hudi.async.SparkAsyncCompactService;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.ingestion.HoodieIngestionException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.ingestion.HoodieIngestionService;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.hudi.utilities.streamer.BootstrapExecutor;
import org.apache.hudi.utilities.streamer.ConfigurationHotUpdateStrategy;
import org.apache.hudi.utilities.streamer.ConfigurationHotUpdateStrategyUtils;
import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
import org.apache.hudi.utilities.streamer.PostWriteTerminationStrategy;
import org.apache.hudi.utilities.streamer.SchedulerConfGenerator;
import org.apache.hudi.utilities.streamer.StreamSync;
import org.apache.hudi.utilities.streamer.TerminationStrategyUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieStreamer
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamer.class);
    private static final List<String> DEFAULT_SENSITIVE_CONFIG_KEYS = Arrays.asList(HoodieWriteConfig.SENSITIVE_CONFIG_KEYS_FILTER.defaultValue().split(","));
    private static final String SENSITIVE_VALUES_MASKED = "SENSITIVE_INFO_MASKED";
    public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
    protected final transient Config cfg;
    private final TypedProperties properties;
    protected transient Option<HoodieIngestionService> ingestionService;
    private final Option<BootstrapExecutor> bootstrapExecutor;
    public static final String STREAMSYNC_POOL_NAME = "hoodiedeltasync";

    public HoodieStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
        this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), Option.empty());
    }

    public HoodieStreamer(Config cfg, JavaSparkContext jssc, Option<TypedProperties> props) throws IOException {
        this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), props);
    }

    public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException {
        this(cfg, jssc, fs, conf, Option.empty());
    }

    public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
        this.properties = HoodieStreamer.combineProperties(cfg, propsOverride, jssc.hadoopConfiguration());
        if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
            InitialCheckPointProvider checkPointProvider = UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, this.properties);
            checkPointProvider.init(conf);
            cfg.checkpoint = checkPointProvider.getCheckpoint();
        }
        this.cfg = cfg;
        this.bootstrapExecutor = Option.ofNullable(cfg.runBootstrap != false ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null);
        HoodieSparkEngineContext sparkEngineContext = new HoodieSparkEngineContext(jssc);
        this.ingestionService = Option.ofNullable(cfg.runBootstrap != false ? null : new StreamSyncService(cfg, sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
    }

    private static TypedProperties combineProperties(Config cfg, Option<TypedProperties> propsOverride, Configuration hadoopConf) {
        HoodieConfig hoodieConfig = new HoodieConfig();
        if (propsOverride.isPresent()) {
            hoodieConfig.setAll(propsOverride.get());
        } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
            hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
        } else {
            hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps());
        }
        hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
        hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
        if (cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            hoodieConfig.setValue(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        }
        return hoodieConfig.getProps(true);
    }

    public void shutdownGracefully() {
        this.ingestionService.ifPresent(ds -> {
            LOG.info("Shutting down DeltaStreamer");
            ds.shutdown(false);
            LOG.info("Async service shutdown complete. Closing DeltaSync ");
            ds.close();
        });
    }

    public void sync() throws Exception {
        if (this.bootstrapExecutor.isPresent()) {
            LOG.info("Performing bootstrap. Source=" + this.bootstrapExecutor.get().getBootstrapConfig().getBootstrapSourceBasePath());
            this.bootstrapExecutor.get().execute();
        } else {
            this.ingestionService.ifPresent(HoodieIngestionService::startIngestion);
        }
    }

    public Config getConfig() {
        return this.cfg;
    }

    public static String toSortedTruncatedString(TypedProperties props) {
        List<String> sensitiveConfigList = props.getStringList(HoodieWriteConfig.SENSITIVE_CONFIG_KEYS_FILTER.key(), ",", DEFAULT_SENSITIVE_CONFIG_KEYS);
        ArrayList<String> allKeys = new ArrayList<String>();
        for (Object k : props.keySet()) {
            allKeys.add(k.toString());
        }
        Collections.sort(allKeys);
        StringBuilder propsLog = new StringBuilder("Creating Hudi Streamer with configs:\n");
        for (String key : allKeys) {
            String value = Option.ofNullable(props.get(key)).orElse("").toString();
            if (value.length() > 255 && !LOG.isDebugEnabled()) {
                value = value.substring(0, 128) + "[...]";
            }
            if (sensitiveConfigList.stream().anyMatch(key::contains)) {
                value = SENSITIVE_VALUES_MASKED;
            }
            propsLog.append(key).append(": ").append(value).append("\n");
        }
        return propsLog.toString();
    }

    public static final Config getConfig(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        return cfg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args2) throws Exception {
        Config cfg = HoodieStreamer.getConfig(args2);
        Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
        JavaSparkContext jssc = null;
        jssc = StringUtils.isNullOrEmpty(cfg.sparkMaster) ? UtilHelpers.buildSparkContext("streamer-" + cfg.targetTableName, additionalSparkConfigs) : UtilHelpers.buildSparkContext("streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);
        if (cfg.enableHiveSync.booleanValue()) {
            LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
        }
        try {
            new HoodieStreamer(cfg, jssc).sync();
        }
        finally {
            jssc.stop();
        }
    }

    @VisibleForTesting
    public HoodieIngestionService getIngestionService() {
        return this.ingestionService.get();
    }

    public static class StreamSyncService
    extends HoodieIngestionService {
        private static final long serialVersionUID = 1L;
        private final Config cfg;
        private transient SchemaProvider schemaProvider;
        private transient SparkSession sparkSession;
        private final transient HoodieSparkEngineContext hoodieSparkContext;
        private transient FileSystem fs;
        private transient Configuration hiveConf;
        TypedProperties props;
        private Option<AsyncCompactService> asyncCompactService;
        private Option<AsyncClusteringService> asyncClusteringService;
        private HoodieTableType tableType;
        private transient StreamSync streamSync;
        private final Option<PostWriteTerminationStrategy> postWriteTerminationStrategy;
        private final Option<ConfigurationHotUpdateStrategy> configurationHotUpdateStrategyOpt;

        public StreamSyncService(Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Option<TypedProperties> properties) throws IOException {
            super(HoodieIngestionService.HoodieIngestionConfig.newBuilder().isContinuous(cfg.continuousMode).withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
            this.cfg = cfg;
            this.hoodieSparkContext = hoodieSparkContext;
            this.fs = fs;
            this.hiveConf = conf;
            this.sparkSession = SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate();
            this.asyncCompactService = Option.empty();
            this.asyncClusteringService = Option.empty();
            this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ? Option.empty() : TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass);
            Option<Object> option = this.configurationHotUpdateStrategyOpt = StringUtils.isNullOrEmpty(cfg.configHotUpdateStrategyClass) ? Option.empty() : ConfigurationHotUpdateStrategyUtils.createConfigurationHotUpdateStrategy(cfg.configHotUpdateStrategyClass, cfg, properties.get());
            if (fs.exists(new Path(cfg.targetBasePath))) {
                try {
                    HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
                    this.tableType = meta.getTableType();
                    ValidationUtils.checkArgument(this.tableType.equals((Object)HoodieTableType.valueOf(cfg.tableType)), "Hoodie table is of type " + (Object)((Object)this.tableType) + " but passed in CLI argument is " + cfg.tableType);
                    String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
                    ValidationUtils.checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null, String.format("Hoodie table's base file format is of type %s but passed in CLI argument is %s", baseFileFormat, cfg.baseFileFormat));
                    cfg.baseFileFormat = baseFileFormat;
                    this.cfg.baseFileFormat = baseFileFormat;
                    HashMap propsToValidate = new HashMap();
                    properties.get().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> propsToValidate.put(k.toString(), v.toString())));
                    HoodieWriterUtils.validateTableConfig(this.sparkSession, HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
                }
                catch (HoodieIOException e) {
                    LOG.warn("Full exception msg " + e.getLocalizedMessage() + ",  msg " + e.getMessage());
                    if (e.getMessage().contains("Could not load Hoodie properties") && e.getMessage().contains("hoodie.properties")) {
                        this.initializeTableTypeAndBaseFileFormat();
                    }
                    throw e;
                }
            } else {
                this.initializeTableTypeAndBaseFileFormat();
            }
            ValidationUtils.checkArgument(cfg.filterDupes == false || cfg.operation != WriteOperationType.UPSERT, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
            this.props = properties.get();
            LOG.info(HoodieStreamer.toSortedTruncatedString(this.props));
            this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, this.props, hoodieSparkContext.jsc()), this.props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
            this.streamSync = new StreamSync(cfg, this.sparkSession, this.schemaProvider, this.props, hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
        }

        public StreamSyncService(Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf) throws IOException {
            this(cfg, hoodieSparkContext, fs, conf, Option.empty());
        }

        private void initializeTableTypeAndBaseFileFormat() {
            this.tableType = HoodieTableType.valueOf(this.cfg.tableType);
            if (this.cfg.baseFileFormat == null) {
                this.cfg.baseFileFormat = "PARQUET";
            }
        }

        private void reInitDeltaSync() throws IOException {
            if (this.streamSync != null) {
                this.streamSync.close();
            }
            this.streamSync = new StreamSync(this.cfg, this.sparkSession, this.schemaProvider, this.props, this.hoodieSparkContext, this.fs, this.hiveConf, this::onInitializingWriteClient);
        }

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService executor = Executors.newFixedThreadPool(1);
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean error = false;
                if (this.cfg.isAsyncCompactionEnabled()) {
                    LOG.info("Setting Spark Pool name for delta-sync to hoodiedeltasync");
                    this.hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME, HoodieStreamer.STREAMSYNC_POOL_NAME);
                }
                HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(this.props);
                try {
                    while (!this.isShutdownRequested()) {
                        try {
                            Option<Object> lastWriteStatuses;
                            Option<String> clusteringInstant;
                            Option<Pair<Option<String>, JavaRDD<WriteStatus>>> scheduledCompactionInstantAndRDD;
                            Option<TypedProperties> newProps;
                            long start2 = System.currentTimeMillis();
                            this.streamSync.getMetrics().updateStreamerHeartbeatTimestamp(start2);
                            if (this.configurationHotUpdateStrategyOpt.isPresent() && (newProps = this.configurationHotUpdateStrategyOpt.get().updateProperties(this.props)).isPresent()) {
                                this.props = newProps.get();
                                LOG.info("Re-init delta sync with new config properties:");
                                LOG.info(HoodieStreamer.toSortedTruncatedString(this.props));
                                this.reInitDeltaSync();
                            }
                            if ((scheduledCompactionInstantAndRDD = Option.ofNullable(this.streamSync.syncOnce())).isPresent() && scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) {
                                LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.get().getLeft() + ")");
                                this.asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", scheduledCompactionInstantAndRDD.get().getLeft().get()));
                                this.asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingCompactions);
                                if (this.asyncCompactService.get().hasError()) {
                                    error = true;
                                    throw new HoodieException("Async compaction failed.  Shutting down Delta Sync...");
                                }
                            }
                            if (clusteringConfig.isAsyncClusteringEnabled() && (clusteringInstant = this.streamSync.getClusteringInstantOpt()).isPresent()) {
                                LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get());
                                this.asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, "replacecommit", clusteringInstant.get()));
                                this.asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingClustering);
                                if (this.asyncClusteringService.get().hasError()) {
                                    error = true;
                                    throw new HoodieException("Async clustering failed.  Shutting down Delta Sync...");
                                }
                            }
                            if (this.requestShutdownIfNeeded(lastWriteStatuses = Option.ofNullable(scheduledCompactionInstantAndRDD.isPresent() ? HoodieJavaRDD.of(scheduledCompactionInstantAndRDD.get().getRight()) : null))) {
                                LOG.warn("Closing and shutting down ingestion service");
                                error = true;
                                this.onIngestionCompletes(false);
                                this.shutdown(true);
                                continue;
                            }
                            this.sleepBeforeNextIngestion(start2);
                        }
                        catch (HoodieUpsertException ue) {
                            this.handleUpsertException(ue);
                        }
                    }
                    return true;
                    {
                        catch (Exception e) {
                            LOG.error("Shutting down delta-sync due to exception", (Throwable)e);
                            error = true;
                            throw new HoodieException(e.getMessage(), e);
                        }
                    }
                }
                finally {
                    this.shutdownAsyncServices(error);
                    executor.shutdownNow();
                }
            }, executor), executor);
        }

        private void handleUpsertException(HoodieUpsertException ue) {
            if (ue.getCause() instanceof HoodieClusteringUpdateException) {
                LOG.warn("Write rejected due to conflicts with pending clustering operation. Going to retry after 1 min with the hope that clustering will complete by then.", (Throwable)ue);
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException e) {
                    throw new HoodieException("Deltastreamer interrupted while waiting for next round ", e);
                }
            } else {
                throw ue;
            }
        }

        private void shutdownAsyncServices(boolean error) {
            LOG.info("Delta Sync shutdown. Error ?" + error);
            if (this.asyncCompactService.isPresent()) {
                LOG.warn("Gracefully shutting down compactor");
                this.asyncCompactService.get().shutdown(false);
            }
            if (this.asyncClusteringService.isPresent()) {
                LOG.warn("Gracefully shutting down clustering service");
                this.asyncClusteringService.get().shutdown(false);
            }
        }

        @Override
        public void ingestOnce() {
            try {
                this.streamSync.syncOnce();
            }
            catch (IOException e) {
                throw new HoodieIngestionException(String.format("Ingestion via %s failed with exception.", this.getClass()), e);
            }
            finally {
                this.close();
            }
        }

        @Override
        protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> lastWriteStatuses) {
            Option<Object> lastWriteStatusRDD = Option.ofNullable(lastWriteStatuses.isPresent() ? HoodieJavaRDD.getJavaRDD(lastWriteStatuses.get()) : null);
            return this.postWriteTerminationStrategy.isPresent() && this.postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD);
        }

        protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
            List<HoodieInstant> pending;
            HoodieTableMetaClient meta;
            if (this.cfg.isAsyncCompactionEnabled()) {
                if (this.asyncCompactService.isPresent()) {
                    this.asyncCompactService.get().updateWriteClient(writeClient);
                } else {
                    this.asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(this.hoodieSparkContext, writeClient));
                    meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.hoodieSparkContext.hadoopConfiguration())).setBasePath(this.cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
                    pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
                    pending.forEach(hoodieInstant -> this.asyncCompactService.get().enqueuePendingAsyncServiceInstant((HoodieInstant)hoodieInstant));
                    this.asyncCompactService.get().start(error -> true);
                    try {
                        this.asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingCompactions);
                        if (this.asyncCompactService.get().hasError()) {
                            throw new HoodieException("Async compaction failed during write client initialization.");
                        }
                    }
                    catch (InterruptedException ie) {
                        throw new HoodieException(ie);
                    }
                }
            }
            if (HoodieClusteringConfig.from(this.props).isAsyncClusteringEnabled()) {
                if (this.asyncClusteringService.isPresent()) {
                    this.asyncClusteringService.get().updateWriteClient(writeClient);
                } else {
                    this.asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(this.hoodieSparkContext, writeClient));
                    meta = HoodieTableMetaClient.builder().setConf(new Configuration(this.hoodieSparkContext.hadoopConfiguration())).setBasePath(this.cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
                    pending = ClusteringUtils.getPendingClusteringInstantTimes(meta);
                    LOG.info(String.format("Found %d pending clustering instants ", pending.size()));
                    pending.forEach(hoodieInstant -> this.asyncClusteringService.get().enqueuePendingAsyncServiceInstant((HoodieInstant)hoodieInstant));
                    this.asyncClusteringService.get().start(error -> true);
                    try {
                        this.asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingClustering);
                        if (this.asyncClusteringService.get().hasError()) {
                            throw new HoodieException("Async clustering failed during write client initialization.");
                        }
                    }
                    catch (InterruptedException e) {
                        throw new HoodieException(e);
                    }
                }
            }
            return true;
        }

        @Override
        protected boolean onIngestionCompletes(boolean hasError) {
            LOG.info("Ingestion completed. Has error: " + hasError);
            this.close();
            return true;
        }

        @Override
        public Option<HoodieIngestionMetrics> getMetrics() {
            return Option.ofNullable(this.streamSync.getMetrics());
        }

        @Override
        public void close() {
            if (this.streamSync != null) {
                this.streamSync.close();
            }
        }

        public SchemaProvider getSchemaProvider() {
            return this.schemaProvider;
        }

        public SparkSession getSparkSession() {
            return this.sparkSession;
        }

        public TypedProperties getProps() {
            return this.props;
        }

        @VisibleForTesting
        public HoodieSparkEngineContext getHoodieSparkContext() {
            return this.hoodieSparkContext;
        }

        @VisibleForTesting
        public StreamSync getStreamSync() {
            return this.streamSync;
        }
    }

    public static class Config
    implements Serializable {
        public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir") + "/src/test/resources/streamer-config/dfs-source.properties";
        @Parameter(names={"--target-base-path"}, description="base path for the target hoodie table. (Will be created if did not exist first time around. If exists, expected to be a hoodie table)", required=true)
        public String targetBasePath;
        @Parameter(names={"--target-table"}, description="name of the target table", required=true)
        public String targetTableName;
        @Parameter(names={"--table-type"}, description="Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required=true)
        public String tableType;
        @Parameter(names={"--base-file-format"}, description="File format for the base files. PARQUET (or) HFILE", required=false)
        public String baseFileFormat = "PARQUET";
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties. Properties in this file can be overridden by \"--hoodie-conf\"")
        public String propsFilePath = DEFAULT_DFS_SOURCE_PROPERTIES;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--source-class"}, description="Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
        public String sourceClassName = JsonDFSSource.class.getName();
        @Parameter(names={"--source-ordering-field"}, description="Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";
        @Parameter(names={"--payload-class"}, description="subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
        @Parameter(names={"--schemaprovider-class"}, description="subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider. For Sources that return Dataset<Row>, the schema is obtained implicitly. However, this CLI option allows overriding the schemaprovider returned by Source.")
        public String schemaProviderClassName = null;
        @Parameter(names={"--transformer-class"}, description="A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query templated to be passed as a transformation function). Pass a comma-separated list of subclass names to chain the transformations. If there are two or more transformers using the same config keys and expect different values for those keys, then transformer can include an identifier. E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the identifier tr1 can be used along with property key like `hoodie.streamer.transformer.sql.tr1` to identify properties related to the transformer. So effective value for `hoodie.streamer.transformer.sql` is determined by key `hoodie.streamer.transformer.sql.tr1` for this transformer. If identifier is used, it should be specified for all the transformers. Further the order in which transformer is applied is determined by the occurrence of transformer irrespective of the identifier used for the transformer. For example: In the configured value below tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer , tr2 is applied before tr1 based on order of occurrence.")
        public List<String> transformerClassNames = null;
        @Parameter(names={"--source-limit"}, description="Maximum amount of data to read from source. Default: No limit, e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
        public long sourceLimit = Long.MAX_VALUE;
        @Parameter(names={"--op"}, description="Takes one of these values : UPSERT (default), INSERT, BULK_INSERT, INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, DELETE_PARTITION", converter=OperationConverter.class)
        public WriteOperationType operation = WriteOperationType.UPSERT;
        @Parameter(names={"--filter-dupes"}, description="Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;
        @Parameter(names={"--enable-hive-sync"}, description="Enable syncing to hive")
        public Boolean enableHiveSync = false;
        @Parameter(names={"--enable-sync"}, description="Enable syncing meta")
        public Boolean enableMetaSync = false;
        @Parameter(names={"--force-empty-sync"}, description="Force syncing meta even on empty commit")
        public Boolean forceEmptyMetaSync = false;
        @Parameter(names={"--sync-tool-classes"}, description="Meta sync client tool, using comma to separate multi tools")
        public String syncClientToolClassNames = HiveSyncTool.class.getName();
        @Parameter(names={"--max-pending-compactions"}, description="Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unlessoutstanding compactions is less than this number")
        public Integer maxPendingCompactions = 5;
        @Parameter(names={"--max-pending-clustering"}, description="Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unlessoutstanding clustering is less than this number")
        public Integer maxPendingClustering = 5;
        @Parameter(names={"--continuous"}, description="Hudi Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop")
        public Boolean continuousMode = false;
        @Parameter(names={"--min-sync-interval-seconds"}, description="the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;
        @Parameter(names={"--spark-master"}, description="spark master to use, if not defined inherits from your environment taking into account Spark Configuration priority rules (e.g. not using spark-submit command).")
        public String sparkMaster = "";
        @Parameter(names={"--commit-on-errors"}, description="Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;
        @Parameter(names={"--delta-sync-scheduling-weight"}, description="Scheduling weight for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingWeight = 1;
        @Parameter(names={"--compact-scheduling-weight"}, description="Scheduling weight for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingWeight = 1;
        @Parameter(names={"--delta-sync-scheduling-minshare"}, description="Minshare for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingMinShare = 0;
        @Parameter(names={"--compact-scheduling-minshare"}, description="Minshare for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingMinShare = 0;
        @Parameter(names={"--disable-compaction"}, description="Compaction is enabled for MoR table by default. This flag disables it ")
        public Boolean forceDisableCompaction = false;
        @Parameter(names={"--checkpoint"}, description="Resume Hudi Streamer from this checkpoint.")
        public String checkpoint = null;
        @Parameter(names={"--initial-checkpoint-provider"}, description="subclass of org.apache.hudi.utilities.checkpointing.InitialCheckpointProvider. Generate check point for Hudi Streamer for the first run. This field will override the checkpoint of last commit using the checkpoint field. Use this field only when switching source, for example, from DFS source to Kafka Source.")
        public String initialCheckpointProvider = null;
        @Parameter(names={"--run-bootstrap"}, description="Run bootstrap if bootstrap index is not found")
        public Boolean runBootstrap = false;
        @Parameter(names={"--bootstrap-overwrite"}, description="Overwrite existing target table, default false")
        public Boolean bootstrapOverwrite = false;
        @Parameter(names={"--bootstrap-index-class"}, description="subclass of BootstrapIndex")
        public String bootstrapIndexClass = HFileBootstrapIndex.class.getName();
        @Parameter(names={"--retry-on-source-failures"}, description="Retry on any source failures")
        public Boolean retryOnSourceFailures = false;
        @Parameter(names={"--retry-interval-seconds"}, description="the retry interval for source failures if --retry-on-source-failures is enabled")
        public Integer retryIntervalSecs = 30;
        @Parameter(names={"--max-retry-count"}, description="the max retry count if --retry-on-source-failures is enabled")
        public Integer maxRetryCount = 3;
        @Parameter(names={"--allow-commit-on-no-checkpoint-change"}, description="allow commits even if checkpoint has not changed before and after fetch datafrom source. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode.")
        public Boolean allowCommitOnNoCheckpointChange = false;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--retry-last-pending-inline-clustering", "-rc"}, description="Retry last pending inline clustering plan before writing to sink.")
        public Boolean retryLastPendingInlineClusteringJob = false;
        @Parameter(names={"--retry-last-pending-inline-compaction"}, description="Retry last pending inline compaction plan before writing to sink.")
        public Boolean retryLastPendingInlineCompactionJob = false;
        @Parameter(names={"--cluster-scheduling-weight"}, description="Scheduling weight for clustering as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer clusterSchedulingWeight = 1;
        @Parameter(names={"--cluster-scheduling-minshare"}, description="Minshare for clustering as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer clusterSchedulingMinShare = 0;
        @Parameter(names={"--post-write-termination-strategy-class"}, description="Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode")
        public String postWriteTerminationStrategyClass = "";
        @Parameter(names={"--ingestion-metrics-class"}, description="Ingestion metrics class for reporting metrics during ingestion lifecycles.")
        public String ingestionMetricsClass = HoodieStreamerMetrics.class.getCanonicalName();
        @Parameter(names={"--config-hot-update-strategy-class"}, description="Configuration hot update in continuous mode")
        public String configHotUpdateStrategyClass = "";

        public boolean isAsyncCompactionEnabled() {
            return this.continuousMode != false && this.forceDisableCompaction == false && HoodieTableType.MERGE_ON_READ.equals((Object)HoodieTableType.valueOf(this.tableType));
        }

        public boolean isInlineCompactionEnabled() {
            return this.continuousMode == false && this.forceDisableCompaction == false && HoodieTableType.MERGE_ON_READ.equals((Object)HoodieTableType.valueOf(this.tableType));
        }

        public static TypedProperties getProps(FileSystem fs, Config cfg) {
            return cfg.propsFilePath.isEmpty() ? UtilHelpers.buildProperties(cfg.configs) : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Config config = (Config)o;
            return this.sourceLimit == config.sourceLimit && Objects.equals(this.targetBasePath, config.targetBasePath) && Objects.equals(this.targetTableName, config.targetTableName) && Objects.equals(this.tableType, config.tableType) && Objects.equals(this.baseFileFormat, config.baseFileFormat) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs) && Objects.equals(this.sourceClassName, config.sourceClassName) && Objects.equals(this.sourceOrderingField, config.sourceOrderingField) && Objects.equals(this.payloadClassName, config.payloadClassName) && Objects.equals(this.schemaProviderClassName, config.schemaProviderClassName) && Objects.equals(this.transformerClassNames, config.transformerClassNames) && this.operation == config.operation && Objects.equals(this.filterDupes, config.filterDupes) && Objects.equals(this.enableHiveSync, config.enableHiveSync) && Objects.equals(this.enableMetaSync, config.enableMetaSync) && Objects.equals(this.forceEmptyMetaSync, config.forceEmptyMetaSync) && Objects.equals(this.syncClientToolClassNames, config.syncClientToolClassNames) && Objects.equals(this.maxPendingCompactions, config.maxPendingCompactions) && Objects.equals(this.maxPendingClustering, config.maxPendingClustering) && Objects.equals(this.continuousMode, config.continuousMode) && Objects.equals(this.minSyncIntervalSeconds, config.minSyncIntervalSeconds) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.commitOnErrors, config.commitOnErrors) && Objects.equals(this.deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) && Objects.equals(this.compactSchedulingWeight, config.compactSchedulingWeight) && Objects.equals(this.clusterSchedulingWeight, config.clusterSchedulingWeight) && Objects.equals(this.deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) && Objects.equals(this.compactSchedulingMinShare, config.compactSchedulingMinShare) && Objects.equals(this.clusterSchedulingMinShare, config.clusterSchedulingMinShare) && Objects.equals(this.forceDisableCompaction, config.forceDisableCompaction) && Objects.equals(this.checkpoint, config.checkpoint) && Objects.equals(this.initialCheckpointProvider, config.initialCheckpointProvider) && Objects.equals(this.ingestionMetricsClass, config.ingestionMetricsClass) && Objects.equals(this.help, config.help);
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.targetBasePath, this.targetTableName, this.tableType, this.baseFileFormat, this.propsFilePath, this.configs, this.sourceClassName, this.sourceOrderingField, this.payloadClassName, this.schemaProviderClassName, this.transformerClassNames, this.sourceLimit, this.operation, this.filterDupes, this.enableHiveSync, this.enableMetaSync, this.forceEmptyMetaSync, this.syncClientToolClassNames, this.maxPendingCompactions, this.maxPendingClustering, this.continuousMode, this.minSyncIntervalSeconds, this.sparkMaster, this.commitOnErrors, this.deltaSyncSchedulingWeight, this.compactSchedulingWeight, this.clusterSchedulingWeight, this.deltaSyncSchedulingMinShare, this.compactSchedulingMinShare, this.clusterSchedulingMinShare, this.forceDisableCompaction, this.checkpoint, this.initialCheckpointProvider, this.ingestionMetricsClass, this.help});
        }

        public String toString() {
            return "Config{targetBasePath='" + this.targetBasePath + '\'' + ", targetTableName='" + this.targetTableName + '\'' + ", tableType='" + this.tableType + '\'' + ", baseFileFormat='" + this.baseFileFormat + '\'' + ", propsFilePath='" + this.propsFilePath + '\'' + ", configs=" + this.configs + ", sourceClassName='" + this.sourceClassName + '\'' + ", sourceOrderingField='" + this.sourceOrderingField + '\'' + ", payloadClassName='" + this.payloadClassName + '\'' + ", schemaProviderClassName='" + this.schemaProviderClassName + '\'' + ", transformerClassNames=" + this.transformerClassNames + ", sourceLimit=" + this.sourceLimit + ", operation=" + (Object)((Object)this.operation) + ", filterDupes=" + this.filterDupes + ", enableHiveSync=" + this.enableHiveSync + ", enableMetaSync=" + this.enableMetaSync + ", forceEmptyMetaSync=" + this.forceEmptyMetaSync + ", syncClientToolClassNames=" + this.syncClientToolClassNames + ", maxPendingCompactions=" + this.maxPendingCompactions + ", maxPendingClustering=" + this.maxPendingClustering + ", continuousMode=" + this.continuousMode + ", minSyncIntervalSeconds=" + this.minSyncIntervalSeconds + ", sparkMaster='" + this.sparkMaster + '\'' + ", commitOnErrors=" + this.commitOnErrors + ", deltaSyncSchedulingWeight=" + this.deltaSyncSchedulingWeight + ", compactSchedulingWeight=" + this.compactSchedulingWeight + ", clusterSchedulingWeight=" + this.clusterSchedulingWeight + ", deltaSyncSchedulingMinShare=" + this.deltaSyncSchedulingMinShare + ", compactSchedulingMinShare=" + this.compactSchedulingMinShare + ", clusterSchedulingMinShare=" + this.clusterSchedulingMinShare + ", forceDisableCompaction=" + this.forceDisableCompaction + ", checkpoint='" + this.checkpoint + '\'' + ", initialCheckpointProvider='" + this.initialCheckpointProvider + '\'' + ", ingestionMetricsClass='" + this.ingestionMetricsClass + '\'' + ", help=" + this.help + '}';
        }
    }
}

