/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieIndexer {
    private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
    static final String DROP_INDEX = "dropindex";
    private final Config cfg;
    private TypedProperties props;
    private final JavaSparkContext jsc;
    private final HoodieTableMetaClient metaClient;

    public HoodieIndexer(JavaSparkContext jsc, Config cfg) {
        this.cfg = cfg;
        this.jsc = jsc;
        this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath) ? UtilHelpers.buildProperties(cfg.configs) : this.readConfigFromFileSystem(jsc, cfg);
        this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
        return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
    }

    public static void main(String[] args2) {
        Config cfg = new Config();
        JCommander cmd = new JCommander((Object)cfg, null, args2);
        if (cfg.help.booleanValue() || args2.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
        HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
        int result = indexer.start(cfg.retry);
        String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s", cfg.basePath, cfg.tableName, cfg.runningMode);
        if (result == -1) {
            LOG.error((Object)(resultMsg + " failed"));
        } else {
            LOG.info((Object)(resultMsg + " success"));
        }
        jsc.stop();
    }

    public int start(int retry) {
        if (!this.props.getBoolean(HoodieMetadataConfig.ENABLE.key())) {
            LOG.error((Object)String.format("Metadata is not enabled. Please set %s to true.", HoodieMetadataConfig.ENABLE.key()));
            return -1;
        }
        Set<String> initializedMetadataPartitions = HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions(this.metaClient.getTableConfig());
        LOG.info((Object)("Setting props for: " + initializedMetadataPartitions));
        initializedMetadataPartitions.forEach(p -> {
            if ("column_stats".equals(p)) {
                this.props.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
            }
            if ("bloom_filters".equals(p)) {
                this.props.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true");
            }
        });
        return UtilHelpers.retry(retry, () -> {
            switch (this.cfg.runningMode.toLowerCase()) {
                case "schedule": {
                    int result;
                    LOG.info((Object)"Running Mode: [schedule]; Do schedule");
                    Option<String> instantTime = this.scheduleIndexing(this.jsc);
                    int n = result = instantTime.isPresent() ? 0 : -1;
                    if (result == 0) {
                        LOG.info((Object)("The schedule instant time is " + instantTime.get()));
                    }
                    return result;
                }
                case "scheduleandexecute": {
                    LOG.info((Object)"Running Mode: [scheduleandexecute]");
                    return this.scheduleAndRunIndexing(this.jsc);
                }
                case "execute": {
                    LOG.info((Object)"Running Mode: [execute];");
                    return this.runIndexing(this.jsc);
                }
                case "dropindex": {
                    LOG.info((Object)"Running Mode: [dropindex];");
                    return this.dropIndex(this.jsc);
                }
            }
            LOG.info((Object)("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly"));
            return -1;
        }, "Indexer failed");
    }

    public Option<String> doSchedule() throws Exception {
        return this.scheduleIndexing(this.jsc);
    }

    private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            Option<String> option2 = this.doSchedule(client);
            return option2;
        }
    }

    private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
        HoodieMetadataConfig metadataConfig = this.getHoodieMetadataConfig();
        List<MetadataPartitionType> partitionTypes = this.getRequestedPartitionTypes(this.cfg.indexTypes, Option.of(metadataConfig));
        ValidationUtils.checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
        if (!this.isMetadataInitialized() && !partitionTypes.contains((Object)MetadataPartitionType.FILES)) {
            throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray()));
        }
        if (this.indexExists(partitionTypes)) {
            return Option.empty();
        }
        Option<String> indexingInstant = client.scheduleIndexing(partitionTypes);
        if (!indexingInstant.isPresent()) {
            LOG.error((Object)"Scheduling of index action did not return any instant.");
        }
        return indexingInstant;
    }

    private HoodieMetadataConfig getHoodieMetadataConfig() {
        this.props.setProperty(HoodieWriteConfig.BASE_PATH.key(), this.cfg.basePath);
        HoodieWriteConfig dataTableWriteConfig = HoodieWriteConfig.newBuilder().withProps(this.props).build();
        return dataTableWriteConfig.getMetadataConfig();
    }

    private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
        Set<String> indexedMetadataPartitions = this.metaClient.getTableConfig().getMetadataPartitions();
        Set requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
        requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
        if (!requestedIndexPartitionPaths.isEmpty()) {
            LOG.error((Object)("Following indexes already built: " + requestedIndexPartitionPaths));
            return true;
        }
        return false;
    }

    private boolean isMetadataInitialized() {
        Set<String> indexedMetadataPartitions = this.metaClient.getTableConfig().getMetadataPartitions();
        return !indexedMetadataPartitions.isEmpty();
    }

    private int runIndexing(JavaSparkContext jsc) throws Exception {
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            if (StringUtils.isNullOrEmpty(this.cfg.indexInstantTime)) {
                Option<HoodieInstant> earliestPendingIndexInstant = this.metaClient.getActiveTimeline().filterPendingIndexTimeline().firstInstant();
                if (earliestPendingIndexInstant.isPresent()) {
                    this.cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
                    LOG.info((Object)("Found the earliest scheduled indexing instant which will be executed: " + this.cfg.indexInstantTime));
                } else {
                    throw new HoodieIndexException("There is no scheduled indexing in the table.");
                }
            }
            int n = this.handleResponse(client.index(this.cfg.indexInstantTime)) ? 0 : 1;
            return n;
        }
    }

    private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            Option<String> indexingInstantTime = this.doSchedule(client);
            if (indexingInstantTime.isPresent()) {
                int n = this.handleResponse(client.index(indexingInstantTime.get())) ? 0 : 1;
                return n;
            }
            int n = -1;
            return n;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private int dropIndex(JavaSparkContext jsc) throws Exception {
        List<MetadataPartitionType> partitionTypes = this.getRequestedPartitionTypes(this.cfg.indexTypes, Option.empty());
        String schemaStr = UtilHelpers.getSchemaFromLatestInstant(this.metaClient);
        try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, this.cfg.basePath, schemaStr, this.cfg.parallelism, Option.empty(), this.props);){
            client.dropIndex(partitionTypes);
            int n = 0;
            return n;
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to drop index. ", (Throwable)e);
            return -1;
        }
    }

    private boolean handleResponse(Option<HoodieIndexCommitMetadata> commitMetadata) {
        if (!commitMetadata.isPresent()) {
            LOG.error((Object)"Indexing failed as no commit metadata present.");
            return false;
        }
        List<HoodieIndexPartitionInfo> indexPartitionInfos = commitMetadata.get().getIndexPartitionInfos();
        LOG.info((Object)String.format("Indexing complete for partitions: %s", indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList())));
        return this.isIndexBuiltForAllRequestedTypes(indexPartitionInfos);
    }

    boolean isIndexBuiltForAllRequestedTypes(List<HoodieIndexPartitionInfo> indexPartitionInfos) {
        Set indexedPartitions = indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
        Set requestedPartitions = this.getRequestedPartitionTypes(this.cfg.indexTypes, Option.empty()).stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
        requestedPartitions.removeAll(indexedPartitions);
        return requestedPartitions.isEmpty();
    }

    List<MetadataPartitionType> getRequestedPartitionTypes(String indexTypes, Option<HoodieMetadataConfig> metadataConfig) {
        List<String> requestedIndexTypes = Arrays.asList(indexTypes.split(","));
        return requestedIndexTypes.stream().map(p -> {
            MetadataPartitionType metadataPartitionType = MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT));
            if (metadataConfig.isPresent() && !metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.FILES.toString())) {
                if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) {
                    metadataPartitionType.setFileGroupCount(((HoodieMetadataConfig)metadataConfig.get()).getColumnStatsIndexFileGroupCount());
                } else if (metadataPartitionType.getPartitionPath().equals(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
                    metadataPartitionType.setFileGroupCount(((HoodieMetadataConfig)metadataConfig.get()).getBloomFilterIndexFileGroupCount());
                }
            }
            return metadataPartitionType;
        }).collect(Collectors.toList());
    }

    public static class Config
    implements Serializable {
        @Parameter(names={"--base-path", "-sp"}, description="Base path for the table", required=true)
        public String basePath = null;
        @Parameter(names={"--table-name", "-tn"}, description="Table name", required=true)
        public String tableName = null;
        @Parameter(names={"--instant-time", "-it"}, description="Indexing Instant time")
        public String indexInstantTime = null;
        @Parameter(names={"--parallelism", "-pl"}, description="Parallelism for hoodie insert", required=true)
        public int parallelism = 1;
        @Parameter(names={"--spark-master", "-ms"}, description="Spark master")
        public String sparkMaster = null;
        @Parameter(names={"--spark-memory", "-sm"}, description="spark memory to use", required=true)
        public String sparkMemory = null;
        @Parameter(names={"--retry", "-rt"}, description="number of retries")
        public int retry = 0;
        @Parameter(names={"--index-types", "-ixt"}, description="Comma-separated index types to be built, e.g. BLOOM_FILTERS,COLUMN_STATS", required=true)
        public String indexTypes = null;
        @Parameter(names={"--mode", "-m"}, description="Set job mode: Set \"schedule\" to generate an indexing plan; Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; Set \"scheduleandExecute\" to generate an indexing plan first and execute that plan immediately;Set \"dropindex\" to drop the index types specified in --index-types;")
        public String runningMode = null;
        @Parameter(names={"--help", "-h"}, help=true)
        public Boolean help = false;
        @Parameter(names={"--props"}, description="path to properties file on localfs or dfs, with configurations for hoodie client for indexing")
        public String propsFilePath = null;
        @Parameter(names={"--hoodie-conf"}, description="Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter=IdentitySplitter.class)
        public List<String> configs = new ArrayList<String>();
    }
}

