/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.assigners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.BinPacking;

public class PreAssignSplitAssigner
implements SplitAssigner {
    private final int splitBatchSize;
    private final int parallelism;
    private final Map<Integer, LinkedList<FileStoreSourceSplit>> pendingSplitAssignment;
    private final AtomicInteger numberOfPendingSplits;
    private final Collection<FileStoreSourceSplit> splits;

    public PreAssignSplitAssigner(int splitBatchSize, SplitEnumeratorContext<FileStoreSourceSplit> context, Collection<FileStoreSourceSplit> splits) {
        this(splitBatchSize, context.currentParallelism(), splits);
    }

    public PreAssignSplitAssigner(int splitBatchSize, int parallelism, Collection<FileStoreSourceSplit> splits, Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData) {
        this(splitBatchSize, parallelism, (Collection<FileStoreSourceSplit>)splits.stream().filter(s -> PreAssignSplitAssigner.filter(partitionRowProjection, dynamicFilteringData, s)).collect(Collectors.toList()));
    }

    public PreAssignSplitAssigner(int splitBatchSize, int parallelism, Collection<FileStoreSourceSplit> splits) {
        this.splitBatchSize = splitBatchSize;
        this.parallelism = parallelism;
        this.splits = splits;
        this.pendingSplitAssignment = PreAssignSplitAssigner.createBatchFairSplitAssignment(splits, parallelism);
        this.numberOfPendingSplits = new AtomicInteger(splits.size());
    }

    @Override
    public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
        Queue taskSplits = this.pendingSplitAssignment.get(subtask);
        ArrayList<FileStoreSourceSplit> assignment = new ArrayList<FileStoreSourceSplit>();
        while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < this.splitBatchSize) {
            assignment.add((FileStoreSourceSplit)taskSplits.poll());
        }
        this.numberOfPendingSplits.getAndAdd(-assignment.size());
        return assignment;
    }

    @Override
    public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
        this.pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new LinkedList()).add(split);
        this.numberOfPendingSplits.incrementAndGet();
    }

    @Override
    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
        LinkedList remainingSplits = this.pendingSplitAssignment.computeIfAbsent(subtask, k -> new LinkedList());
        ListIterator<FileStoreSourceSplit> iterator = splits.listIterator(splits.size());
        while (iterator.hasPrevious()) {
            remainingSplits.addFirst(iterator.previous());
        }
        this.numberOfPendingSplits.getAndAdd(splits.size());
    }

    @Override
    public Collection<FileStoreSourceSplit> remainingSplits() {
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        this.pendingSplitAssignment.values().forEach(splits::addAll);
        return splits;
    }

    private static Map<Integer, LinkedList<FileStoreSourceSplit>> createBatchFairSplitAssignment(Collection<FileStoreSourceSplit> splits, int numReaders) {
        List<List<FileStoreSourceSplit>> assignmentList = BinPacking.packForFixedBinNumber(splits, split -> split.split().rowCount(), numReaders);
        HashMap<Integer, LinkedList<FileStoreSourceSplit>> assignment = new HashMap<Integer, LinkedList<FileStoreSourceSplit>>();
        for (int i = 0; i < assignmentList.size(); ++i) {
            assignment.put(i, new LinkedList(assignmentList.get(i)));
        }
        return assignment;
    }

    @Override
    public Optional<Long> getNextSnapshotId(int subtask) {
        LinkedList<FileStoreSourceSplit> pendingSplits = this.pendingSplitAssignment.get(subtask);
        return pendingSplits == null || pendingSplits.isEmpty() ? Optional.empty() : TableScanUtils.getSnapshotId(pendingSplits.peekFirst());
    }

    @Override
    public int numberOfRemainingSplits() {
        return this.numberOfPendingSplits.get();
    }

    public SplitAssigner ofDynamicPartitionPruning(Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData) {
        return new PreAssignSplitAssigner(this.splitBatchSize, this.parallelism, this.splits, partitionRowProjection, dynamicFilteringData);
    }

    private static boolean filter(Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData, FileStoreSourceSplit sourceSplit) {
        DataSplit dataSplit = (DataSplit)sourceSplit.split();
        BinaryRow partition = dataSplit.partition();
        FlinkRowData projected = new FlinkRowData(partitionRowProjection.apply(partition));
        return dynamicFilteringData.contains((RowData)projected);
    }
}

