/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieCompactor {
    private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
    private static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;
    private final JavaSparkContext jsc;

    public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        try {
            HoodieCompactor compactor = new HoodieCompactor(jsc, cfg);
            compactor.compact(cfg.retry);
        }
        catch (Throwable throwable) {
            LOG.error((Object)("Fail to run compaction for " + cfg.tableName), throwable);
        }
        finally {
            jsc.stop();
        }
    }

    public int compact(int retry) {
        this.fs = FSUtils.getFs(this.cfg.basePath, this.jsc.hadoopConfiguration());
        int ret = UtilHelpers.retry(retry, () -> {
            if (this.cfg.runSchedule.booleanValue()) {
                if (null == this.cfg.strategyClassName) {
                    throw new IllegalArgumentException("Missing Strategy class name for running compaction");
                }
                return this.doSchedule(this.jsc);
            }
            return this.doCompact(this.jsc);
        }, "Compact failed");
        return ret;
    }

    private int doCompact(JavaSparkContext jsc) throws Exception {
        String schemaStr = UtilHelpers.parseSchema(this.fs, this.cfg.schemaFile);
        SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);
        if (StringUtils.isNullOrEmpty(this.cfg.compactionInstantTime)) {
            HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, this.cfg.basePath, true);
            Option<HoodieInstant> firstCompactionInstant = metaClient.getActiveTimeline().firstInstant("compaction", HoodieInstant.State.REQUESTED);
            if (firstCompactionInstant.isPresent()) {
                this.cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
                LOG.info((Object)("Found the earliest scheduled compaction instant which will be executed: " + this.cfg.compactionInstantTime));
            } else {
                throw new HoodieCompactionException("There is no scheduled compaction in the table.");
            }
        }
        JavaRDD writeResponse = (JavaRDD)client.compact(this.cfg.compactionInstantTime);
        return UtilHelpers.handleErrors(jsc, this.cfg.compactionInstantTime, (JavaRDD<WriteStatus>)writeResponse);
    }

    private int doSchedule(JavaSparkContext jsc) throws Exception {
        SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, "", this.cfg.parallelism, Option.of(this.cfg.strategyClassName), this.props);
        if (StringUtils.isNullOrEmpty(this.cfg.compactionInstantTime)) {
            throw new IllegalArgumentException("No instant time is provided for scheduling compaction. Please specify the compaction instant time by using --instant-time.");
        }
        client.scheduleCompactionAtInstant(this.cfg.compactionInstantTime, Option.empty());
        return 0;
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Compaction Instant time", required=false)
        public String compactionInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert", required=true)
        public int parallelism = 1;
        @Parameter(names={"--schema-file", "-sf"}, description="path for Avro schema file", required=true)
        public String schemaFile = null;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master", required=false)
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=true)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries", required=false)
        public int retry = 0;
        @Parameter(names={"--schedule", "-sc"}, description="Schedule compaction", required=false)
        public Boolean runSchedule = false;
        @Parameter(names={"--strategy", "-st"}, description="Strategy Class", required=false)
        public String strategyClassName = null;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for compacting")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
    }
}

