/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTableServicePlanActionExecutor<T, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseTableServicePlanActionExecutor.class);
    private static final Set<String> MOR_COMMITS = CollectionUtils.createSet((Object[])new String[]{"deltacommit", "replacecommit"});
    private static final Set<String> COW_COMMITS = CollectionUtils.createSet((Object[])new String[]{"commit", "replacecommit"});

    public BaseTableServicePlanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
        super(context, config, table, instantTime);
    }

    public List<String> getPartitions(Object strategy, TableServiceType type) {
        if (this.config.isIncrementalTableServiceEnabled() && strategy instanceof IncrementalPartitionAwareStrategy) {
            try {
                LOG.info("Start to fetch incremental partitions for " + type);
                Pair<Option<HoodieInstant>, Set<String>> lastInstantAndIncrPartitions = this.getIncrementalPartitions(type);
                Option lastCompleteTableServiceInstant = (Option)lastInstantAndIncrPartitions.getLeft();
                Set incrementalPartitions = (Set)lastInstantAndIncrPartitions.getRight();
                if (lastCompleteTableServiceInstant.isPresent()) {
                    if (!incrementalPartitions.isEmpty()) {
                        LOG.info("Fetched incremental partitions for " + type + ". " + incrementalPartitions + ". Instant " + this.instantTime);
                        return new ArrayList<String>(incrementalPartitions);
                    }
                    LOG.info("Incremental partitions are empty. Skip current schedule " + this.instantTime);
                    return Collections.emptyList();
                }
                LOG.info("No previous completed table service instant, fall back to get all partitions");
            }
            catch (Exception ex) {
                LOG.warn("Failed to get incremental partitions", (Throwable)ex);
            }
        }
        LOG.info("Start to fetch all partitions for " + type + ". Instant " + this.instantTime);
        return FSUtils.getAllPartitionPaths((HoodieEngineContext)this.context, (HoodieStorage)this.table.getMetaClient().getStorage(), (HoodieMetadataConfig)this.config.getMetadataConfig(), (StoragePath)this.table.getMetaClient().getBasePath());
    }

    public Pair<Option<HoodieInstant>, Set<String>> getIncrementalPartitions(TableServiceType type) {
        Pair<Option<HoodieInstant>, List<String>> missingPair = this.fetchMissingPartitions(type);
        Option lastCompleteTableServiceInstant = (Option)missingPair.getLeft();
        List missingPartitions = (List)missingPair.getRight();
        String leftBoundary = lastCompleteTableServiceInstant.isPresent() ? ((HoodieInstant)((Option)missingPair.getLeft()).get()).requestedTime() : "00000000000000";
        String rightBoundary = this.instantTime;
        HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
        Set partitionsInCommitMeta = this.table.getActiveTimeline().filterCompletedInstants().getCommitsTimeline().getInstantsAsStream().filter(instant -> !lastCompleteTableServiceInstant.isPresent() || !instant.equals(lastCompleteTableServiceInstant.get())).filter(this::filterCommitByTableType).flatMap(instant -> {
            try {
                String completionTime = instant.getCompletionTime();
                if (completionTime.compareTo(leftBoundary) >= 0 && completionTime.compareTo(rightBoundary) < 0) {
                    HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata((HoodieInstant)instant, (HoodieTimeline)activeTimeline);
                    return metadata.getWriteStats().stream().map(HoodieWriteStat::getPartitionPath);
                }
                return Stream.empty();
            }
            catch (IOException e) {
                throw new HoodieIOException("Failed to get commit meta " + instant, e);
            }
        }).collect(Collectors.toSet());
        partitionsInCommitMeta.addAll(missingPartitions);
        return Pair.of((Object)lastCompleteTableServiceInstant, partitionsInCommitMeta);
    }

    private boolean filterCommitByTableType(HoodieInstant instant) {
        switch (this.table.getMetaClient().getTableType()) {
            case MERGE_ON_READ: {
                return MOR_COMMITS.contains(instant.getAction());
            }
            case COPY_ON_WRITE: {
                return COW_COMMITS.contains(instant.getAction());
            }
        }
        throw new HoodieException("Un-supported table type " + this.table.getMetaClient().getTableType());
    }

    public Pair<Option<HoodieInstant>, List<String>> fetchMissingPartitions(TableServiceType tableServiceType) {
        if (!this.config.isIncrementalTableServiceEnabled()) {
            return Pair.of((Object)Option.empty(), Collections.emptyList());
        }
        Option instant = Option.empty();
        List missingPartitions = new ArrayList();
        switch (tableServiceType) {
            case COMPACT: 
            case LOG_COMPACT: {
                Option lastCompactionCommitInstant = this.table.getActiveTimeline().filterCompletedInstants().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{"commit"})).lastInstant();
                if (!lastCompactionCommitInstant.isPresent()) break;
                instant = lastCompactionCommitInstant;
                String action = tableServiceType.equals((Object)TableServiceType.COMPACT) ? "compaction" : "logcompaction";
                HoodieInstant compactionPlanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, action, ((HoodieInstant)instant.get()).requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
                Option details = this.table.getMetaClient().getActiveTimeline().readCompactionPlanAsBytes(compactionPlanInstant);
                HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)this.table.getMetaClient(), (Option)details);
                if (compactionPlan.getMissingSchedulePartitions() == null) break;
                missingPartitions = compactionPlan.getMissingSchedulePartitions();
                break;
            }
            case CLUSTER: {
                Option lastClusteringInstant = this.table.getActiveTimeline().filterCompletedInstants().getLastClusteringInstant();
                if (!lastClusteringInstant.isPresent()) break;
                instant = lastClusteringInstant;
                Option clusteringPlan = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)this.table.getMetaClient(), (HoodieInstant)((HoodieInstant)lastClusteringInstant.get()));
                if (!clusteringPlan.isPresent() || ((HoodieClusteringPlan)((Pair)clusteringPlan.get()).getRight()).getMissingSchedulePartitions() == null) break;
                missingPartitions = ((HoodieClusteringPlan)((Pair)clusteringPlan.get()).getRight()).getMissingSchedulePartitions();
                break;
            }
            default: {
                throw new HoodieException("Un-supported incremental table service " + tableServiceType);
            }
        }
        return Pair.of((Object)instant, missingPartitions);
    }
}

