/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.cluster.strategy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseConsistentHashingBucketClusteringPlanStrategy<T extends HoodieRecordPayload, I, K, O>
extends PartitionAwareClusteringPlanStrategy<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseConsistentHashingBucketClusteringPlanStrategy.class);
    public static final String METADATA_PARTITION_KEY = "clustering.group.partition";
    public static final String METADATA_CHILD_NODE_KEY = "clustering.group.child.node";
    public static final String METADATA_SEQUENCE_NUMBER_KEY = "clustering.group.sequence.no";

    public BaseConsistentHashingBucketClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
        ValidationUtils.checkArgument(this.getHoodieTable().getIndex() instanceof HoodieConsistentBucketIndex, this.getClass().getName() + " is only applicable to table with consistent hash index.");
    }

    @Override
    public boolean checkPrecondition() {
        HoodieTimeline timeline = this.getHoodieTable().getActiveTimeline().getDeltaCommitTimeline().filterInflightsAndRequested();
        if (!timeline.empty()) {
            LOG.warn("When using consistent bucket, clustering cannot be scheduled async if there are concurrent writers. Writer instant: {}.", timeline.getInstants());
            return false;
        }
        return true;
    }

    @Override
    protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
        Option<HoodieConsistentHashingMetadata> metadata = ConsistentBucketIndexUtils.loadMetadata(this.getHoodieTable(), partitionPath);
        ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for partition: " + partitionPath);
        ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata.get());
        int splitSlot = this.getWriteConfig().getBucketIndexMaxNumBuckets() - identifier.getNumBuckets();
        Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult = this.buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
        ArrayList ret = new ArrayList(splitResult.getLeft());
        List<FileSlice> remainedSlices = splitResult.getRight();
        if (this.isBucketClusteringMergeEnabled()) {
            int mergeSlot = identifier.getNumBuckets() - this.getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
            Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> mergeResult = this.buildMergeClusteringGroup(identifier, remainedSlices, mergeSlot);
            ret.addAll(mergeResult.getLeft());
            remainedSlices = mergeResult.getRight();
        }
        if (this.isBucketClusteringSortEnabled()) {
            ret.addAll(remainedSlices.stream().map(fs -> {
                ConsistentHashingNode oldNode = identifier.getBucketByFileId(fs.getFileId());
                ConsistentHashingNode newNode = new ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE);
                return HoodieClusteringGroup.newBuilder().setSlices(BaseConsistentHashingBucketClusteringPlanStrategy.getFileSliceInfo(Collections.singletonList(fs))).setNumOutputFileGroups(1).setMetrics(this.buildMetrics(Collections.singletonList(fs))).setExtraMetadata(this.constructExtraMetadata(fs.getPartitionPath(), Collections.singletonList(newNode), identifier.getMetadata().getSeqNo())).build();
            }).collect(Collectors.toList()));
        }
        return ret.stream();
    }

    protected boolean isBucketClusteringMergeEnabled() {
        return true;
    }

    protected boolean isBucketClusteringSortEnabled() {
        return true;
    }

    @Override
    protected Stream<FileSlice> getFileSlicesEligibleForClustering(String partition) {
        TableFileSystemView fileSystemView = this.getHoodieTable().getFileSystemView();
        boolean isPartitionInClustering = fileSystemView.getFileGroupsInPendingClustering().anyMatch(p -> ((HoodieFileGroupId)p.getLeft()).getPartitionPath().equals(partition));
        if (isPartitionInClustering) {
            LOG.info("Partition {} is already in clustering, skip.", (Object)partition);
            return Stream.empty();
        }
        return super.getFileSlicesEligibleForClustering(partition);
    }

    @Override
    protected Map<String, String> getStrategyParams() {
        HashMap<String, String> params = new HashMap<String, String>();
        if (!StringUtils.isNullOrEmpty(this.getWriteConfig().getClusteringSortColumns())) {
            params.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), this.getWriteConfig().getClusteringSortColumns());
        }
        return params;
    }

    protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> buildSplitClusteringGroups(ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int splitSlot) {
        ArrayList<HoodieClusteringGroup> retGroup = new ArrayList<HoodieClusteringGroup>();
        ArrayList<FileSlice> fsUntouched = new ArrayList<FileSlice>();
        long splitSize = this.getSplitSize();
        int remainingSplitSlot = splitSlot;
        for (FileSlice fs : fileSlices) {
            boolean needSplit;
            boolean bl = needSplit = fs.getTotalFileSize() > splitSize;
            if (!needSplit || remainingSplitSlot == 0) {
                fsUntouched.add(fs);
                continue;
            }
            Option<List<ConsistentHashingNode>> nodes = identifier.splitBucket(fs.getFileId());
            if (!nodes.isPresent()) {
                fsUntouched.add(fs);
                continue;
            }
            --remainingSplitSlot;
            List<FileSlice> fsList = Collections.singletonList(fs);
            retGroup.add(HoodieClusteringGroup.newBuilder().setSlices(BaseConsistentHashingBucketClusteringPlanStrategy.getFileSliceInfo(fsList)).setNumOutputFileGroups(2).setMetrics(this.buildMetrics(fsList)).setExtraMetadata(this.constructExtraMetadata(fs.getPartitionPath(), nodes.get(), identifier.getMetadata().getSeqNo())).build());
        }
        return Triple.of(retGroup, splitSlot - remainingSplitSlot, fsUntouched);
    }

    protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> buildMergeClusteringGroup(ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int mergeSlot) {
        if (fileSlices.size() <= 1) {
            return Triple.of(Collections.emptyList(), 0, fileSlices);
        }
        long mergeSize = this.getMergeSize();
        int remainingMergeSlot = mergeSlot;
        ArrayList<HoodieClusteringGroup> groups2 = new ArrayList<HoodieClusteringGroup>();
        boolean[] added = new boolean[fileSlices.size()];
        fileSlices.sort(Comparator.comparingInt(a -> identifier.getBucketByFileId(a.getFileId()).getValue()));
        for (int i2 = 0; i2 < fileSlices.size(); ++i2) {
            if (added[i2] || fileSlices.get(i2).getTotalFileSize() > mergeSize) continue;
            int[] rangeIdx = new int[]{i2, i2};
            long totalSize = fileSlices.get(i2).getTotalFileSize();
            block1: for (int k = 0; k < 2; ++k) {
                boolean forward;
                boolean bl = forward = k == 1;
                do {
                    boolean isNeighbour;
                    int nextIdx = forward ? (rangeIdx[k] + 1 < fileSlices.size() ? rangeIdx[k] + 1 : 0) : (rangeIdx[k] >= 1 ? rangeIdx[k] - 1 : fileSlices.size() - 1);
                    boolean bl2 = isNeighbour = identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId()) == identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
                    if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour || totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() > mergeSize) continue block1;
                    totalSize += fileSlices.get(nextIdx).getTotalFileSize();
                    rangeIdx[k] = nextIdx;
                    --remainingMergeSlot;
                } while (rangeIdx[k] != i2);
            }
            int startIdx = rangeIdx[0];
            int endIdx = rangeIdx[1];
            if (endIdx == i2 && startIdx == i2) continue;
            ArrayList<FileSlice> fs = new ArrayList<FileSlice>();
            while (true) {
                added[startIdx] = true;
                fs.add(fileSlices.get(startIdx));
                if (startIdx == endIdx) break;
                startIdx = startIdx + 1 < fileSlices.size() ? startIdx + 1 : 0;
            }
            groups2.add(HoodieClusteringGroup.newBuilder().setSlices(BaseConsistentHashingBucketClusteringPlanStrategy.getFileSliceInfo(fs)).setNumOutputFileGroups(1).setMetrics(this.buildMetrics(fs)).setExtraMetadata(this.constructExtraMetadata(((FileSlice)fs.get(0)).getPartitionPath(), identifier.mergeBucket(fs.stream().map(FileSlice::getFileId).collect(Collectors.toList())), identifier.getMetadata().getSeqNo())).build());
        }
        List fsUntouched = IntStream.range(0, fileSlices.size()).filter(i -> !added[i]).mapToObj(fileSlices::get).collect(Collectors.toList());
        return Triple.of(groups2, mergeSlot - remainingMergeSlot, fsUntouched);
    }

    private Map<String, String> constructExtraMetadata(String partition, List<ConsistentHashingNode> nodes, int seqNo) {
        HashMap<String, String> extraMetadata = new HashMap<String, String>();
        try {
            extraMetadata.put(METADATA_PARTITION_KEY, partition);
            extraMetadata.put(METADATA_CHILD_NODE_KEY, ConsistentHashingNode.toJsonString(nodes));
            extraMetadata.put(METADATA_SEQUENCE_NUMBER_KEY, Integer.toString(seqNo));
        }
        catch (IOException e) {
            LOG.error("Failed to construct extra metadata, partition: {}, nodes:{}", (Object)partition, nodes);
            throw new HoodieClusteringException("Failed to construct extra metadata, partition: " + partition + ", nodes:" + nodes);
        }
        return extraMetadata;
    }

    private long getSplitSize() {
        HoodieFileFormat format = this.getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
        return (long)((double)this.getWriteConfig().getMaxFileSize(format) * this.getWriteConfig().getBucketSplitThreshold());
    }

    private long getMergeSize() {
        HoodieFileFormat format = this.getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
        return (long)((double)this.getWriteConfig().getMaxFileSize(format) * this.getWriteConfig().getBucketMergeThreshold());
    }
}

