/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public final class BoundedTestSource<T>
implements SourceFunction<T>,
CheckpointListener {
    private final List<List<T>> elementsPerCheckpoint;
    private final boolean checkpointEnabled;
    private volatile boolean running = true;
    private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0);

    public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean checkpointEnabled) {
        this.elementsPerCheckpoint = elementsPerCheckpoint;
        this.checkpointEnabled = checkpointEnabled;
    }

    public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
        this(elementsPerCheckpoint, true);
    }

    public BoundedTestSource(T ... elements) {
        this(Collections.singletonList(Arrays.asList(elements)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        if (!this.checkpointEnabled) {
            Preconditions.checkArgument((this.elementsPerCheckpoint.size() <= 1 ? 1 : 0) != 0, (Object)"There should be at most one list in the elementsPerCheckpoint when checkpoint is disabled.");
            this.elementsPerCheckpoint.stream().flatMap(Collection::stream).forEach(arg_0 -> ctx.collect(arg_0));
            return;
        }
        for (List<T> elements : this.elementsPerCheckpoint) {
            int checkpointToAwait;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                checkpointToAwait = this.numCheckpointsComplete.get() + 2;
                for (T element : elements) {
                    ctx.collect(element);
                }
            }
            object = ctx.getCheckpointLock();
            synchronized (object) {
                while (this.running && this.numCheckpointsComplete.get() < checkpointToAwait) {
                    ctx.getCheckpointLock().wait(1L);
                }
            }
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.numCheckpointsComplete.incrementAndGet();
    }

    public void cancel() {
        this.running = false;
    }
}

