/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanContext;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergScanSplitPlanner {
    private static final Logger log = LoggerFactory.getLogger(IcebergScanSplitPlanner.class);

    public static IcebergEnumerationResult planStreamSplits(Table table, IcebergScanContext icebergScanContext, IcebergEnumeratorPosition lastPosition) {
        table.refresh();
        if (lastPosition == null) {
            return IcebergScanSplitPlanner.initialStreamSplits(table, icebergScanContext);
        }
        return IcebergScanSplitPlanner.incrementalStreamSplits(table, icebergScanContext, lastPosition);
    }

    private static IcebergEnumerationResult incrementalStreamSplits(Table table, IcebergScanContext icebergScanContext, IcebergEnumeratorPosition lastPosition) {
        Snapshot currentSnapshot = table.currentSnapshot();
        if (currentSnapshot == null) {
            Preconditions.checkArgument((lastPosition.getSnapshotId() == null ? 1 : 0) != 0, (Object)"Invalid last enumerated position for an empty table: not null");
            log.info("Skip incremental scan because table is empty");
            return new IcebergEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        if (lastPosition.getSnapshotId() != null && currentSnapshot.snapshotId() == lastPosition.getSnapshotId().longValue()) {
            log.debug("Current table snapshot is already enumerated: {}", (Object)currentSnapshot.snapshotId());
            return new IcebergEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
        }
        IcebergEnumeratorPosition newPosition = new IcebergEnumeratorPosition(currentSnapshot.snapshotId(), currentSnapshot.timestampMillis());
        IcebergScanContext incrementalScan = icebergScanContext.copyWithAppendsBetween(lastPosition.getSnapshotId(), currentSnapshot.snapshotId());
        List<IcebergFileScanTaskSplit> splits = IcebergScanSplitPlanner.planSplits(table, incrementalScan);
        log.info("Discovered {} splits from incremental scan: from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", new Object[]{splits.size(), lastPosition, newPosition});
        return new IcebergEnumerationResult(splits, lastPosition, newPosition);
    }

    private static IcebergEnumerationResult initialStreamSplits(Table table, IcebergScanContext icebergScanContext) {
        Optional<Snapshot> startSnapshotOptional = IcebergScanSplitPlanner.getStreamStartSnapshot(table, icebergScanContext);
        if (!startSnapshotOptional.isPresent()) {
            return new IcebergEnumerationResult(Collections.emptyList(), null, IcebergEnumeratorPosition.EMPTY);
        }
        Snapshot startSnapshot = startSnapshotOptional.get();
        List<Object> splits = Collections.emptyList();
        IcebergEnumeratorPosition toPosition = IcebergEnumeratorPosition.EMPTY;
        if (IcebergStreamScanStrategy.TABLE_SCAN_THEN_INCREMENTAL.equals((Object)icebergScanContext.getStreamScanStrategy())) {
            splits = IcebergScanSplitPlanner.planSplits(table, icebergScanContext);
            log.info("Discovered {} splits from initial batch table scan with snapshot Id {}", (Object)splits.size(), (Object)startSnapshot.snapshotId());
            toPosition = new IcebergEnumeratorPosition(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
        } else {
            Long parentSnapshotId = startSnapshot.parentId();
            if (parentSnapshotId != null) {
                Snapshot parentSnapshot = table.snapshot(parentSnapshotId);
                Long parentSnapshotTimestampMs = parentSnapshot != null ? Long.valueOf(parentSnapshot.timestampMillis()) : null;
                toPosition = new IcebergEnumeratorPosition(parentSnapshotId, parentSnapshotTimestampMs);
            }
            log.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", (Object)startSnapshot.snapshotId(), (Object)startSnapshot.timestampMillis());
        }
        return new IcebergEnumerationResult(splits, null, toPosition);
    }

    private static Optional<Snapshot> getStreamStartSnapshot(Table table, IcebergScanContext icebergScanContext) {
        switch (icebergScanContext.getStreamScanStrategy()) {
            case TABLE_SCAN_THEN_INCREMENTAL: 
            case FROM_LATEST_SNAPSHOT: {
                return Optional.ofNullable(table.currentSnapshot());
            }
            case FROM_EARLIEST_SNAPSHOT: {
                return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
            }
            case FROM_SNAPSHOT_ID: {
                return Optional.of(table.snapshot(icebergScanContext.getStartSnapshotId()));
            }
            case FROM_SNAPSHOT_TIMESTAMP: {
                long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, icebergScanContext.getStartSnapshotTimestamp());
                Snapshot matchedSnapshot = table.snapshot(snapshotIdAsOfTime);
                if (matchedSnapshot.timestampMillis() == icebergScanContext.getStartSnapshotTimestamp().longValue()) {
                    return Optional.of(matchedSnapshot);
                }
                return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
            }
        }
        throw new UnsupportedOperationException("Unsupported stream scan strategy: " + (Object)((Object)icebergScanContext.getStreamScanStrategy()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static List<IcebergFileScanTaskSplit> planSplits(Table table, IcebergScanContext context) {
        try (CloseableIterable<CombinedScanTask> tasksIterable = IcebergScanSplitPlanner.planTasks(table, context);){
            ArrayList<IcebergFileScanTaskSplit> splits = new ArrayList<IcebergFileScanTaskSplit>();
            for (CombinedScanTask combinedScanTask : tasksIterable) {
                for (FileScanTask fileScanTask : combinedScanTask.files()) {
                    splits.add(new IcebergFileScanTaskSplit(fileScanTask));
                }
            }
            ArrayList<IcebergFileScanTaskSplit> arrayList = splits;
            return arrayList;
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to scan iceberg splits from: " + table.name(), e);
        }
    }

    private static CloseableIterable<CombinedScanTask> planTasks(Table table, IcebergScanContext context) {
        if (context.isStreaming() || context.getStartSnapshotId() != null || context.getEndSnapshotId() != null) {
            IncrementalAppendScan scan = table.newIncrementalAppendScan();
            scan = IcebergScanSplitPlanner.rebuildScanWithBaseConfig(scan, context);
            if (context.getStartSnapshotId() != null) {
                scan = (IncrementalAppendScan)scan.fromSnapshotExclusive(context.getStartSnapshotId());
            }
            if (context.getEndSnapshotId() != null) {
                scan = (IncrementalAppendScan)scan.toSnapshot(context.getEndSnapshotId());
            }
            return scan.planTasks();
        }
        TableScan scan = table.newScan();
        scan = IcebergScanSplitPlanner.rebuildScanWithBaseConfig(scan, context);
        if (context.getUseSnapshotId() != null) {
            scan = scan.useSnapshot(context.getUseSnapshotId());
        }
        if (context.getUseSnapshotTimestamp() != null) {
            scan = scan.asOfTime(context.getUseSnapshotTimestamp());
        }
        return scan.planTasks();
    }

    private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T rebuildScanWithBaseConfig(T scan, IcebergScanContext context) {
        Scan newScan = (Scan)((Scan)scan.caseSensitive(context.isCaseSensitive())).project(context.getSchema());
        if (context.getFilter() != null) {
            newScan = (Scan)newScan.filter(context.getFilter());
        }
        if (context.getSplitSize() != null) {
            newScan = (Scan)newScan.option("read.split.target-size", context.getSplitSize().toString());
        }
        if (context.getSplitLookback() != null) {
            newScan = (Scan)newScan.option("read.split.planning-lookback", context.getSplitLookback().toString());
        }
        if (context.getSplitOpenFileCost() != null) {
            newScan = (Scan)newScan.option("read.split.open-file-cost", context.getSplitOpenFileCost().toString());
        }
        return (T)newScan;
    }
}

