/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.align;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.paimon.flink.source.ContinuousFileSplitEnumerator;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.align.CheckpointEvent;
import org.apache.paimon.flink.source.align.PlaceholderSplit;
import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlignedContinuousFileSplitEnumerator
extends ContinuousFileSplitEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(AlignedContinuousFileSplitEnumerator.class);
    private static final String PLACEHOLDER_SPLIT = "placeholder";
    private static final int MAX_PENDING_PLAN = 10;
    private final ArrayBlockingQueue<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> pendingPlans = new ArrayBlockingQueue(10);
    private final AlignedSplitAssigner alignedAssigner = (AlignedSplitAssigner)this.splitAssigner;
    private final long alignTimeout;
    private final Object lock;
    private long currentCheckpointId;
    private Long lastConsumedSnapshotId;
    private boolean closed;

    public AlignedContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, Collection<FileStoreSourceSplit> remainSplits, @Nullable Long nextSnapshotId, long discoveryInterval, StreamTableScan scan, BucketMode bucketMode, long alignTimeout, int splitPerTaskMax, boolean shuffleBucketWithPartition) {
        super(context, remainSplits, nextSnapshotId, discoveryInterval, scan, bucketMode, splitPerTaskMax, shuffleBucketWithPartition);
        this.nextSnapshotId = nextSnapshotId;
        this.alignTimeout = alignTimeout;
        this.lock = new Object();
        this.currentCheckpointId = Long.MIN_VALUE;
        this.lastConsumedSnapshotId = null;
        this.closed = false;
    }

    @Override
    protected void addSplits(Collection<FileStoreSourceSplit> splits) {
        TreeMap<Long, List> splitsBySnapshot = new TreeMap<Long, List>();
        for (FileStoreSourceSplit split : splits) {
            long snapshotId = ((DataSplit)split.split()).snapshotId();
            splitsBySnapshot.computeIfAbsent(snapshotId, snapshot -> new ArrayList()).add(split);
        }
        for (List previousSplits : splitsBySnapshot.values()) {
            Map<Integer, List<FileStoreSourceSplit>> subtaskSplits = this.computeForBucket(previousSplits);
            subtaskSplits.forEach((subtask, taskSplits) -> taskSplits.forEach(split -> this.splitAssigner.addSplit((int)subtask, (FileStoreSourceSplit)split)));
        }
    }

    private Map<Integer, List<FileStoreSourceSplit>> computeForBucket(Collection<FileStoreSourceSplit> splits) {
        HashMap<Integer, List<FileStoreSourceSplit>> subtaskSplits = new HashMap<Integer, List<FileStoreSourceSplit>>();
        for (FileStoreSourceSplit split : splits) {
            subtaskSplits.computeIfAbsent(this.assignSuggestedTask(split), subtask -> new ArrayList()).add(split);
        }
        return subtaskSplits;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        this.closed = true;
        Object object = this.lock;
        synchronized (object) {
            this.lock.notifyAll();
        }
    }

    @Override
    public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
        super.addSplitsBack(splits, subtaskId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception {
        if (!this.alignedAssigner.isAligned() && !this.closed) {
            Object object = this.lock;
            synchronized (object) {
                if (this.pendingPlans.isEmpty()) {
                    this.lock.wait(this.alignTimeout);
                    Preconditions.checkArgument(!this.closed, "Enumerator has been closed.");
                    Preconditions.checkArgument(!this.pendingPlans.isEmpty(), "Timeout while waiting for snapshot from paimon source.");
                }
            }
            ContinuousFileSplitEnumerator.PlanWithNextSnapshotId pendingPlan = this.pendingPlans.poll();
            this.addSplits(this.splitGenerator.createSplits(Objects.requireNonNull(pendingPlan).plan()));
            this.nextSnapshotId = pendingPlan.nextSnapshotId();
            this.assignSplits();
        }
        Preconditions.checkArgument(this.alignedAssigner.isAligned());
        this.lastConsumedSnapshotId = this.alignedAssigner.getNextSnapshotId(0).orElse(null);
        this.alignedAssigner.removeFirst();
        this.currentCheckpointId = checkpointId;
        CheckpointEvent event = new CheckpointEvent(checkpointId);
        for (int i = 0; i < this.context.currentParallelism(); ++i) {
            this.context.sendEventToSourceReader(i, (SourceEvent)event);
        }
        return new PendingSplitsCheckpoint(this.alignedAssigner.remainingSplits(), this.nextSnapshotId);
    }

    public void notifyCheckpointAborted(long checkpointId) {
        if (this.currentCheckpointId == checkpointId) {
            throw new FlinkRuntimeException("Checkpoint failure is not allowed in aligned mode.");
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.currentCheckpointId = Long.MIN_VALUE;
        Long nextSnapshot = this.lastConsumedSnapshotId == null ? null : Long.valueOf(this.lastConsumedSnapshotId + 1L);
        this.scan.notifyCheckpointComplete(nextSnapshot);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> scanNextSnapshot() {
        ContinuousFileSplitEnumerator.PlanWithNextSnapshotId scannedPlan;
        Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> scannedPlanOptional;
        if (this.pendingPlans.remainingCapacity() > 0 && (scannedPlanOptional = super.scanNextSnapshot()).isPresent() && !((scannedPlan = scannedPlanOptional.get()).plan() instanceof SnapshotNotExistPlan)) {
            Object object = this.lock;
            synchronized (object) {
                this.pendingPlans.add(scannedPlan);
                this.lock.notifyAll();
            }
        }
        return Optional.empty();
    }

    @Override
    protected void processDiscoveredSplits(Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> ignore, Throwable error) {
        if (error != null) {
            if (error instanceof EndOfScanException) {
                LOG.debug("Catching EndOfStreamException, the stream is finished.");
                this.finished = true;
            } else {
                LOG.error("Failed to enumerate files", error);
                throw new RuntimeException(error);
            }
        }
        if (this.alignedAssigner.remainingSnapshots() >= 10) {
            this.assignSplits();
            return;
        }
        ContinuousFileSplitEnumerator.PlanWithNextSnapshotId nextPlan = this.pendingPlans.poll();
        if (nextPlan != null) {
            this.nextSnapshotId = nextPlan.nextSnapshotId();
            Objects.requireNonNull(this.nextSnapshotId);
            TableScan.Plan plan = nextPlan.plan();
            if (plan.splits().isEmpty()) {
                this.addSplits(Collections.singletonList(new FileStoreSourceSplit(PLACEHOLDER_SPLIT, new PlaceholderSplit(this.nextSnapshotId - 1L))));
            } else {
                this.addSplits(this.splitGenerator.createSplits(plan));
            }
        }
        this.assignSplits();
    }

    @Override
    protected boolean noMoreSplits() {
        return super.noMoreSplits() && this.alignedAssigner.remainingSnapshots() == 0 && this.pendingPlans.isEmpty();
    }

    @Override
    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
        return new AlignedSplitAssigner();
    }
}

