/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.AbstractInnerTableScan;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InnerStreamTableScanImpl
extends AbstractInnerTableScan
implements InnerStreamTableScan {
    private static final Logger LOG = LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
    private final CoreOptions options;
    private final SnapshotManager snapshotManager;
    private final boolean supportStreamingReadOverwrite;
    private StartingScanner startingScanner;
    private FollowUpScanner followUpScanner;
    private BoundedChecker boundedChecker;
    private boolean isFullPhaseEnd = false;
    @Nullable
    private Long nextSnapshotId;

    public InnerStreamTableScanImpl(CoreOptions options, SnapshotSplitReader snapshotSplitReader, SnapshotManager snapshotManager, boolean supportStreamingReadOverwrite) {
        super(options, snapshotSplitReader);
        this.options = options;
        this.snapshotManager = snapshotManager;
        this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
    }

    @Override
    public InnerStreamTableScanImpl withFilter(Predicate predicate) {
        this.snapshotSplitReader.withFilter(predicate);
        return this;
    }

    @Override
    public TableScan.Plan plan() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(true);
        }
        if (this.followUpScanner == null) {
            this.followUpScanner = this.createFollowUpScanner();
        }
        if (this.boundedChecker == null) {
            this.boundedChecker = this.createBoundedChecker();
        }
        if (this.nextSnapshotId == null) {
            return this.tryFirstPlan();
        }
        return this.nextPlan();
    }

    private TableScan.Plan tryFirstPlan() {
        StartingScanner.Result result = this.startingScanner.scan(this.snapshotManager, this.snapshotSplitReader);
        if (result instanceof StartingScanner.ScannedResult) {
            long currentSnapshotId = ((StartingScanner.ScannedResult)result).currentSnapshotId();
            this.nextSnapshotId = currentSnapshotId + 1L;
            this.isFullPhaseEnd = this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(currentSnapshotId));
        } else if (result instanceof StartingScanner.NextSnapshot) {
            this.nextSnapshotId = ((StartingScanner.NextSnapshot)result).nextSnapshotId();
            this.isFullPhaseEnd = this.snapshotManager.snapshotExists(this.nextSnapshotId - 1L) && this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(this.nextSnapshotId - 1L));
        }
        return DataFilePlan.fromResult(result);
    }

    private TableScan.Plan nextPlan() {
        while (true) {
            Long l;
            if (this.isFullPhaseEnd) {
                throw new EndOfScanException();
            }
            if (!this.snapshotManager.snapshotExists(this.nextSnapshotId)) {
                LOG.debug("Next snapshot id {} does not exist, wait for the snapshot generation.", (Object)this.nextSnapshotId);
                return new DataFilePlan(Collections.emptyList());
            }
            Snapshot snapshot = this.snapshotManager.snapshot(this.nextSnapshotId);
            if (this.boundedChecker.shouldEndInput(snapshot)) {
                throw new EndOfScanException();
            }
            if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE && this.supportStreamingReadOverwrite) {
                LOG.debug("Find overwrite snapshot id {}.", (Object)this.nextSnapshotId);
                TableScan.Plan overwritePlan = this.followUpScanner.getOverwriteChangesPlan(this.nextSnapshotId, this.snapshotSplitReader);
                l = this.nextSnapshotId;
                Long l2 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return overwritePlan;
            }
            if (this.followUpScanner.shouldScanSnapshot(snapshot)) {
                LOG.debug("Find snapshot id {}.", (Object)this.nextSnapshotId);
                TableScan.Plan plan = this.followUpScanner.scan(this.nextSnapshotId, this.snapshotSplitReader);
                l = this.nextSnapshotId;
                Long l3 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return plan;
            }
            Long l4 = this.nextSnapshotId;
            l = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
        }
    }

    private FollowUpScanner createFollowUpScanner() {
        FollowUpScanner followUpScanner;
        if (this.options.toConfiguration().get(CoreOptions.STREAMING_COMPACT).booleanValue()) {
            return new ContinuousCompactorFollowUpScanner();
        }
        CoreOptions.ChangelogProducer changelogProducer = this.options.changelogProducer();
        switch (changelogProducer) {
            case NONE: {
                followUpScanner = new DeltaFollowUpScanner();
                break;
            }
            case INPUT: {
                followUpScanner = new InputChangelogFollowUpScanner();
                break;
            }
            case FULL_COMPACTION: {
                this.snapshotSplitReader.withLevelFilter(level -> level == this.options.numLevels() - 1);
                followUpScanner = new CompactionChangelogFollowUpScanner();
                break;
            }
            case LOOKUP: {
                this.snapshotSplitReader.withLevelFilter(level -> level > 0);
                followUpScanner = new CompactionChangelogFollowUpScanner();
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown changelog producer " + changelogProducer.name());
            }
        }
        return followUpScanner;
    }

    private BoundedChecker createBoundedChecker() {
        Long boundedWatermark = this.options.scanBoundedWatermark();
        return boundedWatermark != null ? BoundedChecker.watermark(boundedWatermark) : BoundedChecker.neverEnd();
    }

    @Override
    @Nullable
    public Long checkpoint() {
        return this.nextSnapshotId;
    }

    @Override
    public void restore(@Nullable Long nextSnapshotId) {
        this.nextSnapshotId = nextSnapshotId;
    }

    @Override
    public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
        if (nextSnapshot == null) {
            return;
        }
        String consumerId = this.options.consumerId();
        if (consumerId != null) {
            this.snapshotSplitReader.consumerManager().recordConsumer(consumerId, new Consumer(nextSnapshot));
        }
    }
}

