/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.List;
import java.util.Optional;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

public abstract class AbstractInnerTableScan
implements InnerTableScan {
    private final CoreOptions options;
    protected final SnapshotReader snapshotReader;

    protected AbstractInnerTableScan(CoreOptions options, SnapshotReader snapshotReader) {
        this.options = options;
        this.snapshotReader = snapshotReader;
    }

    @VisibleForTesting
    public AbstractInnerTableScan withBucket(int bucket) {
        this.snapshotReader.withBucket(bucket);
        return this;
    }

    public AbstractInnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
        this.snapshotReader.withBucketFilter(bucketFilter);
        return this;
    }

    public CoreOptions options() {
        return this.options;
    }

    protected StartingScanner createStartingScanner(boolean isStreaming) {
        ConsumerManager consumerManager;
        Optional<Consumer> consumer;
        CoreOptions.StreamingCompactionType type = this.options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
        switch (type) {
            case NORMAL: {
                Preconditions.checkArgument(isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected.");
                return new ContinuousCompactorStartingScanner();
            }
            case BUCKET_UNAWARE: {
                return new FullStartingScanner();
            }
        }
        String consumerId = this.options.consumerId();
        if (consumerId != null && (consumer = (consumerManager = this.snapshotReader.consumerManager()).consumer(consumerId)).isPresent()) {
            return new ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
        }
        CoreOptions.StartupMode startupMode = this.options.startupMode();
        switch (startupMode) {
            case LATEST_FULL: {
                return new FullStartingScanner();
            }
            case LATEST: {
                return isStreaming ? new ContinuousLatestStartingScanner() : new FullStartingScanner();
            }
            case COMPACTED_FULL: {
                if (this.options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION || this.options.toConfiguration().contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                    int deltaCommits = this.options.toConfiguration().getOptional(CoreOptions.FULL_COMPACTION_DELTA_COMMITS).orElse(1);
                    return new FullCompactedStartingScanner(deltaCommits);
                }
                return new CompactedStartingScanner();
            }
            case FROM_TIMESTAMP: {
                Long startupMillis = this.options.scanTimestampMills();
                return isStreaming ? new ContinuousFromTimestampStartingScanner(startupMillis) : new StaticFromTimestampStartingScanner(startupMillis);
            }
            case FROM_SNAPSHOT: {
                if (this.options.scanSnapshotId() != null) {
                    return isStreaming ? new ContinuousFromSnapshotStartingScanner(this.options.scanSnapshotId()) : new StaticFromSnapshotStartingScanner(this.options.scanSnapshotId());
                }
                Preconditions.checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
                return new StaticFromTagStartingScanner(this.options().scanTagName());
            }
            case FROM_SNAPSHOT_FULL: {
                return isStreaming ? new ContinuousFromSnapshotFullStartingScanner(this.options.scanSnapshotId()) : new StaticFromSnapshotStartingScanner(this.options.scanSnapshotId());
            }
            case INCREMENTAL: {
                Preconditions.checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
                Pair<String, String> incrementalBetween = this.options.incrementalBetween();
                if (this.options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
                    try {
                        return new IncrementalStartingScanner(Long.parseLong(incrementalBetween.getLeft()), Long.parseLong(incrementalBetween.getRight()));
                    }
                    catch (NumberFormatException e) {
                        return new IncrementalTagStartingScanner(incrementalBetween.getLeft(), incrementalBetween.getRight());
                    }
                }
                return new IncrementalTimeStampStartingScanner(Long.parseLong(incrementalBetween.getLeft()), Long.parseLong(incrementalBetween.getRight()));
            }
        }
        throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
    }

    @Override
    public List<BinaryRow> listPartitions() {
        return this.snapshotReader.partitions();
    }
}

