/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.source.split.assigner;

import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoveryScheduler;
import com.google.cloud.flink.bigquery.source.split.assigner.BoundedSplitAssigner;
import com.google.cloud.flink.bigquery.source.split.assigner.UnboundedSplitAssigner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.annotation.Internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class BigQuerySourceSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitAssigner.class);
    protected final BigQueryReadOptions readOptions;
    protected final Set<String> lastSeenPartitions;
    protected final Queue<String> remainingTableStreams;
    protected final Queue<String> alreadyProcessedTableStreams;
    protected final Queue<BigQuerySourceSplit> remainingSourceSplits;
    protected final Map<String, BigQuerySourceSplit> assignedSourceSplits;
    protected boolean initialized;

    public static BigQuerySourceSplitAssigner createBounded(BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        return new BoundedSplitAssigner(readOptions, sourceEnumState);
    }

    public static BigQuerySourceSplitAssigner createUnbounded(SplitDiscoveryScheduler observer, BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        return new UnboundedSplitAssigner(observer, readOptions, sourceEnumState);
    }

    BigQuerySourceSplitAssigner(BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) {
        this.readOptions = readOptions;
        this.lastSeenPartitions = ConcurrentHashMap.newKeySet();
        this.lastSeenPartitions.addAll(sourceEnumState.getLastSeenPartitions());
        this.remainingTableStreams = new ConcurrentLinkedQueue<String>(sourceEnumState.getRemaniningTableStreams());
        this.alreadyProcessedTableStreams = new ConcurrentLinkedQueue<String>(sourceEnumState.getCompletedTableStreams());
        this.remainingSourceSplits = new ConcurrentLinkedQueue<BigQuerySourceSplit>(sourceEnumState.getRemainingSourceSplits());
        this.assignedSourceSplits = new ConcurrentHashMap<String, BigQuerySourceSplit>();
        this.assignedSourceSplits.putAll(sourceEnumState.getAssignedSourceSplits());
        this.initialized = sourceEnumState.isInitialized();
    }

    public abstract void discoverSplits();

    public void openAndDiscoverSplits() {
        LOG.info("BigQuery source split assigner is opening.");
        if (!this.initialized) {
            this.discoverSplits();
            this.initialized = true;
        }
    }

    public void addSplitsBack(List<BigQuerySourceSplit> splits) {
        for (BigQuerySourceSplit split : splits) {
            this.remainingSourceSplits.add(split);
            this.assignedSourceSplits.remove(split.splitId());
        }
    }

    public BigQuerySourceEnumState snapshotState(long checkpointId) {
        return new BigQuerySourceEnumState(new ArrayList<String>(this.lastSeenPartitions), new ArrayList<String>(this.remainingTableStreams), new ArrayList<String>(this.alreadyProcessedTableStreams), new ArrayList<BigQuerySourceSplit>(this.remainingSourceSplits), new HashMap<String, BigQuerySourceSplit>(this.assignedSourceSplits), this.initialized);
    }

    public void close() {
        LOG.info("BigQuery source split assigner is closed.");
    }

    public Optional<BigQuerySourceSplit> getNext() {
        if (!this.remainingSourceSplits.isEmpty()) {
            BigQuerySourceSplit split = this.remainingSourceSplits.poll();
            this.assignedSourceSplits.put(split.splitId(), split);
            return Optional.of(split);
        }
        String nextStream = this.remainingTableStreams.poll();
        if (nextStream != null) {
            BigQuerySourceSplit split = new BigQuerySourceSplit(nextStream);
            this.remainingSourceSplits.add(split);
            this.alreadyProcessedTableStreams.add(nextStream);
            return this.getNext();
        }
        return Optional.empty();
    }

    public abstract boolean noMoreSplits();
}

