/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.AbstractInnerTableScan;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InnerStreamTableScanImpl
extends AbstractInnerTableScan
implements InnerStreamTableScan {
    private static final Logger LOG = LoggerFactory.getLogger(InnerStreamTableScanImpl.class);
    private final CoreOptions options;
    private final SnapshotManager snapshotManager;
    private final boolean supportStreamingReadOverwrite;
    private final DefaultValueAssigner defaultValueAssigner;
    private StartingScanner startingScanner;
    private FollowUpScanner followUpScanner;
    private BoundedChecker boundedChecker;
    private boolean isFullPhaseEnd = false;
    @Nullable
    private Long currentWatermark;
    @Nullable
    private Long nextSnapshotId;

    public InnerStreamTableScanImpl(CoreOptions options, SnapshotReader snapshotReader, SnapshotManager snapshotManager, boolean supportStreamingReadOverwrite, DefaultValueAssigner defaultValueAssigner) {
        super(options, snapshotReader);
        this.options = options;
        this.snapshotManager = snapshotManager;
        this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
        this.defaultValueAssigner = defaultValueAssigner;
    }

    @Override
    public InnerStreamTableScanImpl withFilter(Predicate predicate) {
        this.snapshotReader.withFilter(this.defaultValueAssigner.handlePredicate(predicate));
        return this;
    }

    @Override
    public TableScan.Plan plan() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(true);
        }
        if (this.followUpScanner == null) {
            this.followUpScanner = this.createFollowUpScanner();
        }
        if (this.boundedChecker == null) {
            this.boundedChecker = this.createBoundedChecker();
        }
        if (this.nextSnapshotId == null) {
            return this.tryFirstPlan();
        }
        return this.nextPlan();
    }

    private TableScan.Plan tryFirstPlan() {
        StartingScanner.Result result = this.startingScanner.scan(this.snapshotManager, this.snapshotReader);
        if (result instanceof StartingScanner.ScannedResult) {
            StartingScanner.ScannedResult scannedResult = (StartingScanner.ScannedResult)result;
            this.currentWatermark = scannedResult.currentWatermark();
            long currentSnapshotId = scannedResult.currentSnapshotId();
            this.nextSnapshotId = currentSnapshotId + 1L;
            this.isFullPhaseEnd = this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(currentSnapshotId));
            return DataFilePlan.fromResult(result);
        }
        if (result instanceof StartingScanner.NextSnapshot) {
            this.nextSnapshotId = ((StartingScanner.NextSnapshot)result).nextSnapshotId();
            this.isFullPhaseEnd = this.snapshotManager.snapshotExists(this.nextSnapshotId - 1L) && this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(this.nextSnapshotId - 1L));
        }
        return SnapshotNotExistPlan.INSTANCE;
    }

    private TableScan.Plan nextPlan() {
        while (true) {
            Long l;
            if (this.isFullPhaseEnd) {
                throw new EndOfScanException();
            }
            if (!this.snapshotManager.snapshotExists(this.nextSnapshotId)) {
                Long earliestSnapshotId = this.snapshotManager.earliestSnapshotId();
                if (earliestSnapshotId != null && earliestSnapshotId > this.nextSnapshotId) {
                    throw new OutOfRangeException(String.format("The snapshot with id %d has expired. You can: 1. increase the snapshot expiration time. 2. use consumer-id to ensure that unconsumed snapshots will not be expired.", this.nextSnapshotId));
                }
                LOG.debug("Next snapshot id {} does not exist, wait for the snapshot generation.", (Object)this.nextSnapshotId);
                return SnapshotNotExistPlan.INSTANCE;
            }
            Snapshot snapshot = this.snapshotManager.snapshot(this.nextSnapshotId);
            if (this.boundedChecker.shouldEndInput(snapshot)) {
                throw new EndOfScanException();
            }
            if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE && this.supportStreamingReadOverwrite) {
                LOG.debug("Find overwrite snapshot id {}.", (Object)this.nextSnapshotId);
                SnapshotReader.Plan overwritePlan = this.followUpScanner.getOverwriteChangesPlan(this.nextSnapshotId, this.snapshotReader);
                this.currentWatermark = overwritePlan.watermark();
                l = this.nextSnapshotId;
                Long l2 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return overwritePlan;
            }
            if (this.followUpScanner.shouldScanSnapshot(snapshot)) {
                LOG.debug("Find snapshot id {}.", (Object)this.nextSnapshotId);
                SnapshotReader.Plan plan = this.followUpScanner.scan(this.nextSnapshotId, this.snapshotReader);
                this.currentWatermark = plan.watermark();
                l = this.nextSnapshotId;
                Long l3 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return plan;
            }
            Long l4 = this.nextSnapshotId;
            l = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
        }
    }

    private FollowUpScanner createFollowUpScanner() {
        FollowUpScanner followUpScanner;
        CoreOptions.StreamingCompactionType type = this.options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
        switch (type) {
            case NORMAL: {
                return new ContinuousCompactorFollowUpScanner();
            }
            case BUCKET_UNAWARE: {
                return new ContinuousAppendAndCompactFollowUpScanner();
            }
        }
        CoreOptions.ChangelogProducer changelogProducer = this.options.changelogProducer();
        switch (changelogProducer) {
            case NONE: {
                followUpScanner = new DeltaFollowUpScanner();
                break;
            }
            case INPUT: {
                followUpScanner = new InputChangelogFollowUpScanner();
                break;
            }
            case FULL_COMPACTION: {
                this.snapshotReader.withLevelFilter(level -> level == this.options.numLevels() - 1);
                followUpScanner = new CompactionChangelogFollowUpScanner();
                break;
            }
            case LOOKUP: {
                this.snapshotReader.withLevelFilter(level -> level > 0);
                followUpScanner = new CompactionChangelogFollowUpScanner();
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown changelog producer " + changelogProducer.name());
            }
        }
        return followUpScanner;
    }

    private BoundedChecker createBoundedChecker() {
        Long boundedWatermark = this.options.scanBoundedWatermark();
        return boundedWatermark != null ? BoundedChecker.watermark(boundedWatermark) : BoundedChecker.neverEnd();
    }

    @Override
    @Nullable
    public Long checkpoint() {
        return this.nextSnapshotId;
    }

    @Override
    @Nullable
    public Long watermark() {
        return this.currentWatermark;
    }

    @Override
    public void restore(@Nullable Long nextSnapshotId) {
        this.nextSnapshotId = nextSnapshotId;
    }

    @Override
    public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
        if (nextSnapshot == null) {
            return;
        }
        String consumerId = this.options.consumerId();
        if (consumerId != null) {
            this.snapshotReader.consumerManager().resetConsumer(consumerId, new Consumer(nextSnapshot));
        }
    }
}

