/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.FlinkSplitGenerator;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingMonitorFunction
extends RichSourceFunction<FlinkInputSplit>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class);
    private static final long INIT_LAST_SNAPSHOT_ID = -1L;
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private volatile boolean isRunning = true;
    private volatile long lastSnapshotId = -1L;
    private transient SourceFunction.SourceContext<FlinkInputSplit> sourceContext;
    private transient Table table;
    private transient ListState<Long> lastSnapshotIdState;

    public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) {
        Preconditions.checkArgument(scanContext.snapshotId() == null, "Cannot set snapshot-id option for streaming reader");
        Preconditions.checkArgument(scanContext.asOfTimestamp() == null, "Cannot set as-of-timestamp option for streaming reader");
        Preconditions.checkArgument(scanContext.endSnapshotId() == null, "Cannot set end-snapshot-id option for streaming reader");
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.lastSnapshotIdState = context.getOperatorStateStore().getListState(new ListStateDescriptor("snapshot-id-state", (TypeSerializer)LongSerializer.INSTANCE));
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
            this.lastSnapshotId = (Long)((Iterable)this.lastSnapshotIdState.get()).iterator().next();
        } else if (this.scanContext.startSnapshotId() != null) {
            Preconditions.checkNotNull(this.table.currentSnapshot(), "Don't have any available snapshot in table.");
            long currentSnapshotId = this.table.currentSnapshot().snapshotId();
            Preconditions.checkState(SnapshotUtil.isAncestorOf(this.table, currentSnapshotId, this.scanContext.startSnapshotId()), "The option start-snapshot-id %s is not an ancestor of the current snapshot.", (Object)this.scanContext.startSnapshotId());
            this.lastSnapshotId = this.scanContext.startSnapshotId();
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.lastSnapshotIdState.clear();
        this.lastSnapshotIdState.add((Object)this.lastSnapshotId);
    }

    public void run(SourceFunction.SourceContext<FlinkInputSplit> ctx) throws Exception {
        this.sourceContext = ctx;
        while (this.isRunning) {
            this.monitorAndForwardSplits();
            Thread.sleep(this.scanContext.monitorInterval().toMillis());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void monitorAndForwardSplits() {
        this.table.refresh();
        Snapshot snapshot = this.table.currentSnapshot();
        if (snapshot != null && snapshot.snapshotId() != this.lastSnapshotId) {
            long snapshotId = snapshot.snapshotId();
            ScanContext newScanContext = this.lastSnapshotId == -1L ? this.scanContext.copyWithSnapshotId(snapshotId) : this.scanContext.copyWithAppendsBetween(this.lastSnapshotId, snapshotId);
            FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(this.table, newScanContext);
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                for (FlinkInputSplit split : splits) {
                    this.sourceContext.collect((Object)split);
                }
                this.lastSnapshotId = snapshotId;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.sourceContext != null) {
            Object object = this.sourceContext.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
        if (this.tableLoader != null) {
            try {
                this.tableLoader.close();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    public void close() {
        this.cancel();
    }
}

