/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.index;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.index.IndexingCatchupTaskFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RunIndexActionExecutor<T, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
    static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
    private static final Logger LOG = LoggerFactory.getLogger(RunIndexActionExecutor.class);
    private static final Integer INDEX_COMMIT_METADATA_VERSION_1;
    private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION;
    private static final int MAX_CONCURRENT_INDEXING = 1;
    private final Option<HoodieMetadataMetrics> metrics;
    private volatile String currentCaughtupInstant;
    private final TransactionManager txnManager;

    public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
        super(context, config, table, instantTime);
        this.txnManager = new TransactionManager(config, table.getStorage());
        this.metrics = config.getMetadataConfig().isMetricsEnabled() ? Option.of(new HoodieMetadataMetrics(config.getMetricsConfig(), table.getStorage())) : Option.empty();
    }

    @Override
    public Option<HoodieIndexCommitMetadata> execute() {
        HoodieIndexPlan indexPlan;
        HoodieInstant indexInstant = this.validateAndGetIndexInstant();
        try {
            indexPlan = TimelineMetadataUtils.deserializeIndexPlan(this.table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
        }
        catch (IOException e) {
            throw new HoodieIndexException("Failed to read the index plan for instant: " + indexInstant);
        }
        List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
        try {
            List<HoodieIndexPartitionInfo> finalIndexPartitionInfos;
            block22: {
                if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
                    throw new HoodieIndexException(String.format("No partitions to index for instant: %s", this.instantTime));
                }
                boolean firstTimeInitializingMetadataTable = false;
                HoodieIndexPartitionInfo fileIndexPartitionInfo = null;
                if (indexPartitionInfos.size() == 1 && indexPartitionInfos.get(0).getMetadataPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) {
                    firstTimeInitializingMetadataTable = true;
                    fileIndexPartitionInfo = indexPartitionInfos.get(0);
                }
                Set<String> indexesInflightOrCompleted = HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions(this.table.getMetaClient().getTableConfig());
                Set requestedPartitions = indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
                requestedPartitions.retainAll(indexesInflightOrCompleted);
                if (!firstTimeInitializingMetadataTable && !requestedPartitions.isEmpty()) {
                    throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions));
                }
                this.table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
                if (!firstTimeInitializingMetadataTable) {
                    try (HoodieTableMetadataWriter metadataWriter = this.table.getIndexingMetadataWriter(this.instantTime).orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", this.instantTime)));){
                        String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
                        LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
                        HoodieTimer timer = HoodieTimer.start();
                        metadataWriter.buildMetadataPartitions(this.context, indexPartitionInfos);
                        this.metrics.ifPresent(m -> m.updateMetrics("initialize", timer.endTimer()));
                        List<HoodieInstant> instantsToCatchup = this.getInstantsToCatchup(indexUptoInstant);
                        LOG.info("Total remaining instants to index: " + instantsToCatchup.size());
                        String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(this.table.getMetaClient().getBasePathV2().toString());
                        HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.hadoopConf)).setBasePath(metadataBasePath).build();
                        Set<String> metadataCompletedTimestamps = RunIndexActionExecutor.getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
                        this.currentCaughtupInstant = indexUptoInstant;
                        this.catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps, indexPartitionInfos);
                        finalIndexPartitionInfos = indexPartitionInfos.stream().map(info -> new HoodieIndexPartitionInfo(info.getVersion(), info.getMetadataPartitionPath(), this.currentCaughtupInstant)).collect(Collectors.toList());
                        break block22;
                    }
                    catch (Exception e) {
                        throw new HoodieMetadataException("Failed to index partition " + Arrays.toString(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList()).toArray()), e);
                    }
                }
                String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
                this.table.getIndexingMetadataWriter(this.instantTime).orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", this.instantTime)));
                finalIndexPartitionInfos = Stream.of(fileIndexPartitionInfo).map(info -> new HoodieIndexPartitionInfo(info.getVersion(), info.getMetadataPartitionPath(), indexUptoInstant)).collect(Collectors.toList());
            }
            HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder().setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
            this.updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata);
            return Option.of(indexCommitMetadata);
        }
        catch (IOException e) {
            this.abort(indexInstant, indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
            throw new HoodieIndexException(String.format("Unable to index instant: %s", indexInstant));
        }
    }

    private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
        Set<String> inflightPartitions = HoodieTableMetadataUtil.getInflightMetadataPartitions(this.table.getMetaClient().getTableConfig());
        Set<String> completedPartitions = this.table.getMetaClient().getTableConfig().getMetadataPartitions();
        requestedPartitions.forEach(partition -> {
            inflightPartitions.remove(partition);
            completedPartitions.remove(partition);
        });
        this.table.getMetaClient().getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join((CharSequence)",", inflightPartitions));
        this.table.getMetaClient().getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join((CharSequence)",", completedPartitions));
        HoodieTableConfig.update(this.table.getStorage(), this.table.getMetaClient().getMetaPath(), this.table.getMetaClient().getTableConfig().getProps());
        requestedPartitions.forEach(partition -> {
            MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT));
            if (HoodieTableMetadataUtil.metadataPartitionExists(this.table.getMetaClient().getBasePathV2().toString(), this.context, partitionType)) {
                HoodieTableMetadataUtil.deleteMetadataPartition(this.table.getMetaClient().getBasePathV2().toString(), this.context, partitionType);
            }
        });
        this.table.getMetaClient().reloadActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant(indexInstant.getTimestamp()));
    }

    private List<HoodieInstant> getInstantsToCatchup(String indexUptoInstant) {
        Set<String> validActions = CollectionUtils.createSet("clean", "restore", "rollback");
        Option<HoodieInstant> catchupStartInstant = this.table.getMetaClient().reloadActiveTimeline().getTimelineOfActions(validActions).filterInflightsAndRequested().findInstantsBefore(indexUptoInstant).firstInstant();
        List<HoodieInstant> instantsToIndex = catchupStartInstant.isPresent() ? RunIndexActionExecutor.getRemainingArchivedAndActiveInstantsSince(catchupStartInstant.get().getTimestamp(), this.table.getMetaClient()) : RunIndexActionExecutor.getRemainingArchivedAndActiveInstantsSince(indexUptoInstant, this.table.getMetaClient());
        return instantsToIndex;
    }

    private HoodieInstant validateAndGetIndexInstant() {
        if (!this.config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || StringUtils.isNullOrEmpty(this.config.getLockProviderClass())) {
            throw new HoodieIndexException(String.format("Need to set %s as %s and configure lock provider class", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
        }
        return this.table.getActiveTimeline().filterPendingIndexTimeline().filter(instant -> instant.getTimestamp().equals(this.instantTime) && HoodieInstant.State.REQUESTED.equals((Object)instant.getState())).lastInstant().orElseThrow(() -> new HoodieIndexException(String.format("No requested index instant found: %s", this.instantTime)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTableConfigAndTimeline(HoodieInstant indexInstant, List<HoodieIndexPartitionInfo> finalIndexPartitionInfos, HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
        try {
            this.txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
            this.updateMetadataPartitionsTableConfig(this.table.getMetaClient(), finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
            this.table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, "indexing", indexInstant.getTimestamp()), TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
        }
        finally {
            this.txnManager.endTransaction(Option.of(indexInstant));
        }
    }

    private void catchupWithInflightWriters(HoodieTableMetadataWriter metadataWriter, List<HoodieInstant> instantsToIndex, HoodieTableMetaClient metadataMetaClient, Set<String> metadataCompletedTimestamps, List<HoodieIndexPartitionInfo> indexPartitionInfos) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<?> indexingCatchupTaskFuture = executorService.submit(IndexingCatchupTaskFactory.createCatchupTask(indexPartitionInfos, metadataWriter, instantsToIndex, metadataCompletedTimestamps, this.table.getMetaClient(), metadataMetaClient, this.currentCaughtupInstant, this.txnManager, this.context));
        try {
            LOG.info("Starting index catchup task");
            HoodieTimer timer = HoodieTimer.start();
            indexingCatchupTaskFuture.get(this.config.getIndexingCheckTimeoutSeconds(), TimeUnit.SECONDS);
            this.metrics.ifPresent(m -> m.updateMetrics("async_indexer_catchup_time", timer.endTimer()));
        }
        catch (Exception e) {
            indexingCatchupTaskFuture.cancel(true);
            throw new HoodieIndexException(String.format("Index catchup failed. Current indexed instant = %s. Aborting!", this.currentCaughtupInstant), e);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    private static List<HoodieInstant> getRemainingArchivedAndActiveInstantsSince(String instant, HoodieTableMetaClient metaClient) {
        List<HoodieInstant> remainingInstantsToIndex = metaClient.getArchivedTimeline().getInstantsAsStream().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instant)).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().findInstantsAfter(instant).getInstantsAsStream().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instant)).filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList()));
        return remainingInstantsToIndex;
    }

    private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(String instant, HoodieTableMetaClient metaClient) {
        List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstantsAsStream().filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList());
        completedInstants.addAll(metaClient.reloadActiveTimeline().filterCompletedInstants().findInstantsAfter(instant).getInstantsAsStream().filter(i -> !"indexing".equals(i.getAction())).collect(Collectors.toList()));
        return completedInstants;
    }

    private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
        metadataPartitions.forEach(metadataPartition -> metaClient.getTableConfig().setMetadataPartitionState(metaClient, MetadataPartitionType.valueOf(metadataPartition.toUpperCase(Locale.ROOT)), true));
    }

    static {
        LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1 = Integer.valueOf(1);
    }
}

