/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.utils.Preconditions;

public class FiniteTestSource<T>
extends AbstractNonCoordinatedSource<T> {
    private static final long serialVersionUID = 1L;
    private final List<T> elements;
    private final boolean emitOnce;

    public FiniteTestSource(List<T> elements, boolean emitOnce) {
        this.elements = elements;
        this.emitOnce = emitOnce;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SourceReader<T, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader(this.elements, this.emitOnce);
    }

    private static class Reader<T>
    extends AbstractNonCoordinatedSourceReader<T> {
        private final List<T> elements;
        private final boolean emitOnce;
        private final SplitListState<Integer> checkpointedState = new SplitListState("emit-times", x -> Integer.toString(x), Integer::parseInt);
        private int numTimesEmitted = 0;
        private int numCheckpointsComplete;
        private Integer checkpointToAwait;

        private Reader(List<T> elements, boolean emitOnce) {
            this.elements = elements;
            this.emitOnce = emitOnce;
            this.numCheckpointsComplete = 0;
        }

        public synchronized InputStatus pollNext(ReaderOutput<T> readerOutput) {
            if (this.checkpointToAwait == null) {
                this.checkpointToAwait = this.numCheckpointsComplete + 2;
            }
            switch (this.numTimesEmitted) {
                case 0: {
                    this.emitElements(readerOutput, false);
                    if (this.numCheckpointsComplete < this.checkpointToAwait) {
                        return InputStatus.MORE_AVAILABLE;
                    }
                    this.emitElements(readerOutput, true);
                    if (this.numCheckpointsComplete >= this.checkpointToAwait + 2) break;
                    return InputStatus.MORE_AVAILABLE;
                }
                case 1: {
                    this.emitElements(readerOutput, true);
                    if (this.numCheckpointsComplete >= this.checkpointToAwait) break;
                    return InputStatus.MORE_AVAILABLE;
                }
                case 2: {
                    if (this.numCheckpointsComplete >= this.checkpointToAwait) break;
                    return InputStatus.MORE_AVAILABLE;
                }
            }
            return InputStatus.END_OF_INPUT;
        }

        public void addSplits(List<SimpleSourceSplit> list) {
            this.checkpointedState.restoreState(list);
            ArrayList<Integer> retrievedStates = new ArrayList<Integer>();
            for (Integer entry : this.checkpointedState.get()) {
                retrievedStates.add(entry);
            }
            Preconditions.checkArgument((retrievedStates.size() == 1 ? 1 : 0) != 0, (Object)(((Object)((Object)this)).getClass().getSimpleName() + " retrieved invalid state."));
            this.numTimesEmitted = (Integer)retrievedStates.get(0);
            Preconditions.checkArgument((this.numTimesEmitted <= 2 ? 1 : 0) != 0, (Object)(((Object)((Object)this)).getClass().getSimpleName() + " retrieved invalid numTimesEmitted: " + this.numTimesEmitted));
        }

        public List<SimpleSourceSplit> snapshotState(long l) {
            this.checkpointedState.clear();
            this.checkpointedState.add((Object)this.numTimesEmitted);
            return this.checkpointedState.snapshotState();
        }

        public void notifyCheckpointComplete(long checkpointId) {
            ++this.numCheckpointsComplete;
        }

        private void emitElements(ReaderOutput<T> readerOutput, boolean isSecond) {
            if (!isSecond || !this.emitOnce) {
                for (T t : this.elements) {
                    readerOutput.collect(t);
                }
            }
            ++this.numTimesEmitted;
        }
    }
}

