/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.source.enumerator;

import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoveryScheduler;
import com.google.cloud.flink.bigquery.source.split.assigner.BigQuerySourceSplitAssigner;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class BigQuerySourceEnumerator
implements SplitEnumerator<BigQuerySourceSplit, BigQuerySourceEnumState>,
SplitDiscoveryScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceEnumerator.class);
    private final Boundedness boundedness;
    private final SplitEnumeratorContext<BigQuerySourceSplit> context;
    private final BigQuerySourceSplitAssigner splitAssigner;
    private final TreeSet<Integer> readersAwaitingSplit;

    public BigQuerySourceEnumerator(Boundedness boundedness, SplitEnumeratorContext<BigQuerySourceSplit> context, BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        this.boundedness = boundedness;
        this.context = context;
        this.splitAssigner = this.createBigQuerySourceSplitAssigner(readOptions, sourceEnumState);
        this.readersAwaitingSplit = new TreeSet();
    }

    final BigQuerySourceSplitAssigner createBigQuerySourceSplitAssigner(BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        switch (this.boundedness) {
            case BOUNDED: {
                return BigQuerySourceSplitAssigner.createBounded(readOptions, sourceEnumState);
            }
            case CONTINUOUS_UNBOUNDED: {
                return BigQuerySourceSplitAssigner.createUnbounded(this, readOptions, sourceEnumState);
            }
        }
        throw new IllegalArgumentException("Non supported boundedness: " + this.boundedness);
    }

    public void start() {
        this.splitAssigner.openAndDiscoverSplits();
    }

    public void handleSplitRequest(int subtaskId, String requesterHostname) {
        if (!this.context.registeredReaders().containsKey(subtaskId)) {
            return;
        }
        this.readersAwaitingSplit.add(subtaskId);
        this.assignSplits();
    }

    public void addSplitsBack(List<BigQuerySourceSplit> splits, int subtaskId) {
        LOG.debug("BigQuery Source Enumerator adds splits back: {}", splits);
        this.splitAssigner.addSplitsBack(splits);
    }

    public void addReader(int subtaskId) {
        LOG.debug("Adding reader {} to BigQuerySourceEnumerator.", (Object)subtaskId);
    }

    public BigQuerySourceEnumState snapshotState(long checkpointId) throws Exception {
        BigQuerySourceEnumState state = this.splitAssigner.snapshotState(checkpointId);
        LOG.debug("Checkpointing state {}", (Object)state);
        return state;
    }

    public void close() throws IOException {
        this.splitAssigner.close();
    }

    private void assignSplits() {
        Iterator<Integer> awaitingReader = this.readersAwaitingSplit.iterator();
        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            if (!this.context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }
            Optional<BigQuerySourceSplit> split = this.splitAssigner.getNext();
            if (split.isPresent()) {
                BigQuerySourceSplit bqSplit = split.get();
                this.context.assignSplit((SourceSplit)bqSplit, nextAwaiting);
                awaitingReader.remove();
                LOG.info("Assign split {} to subtask {}", (Object)bqSplit, (Object)nextAwaiting);
                break;
            }
            if (this.splitAssigner.noMoreSplits() && this.boundedness == Boundedness.BOUNDED) {
                LOG.info("All splits have been assigned");
                this.context.registeredReaders().keySet().forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
                break;
            }
            LOG.info("All splits have been assigned, will check later on.");
            break;
        }
    }

    public <T> void schedule(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelayMillis, long periodMillis) {
        this.context.callAsync(callable, handler, initialDelayMillis, periodMillis);
    }

    public void notifySplits() {
        this.assignSplits();
    }
}

