/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.enumerator.ContinuousEnumerationResult;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousSplitPlannerImpl
implements ContinuousSplitPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
    private final Table table;
    private final ScanContext scanContext;
    private final boolean isSharedPool;
    private final ExecutorService workerPool;

    public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext, String threadName) {
        this.table = table;
        this.scanContext = scanContext;
        this.isSharedPool = threadName == null;
        this.workerPool = this.isSharedPool ? ThreadPools.getWorkerPool() : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + threadName, scanContext.planParallelism());
    }

    @Override
    public void close() throws IOException {
        if (!this.isSharedPool) {
            this.workerPool.shutdown();
        }
    }

    @Override
    public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
        this.table.refresh();
        if (lastPosition != null) {
            return this.discoverIncrementalSplits(lastPosition);
        }
        return this.discoverInitialSplits();
    }

    private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition lastPosition) {
        Snapshot currentSnapshot = this.table.currentSnapshot();
        if (currentSnapshot == null) {
            Preconditions.checkArgument((lastPosition.snapshotId() == null ? 1 : 0) != 0, (Object)"Invalid last enumerated position for an empty table: not null");
            LOG.info("Skip incremental scan because table is empty");
            return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId().longValue()) {
            LOG.info("Current table snapshot is already enumerated: {}", (Object)currentSnapshot.snapshotId());
            return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        IcebergEnumeratorPosition newPosition = IcebergEnumeratorPosition.of(currentSnapshot.snapshotId(), currentSnapshot.timestampMillis());
        ScanContext incrementalScan = this.scanContext.copyWithAppendsBetween(lastPosition.snapshotId(), currentSnapshot.snapshotId());
        List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(this.table, incrementalScan, this.workerPool);
        LOG.info("Discovered {} splits from incremental scan: from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", new Object[]{splits.size(), lastPosition, newPosition});
        return new ContinuousEnumerationResult(splits, lastPosition, newPosition);
    }

    private ContinuousEnumerationResult discoverInitialSplits() {
        IcebergEnumeratorPosition toPosition;
        List<Object> splits;
        Optional<Snapshot> startSnapshotOptional = ContinuousSplitPlannerImpl.startSnapshot(this.table, this.scanContext);
        if (!startSnapshotOptional.isPresent()) {
            return new ContinuousEnumerationResult(Collections.emptyList(), null, IcebergEnumeratorPosition.empty());
        }
        Snapshot startSnapshot = startSnapshotOptional.get();
        LOG.info("Get starting snapshot id {} based on strategy {}", (Object)startSnapshot.snapshotId(), (Object)this.scanContext.streamingStartingStrategy());
        if (this.scanContext.streamingStartingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
            splits = FlinkSplitPlanner.planIcebergSourceSplits(this.table, this.scanContext, this.workerPool);
            LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}", (Object)splits.size(), (Object)startSnapshot.snapshotId());
            toPosition = IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
        } else {
            splits = Collections.emptyList();
            Long parentSnapshotId = startSnapshot.parentId();
            if (parentSnapshotId != null) {
                Snapshot parentSnapshot = this.table.snapshot(parentSnapshotId);
                Long parentSnapshotTimestampMs = parentSnapshot != null ? Long.valueOf(parentSnapshot.timestampMillis()) : null;
                toPosition = IcebergEnumeratorPosition.of(parentSnapshotId, parentSnapshotTimestampMs);
            } else {
                toPosition = IcebergEnumeratorPosition.empty();
            }
            LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", (Object)startSnapshot.snapshotId(), (Object)startSnapshot.timestampMillis());
        }
        return new ContinuousEnumerationResult(splits, null, toPosition);
    }

    @VisibleForTesting
    static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
        switch (scanContext.streamingStartingStrategy()) {
            case TABLE_SCAN_THEN_INCREMENTAL: 
            case INCREMENTAL_FROM_LATEST_SNAPSHOT: {
                return Optional.ofNullable(table.currentSnapshot());
            }
            case INCREMENTAL_FROM_EARLIEST_SNAPSHOT: {
                return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
            }
            case INCREMENTAL_FROM_SNAPSHOT_ID: {
                Snapshot matchedSnapshotById = table.snapshot(scanContext.startSnapshotId());
                Preconditions.checkArgument((matchedSnapshotById != null ? 1 : 0) != 0, (Object)("Start snapshot id not found in history: " + scanContext.startSnapshotId()));
                return Optional.of(matchedSnapshotById);
            }
            case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: {
                long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
                Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
                if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp().longValue()) {
                    return Optional.of(matchedSnapshotByTimestamp);
                }
                return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
            }
        }
        throw new IllegalArgumentException("Unknown starting strategy: " + (Object)((Object)scanContext.streamingStartingStrategy()));
    }
}

