/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.assigner;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;

@Internal
public class SimpleSplitAssigner
implements SplitAssigner {
    private final Deque<IcebergSourceSplit> pendingSplits;
    private CompletableFuture<Void> availableFuture;

    public SimpleSplitAssigner() {
        this.pendingSplits = new ArrayDeque<IcebergSourceSplit>();
    }

    public SimpleSplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
        this.pendingSplits = new ArrayDeque<IcebergSourceSplit>(assignerState.size());
        assignerState.forEach(splitState -> this.pendingSplits.add(splitState.split()));
    }

    @Override
    public GetSplitResult getNext(@Nullable String hostname) {
        if (this.pendingSplits.isEmpty()) {
            return GetSplitResult.unavailable();
        }
        IcebergSourceSplit split = this.pendingSplits.poll();
        return GetSplitResult.forSplit(split);
    }

    @Override
    public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
        this.addSplits(splits);
    }

    @Override
    public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
        this.addSplits(splits);
    }

    private void addSplits(Collection<IcebergSourceSplit> splits) {
        if (!splits.isEmpty()) {
            this.pendingSplits.addAll(splits);
            this.completeAvailableFuturesIfNeeded();
        }
    }

    @Override
    public Collection<IcebergSourceSplitState> state() {
        return this.pendingSplits.stream().map(split -> new IcebergSourceSplitState((IcebergSourceSplit)split, IcebergSourceSplitStatus.UNASSIGNED)).collect(Collectors.toList());
    }

    @Override
    public synchronized CompletableFuture<Void> isAvailable() {
        if (this.availableFuture == null) {
            this.availableFuture = new CompletableFuture();
        }
        return this.availableFuture;
    }

    private synchronized void completeAvailableFuturesIfNeeded() {
        if (this.availableFuture != null && !this.pendingSplits.isEmpty()) {
            this.availableFuture.complete(null);
        }
        this.availableFuture = null;
    }
}

