/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitorFunction
extends RichSourceFunction<Split>
implements CheckpointedFunction,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class);
    private final ReadBuilder readBuilder;
    private final long monitorInterval;
    private final boolean emitSnapshotWatermark;
    private volatile boolean isRunning = true;
    private transient StreamTableScan scan;
    private transient SourceFunction.SourceContext<Split> ctx;
    private transient ListState<Long> checkpointState;
    private transient ListState<Tuple2<Long, Long>> nextSnapshotState;
    private transient TreeMap<Long, Long> nextSnapshotPerCheckpoint;

    public MonitorFunction(ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) {
        this.readBuilder = readBuilder;
        this.monitorInterval = monitorInterval;
        this.emitSnapshotWatermark = emitSnapshotWatermark;
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.scan = this.readBuilder.newStreamScan();
        this.checkpointState = context.getOperatorStateStore().getListState(new ListStateDescriptor("next-snapshot", (TypeSerializer)LongSerializer.INSTANCE));
        Class<Tuple2> typedTuple = Tuple2.class;
        this.nextSnapshotState = context.getOperatorStateStore().getListState(new ListStateDescriptor("next-snapshot-per-checkpoint", (TypeSerializer)new TupleSerializer(typedTuple, new TypeSerializer[]{LongSerializer.INSTANCE, LongSerializer.INSTANCE})));
        this.nextSnapshotPerCheckpoint = new TreeMap();
        if (context.isRestored()) {
            LOG.info("Restoring state for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
            ArrayList<Long> retrievedStates = new ArrayList<Long>();
            for (Long entry : (Iterable)this.checkpointState.get()) {
                retrievedStates.add(entry);
            }
            Preconditions.checkArgument((retrievedStates.size() <= 1 ? 1 : 0) != 0, (Object)(((Object)((Object)this)).getClass().getSimpleName() + " retrieved invalid state."));
            if (retrievedStates.size() == 1) {
                this.scan.restore((Long)retrievedStates.get(0));
            }
            for (Tuple2 tuple2 : (Iterable)this.nextSnapshotState.get()) {
                this.nextSnapshotPerCheckpoint.put((Long)tuple2.f0, (Long)tuple2.f1);
            }
        } else {
            LOG.info("No state to restore for the {}.", (Object)((Object)((Object)this)).getClass().getSimpleName());
        }
    }

    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        this.checkpointState.clear();
        Long nextSnapshot = this.scan.checkpoint();
        if (nextSnapshot != null) {
            this.checkpointState.add((Object)nextSnapshot);
            this.nextSnapshotPerCheckpoint.put(ctx.getCheckpointId(), nextSnapshot);
        }
        ArrayList nextSnapshots = new ArrayList();
        this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2(k, v)));
        this.nextSnapshotState.update(nextSnapshots);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpoint {}.", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)nextSnapshot);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<Split> ctx) throws Exception {
        this.ctx = ctx;
        while (this.isRunning) {
            boolean isEmpty;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                if (!this.isRunning) {
                    return;
                }
                try {
                    Long watermark;
                    List<Split> splits = this.scan.plan().splits();
                    isEmpty = splits.isEmpty();
                    splits.forEach(arg_0 -> ctx.collect(arg_0));
                    if (this.emitSnapshotWatermark && (watermark = this.scan.watermark()) != null) {
                        ctx.emitWatermark(new Watermark(watermark.longValue()));
                    }
                }
                catch (EndOfScanException esf) {
                    LOG.info("Catching EndOfStreamException, the stream is finished.");
                    return;
                }
            }
            if (!isEmpty) continue;
            Thread.sleep(this.monitorInterval);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
        NavigableMap<Long, Long> nextSnapshots = this.nextSnapshotPerCheckpoint.headMap(checkpointId, true);
        OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
        max.ifPresent(this.scan::notifyCheckpointComplete);
        nextSnapshots.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.ctx != null) {
            Object object = this.ctx.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment env, String name, TypeInformation<RowData> typeInfo, ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) {
        return env.addSource((SourceFunction)new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark), name + "-Monitor", new JavaTypeInfo<Split>(Split.class)).forceNonParallel().partitionCustom((Partitioner & Serializable)(key, numPartitions) -> key % numPartitions, (KeySelector & Serializable)split -> ((DataSplit)split).bucket()).transform(name + "-Reader", typeInfo, (OneInputStreamOperator)new ReadOperator(readBuilder));
    }
}

