/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.text.ParseException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

public class SparkScheduleCompactionActionExecutor<T extends HoodieRecordPayload>
extends BaseScheduleCompactionActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(SparkScheduleCompactionActionExecutor.class);

    public SparkScheduleCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, String instantTime, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime, extraMetadata);
    }

    @Override
    protected HoodieCompactionPlan scheduleCompaction() {
        LOG.info((Object)("Checking if compaction needs to be run on " + this.config.getBasePath()));
        boolean compactable = this.needCompact(this.config.getInlineCompactTriggerStrategy());
        if (compactable) {
            LOG.info((Object)("Generating compaction plan for merge on read table " + this.config.getBasePath()));
            HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor();
            try {
                SyncableFileSystemView fileSystemView = (SyncableFileSystemView)this.table.getSliceView();
                Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations().map(instantTimeOpPair -> ((CompactionOperation)instantTimeOpPair.getValue()).getFileGroupId()).collect(Collectors.toSet());
                fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
                return compactor.generateCompactionPlan(this.context, this.table, this.config, this.instantTime, fgInPendingCompactionAndClustering);
            }
            catch (IOException e) {
                throw new HoodieCompactionException("Could not schedule compaction " + this.config.getBasePath(), e);
            }
        }
        return new HoodieCompactionPlan();
    }

    public Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
        String latestInstantTs;
        Option<HoodieInstant> lastCompaction = this.table.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
        HoodieTimeline deltaCommits = this.table.getActiveTimeline().getDeltaCommitTimeline();
        int deltaCommitsSinceLastCompaction = 0;
        if (lastCompaction.isPresent()) {
            latestInstantTs = lastCompaction.get().getTimestamp();
            deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
        } else {
            latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
            deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants();
        }
        return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
    }

    public boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
        boolean compactable;
        Pair<Integer, String> latestDeltaCommitInfo = this.getLatestDeltaCommitInfo(compactionTriggerStrategy);
        int inlineCompactDeltaCommitMax = this.config.getInlineCompactDeltaCommitMax();
        int inlineCompactDeltaSecondsMax = this.config.getInlineCompactDeltaSecondsMax();
        switch (compactionTriggerStrategy) {
            case NUM_COMMITS: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
                if (!compactable) break;
                LOG.info((Object)String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
                break;
            }
            case TIME_ELAPSED: {
                boolean bl = compactable = (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info((Object)String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
                break;
            }
            case NUM_OR_TIME: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() || (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info((Object)String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, inlineCompactDeltaSecondsMax));
                break;
            }
            case NUM_AND_TIME: {
                boolean bl = compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft() && (long)inlineCompactDeltaSecondsMax <= this.parsedToSeconds(this.instantTime) - this.parsedToSeconds(latestDeltaCommitInfo.getRight());
                if (!compactable) break;
                LOG.info((Object)String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax, inlineCompactDeltaSecondsMax));
                break;
            }
            default: {
                throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + (Object)((Object)this.config.getInlineCompactTriggerStrategy()));
            }
        }
        return compactable;
    }

    public Long parsedToSeconds(String time) {
        long timestamp;
        try {
            timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000L;
        }
        catch (ParseException e) {
            throw new HoodieCompactionException(e.getMessage(), e);
        }
        return timestamp;
    }
}

