/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.flink.sink.cdc.TestCdcEvent;

public class TestCdcSourceFunction
extends RichParallelSourceFunction<TestCdcEvent>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private final LinkedList<TestCdcEvent> events;
    private volatile boolean isRunning = true;
    private transient int numRecordsPerCheckpoint;
    private transient AtomicInteger recordsThisCheckpoint;
    private transient ListState<Integer> remainingEventsCount;

    public TestCdcSourceFunction(Collection<TestCdcEvent> events) {
        this.events = new LinkedList<TestCdcEvent>(events);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.numRecordsPerCheckpoint = this.events.size() / ThreadLocalRandom.current().nextInt(10, 20) + 1;
        this.recordsThisCheckpoint = new AtomicInteger(0);
        this.remainingEventsCount = context.getOperatorStateStore().getListState(new ListStateDescriptor("count", Integer.class));
        if (context.isRestored()) {
            int count = 0;
            Iterator iterator = ((Iterable)this.remainingEventsCount.get()).iterator();
            while (iterator.hasNext()) {
                int c = (Integer)iterator.next();
                count += c;
            }
            while (this.events.size() > count) {
                this.events.poll();
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.recordsThisCheckpoint.set(0);
        this.remainingEventsCount.clear();
        this.remainingEventsCount.add((Object)this.events.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<TestCdcEvent> ctx) throws Exception {
        while (this.isRunning && !this.events.isEmpty()) {
            if (this.recordsThisCheckpoint.get() >= this.numRecordsPerCheckpoint) {
                Thread.sleep(10L);
                continue;
            }
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                TestCdcEvent event = this.events.poll();
                if (event.records() != null) {
                    int subtaskId = this.getRuntimeContext().getIndexOfThisSubtask();
                    int totalSubtasks = this.getRuntimeContext().getNumberOfParallelSubtasks();
                    if (Math.abs(event.hashCode()) % totalSubtasks != subtaskId) {
                        continue;
                    }
                }
                ctx.collect((Object)event);
                this.recordsThisCheckpoint.incrementAndGet();
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}

