/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.timeline.versioning.v2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.ArchivalUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.ActiveAction;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimelineArchiverV2<T extends HoodieAvroPayload, I, K, O>
implements HoodieTimelineArchiver<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(TimelineArchiverV2.class);
    private final HoodieWriteConfig config;
    private final int maxInstantsToKeep;
    private final int minInstantsToKeep;
    private final HoodieTable<T, I, K, O> table;
    private final HoodieTableMetaClient metaClient;
    private final TransactionManager txnManager;
    private final LSMTimelineWriter timelineWriter;

    public TimelineArchiverV2(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
        this.config = config;
        this.table = table;
        this.metaClient = table.getMetaClient();
        this.txnManager = new TransactionManager(config, table.getStorage());
        this.timelineWriter = LSMTimelineWriter.getInstance(config, table);
        Pair<Integer, Integer> minAndMaxInstants = ArchivalUtils.getMinAndMaxInstantsToKeep(table, this.metaClient);
        this.minInstantsToKeep = minAndMaxInstants.getLeft();
        this.maxInstantsToKeep = minAndMaxInstants.getRight();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
        try {
            if (acquireLock) {
                this.txnManager.beginTransaction(Option.empty(), Option.empty());
            }
        }
        catch (HoodieLockException e2) {
            LOG.error("Fail to begin transaction", (Throwable)e2);
            return 0;
        }
        try {
            List<ActiveAction> instantsToArchive = this.getInstantsToArchive().sorted().collect(Collectors.toList());
            if (!instantsToArchive.isEmpty()) {
                LOG.info("Archiving and deleting instants {}", instantsToArchive);
                Consumer<Exception> exceptionHandler = e -> {
                    if (this.config.isFailOnTimelineArchivingEnabled()) {
                        throw new HoodieException((Throwable)e);
                    }
                };
                this.timelineWriter.write(instantsToArchive, Option.of(action -> this.deleteAnyLeftOverMarkers(context, (ActiveAction)action)), Option.of(exceptionHandler));
                LOG.debug("Deleting archived instants");
                this.deleteArchivedActions(instantsToArchive, context);
                this.timelineWriter.compactAndClean(context);
            } else {
                LOG.info("No Instants to archive");
            }
            int n = instantsToArchive.size();
            return n;
        }
        finally {
            if (acquireLock) {
                this.txnManager.endTransaction(Option.empty());
            }
        }
    }

    private List<HoodieInstant> getCleanAndRollbackInstantsToArchive(HoodieInstant latestCommitInstantToArchive) {
        HoodieTimeline cleanAndRollbackTimeline = this.table.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet("clean", "rollback")).filterCompletedInstants();
        return cleanAndRollbackTimeline.getInstantsAsStream().filter(s -> InstantComparison.compareTimestamps(s.requestedTime(), InstantComparison.LESSER_THAN, latestCommitInstantToArchive.requestedTime())).collect(Collectors.toList());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
        Option<HoodieInstant> completedCommitBeforeEarliestPendingInstant;
        HoodieTimeline completedCommitsTimeline = this.table.getCompletedCommitsTimeline();
        if (completedCommitsTimeline.countInstants() <= this.maxInstantsToKeep) {
            return Collections.emptyList();
        }
        ArrayList earliestInstantToRetainCandidates = new ArrayList();
        Option<HoodieInstant> earliestPendingInstant = this.table.getActiveTimeline().getWriteTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
        Option<Object> earliestCommitToRetain = earliestPendingInstant.isPresent() ? (!(completedCommitBeforeEarliestPendingInstant = Option.fromJavaOptional(completedCommitsTimeline.filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.LESSER_THAN, ((HoodieInstant)earliestPendingInstant.get()).requestedTime())).getReverseOrderedInstants().findFirst())).isPresent() ? earliestPendingInstant : completedCommitBeforeEarliestPendingInstant) : Option.empty();
        earliestInstantToRetainCandidates.add(earliestCommitToRetain);
        Option earliestInstantToRetainForCompaction = this.metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && (this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS || this.config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME) ? CompactionUtils.getEarliestInstantToRetainForCompaction(this.table.getActiveTimeline(), this.config.getInlineCompactDeltaCommitMax()) : Option.empty();
        earliestInstantToRetainCandidates.add(earliestInstantToRetainForCompaction);
        Option<HoodieInstant> earliestInstantToRetainForClustering = ClusteringUtils.getEarliestInstantToRetainForClustering(this.table.getActiveTimeline(), this.table.getMetaClient(), this.config.getCleanerPolicy());
        earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);
        if (this.table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
            try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(this.table.getContext(), this.table.getStorage(), this.config.getMetadataConfig(), this.config.getBasePath());){
                Option<String> latestCompactionTime = tableMetadata.getLatestCompactionTime();
                if (!latestCompactionTime.isPresent()) {
                    LOG.info("Not archiving as there is no compaction yet on the metadata table");
                    List<HoodieInstant> list = Collections.emptyList();
                    return list;
                }
                LOG.info("Limiting archiving of instants to latest compaction on metadata table at {}", (Object)latestCompactionTime.get());
                earliestInstantToRetainCandidates.add(completedCommitsTimeline.findInstantsModifiedAfterByCompletionTime(latestCompactionTime.get()).firstInstant());
            }
            catch (Exception e) {
                throw new HoodieException("Error limiting instant archival based on metadata table", e);
            }
        }
        if (this.table.isMetadataTable()) {
            HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(HoodieTableMetadata.getDatasetBasePath(this.config.getBasePath())).setConf(this.metaClient.getStorageConf().newInstance()).build();
            Option<HoodieInstant> qualifiedEarliestInstant = TimelineUtils.getEarliestInstantForMetadataArchival(dataMetaClient.getActiveTimeline(), this.config.shouldArchiveBeyondSavepoint());
            earliestInstantToRetainCandidates.add(qualifiedEarliestInstant);
        }
        Optional<HoodieInstant> earliestInstantToRetain = earliestInstantToRetainCandidates.stream().filter(Option::isPresent).map(Option::get).min(InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
        Option<HoodieInstant> firstSavepoint = this.table.getCompletedSavepointTimeline().firstInstant();
        Set<String> savepointTimestamps = this.table.getSavepointTimestamps();
        Stream<HoodieInstant> instantToArchiveStream = completedCommitsTimeline.getInstantsAsStream().filter(s -> {
            if (this.config.shouldArchiveBeyondSavepoint()) {
                return !savepointTimestamps.contains(s.requestedTime());
            }
            return !firstSavepoint.isPresent() || InstantComparison.compareTimestamps(s.requestedTime(), InstantComparison.LESSER_THAN, ((HoodieInstant)firstSavepoint.get()).requestedTime());
        }).filter(s -> earliestInstantToRetain.map(instant -> InstantComparison.compareTimestamps(s.requestedTime(), InstantComparison.LESSER_THAN, instant.requestedTime())).orElse(true));
        return instantToArchiveStream.limit(completedCommitsTimeline.countInstants() - this.minInstantsToKeep).collect(Collectors.toList());
    }

    private Stream<ActiveAction> getInstantsToArchive() throws IOException {
        if (this.config.isMetaserverEnabled()) {
            return Stream.empty();
        }
        List<HoodieInstant> instantsToArchive = this.getCommitInstantsToArchive();
        if (!instantsToArchive.isEmpty()) {
            HoodieInstant latestCommitInstantToArchive = instantsToArchive.get(instantsToArchive.size() - 1);
            List<HoodieInstant> cleanAndRollbackInstantsToArchive = this.getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive);
            instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);
            instantsToArchive.sort(InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
        }
        ActiveTimelineV2 rawActiveTimeline = new ActiveTimelineV2(this.metaClient, false);
        Map<Pair, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstantsAsStream().collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(), InstantComparatorV2.getComparableAction(i.getAction()))));
        return instantsToArchive.stream().flatMap(hoodieInstant -> {
            List instantsToStream = (List)groupByTsAction.get(Pair.of(hoodieInstant.requestedTime(), InstantComparatorV2.getComparableAction(hoodieInstant.getAction())));
            if (instantsToStream != null) {
                return Stream.of(ActiveAction.fromInstants(instantsToStream));
            }
            return Stream.empty();
        });
    }

    private boolean deleteArchivedActions(List<ActiveAction> activeActions, HoodieEngineContext context) {
        ArrayList<HoodieInstant> pendingInstants = new ArrayList<HoodieInstant>();
        ArrayList<HoodieInstant> completedInstants = new ArrayList<HoodieInstant>();
        for (ActiveAction activeAction : activeActions) {
            completedInstants.addAll(activeAction.getCompletedInstants());
            pendingInstants.addAll(activeAction.getPendingInstants());
        }
        context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + this.config.getTableName());
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        if (!pendingInstants.isEmpty()) {
            context.foreach(pendingInstants, instant -> activeTimeline.deleteInstantFileIfExists((HoodieInstant)instant), Math.min(pendingInstants.size(), this.config.getArchiveDeleteParallelism()));
        }
        if (!completedInstants.isEmpty()) {
            completedInstants.stream().forEach(instant -> activeTimeline.deleteInstantFileIfExists((HoodieInstant)instant));
        }
        return true;
    }

    private void deleteAnyLeftOverMarkers(HoodieEngineContext context, ActiveAction activeAction) {
        WriteMarkers writeMarkers = WriteMarkersFactory.get(this.config.getMarkersType(), this.table, activeAction.getInstantTime());
        if (writeMarkers.deleteMarkerDir(context, this.config.getMarkersDeleteParallelism())) {
            LOG.info("Cleaned up left over marker directory for instant: {}", (Object)activeAction);
        }
    }
}

