/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieClusteringJob {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieClusteringJob.class);
    private final Config cfg;
    private final TypedProperties props;
    private final JavaSparkContext jsc;
    private HoodieTableMetaClient metaClient;

    public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
        this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs), UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
    }

    public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = props;
        this.metaClient = metaClient;
        this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), (Object)false);
        if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
            UtilHelpers.addLockOptions(cfg.basePath, this.metaClient.getBasePath().toUri().getScheme(), this.props);
        }
    }

    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            throw new HoodieException("Clustering failed for basePath: " + cfg.basePath);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        int result2 = new HoodieClusteringJob(jsc, cfg).cluster(cfg.retry);
        String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runningMode: %s", cfg.basePath, cfg.tableName, cfg.runningMode);
        if (result2 != 0) {
            throw new HoodieException(resultMsg + " failed");
        }
        LOG.info(resultMsg + " success");
        jsc.stop();
    }

    private static void validateRunningMode(Config cfg) {
        if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
            cfg.runningMode = cfg.runSchedule != false ? "schedule" : "execute";
        }
    }

    public int cluster(int retry) {
        HoodieClusteringJob.validateRunningMode(this.cfg);
        return UtilHelpers.retry(retry, () -> {
            switch (this.cfg.runningMode.toLowerCase()) {
                case "schedule": {
                    int result2;
                    LOG.info("Running Mode: [schedule]; Do schedule");
                    Option<String> instantTime = this.doSchedule(this.jsc);
                    int n = result2 = instantTime.isPresent() ? 0 : -1;
                    if (result2 == 0) {
                        LOG.info("The schedule instant time is " + instantTime.get());
                    }
                    return result2;
                }
                case "scheduleandexecute": {
                    LOG.info("Running Mode: [scheduleandexecute]");
                    return this.doScheduleAndCluster(this.jsc);
                }
                case "execute": {
                    LOG.info("Running Mode: [execute]; Do cluster");
                    return this.doCluster(this.jsc);
                }
                case "purge_pending_instant": {
                    LOG.info("Running Mode: [purge_pending_instant];");
                    return this.doPurgePendingInstant(this.jsc);
                }
            }
            LOG.error("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly");
            return -1;
        }, "Cluster failed");
    }

    private int doCluster(JavaSparkContext jsc) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            if (StringUtils.isNullOrEmpty(this.cfg.clusteringInstantTime)) {
                Option<HoodieInstant> firstClusteringInstant = this.metaClient.getActiveTimeline().getFirstPendingClusterInstant();
                if (firstClusteringInstant.isPresent()) {
                    this.cfg.clusteringInstantTime = firstClusteringInstant.get().requestedTime();
                    LOG.info("Found the earliest scheduled clustering instant which will be executed: " + this.cfg.clusteringInstantTime);
                } else {
                    LOG.info("There is no scheduled clustering in the table.");
                    int n = 0;
                    return n;
                }
            }
            Option<HoodieCommitMetadata> commitMetadata = client.cluster(this.cfg.clusteringInstantTime).getCommitMetadata();
            this.clean(client);
            int n = UtilHelpers.handleErrors(commitMetadata.get(), this.cfg.clusteringInstantTime);
            return n;
        }
    }

    public Option<String> doSchedule() throws Exception {
        return this.doSchedule(this.jsc);
    }

    private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            Option<String> option = this.doSchedule(client);
            return option;
        }
    }

    private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
        if (this.cfg.clusteringInstantTime != null) {
            client.scheduleClusteringAtInstant(this.cfg.clusteringInstantTime, Option.empty());
            return Option.of(this.cfg.clusteringInstantTime);
        }
        return client.scheduleClustering(Option.empty());
    }

    private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
        LOG.info("Step 1: Do schedule");
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            Option<Object> instantTime = Option.empty();
            if (this.cfg.retryLastFailedClusteringJob.booleanValue()) {
                HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)client.getConfig(), (HoodieEngineContext)client.getEngineContext());
                client.validateAgainstTableProperties(table.getMetaClient().getTableConfig(), client.getConfig());
                Option<HoodieInstant> lastClusterOpt = table.getActiveTimeline().getLastPendingClusterInstant();
                if (lastClusterOpt.isPresent()) {
                    HoodieInstant inflightClusteringInstant = lastClusterOpt.get();
                    Date clusteringStartTime = TimelineUtils.parseDateFromInstantTime(inflightClusteringInstant.requestedTime());
                    if (clusteringStartTime.getTime() + this.cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
                        LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again.");
                        instantTime = Option.of(inflightClusteringInstant.requestedTime());
                    } else {
                        LOG.info(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job.");
                    }
                }
            }
            Option<Object> option = instantTime = instantTime.isPresent() ? instantTime : this.doSchedule(client);
            if (!instantTime.isPresent()) {
                LOG.info("Couldn't generate cluster plan");
                int table = -1;
                return table;
            }
            LOG.info("The schedule instant time is " + (String)instantTime.get());
            LOG.info("Step 2: Do cluster");
            Option<HoodieCommitMetadata> metadata2 = client.cluster((String)instantTime.get()).getCommitMetadata();
            this.clean(client);
            int n = UtilHelpers.handleErrors(metadata2.get(), (String)instantTime.get());
            return n;
        }
    }

    private int doPurgePendingInstant(JavaSparkContext jsc) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            client.purgePendingClustering(this.cfg.clusteringInstantTime);
        }
        return 0;
    }

    private void clean(SparkRDDWriteClient<?> client) {
        if (!this.cfg.skipClean.booleanValue() && client.getConfig().isAutoClean()) {
            client.clean();
        }
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Clustering Instant time, only used when set --mode execute. If the instant time is not provided with --mode execute, the earliest scheduled clustering instant time is used by default. When set \"--mode scheduleAndExecute\" this instant-time will be ignored.")
        public String clusteringInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert")
        public int parallelism = 1;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master")
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=false)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries")
        public int retry = 0;
        @Parameter(names={"--skip-clean", "-sc"}, description="do not trigger clean after clustering", required=false)
        public Boolean skipClean = true;
        @Parameter(names={"--schedule", "-sc"}, description="Schedule clustering @desperate soon please use \"--mode schedule\" instead")
        public Boolean runSchedule = false;
        @Parameter(names={"--retry-last-failed-clustering-job", "-rc"}, description="Take effect when using --mode/-m scheduleAndExecute. Set true means check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.")
        public Boolean retryLastFailedClusteringJob = false;
        @Parameter(names={"--mode", "-m"}, description="Set job mode: Set \"schedule\" means make a cluster plan; Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately")
        public String runningMode = null;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--job-max-processing-time-ms", "-jt"}, description="Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.")
        public long maxProcessingTimeMs = 0L;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for clustering")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();

        public String toString() {
            return "HoodieClusteringJobConfig{\n   --base-path " + this.basePath + ", \n   --table-name " + this.tableName + ", \n   --instant-time " + this.clusteringInstantTime + ", \n   --parallelism " + this.parallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --retry " + this.retry + ", \n   --skipClean " + this.skipClean + ", \n   --schedule " + this.runSchedule + ", \n   --retry-last-failed-clustering-job " + this.retryLastFailedClusteringJob + ", \n   --mode " + this.runningMode + ", \n   --job-max-processing-time-ms " + this.maxProcessingTimeMs + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + ", \n\n}";
        }
    }
}

