/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.enumerator.ContinuousEnumerationResult;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousSplitPlannerImpl
implements ContinuousSplitPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
    private final Table table;
    private final ScanContext scanContext;
    private final boolean isSharedPool;
    private final ExecutorService workerPool;
    private final TableLoader tableLoader;

    public ContinuousSplitPlannerImpl(TableLoader tableLoader, ScanContext scanContext, String threadName) {
        this.tableLoader = tableLoader.clone();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.scanContext = scanContext;
        this.isSharedPool = threadName == null;
        this.workerPool = this.isSharedPool ? ThreadPools.getWorkerPool() : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + threadName, scanContext.planParallelism());
    }

    @Override
    public void close() throws IOException {
        if (!this.isSharedPool) {
            this.workerPool.shutdown();
        }
        this.tableLoader.close();
    }

    @Override
    public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
        this.table.refresh();
        if (lastPosition != null) {
            return this.discoverIncrementalSplits(lastPosition);
        }
        return this.discoverInitialSplits();
    }

    private Snapshot toSnapshotInclusive(Long lastConsumedSnapshotId, Snapshot currentSnapshot, int maxPlanningSnapshotCount) {
        ArrayList<Snapshot> snapshots = Lists.newArrayList(SnapshotUtil.ancestorsBetween(this.table, currentSnapshot.snapshotId(), lastConsumedSnapshotId));
        if (snapshots.size() <= maxPlanningSnapshotCount) {
            return currentSnapshot;
        }
        return (Snapshot)snapshots.get(snapshots.size() - maxPlanningSnapshotCount);
    }

    private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition lastPosition) {
        Snapshot currentSnapshot;
        Snapshot snapshot = currentSnapshot = this.scanContext.branch() != null ? this.table.snapshot(this.scanContext.branch()) : this.table.currentSnapshot();
        if (currentSnapshot == null) {
            Preconditions.checkArgument((lastPosition.snapshotId() == null ? 1 : 0) != 0, (Object)"Invalid last enumerated position for an empty table: not null");
            LOG.info("Skip incremental scan because table is empty");
            return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId().longValue()) {
            LOG.info("Current table snapshot is already enumerated: {}", (Object)currentSnapshot.snapshotId());
            return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        Long lastConsumedSnapshotId = lastPosition != null ? lastPosition.snapshotId() : null;
        Snapshot toSnapshotInclusive = this.toSnapshotInclusive(lastConsumedSnapshotId, currentSnapshot, this.scanContext.maxPlanningSnapshotCount());
        IcebergEnumeratorPosition newPosition = IcebergEnumeratorPosition.of(toSnapshotInclusive.snapshotId(), toSnapshotInclusive.timestampMillis());
        ScanContext incrementalScan = this.scanContext.copyWithAppendsBetween(lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());
        List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(this.table, incrementalScan, this.workerPool);
        LOG.info("Discovered {} splits from incremental scan: from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", new Object[]{splits.size(), lastPosition, newPosition});
        return new ContinuousEnumerationResult(splits, lastPosition, newPosition);
    }

    private ContinuousEnumerationResult discoverInitialSplits() {
        IcebergEnumeratorPosition toPosition;
        List<Object> splits;
        Optional<Snapshot> startSnapshotOptional = ContinuousSplitPlannerImpl.startSnapshot(this.table, this.scanContext);
        if (!startSnapshotOptional.isPresent()) {
            return new ContinuousEnumerationResult(Collections.emptyList(), null, IcebergEnumeratorPosition.empty());
        }
        Snapshot startSnapshot = startSnapshotOptional.get();
        LOG.info("Get starting snapshot id {} based on strategy {}", (Object)startSnapshot.snapshotId(), (Object)this.scanContext.streamingStartingStrategy());
        if (this.scanContext.streamingStartingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
            splits = FlinkSplitPlanner.planIcebergSourceSplits(this.table, this.scanContext.copyWithSnapshotId(startSnapshot.snapshotId()), this.workerPool);
            LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}", (Object)splits.size(), (Object)startSnapshot.snapshotId());
            toPosition = IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
        } else {
            splits = Collections.emptyList();
            Long parentSnapshotId = startSnapshot.parentId();
            if (parentSnapshotId != null) {
                Snapshot parentSnapshot = this.table.snapshot(parentSnapshotId);
                Long parentSnapshotTimestampMs = parentSnapshot != null ? Long.valueOf(parentSnapshot.timestampMillis()) : null;
                toPosition = IcebergEnumeratorPosition.of(parentSnapshotId, parentSnapshotTimestampMs);
            } else {
                toPosition = IcebergEnumeratorPosition.empty();
            }
            LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", (Object)startSnapshot.snapshotId(), (Object)startSnapshot.timestampMillis());
        }
        return new ContinuousEnumerationResult(splits, null, toPosition);
    }

    @VisibleForTesting
    static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
        switch (scanContext.streamingStartingStrategy()) {
            case TABLE_SCAN_THEN_INCREMENTAL: 
            case INCREMENTAL_FROM_LATEST_SNAPSHOT: {
                return Optional.ofNullable(table.currentSnapshot());
            }
            case INCREMENTAL_FROM_EARLIEST_SNAPSHOT: {
                return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
            }
            case INCREMENTAL_FROM_SNAPSHOT_ID: {
                Snapshot matchedSnapshotById = table.snapshot(scanContext.startSnapshotId());
                Preconditions.checkArgument((matchedSnapshotById != null ? 1 : 0) != 0, (Object)("Start snapshot id not found in history: " + scanContext.startSnapshotId()));
                return Optional.of(matchedSnapshotById);
            }
            case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: {
                Snapshot matchedSnapshotByTimestamp = SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp());
                Preconditions.checkArgument((matchedSnapshotByTimestamp != null ? 1 : 0) != 0, (Object)("Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp()));
                return Optional.of(matchedSnapshotByTimestamp);
            }
        }
        throw new IllegalArgumentException("Unknown starting strategy: " + (Object)((Object)scanContext.streamingStartingStrategy()));
    }
}

