/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.AbstractDataTableScan;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.NextSnapshotFetcher;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataTableStreamScan
extends AbstractDataTableScan
implements StreamDataTableScan {
    private static final Logger LOG = LoggerFactory.getLogger(DataTableStreamScan.class);
    private final CoreOptions options;
    private final SnapshotManager snapshotManager;
    private final boolean supportStreamingReadOverwrite;
    private final DefaultValueAssigner defaultValueAssigner;
    private final NextSnapshotFetcher nextSnapshotProvider;
    private boolean initialized = false;
    private StartingScanner startingScanner;
    private FollowUpScanner followUpScanner;
    private BoundedChecker boundedChecker;
    private boolean isFullPhaseEnd = false;
    @Nullable
    private Long currentWatermark;
    @Nullable
    private Long nextSnapshotId;
    @Nullable
    private Long scanDelayMillis;

    public DataTableStreamScan(CoreOptions options, SnapshotReader snapshotReader, SnapshotManager snapshotManager, boolean supportStreamingReadOverwrite, DefaultValueAssigner defaultValueAssigner) {
        super(options, snapshotReader);
        this.options = options;
        this.snapshotManager = snapshotManager;
        this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
        this.defaultValueAssigner = defaultValueAssigner;
        this.nextSnapshotProvider = new NextSnapshotFetcher(snapshotManager, options.changelogLifecycleDecoupled());
    }

    @Override
    public DataTableStreamScan withFilter(Predicate predicate) {
        this.snapshotReader.withFilter(this.defaultValueAssigner.handlePredicate(predicate));
        return this;
    }

    @Override
    public StartingContext startingContext() {
        if (!this.initialized) {
            this.initScanner();
        }
        return this.startingScanner.startingContext();
    }

    @Override
    public TableScan.Plan plan() {
        if (!this.initialized) {
            this.initScanner();
        }
        if (this.nextSnapshotId == null) {
            return this.tryFirstPlan();
        }
        return this.nextPlan();
    }

    @Override
    public List<PartitionEntry> listPartitionEntries() {
        throw new UnsupportedOperationException("List Partition Entries is not supported in Stream Scan.");
    }

    private void initScanner() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(true);
        }
        if (this.followUpScanner == null) {
            this.followUpScanner = this.createFollowUpScanner();
        }
        if (this.boundedChecker == null) {
            this.boundedChecker = this.createBoundedChecker();
        }
        if (this.scanDelayMillis == null) {
            this.scanDelayMillis = this.getScanDelayMillis();
        }
        this.initialized = true;
    }

    private TableScan.Plan tryFirstPlan() {
        StartingScanner.Result result;
        if (this.options.needLookup()) {
            result = this.startingScanner.scan(this.snapshotReader.withLevelFilter((Integer level) -> level > 0));
            this.snapshotReader.withLevelFilter(Filter.alwaysTrue());
        } else if (this.options.changelogProducer().equals(CoreOptions.ChangelogProducer.FULL_COMPACTION)) {
            result = this.startingScanner.scan(this.snapshotReader.withLevelFilter((Integer level) -> level == this.options.numLevels() - 1));
            this.snapshotReader.withLevelFilter(Filter.alwaysTrue());
        } else {
            result = this.startingScanner.scan(this.snapshotReader);
        }
        if (result instanceof StartingScanner.ScannedResult) {
            StartingScanner.ScannedResult scannedResult = (StartingScanner.ScannedResult)result;
            this.currentWatermark = scannedResult.currentWatermark();
            long currentSnapshotId = scannedResult.currentSnapshotId();
            LookupStrategy lookupStrategy = this.options.lookupStrategy();
            this.nextSnapshotId = !lookupStrategy.produceChangelog && lookupStrategy.deletionVector ? Long.valueOf(currentSnapshotId) : Long.valueOf(currentSnapshotId + 1L);
            this.isFullPhaseEnd = this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(currentSnapshotId));
            return scannedResult.plan();
        }
        if (result instanceof StartingScanner.NextSnapshot) {
            this.nextSnapshotId = ((StartingScanner.NextSnapshot)result).nextSnapshotId();
            this.isFullPhaseEnd = this.snapshotManager.snapshotExists(this.nextSnapshotId - 1L) && this.boundedChecker.shouldEndInput(this.snapshotManager.snapshot(this.nextSnapshotId - 1L));
        }
        return SnapshotNotExistPlan.INSTANCE;
    }

    private TableScan.Plan nextPlan() {
        while (true) {
            Long l;
            SnapshotReader.Plan overwritePlan;
            if (this.isFullPhaseEnd) {
                throw new EndOfScanException();
            }
            Snapshot snapshot = this.nextSnapshotProvider.getNextSnapshot(this.nextSnapshotId);
            if (snapshot == null) {
                return SnapshotNotExistPlan.INSTANCE;
            }
            if (this.boundedChecker.shouldEndInput(snapshot)) {
                throw new EndOfScanException();
            }
            if (this.shouldDelaySnapshot(this.nextSnapshotId)) {
                return SnapshotNotExistPlan.INSTANCE;
            }
            if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE && (overwritePlan = this.handleOverwriteSnapshot(snapshot)) != null) {
                l = this.nextSnapshotId;
                Long l2 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return overwritePlan;
            }
            if (this.followUpScanner.shouldScanSnapshot(snapshot)) {
                LOG.debug("Find snapshot id {}.", (Object)this.nextSnapshotId);
                SnapshotReader.Plan plan = this.followUpScanner.scan(snapshot, this.snapshotReader);
                this.currentWatermark = plan.watermark();
                l = this.nextSnapshotId;
                Long l3 = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
                return plan;
            }
            Long l4 = this.nextSnapshotId;
            l = this.nextSnapshotId = Long.valueOf(this.nextSnapshotId + 1L);
        }
    }

    private boolean shouldDelaySnapshot(long snapshotId) {
        if (this.scanDelayMillis == null) {
            return false;
        }
        long snapshotMills = System.currentTimeMillis() - this.scanDelayMillis;
        return this.snapshotManager.snapshotExists(snapshotId) && this.snapshotManager.snapshot(snapshotId).timeMillis() > snapshotMills;
    }

    @Nullable
    protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
        if (this.supportStreamingReadOverwrite) {
            LOG.debug("Find overwrite snapshot id {}.", (Object)this.nextSnapshotId);
            SnapshotReader.Plan overwritePlan = this.followUpScanner.getOverwriteChangesPlan(snapshot, this.snapshotReader);
            this.currentWatermark = overwritePlan.watermark();
            return overwritePlan;
        }
        return null;
    }

    protected FollowUpScanner createFollowUpScanner() {
        FollowUpScanner followUpScanner;
        CoreOptions.StreamScanMode type = this.options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
        switch (type) {
            case COMPACT_BUCKET_TABLE: {
                return new DeltaFollowUpScanner();
            }
            case FILE_MONITOR: {
                return new AllDeltaFollowUpScanner();
            }
        }
        CoreOptions.ChangelogProducer changelogProducer = this.options.changelogProducer();
        switch (changelogProducer) {
            case NONE: {
                followUpScanner = new DeltaFollowUpScanner();
                break;
            }
            case INPUT: {
                followUpScanner = new InputChangelogFollowUpScanner();
                break;
            }
            case FULL_COMPACTION: 
            case LOOKUP: {
                followUpScanner = new CompactionChangelogFollowUpScanner();
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown changelog producer " + changelogProducer.name());
            }
        }
        return followUpScanner;
    }

    protected BoundedChecker createBoundedChecker() {
        Long boundedWatermark = this.options.scanBoundedWatermark();
        return boundedWatermark != null ? BoundedChecker.watermark(boundedWatermark) : BoundedChecker.neverEnd();
    }

    private Long getScanDelayMillis() {
        return this.options.streamingReadDelay() == null ? null : Long.valueOf(this.options.streamingReadDelay().toMillis());
    }

    @Override
    @Nullable
    public Long checkpoint() {
        return this.nextSnapshotId;
    }

    @Override
    @Nullable
    public Long watermark() {
        return this.currentWatermark;
    }

    @Override
    public void restore(@Nullable Long nextSnapshotId) {
        this.nextSnapshotId = nextSnapshotId;
    }

    @Override
    public void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot) {
        if (nextSnapshotId != null && scanAllSnapshot) {
            this.startingScanner = new StaticFromSnapshotStartingScanner(this.snapshotManager, nextSnapshotId);
            this.restore(null);
        } else {
            this.restore(nextSnapshotId);
        }
    }

    @Override
    public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
        if (nextSnapshot == null) {
            return;
        }
        String consumerId = this.options.consumerId();
        if (consumerId != null) {
            this.snapshotReader.consumerManager().resetConsumer(consumerId, new Consumer(nextSnapshot));
        }
    }

    @Override
    public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
        this.snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
        return this;
    }
}

