/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.AbstractDeltaStreamerService;
import org.apache.hudi.utilities.deltastreamer.Compactor;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class HoodieDeltaStreamer
implements Serializable {
    private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
    public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    private final transient Config cfg;
    private transient DeltaSyncService deltaSyncService;

    public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
        this(cfg, jssc, FSUtils.getFs((String)cfg.targetBasePath, (Configuration)jssc.hadoopConfiguration()), HoodieDeltaStreamer.getDefaultHiveConf(jssc.hadoopConfiguration()));
    }

    public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException {
        this.cfg = cfg;
        this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
    }

    public void shutdownGracefully() {
        this.deltaSyncService.shutdown(false);
    }

    private static HiveConf getDefaultHiveConf(Configuration cfg) {
        HiveConf hiveConf = new HiveConf();
        hiveConf.addResource(cfg);
        return hiveConf;
    }

    public void sync() throws Exception {
        if (this.cfg.continuousMode.booleanValue()) {
            this.deltaSyncService.start(this::onDeltaSyncShutdown);
            this.deltaSyncService.waitForShutdown();
            log.info((Object)"Delta Sync shutting down");
        } else {
            log.info((Object)"Delta Streamer running only single round");
            this.deltaSyncService.getDeltaSync().syncOnce();
            this.deltaSyncService.close();
            log.info((Object)"Shut down deltastreamer");
        }
    }

    private boolean onDeltaSyncShutdown(boolean error) {
        log.info((Object)("DeltaSync shutdown. Closing write client. Error?" + error));
        this.deltaSyncService.close();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
        JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);
        try {
            new HoodieDeltaStreamer(cfg, jssc).sync();
        }
        finally {
            jssc.stop();
        }
    }

    public DeltaSyncService getDeltaSyncService() {
        return this.deltaSyncService;
    }

    public static class AsyncCompactService
    extends AbstractDeltaStreamerService {
        private final int maxConcurrentCompaction;
        private transient Compactor compactor;
        private transient JavaSparkContext jssc;
        private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<HoodieInstant>();
        private transient ReentrantLock queueLock = new ReentrantLock();
        private transient Condition consumed = this.queueLock.newCondition();

        public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
            this.jssc = jssc;
            this.compactor = new Compactor(client, jssc);
            this.maxConcurrentCompaction = 1;
        }

        public void enqueuePendingCompaction(HoodieInstant instant) {
            this.pendingCompactions.add(instant);
        }

        public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
            try {
                this.queueLock.lock();
                while (!this.isShutdown() && this.pendingCompactions.size() > numPendingCompactions) {
                    this.consumed.await();
                }
            }
            finally {
                this.queueLock.unlock();
            }
        }

        private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
            log.info((Object)"Compactor waiting for next instant for compaction upto 60 seconds");
            HoodieInstant instant = this.pendingCompactions.poll(60L, TimeUnit.SECONDS);
            if (instant != null) {
                try {
                    this.queueLock.lock();
                    this.consumed.signal();
                }
                finally {
                    this.queueLock.unlock();
                }
            }
            return instant;
        }

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService executor = Executors.newFixedThreadPool(this.maxConcurrentCompaction);
            List compactionFutures = IntStream.range(0, this.maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                try {
                    log.info((Object)"Setting Spark Pool name for compaction to hoodiecompact");
                    this.jssc.setLocalProperty("spark.scheduler.pool", "hoodiecompact");
                    while (!this.isShutdownRequested()) {
                        HoodieInstant instant = this.fetchNextCompactionInstant();
                        if (null == instant) continue;
                        this.compactor.compact(instant);
                    }
                    log.info((Object)"Compactor shutting down properly!!");
                }
                catch (InterruptedException ie) {
                    log.warn((Object)"Compactor executor thread got interrupted exception. Stopping", (Throwable)ie);
                }
                catch (IOException e) {
                    log.error((Object)"Compactor executor failed", (Throwable)e);
                    throw new HoodieIOException(e.getMessage(), e);
                }
                return true;
            }, executor)).collect(Collectors.toList());
            return Pair.of(CompletableFuture.allOf((CompletableFuture[])compactionFutures.stream().toArray(CompletableFuture[]::new)), (Object)executor);
        }
    }

    public static class DeltaSyncService
    extends AbstractDeltaStreamerService {
        private final Config cfg;
        private transient SchemaProvider schemaProvider;
        private transient SparkSession sparkSession;
        private transient JavaSparkContext jssc;
        TypedProperties props;
        private AsyncCompactService asyncCompactService;
        private final HoodieTableType tableType;
        private transient DeltaSync deltaSync;

        public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException {
            this.cfg = cfg;
            this.jssc = jssc;
            this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
            if (fs.exists(new Path(cfg.targetBasePath))) {
                HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, false);
                this.tableType = meta.getTableType();
                Preconditions.checkArgument((boolean)this.tableType.equals((Object)HoodieTableType.valueOf((String)cfg.storageType)), (Object)("Hoodie table is of type " + this.tableType + " but passed in CLI argument is " + cfg.storageType));
            } else {
                this.tableType = HoodieTableType.valueOf((String)cfg.storageType);
            }
            this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
            log.info((Object)("Creating delta streamer with configs : " + this.props.toString()));
            this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, this.props, jssc);
            if (cfg.filterDupes.booleanValue()) {
                cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
            }
            this.deltaSync = new DeltaSync(cfg, this.sparkSession, this.schemaProvider, this.tableType, this.props, jssc, fs, hiveConf, this::onInitializingWriteClient);
        }

        public DeltaSync getDeltaSync() {
            return this.deltaSync;
        }

        @Override
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService executor = Executors.newFixedThreadPool(1);
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1050)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }, executor), (Object)executor);
        }

        private void shutdownCompactor(boolean error) {
            log.info((Object)("Delta Sync shutdown. Error ?" + error));
            if (this.asyncCompactService != null) {
                log.warn((Object)"Gracefully shutting down compactor");
                this.asyncCompactService.shutdown(false);
            }
        }

        protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) {
            if (this.cfg.isAsyncCompactionEnabled()) {
                this.asyncCompactService = new AsyncCompactService(this.jssc, writeClient);
                HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(this.jssc.hadoopConfiguration()), this.cfg.targetBasePath, true);
                List pending = CompactionUtils.getPendingCompactionInstantTimes((HoodieTableMetaClient)meta);
                pending.stream().forEach(hoodieInstant -> this.asyncCompactService.enqueuePendingCompaction((HoodieInstant)hoodieInstant));
                this.asyncCompactService.start(error -> {
                    this.shutdown(false);
                    return true;
                });
                try {
                    this.asyncCompactService.waitTillPendingCompactionsReducesTo(this.cfg.maxPendingCompactions);
                }
                catch (InterruptedException ie) {
                    throw new HoodieException((Throwable)ie);
                }
            }
            return true;
        }

        public void close() {
            if (null != this.deltaSync) {
                this.deltaSync.close();
            }
        }

        public SchemaProvider getSchemaProvider() {
            return this.schemaProvider;
        }

        public SparkSession getSparkSession() {
            return this.sparkSession;
        }

        public JavaSparkContext getJavaSparkContext() {
            return this.jssc;
        }

        public AsyncCompactService getAsyncCompactService() {
            return this.asyncCompactService;
        }

        public TypedProperties getProps() {
            return this.props;
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--target-base-path"}, description="base path for the target hoodie dataset. (Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)", required=true)
        public String targetBasePath;
        @Parameter(names={"--target-table"}, description="name of the target table in Hive", required=true)
        public String targetTableName;
        @Parameter(names={"--storage-type"}, description="Type of Storage. COPY_ON_WRITE (or) MERGE_ON_READ", required=true)
        public String storageType;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties.")
        public String propsFilePath = "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
        public List<String> configs = new ArrayList<String>();
        @Parameter(names={"--source-class"}, description="Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
        public String sourceClassName = JsonDFSSource.class.getName();
        @Parameter(names={"--source-ordering-field"}, description="Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";
        @Parameter(names={"--payload-class"}, description="subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
        @Parameter(names={"--schemaprovider-class"}, description="subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider. For Sources that return Dataset<Row>, the schema is obtained implicitly. However, this CLI option allows overriding the schemaprovider returned by Source.")
        public String schemaProviderClassName = null;
        @Parameter(names={"--transformer-class"}, description="subclass of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source dataset to a target dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query templated to be passed as a transformation function)")
        public String transformerClassName = null;
        @Parameter(names={"--source-limit"}, description="Maximum amount of data to read from source. Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
        public long sourceLimit = Long.MAX_VALUE;
        @Parameter(names={"--op"}, description="Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter=OperationConvertor.class)
        public Operation operation = Operation.UPSERT;
        @Parameter(names={"--filter-dupes"}, description="Should duplicate records from source be dropped/filtered outbefore insert/bulk-insert")
        public Boolean filterDupes = false;
        @Parameter(names={"--enable-hive-sync"}, description="Enable syncing to hive")
        public Boolean enableHiveSync = false;
        @Parameter(names={"--max-pending-compactions"}, description="Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unlessoutstanding compactions is less than this number")
        public Integer maxPendingCompactions = 5;
        @Parameter(names={"--continuous"}, description="Delta Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop")
        public Boolean continuousMode = false;
        @Parameter(names={"--min-sync-interval-seconds"}, description="the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;
        @Parameter(names={"--spark-master"}, description="spark master to use.")
        public String sparkMaster = "local[2]";
        @Parameter(names={"--commit-on-errors"}, description="Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;
        @Parameter(names={"--delta-sync-scheduling-weight"}, description="Scheduling weight for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingWeight = 1;
        @Parameter(names={"--compact-scheduling-weight"}, description="Scheduling weight for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingWeight = 1;
        @Parameter(names={"--delta-sync-scheduling-minshare"}, description="Minshare for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingMinShare = 0;
        @Parameter(names={"--compact-scheduling-minshare"}, description="Minshare for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingMinShare = 0;
        @Parameter(names={"--disable-compaction"}, description="Compaction is enabled for MoR table by default.This flag disables it ")
        public Boolean forceDisableCompaction = false;
        @Parameter(names={"--checkpoint"}, description="Resume Delta Streamer from this checkpoint.")
        public String checkpoint = null;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;

        public boolean isAsyncCompactionEnabled() {
            return this.continuousMode != false && this.forceDisableCompaction == false && HoodieTableType.MERGE_ON_READ.equals((Object)HoodieTableType.valueOf((String)this.storageType));
        }

        public boolean isInlineCompactionEnabled() {
            return this.continuousMode == false && this.forceDisableCompaction == false && HoodieTableType.MERGE_ON_READ.equals((Object)HoodieTableType.valueOf((String)this.storageType));
        }
    }

    private static class OperationConvertor
    implements IStringConverter<Operation> {
        private OperationConvertor() {
        }

        public Operation convert(String value) throws ParameterException {
            return Operation.valueOf(value);
        }
    }

    public static enum Operation {
        UPSERT,
        INSERT,
        BULK_INSERT;

    }
}

