/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.append.cluster;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.append.cluster.IncrementalClusterStrategy;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HistoryPartitionCluster {
    private static final Logger LOG = LoggerFactory.getLogger(HistoryPartitionCluster.class);
    private final FileStoreTable table;
    private final IncrementalClusterStrategy incrementalClusterStrategy;
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionPredicate specifiedPartitions;
    private final Duration historyPartitionIdleTime;
    private final int historyPartitionLimit;
    private final int maxLevel;

    public HistoryPartitionCluster(FileStoreTable table, IncrementalClusterStrategy incrementalClusterStrategy, InternalRowPartitionComputer partitionComputer, PartitionPredicate specifiedPartitions, Duration historyPartitionIdleTime, int historyPartitionLimit) {
        this.table = table;
        this.incrementalClusterStrategy = incrementalClusterStrategy;
        this.partitionComputer = partitionComputer;
        this.specifiedPartitions = specifiedPartitions;
        this.historyPartitionIdleTime = historyPartitionIdleTime;
        this.historyPartitionLimit = historyPartitionLimit;
        this.maxLevel = table.coreOptions().numLevels() - 1;
    }

    @Nullable
    public static HistoryPartitionCluster create(FileStoreTable table, IncrementalClusterStrategy incrementalClusterStrategy, InternalRowPartitionComputer partitionComputer, @Nullable PartitionPredicate specifiedPartitions) {
        if (table.schema().partitionKeys().isEmpty()) {
            return null;
        }
        if (specifiedPartitions == null) {
            return null;
        }
        Duration idleTime = table.coreOptions().clusteringHistoryPartitionIdleTime();
        if (idleTime == null) {
            return null;
        }
        int limit = table.coreOptions().clusteringHistoryPartitionLimit();
        return new HistoryPartitionCluster(table, incrementalClusterStrategy, partitionComputer, specifiedPartitions, idleTime, limit);
    }

    public Map<BinaryRow, CompactUnit> pickForHistoryPartitions() {
        Map<BinaryRow, List<LevelSortedRun>> partitionLevels = this.constructLevelsForHistoryPartitions();
        IncrementalClusterManager.logForPartitionLevel(partitionLevels, this.partitionComputer);
        HashMap<BinaryRow, CompactUnit> units = new HashMap<BinaryRow, CompactUnit>();
        partitionLevels.forEach((k, v) -> {
            Optional<CompactUnit> pick = this.incrementalClusterStrategy.pick(this.maxLevel + 1, (List<LevelSortedRun>)v, true);
            pick.ifPresent(compactUnit -> units.put((BinaryRow)k, (CompactUnit)compactUnit));
        });
        return units;
    }

    @VisibleForTesting
    public Map<BinaryRow, List<LevelSortedRun>> constructLevelsForHistoryPartitions() {
        long historyMilli = LocalDateTime.now().minus(this.historyPartitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        List<BinaryRow> historyPartitions = this.table.newSnapshotReader().withLevelMinMaxFilter((min, max) -> min < this.maxLevel).withLevelFilter(level -> level < this.maxLevel).partitionEntries().stream().filter(entry -> entry.lastFileCreationTime() < historyMilli).sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime)).map(PartitionEntry::partition).collect(Collectors.toList());
        List<DataSplit> historyDataSplits = this.table.newSnapshotReader().withPartitionFilter(historyPartitions).read().dataSplits();
        HashMap<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new HashMap<BinaryRow, List<DataFileMeta>>();
        for (DataSplit dataSplit : historyDataSplits) {
            historyPartitionFiles.computeIfAbsent(dataSplit.partition(), k -> new ArrayList()).addAll(dataSplit.dataFiles());
        }
        return this.filterPartitions(historyPartitionFiles).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> IncrementalClusterManager.constructPartitionLevels((List)entry.getValue())));
    }

    private Map<BinaryRow, List<DataFileMeta>> filterPartitions(Map<BinaryRow, List<DataFileMeta>> partitionFiles) {
        HashMap<BinaryRow, List<DataFileMeta>> result = new HashMap<BinaryRow, List<DataFileMeta>>();
        partitionFiles.forEach((part, files) -> {
            if (this.specifiedPartitions.test((BinaryRow)part)) {
                return;
            }
            if (result.size() < this.historyPartitionLimit) {
                result.put((BinaryRow)part, (List<DataFileMeta>)files);
            }
        });
        LOG.info("Find {} history partitions for full clustering, the history partitions are {}", (Object)result.size(), result.keySet().stream().map(this.partitionComputer::generatePartValues).collect(Collectors.toSet()));
        return result;
    }
}

