/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitorSource
extends AbstractNonCoordinatedSource<Split> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);
    private final ReadBuilder readBuilder;
    private final long monitorInterval;
    private final boolean emitSnapshotWatermark;

    public MonitorSource(ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) {
        this.readBuilder = readBuilder;
        this.monitorInterval = monitorInterval;
        this.emitSnapshotWatermark = emitSnapshotWatermark;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<Split, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment env, String name, TypeInformation<RowData> typeInfo, ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark, boolean shuffleBucketWithPartition, BucketMode bucketMode, NestedProjectedRowData nestedProjectedRowData) {
        SingleOutputStreamOperator singleOutputStreamOperator = env.fromSource((Source)new MonitorSource(readBuilder, monitorInterval, emitSnapshotWatermark), WatermarkStrategy.noWatermarks(), name + "-Monitor", new JavaTypeInfo<Split>(Split.class)).forceNonParallel();
        DataStream<Split> sourceDataStream = bucketMode == BucketMode.BUCKET_UNAWARE ? MonitorSource.shuffleUnwareBucket((SingleOutputStreamOperator<Split>)singleOutputStreamOperator) : MonitorSource.shuffleNonUnwareBucket((SingleOutputStreamOperator<Split>)singleOutputStreamOperator, shuffleBucketWithPartition);
        return sourceDataStream.transform(name + "-Reader", typeInfo, (OneInputStreamOperator)new ReadOperator(readBuilder, nestedProjectedRowData));
    }

    private static DataStream<Split> shuffleUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
        return singleOutputStreamOperator.rebalance();
    }

    private static DataStream<Split> shuffleNonUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator, boolean shuffleBucketWithPartition) {
        return singleOutputStreamOperator.partitionCustom((Partitioner & Serializable)(key, numPartitions) -> {
            if (shuffleBucketWithPartition) {
                return ChannelComputer.select((BinaryRow)key.f0, (Integer)key.f1, numPartitions);
            }
            return ChannelComputer.select((Integer)key.f1, numPartitions);
        }, (KeySelector & Serializable)split -> {
            DataSplit dataSplit = (DataSplit)split;
            return Tuple2.of((Object)dataSplit.partition(), (Object)dataSplit.bucket());
        });
    }

    private class Reader
    extends AbstractNonCoordinatedSourceReader<Split> {
        private static final String CHECKPOINT_STATE = "CS";
        private static final String NEXT_SNAPSHOT_STATE = "NSS";
        private final StreamTableScan scan;
        private final SplitListState<Long> checkpointState;
        private final SplitListState<Tuple2<Long, Long>> nextSnapshotState;
        private final TreeMap<Long, Long> nextSnapshotPerCheckpoint;

        private Reader() {
            this.scan = MonitorSource.this.readBuilder.newStreamScan();
            this.checkpointState = new SplitListState<Long>(CHECKPOINT_STATE, x -> Long.toString(x), Long::parseLong);
            this.nextSnapshotState = new SplitListState<Tuple2>(NEXT_SNAPSHOT_STATE, x -> x.f0 + ":" + x.f1, x -> Tuple2.of((Object)Long.parseLong(x.split(":")[0]), (Object)Long.parseLong(x.split(":")[1])));
            this.nextSnapshotPerCheckpoint = new TreeMap();
        }

        public void notifyCheckpointComplete(long checkpointId) {
            NavigableMap<Long, Long> nextSnapshots = this.nextSnapshotPerCheckpoint.headMap(checkpointId, true);
            OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
            max.ifPresent(this.scan::notifyCheckpointComplete);
            nextSnapshots.clear();
        }

        @Override
        public List<SimpleSourceSplit> snapshotState(long checkpointId) {
            this.checkpointState.clear();
            Long nextSnapshot = this.scan.checkpoint();
            if (nextSnapshot != null) {
                this.checkpointState.add(nextSnapshot);
                this.nextSnapshotPerCheckpoint.put(checkpointId, nextSnapshot);
            }
            ArrayList nextSnapshots = new ArrayList();
            this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2(k, v)));
            this.nextSnapshotState.update(nextSnapshots);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} checkpoint {}.", (Object)this.getClass().getSimpleName(), (Object)nextSnapshot);
            }
            ArrayList<SimpleSourceSplit> results = new ArrayList<SimpleSourceSplit>();
            results.addAll(this.checkpointState.snapshotState());
            results.addAll(this.nextSnapshotState.snapshotState());
            return results;
        }

        @Override
        public void addSplits(List<SimpleSourceSplit> list) {
            LOG.info("Restoring state for the {}.", (Object)this.getClass().getSimpleName());
            this.checkpointState.restoreState(list);
            this.nextSnapshotState.restoreState(list);
            Object retrievedStates = this.checkpointState.get();
            Preconditions.checkArgument((retrievedStates.size() <= 1 ? 1 : 0) != 0, (Object)(this.getClass().getSimpleName() + " retrieved invalid state."));
            if (retrievedStates.size() == 1) {
                this.scan.restore((Long)retrievedStates.get(0));
            }
            Iterator iterator2 = this.nextSnapshotState.get().iterator();
            while (iterator2.hasNext()) {
                Tuple2 tuple2 = (Tuple2)iterator2.next();
                this.nextSnapshotPerCheckpoint.put((Long)tuple2.f0, (Long)tuple2.f1);
            }
        }

        public InputStatus pollNext(ReaderOutput<Split> readerOutput) throws Exception {
            boolean isEmpty;
            try {
                Long watermark;
                List<Split> splits = this.scan.plan().splits();
                isEmpty = splits.isEmpty();
                splits.forEach(arg_0 -> readerOutput.collect(arg_0));
                if (MonitorSource.this.emitSnapshotWatermark && (watermark = this.scan.watermark()) != null) {
                    readerOutput.emitWatermark(new Watermark(watermark.longValue()));
                }
            }
            catch (EndOfScanException esf) {
                LOG.info("Catching EndOfStreamException, the stream is finished.");
                return InputStatus.END_OF_INPUT;
            }
            if (isEmpty) {
                Thread.sleep(MonitorSource.this.monitorInterval);
            }
            return InputStatus.MORE_AVAILABLE;
        }
    }
}

