/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.assigner;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.flink.source.split.SerializableComparator;

@Internal
public class DefaultSplitAssigner
implements SplitAssigner {
    private final Queue<IcebergSourceSplit> pendingSplits;
    private CompletableFuture<Void> availableFuture;

    public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> comparator) {
        this.pendingSplits = comparator == null ? new ArrayDeque() : new PriorityQueue<IcebergSourceSplit>(comparator);
    }

    public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> comparator, Collection<IcebergSourceSplitState> assignerState) {
        this(comparator);
        assignerState.forEach(splitState -> this.pendingSplits.add(splitState.split()));
    }

    @Override
    public synchronized GetSplitResult getNext(@Nullable String hostname) {
        if (this.pendingSplits.isEmpty()) {
            return GetSplitResult.unavailable();
        }
        IcebergSourceSplit split = this.pendingSplits.poll();
        return GetSplitResult.forSplit(split);
    }

    @Override
    public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
        this.addSplits(splits);
    }

    @Override
    public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
        this.addSplits(splits);
    }

    private synchronized void addSplits(Collection<IcebergSourceSplit> splits) {
        if (!splits.isEmpty()) {
            this.pendingSplits.addAll(splits);
            this.completeAvailableFuturesIfNeeded();
        }
    }

    @Override
    public synchronized Collection<IcebergSourceSplitState> state() {
        return this.pendingSplits.stream().map(split -> new IcebergSourceSplitState((IcebergSourceSplit)split, IcebergSourceSplitStatus.UNASSIGNED)).collect(Collectors.toList());
    }

    @Override
    public synchronized CompletableFuture<Void> isAvailable() {
        if (this.availableFuture == null) {
            this.availableFuture = new CompletableFuture();
        }
        return this.availableFuture;
    }

    @Override
    public synchronized int pendingSplitCount() {
        return this.pendingSplits.size();
    }

    private synchronized void completeAvailableFuturesIfNeeded() {
        if (this.availableFuture != null && !this.pendingSplits.isEmpty()) {
            this.availableFuture.complete(null);
        }
        this.availableFuture = null;
    }
}

