/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.FileCreationTimeStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

public abstract class AbstractDataTableScan
implements DataTableScan {
    private final CoreOptions options;
    protected final SnapshotReader snapshotReader;

    protected AbstractDataTableScan(CoreOptions options, SnapshotReader snapshotReader) {
        this.options = options;
        this.snapshotReader = snapshotReader;
    }

    @VisibleForTesting
    public AbstractDataTableScan withBucket(int bucket) {
        this.snapshotReader.withBucket(bucket);
        return this;
    }

    @Override
    public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
        this.snapshotReader.withBucketFilter(bucketFilter);
        return this;
    }

    @Override
    public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
        this.snapshotReader.withPartitionFilter(partitionSpec);
        return this;
    }

    @Override
    public AbstractDataTableScan withPartitionFilter(List<BinaryRow> partitions) {
        this.snapshotReader.withPartitionFilter(partitions);
        return this;
    }

    @Override
    public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
        this.snapshotReader.withLevelFilter(levelFilter);
        return this;
    }

    @Override
    public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry) {
        this.snapshotReader.withMetricRegistry(metricsRegistry);
        return this;
    }

    @Override
    public AbstractDataTableScan dropStats() {
        this.snapshotReader.dropStats();
        return this;
    }

    public CoreOptions options() {
        return this.options;
    }

    protected StartingScanner createStartingScanner(boolean isStreaming) {
        ConsumerManager consumerManager;
        Optional<Consumer> consumer;
        SnapshotManager snapshotManager = this.snapshotReader.snapshotManager();
        CoreOptions.StreamScanMode type = this.options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
        switch (type) {
            case COMPACT_BUCKET_TABLE: {
                Preconditions.checkArgument(isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected.");
                return new ContinuousCompactorStartingScanner(snapshotManager);
            }
            case FILE_MONITOR: {
                return new FullStartingScanner(snapshotManager);
            }
        }
        String consumerId = this.options.consumerId();
        if (isStreaming && consumerId != null && !this.options.consumerIgnoreProgress() && (consumer = (consumerManager = this.snapshotReader.consumerManager()).consumer(consumerId)).isPresent()) {
            return new ContinuousFromSnapshotStartingScanner(snapshotManager, consumer.get().nextSnapshot(), this.options.changelogLifecycleDecoupled());
        }
        CoreOptions.StartupMode startupMode = this.options.startupMode();
        switch (startupMode) {
            case LATEST_FULL: {
                return new FullStartingScanner(snapshotManager);
            }
            case LATEST: {
                return isStreaming ? new ContinuousLatestStartingScanner(snapshotManager) : new FullStartingScanner(snapshotManager);
            }
            case COMPACTED_FULL: {
                if (this.options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION || this.options.toConfiguration().contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                    int deltaCommits = this.options.toConfiguration().getOptional(CoreOptions.FULL_COMPACTION_DELTA_COMMITS).orElse(1);
                    return new FullCompactedStartingScanner(snapshotManager, deltaCommits);
                }
                return new CompactedStartingScanner(snapshotManager);
            }
            case FROM_TIMESTAMP: {
                Long startupMillis = this.options.scanTimestampMills();
                return isStreaming ? new ContinuousFromTimestampStartingScanner(snapshotManager, startupMillis, this.options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
            }
            case FROM_FILE_CREATION_TIME: {
                Long fileCreationTimeMills = this.options.scanFileCreationTimeMills();
                return new FileCreationTimeStartingScanner(snapshotManager, fileCreationTimeMills);
            }
            case FROM_SNAPSHOT: {
                if (this.options.scanSnapshotId() != null) {
                    return isStreaming ? new ContinuousFromSnapshotStartingScanner(snapshotManager, this.options.scanSnapshotId(), this.options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner(snapshotManager, this.options.scanSnapshotId());
                }
                if (this.options.scanWatermark() != null) {
                    Preconditions.checkArgument(!isStreaming, "Cannot scan from watermark in streaming mode.");
                    return new StaticFromWatermarkStartingScanner(snapshotManager, this.options().scanWatermark());
                }
                if (this.options.scanTagName() != null) {
                    Preconditions.checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
                    return new StaticFromTagStartingScanner(snapshotManager, this.options().scanTagName());
                }
                throw new UnsupportedOperationException("Unknown snapshot read mode");
            }
            case FROM_SNAPSHOT_FULL: {
                Long scanSnapshotId = this.options.scanSnapshotId();
                Preconditions.checkNotNull(scanSnapshotId, "scan.snapshot-id must be set when startupMode is FROM_SNAPSHOT_FULL.");
                return isStreaming ? new ContinuousFromSnapshotFullStartingScanner(snapshotManager, scanSnapshotId) : new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
            }
            case INCREMENTAL: {
                ScanMode scanMode;
                Preconditions.checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
                Pair<String, String> incrementalBetween = this.options.incrementalBetween();
                CoreOptions.IncrementalBetweenScanMode scanType = this.options.incrementalBetweenScanMode();
                switch (scanType) {
                    case AUTO: {
                        scanMode = this.options.changelogProducer() == CoreOptions.ChangelogProducer.NONE ? ScanMode.DELTA : ScanMode.CHANGELOG;
                        break;
                    }
                    case DELTA: {
                        scanMode = ScanMode.DELTA;
                        break;
                    }
                    case CHANGELOG: {
                        scanMode = ScanMode.CHANGELOG;
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("Unknown incremental scan type " + scanType.name());
                    }
                }
                if (this.options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
                    try {
                        return new IncrementalStartingScanner(snapshotManager, Long.parseLong(incrementalBetween.getLeft()), Long.parseLong(incrementalBetween.getRight()), scanMode);
                    }
                    catch (NumberFormatException e) {
                        return new IncrementalTagStartingScanner(snapshotManager, incrementalBetween.getLeft(), incrementalBetween.getRight());
                    }
                }
                return new IncrementalTimeStampStartingScanner(snapshotManager, Long.parseLong(incrementalBetween.getLeft()), Long.parseLong(incrementalBetween.getRight()), scanMode);
            }
        }
        throw new UnsupportedOperationException("Unknown startup mode " + startupMode.name());
    }
}

