/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractIcebergEnumerator
implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
    private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
    private final SplitAssigner assigner;
    private final Map<Integer, String> readersAwaitingSplit;
    private final AtomicReference<CompletableFuture<Void>> availableFuture;

    AbstractIcebergEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext, SplitAssigner assigner) {
        this.enumeratorContext = enumeratorContext;
        this.assigner = assigner;
        this.readersAwaitingSplit = new LinkedHashMap<Integer, String>();
        this.availableFuture = new AtomicReference();
    }

    public void start() {
        this.assigner.start();
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        throw new UnsupportedOperationException(String.format("Received invalid default split request event from subtask %d as Iceberg source uses custom split request event", subtaskId));
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof SplitRequestEvent)) {
            throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", subtaskId, sourceEvent.getClass().getCanonicalName()));
        }
        SplitRequestEvent splitRequestEvent = (SplitRequestEvent)sourceEvent;
        LOG.info("Received request split event from subtask {}", (Object)subtaskId);
        this.assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
        this.readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname());
        this.assignSplits();
    }

    public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
        LOG.info("Add {} splits back to the pool for failed subtask {}", (Object)splits.size(), (Object)subtaskId);
        this.assigner.onUnassignedSplits(splits);
        this.assignSplits();
    }

    public void addReader(int subtaskId) {
        LOG.info("Added reader: {}", (Object)subtaskId);
    }

    private void assignSplits() {
        LOG.info("Assigning splits for {} awaiting readers", (Object)this.readersAwaitingSplit.size());
        Iterator<Map.Entry<Integer, String>> awaitingReader = this.readersAwaitingSplit.entrySet().iterator();
        while (awaitingReader.hasNext()) {
            Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
            if (!this.enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
                awaitingReader.remove();
                continue;
            }
            int awaitingSubtask = nextAwaiting.getKey();
            String hostname = nextAwaiting.getValue();
            GetSplitResult getResult = this.assigner.getNext(hostname);
            if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
                LOG.info("Assign split to subtask {}: {}", (Object)awaitingSubtask, (Object)getResult.split());
                this.enumeratorContext.assignSplit((SourceSplit)getResult.split(), awaitingSubtask);
                awaitingReader.remove();
                continue;
            }
            if (getResult.status() == GetSplitResult.Status.CONSTRAINED) {
                this.getAvailableFutureIfNeeded();
                break;
            }
            if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) {
                if (this.shouldWaitForMoreSplits()) {
                    this.getAvailableFutureIfNeeded();
                    break;
                }
                LOG.info("No more splits available for subtask {}", (Object)awaitingSubtask);
                this.enumeratorContext.signalNoMoreSplits(awaitingSubtask);
                awaitingReader.remove();
                continue;
            }
            throw new IllegalArgumentException("Unsupported status: " + (Object)((Object)getResult.status()));
        }
    }

    protected abstract boolean shouldWaitForMoreSplits();

    private synchronized void getAvailableFutureIfNeeded() {
        if (this.availableFuture.get() != null) {
            return;
        }
        CompletionStage future = this.assigner.isAvailable().thenAccept(ignore -> this.enumeratorContext.runInCoordinatorThread(() -> {
            LOG.debug("Executing callback of assignSplits");
            this.availableFuture.set(null);
            this.assignSplits();
        }));
        this.availableFuture.set((CompletableFuture<Void>)future);
        LOG.debug("Registered callback for future available splits");
    }
}

