/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingMonitorFunction
extends RichSourceFunction<FlinkInputSplit>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class);
    private static final long INIT_LAST_SNAPSHOT_ID = -1L;
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private volatile boolean isRunning = true;
    private volatile long lastSnapshotId = -1L;
    private transient SourceFunction.SourceContext<FlinkInputSplit> sourceContext;
    private transient Table table;
    private transient ListState<Long> lastSnapshotIdState;
    private transient ExecutorService workerPool;

    public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) {
        Preconditions.checkArgument(scanContext.snapshotId() == null, "Cannot set snapshot-id option for streaming reader");
        Preconditions.checkArgument(scanContext.asOfTimestamp() == null, "Cannot set as-of-timestamp option for streaming reader");
        Preconditions.checkArgument(scanContext.endSnapshotId() == null, "Cannot set end-snapshot-id option for streaming reader");
        Preconditions.checkArgument(scanContext.endTag() == null, "Cannot set end-tag option for streaming reader");
        Preconditions.checkArgument(scanContext.maxPlanningSnapshotCount() > 0, "The max-planning-snapshot-count must be greater than zero");
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext runtimeContext = this.getRuntimeContext();
        ValidationException.check(runtimeContext instanceof StreamingRuntimeContext, "context should be instance of StreamingRuntimeContext", new Object[0]);
        String operatorID = ((StreamingRuntimeContext)runtimeContext).getOperatorUniqueID();
        this.workerPool = ThreadPools.newFixedThreadPool("iceberg-worker-pool-" + operatorID, this.scanContext.planParallelism());
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.lastSnapshotIdState = context.getOperatorStateStore().getListState(new ListStateDescriptor("snapshot-id-state", (TypeSerializer)LongSerializer.INSTANCE));
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
            this.lastSnapshotId = (Long)((Iterable)this.lastSnapshotIdState.get()).iterator().next();
        } else if (this.scanContext.startTag() != null || this.scanContext.startSnapshotId() != null) {
            long startSnapshotId;
            Preconditions.checkArgument(this.scanContext.startTag() == null || this.scanContext.startSnapshotId() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set.");
            Preconditions.checkNotNull(this.table.currentSnapshot(), "Don't have any available snapshot in table.");
            if (this.scanContext.startTag() != null) {
                Preconditions.checkArgument(this.table.snapshot(this.scanContext.startTag()) != null, "Cannot find snapshot with tag %s in table.", (Object)this.scanContext.startTag());
                startSnapshotId = this.table.snapshot(this.scanContext.startTag()).snapshotId();
            } else {
                startSnapshotId = this.scanContext.startSnapshotId();
            }
            long currentSnapshotId = this.table.currentSnapshot().snapshotId();
            Preconditions.checkState(SnapshotUtil.isAncestorOf(this.table, currentSnapshotId, startSnapshotId), "The option start-snapshot-id %s is not an ancestor of the current snapshot.", startSnapshotId);
            this.lastSnapshotId = startSnapshotId;
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.lastSnapshotIdState.clear();
        this.lastSnapshotIdState.add((Object)this.lastSnapshotId);
    }

    public void run(SourceFunction.SourceContext<FlinkInputSplit> ctx) throws Exception {
        this.sourceContext = ctx;
        while (this.isRunning) {
            this.monitorAndForwardSplits();
            Thread.sleep(this.scanContext.monitorInterval().toMillis());
        }
    }

    private long toSnapshotIdInclusive(long lastConsumedSnapshotId, long currentSnapshotId, int maxPlanningSnapshotCount) {
        List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(this.table, lastConsumedSnapshotId, currentSnapshotId);
        if (snapshotIds.size() <= maxPlanningSnapshotCount) {
            return currentSnapshotId;
        }
        return snapshotIds.get(snapshotIds.size() - maxPlanningSnapshotCount);
    }

    @VisibleForTesting
    void sourceContext(SourceFunction.SourceContext<FlinkInputSplit> ctx) {
        this.sourceContext = ctx;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void monitorAndForwardSplits() {
        Snapshot snapshot;
        this.table.refresh();
        Snapshot snapshot2 = snapshot = this.scanContext.branch() != null ? this.table.snapshot(this.scanContext.branch()) : this.table.currentSnapshot();
        if (snapshot != null && snapshot.snapshotId() != this.lastSnapshotId) {
            ScanContext newScanContext;
            long snapshotId = snapshot.snapshotId();
            if (this.lastSnapshotId == -1L) {
                newScanContext = this.scanContext.copyWithSnapshotId(snapshotId);
            } else {
                snapshotId = this.toSnapshotIdInclusive(this.lastSnapshotId, snapshotId, this.scanContext.maxPlanningSnapshotCount());
                newScanContext = this.scanContext.copyWithAppendsBetween(this.lastSnapshotId, snapshotId);
            }
            LOG.debug("Start discovering splits from {} (exclusive) to {} (inclusive)", (Object)this.lastSnapshotId, (Object)snapshotId);
            long start = System.currentTimeMillis();
            FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(this.table, newScanContext, this.workerPool);
            LOG.debug("Discovered {} splits, time elapsed {}ms", (Object)splits.length, (Object)(System.currentTimeMillis() - start));
            start = System.currentTimeMillis();
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                for (FlinkInputSplit split : splits) {
                    this.sourceContext.collect((Object)split);
                }
                this.lastSnapshotId = snapshotId;
            }
            LOG.debug("Forwarded {} splits, time elapsed {}ms", (Object)splits.length, (Object)(System.currentTimeMillis() - start));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.sourceContext != null) {
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
        if (this.tableLoader != null) {
            try {
                this.tableLoader.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    public void close() {
        this.cancel();
        if (this.workerPool != null) {
            this.workerPool.shutdown();
        }
    }
}

