/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;

@Internal
public class FlinkSplitPlanner {
    private FlinkSplitPlanner() {
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static FlinkInputSplit[] planInputSplits(Table table, ScanContext context, ExecutorService workerPool) {
        try (CloseableIterable<CombinedScanTask> tasksIterable = FlinkSplitPlanner.planTasks(table, context, workerPool);){
            ArrayList<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
            FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
            boolean exposeLocality = context.exposeLocality();
            Tasks.range(tasks.size()).stopOnFailure().executeWith(exposeLocality ? workerPool : null).run(index -> {
                CombinedScanTask task = (CombinedScanTask)tasks.get((int)index);
                String[] hostnames = null;
                if (exposeLocality) {
                    hostnames = Util.blockLocations(table.io(), task);
                }
                splits[index.intValue()] = new FlinkInputSplit((int)index, task, hostnames);
            });
            FlinkInputSplit[] flinkInputSplitArray = splits;
            return flinkInputSplitArray;
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to process tasks iterable", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static List<IcebergSourceSplit> planIcebergSourceSplits(Table table, ScanContext context, ExecutorService workerPool) {
        try (CloseableIterable<CombinedScanTask> tasksIterable = FlinkSplitPlanner.planTasks(table, context, workerPool);){
            ArrayList<IcebergSourceSplit> arrayList = Lists.newArrayList(CloseableIterable.transform(tasksIterable, IcebergSourceSplit::fromCombinedScanTask));
            return arrayList;
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to process task iterable: ", e);
        }
    }

    static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
        ScanMode scanMode = FlinkSplitPlanner.checkScanMode(context);
        if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
            IncrementalAppendScan scan = table.newIncrementalAppendScan();
            scan = FlinkSplitPlanner.refineScanWithBaseConfigs(scan, context, workerPool);
            if (context.startSnapshotId() != null) {
                scan = (IncrementalAppendScan)scan.fromSnapshotExclusive(context.startSnapshotId());
            }
            if (context.endSnapshotId() != null) {
                scan = (IncrementalAppendScan)scan.toSnapshot(context.endSnapshotId());
            }
            return scan.planTasks();
        }
        TableScan scan = table.newScan();
        scan = FlinkSplitPlanner.refineScanWithBaseConfigs(scan, context, workerPool);
        if (context.snapshotId() != null) {
            scan = scan.useSnapshot(context.snapshotId());
        }
        if (context.asOfTimestamp() != null) {
            scan = scan.asOfTime(context.asOfTimestamp());
        }
        return scan.planTasks();
    }

    private static ScanMode checkScanMode(ScanContext context) {
        if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
            return ScanMode.INCREMENTAL_APPEND_SCAN;
        }
        return ScanMode.BATCH;
    }

    private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T refineScanWithBaseConfigs(T scan, ScanContext context, ExecutorService workerPool) {
        Scan refinedScan = (Scan)((Scan)((Scan)scan.caseSensitive(context.caseSensitive())).project(context.project())).planWith(workerPool);
        if (context.includeColumnStats()) {
            refinedScan = (Scan)refinedScan.includeColumnStats();
        }
        if (context.splitSize() != null) {
            refinedScan = (Scan)refinedScan.option("read.split.target-size", context.splitSize().toString());
        }
        if (context.splitLookback() != null) {
            refinedScan = (Scan)refinedScan.option("read.split.planning-lookback", context.splitLookback().toString());
        }
        if (context.splitOpenFileCost() != null) {
            refinedScan = (Scan)refinedScan.option("read.split.open-file-cost", context.splitOpenFileCost().toString());
        }
        if (context.filters() != null) {
            for (Expression filter : context.filters()) {
                refinedScan = (Scan)refinedScan.filter(filter);
            }
        }
        return (T)refinedScan;
    }

    private static enum ScanMode {
        BATCH,
        INCREMENTAL_APPEND_SCAN;

    }
}

