/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.append.cluster;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.cluster.HistoryPartitionCluster;
import org.apache.paimon.append.cluster.IncrementalClusterStrategy;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalClusterManager {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class);
    private final InternalRowPartitionComputer partitionComputer;
    private final SnapshotReader snapshotReader;
    private final IncrementalClusterStrategy incrementalClusterStrategy;
    private final CoreOptions.OrderType clusterCurve;
    private final List<String> clusterKeys;
    private final int numLevels;
    @Nullable
    private final HistoryPartitionCluster historyPartitionCluster;

    public IncrementalClusterManager(FileStoreTable table) {
        this(table, null);
    }

    public IncrementalClusterManager(FileStoreTable table, @Nullable PartitionPredicate specifiedPartitions) {
        Preconditions.checkArgument(table.bucketMode() == BucketMode.BUCKET_UNAWARE, "only append unaware-bucket table support incremental clustering.");
        CoreOptions options = table.coreOptions();
        Preconditions.checkArgument(options.clusteringIncrementalEnabled(), "Only support incremental clustering when '%s' is true.", CoreOptions.CLUSTERING_INCREMENTAL.key());
        this.numLevels = options.numLevels();
        this.partitionComputer = new InternalRowPartitionComputer(table.coreOptions().partitionDefaultName(), table.store().partitionType(), table.partitionKeys().toArray(new String[0]), table.coreOptions().legacyPartitionName());
        this.snapshotReader = table.newSnapshotReader().dropStats().withPartitionFilter(specifiedPartitions);
        this.incrementalClusterStrategy = new IncrementalClusterStrategy(table.schemaManager(), options.clusteringColumns(), options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger());
        this.clusterCurve = options.clusteringStrategy(options.clusteringColumns().size());
        this.clusterKeys = options.clusteringColumns();
        this.historyPartitionCluster = HistoryPartitionCluster.create(table, this.incrementalClusterStrategy, this.partitionComputer, specifiedPartitions);
    }

    public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) {
        Map<BinaryRow, List<LevelSortedRun>> partitionLevels = this.constructLevels();
        IncrementalClusterManager.logForPartitionLevel(partitionLevels, this.partitionComputer);
        HashMap<BinaryRow, CompactUnit> units = new HashMap<BinaryRow, CompactUnit>();
        partitionLevels.forEach((k, v) -> {
            Optional<CompactUnit> pick = this.incrementalClusterStrategy.pick(this.numLevels, (List<LevelSortedRun>)v, fullCompaction);
            pick.ifPresent(compactUnit -> units.put((BinaryRow)k, (CompactUnit)compactUnit));
        });
        if (this.historyPartitionCluster != null) {
            units.putAll(this.historyPartitionCluster.pickForHistoryPartitions());
        }
        if (LOG.isDebugEnabled()) {
            units.forEach((partition, compactUnit) -> {
                String filesInfo = compactUnit.files().stream().map(file -> String.format("%s,%s,%s", file.fileName(), file.level(), file.fileSize())).collect(Collectors.joining(", "));
                LOG.debug("Partition {}, outputLevel:{}, clustered with {} files: [{}]", new Object[]{this.partitionComputer.generatePartValues((InternalRow)partition), compactUnit.outputLevel(), compactUnit.files().size(), filesInfo});
            });
        }
        return units;
    }

    public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
        List<DataSplit> dataSplits = this.snapshotReader.read().dataSplits();
        HashMap<BinaryRow, List> partitionFiles = new HashMap<BinaryRow, List>();
        for (DataSplit dataSplit : dataSplits) {
            partitionFiles.computeIfAbsent(dataSplit.partition(), k -> new ArrayList()).addAll(dataSplit.dataFiles());
        }
        return partitionFiles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> IncrementalClusterManager.constructPartitionLevels((List)entry.getValue())));
    }

    public static List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> partitionFiles) {
        ArrayList<LevelSortedRun> partitionLevels = new ArrayList<LevelSortedRun>();
        Map<Integer, List<DataFileMeta>> levelMap = partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
        for (Map.Entry<Integer, List<DataFileMeta>> entry : levelMap.entrySet()) {
            int level = entry.getKey();
            if (level == 0) {
                for (DataFileMeta level0File : entry.getValue()) {
                    partitionLevels.add(new LevelSortedRun(level, SortedRun.fromSingle(level0File)));
                }
                continue;
            }
            partitionLevels.add(new LevelSortedRun(level, SortedRun.fromSorted(entry.getValue())));
        }
        partitionLevels.sort(Comparator.comparing(LevelSortedRun::level));
        return partitionLevels;
    }

    public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) {
        ArrayList<DataSplit> splits = new ArrayList<DataSplit>();
        DataSplit.Builder builder = DataSplit.builder().withPartition(partition).withBucket(0).withTotalBuckets(1).isStreaming(false);
        SplitGenerator splitGenerator = this.snapshotReader.splitGenerator();
        List<SplitGenerator.SplitGroup> splitGroups = splitGenerator.splitForBatch(files);
        for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
            List<DataFileMeta> dataFiles = splitGroup.files;
            String bucketPath = this.snapshotReader.pathFactory().bucketPath(partition, 0).toString();
            builder.withDataFiles(dataFiles).rawConvertible(splitGroup.rawConvertible).withBucketPath(bucketPath);
            splits.add(builder.build());
        }
        return splits;
    }

    public static List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) {
        return filesAfterCluster.stream().map(file -> file.upgrade(outputLevel)).collect(Collectors.toList());
    }

    public static void logForPartitionLevel(Map<BinaryRow, List<LevelSortedRun>> partitionLevels, InternalRowPartitionComputer partitionComputer) {
        if (LOG.isDebugEnabled()) {
            partitionLevels.forEach((partition, levelSortedRuns) -> {
                String runsInfo = levelSortedRuns.stream().map(lsr -> String.format("level-%s:%s", lsr.level(), lsr.run().files().size())).collect(Collectors.joining(","));
                LOG.debug("Partition {} has {} runs: [{}]", new Object[]{partitionComputer.generatePartValues((InternalRow)partition), levelSortedRuns.size(), runsInfo});
            });
        }
    }

    public CoreOptions.OrderType clusterCurve() {
        return this.clusterCurve;
    }

    public List<String> clusterKeys() {
        return this.clusterKeys;
    }

    @VisibleForTesting
    HistoryPartitionCluster historyPartitionCluster() {
        return this.historyPartitionCluster;
    }
}

