/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.assigners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.utils.BinPacking;

public class PreAssignSplitAssigner
implements SplitAssigner {
    private final int splitBatchSize;
    private final Map<Integer, LinkedList<FileStoreSourceSplit>> pendingSplitAssignment;

    public PreAssignSplitAssigner(int splitBatchSize, SplitEnumeratorContext<FileStoreSourceSplit> context, Collection<FileStoreSourceSplit> splits) {
        this.splitBatchSize = splitBatchSize;
        this.pendingSplitAssignment = PreAssignSplitAssigner.createBatchFairSplitAssignment(splits, context.currentParallelism());
    }

    @Override
    public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
        Queue taskSplits = this.pendingSplitAssignment.get(subtask);
        ArrayList<FileStoreSourceSplit> assignment = new ArrayList<FileStoreSourceSplit>();
        while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < this.splitBatchSize) {
            assignment.add((FileStoreSourceSplit)taskSplits.poll());
        }
        return assignment;
    }

    @Override
    public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
        this.pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new LinkedList()).add(split);
    }

    @Override
    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
        LinkedList remainingSplits = this.pendingSplitAssignment.computeIfAbsent(subtask, k -> new LinkedList());
        ListIterator<FileStoreSourceSplit> iterator = splits.listIterator(splits.size());
        while (iterator.hasPrevious()) {
            remainingSplits.addFirst(iterator.previous());
        }
    }

    @Override
    public Collection<FileStoreSourceSplit> remainingSplits() {
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        this.pendingSplitAssignment.values().forEach(splits::addAll);
        return splits;
    }

    private static Map<Integer, LinkedList<FileStoreSourceSplit>> createBatchFairSplitAssignment(Collection<FileStoreSourceSplit> splits, int numReaders) {
        List<List<FileStoreSourceSplit>> assignmentList = BinPacking.packForFixedBinNumber(splits, split -> split.split().rowCount(), numReaders);
        HashMap<Integer, LinkedList<FileStoreSourceSplit>> assignment = new HashMap<Integer, LinkedList<FileStoreSourceSplit>>();
        for (int i = 0; i < assignmentList.size(); ++i) {
            assignment.put(i, new LinkedList(assignmentList.get(i)));
        }
        return assignment;
    }

    @Override
    public Optional<Long> getNextSnapshotId(int subtask) {
        LinkedList<FileStoreSourceSplit> pendingSplits = this.pendingSplitAssignment.get(subtask);
        return pendingSplits == null || pendingSplits.isEmpty() ? Optional.empty() : TableScanUtils.getSnapshotId(pendingSplits.peekFirst());
    }
}

