/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseDistributedDataScan;
import org.apache.iceberg.BaseScan;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.ImmutableTableScanContext;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScanContext;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.actions.ManifestFileBean;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;

public class SparkDistributedDataScan
extends BaseDistributedDataScan {
    private static final Joiner COMMA = Joiner.on((char)',');
    private static final String DELETE_PLANNING_JOB_GROUP_ID = "DELETE-PLANNING";
    private static final String DATA_PLANNING_JOB_GROUP_ID = "DATA-PLANNING";
    private final SparkSession spark;
    private final JavaSparkContext sparkContext;
    private final SparkReadConf readConf;
    private Broadcast<Table> tableBroadcast = null;

    public SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf) {
        this(spark, table, readConf, table.schema(), SparkDistributedDataScan.newTableScanContext(table));
    }

    private SparkDistributedDataScan(SparkSession spark, Table table, SparkReadConf readConf, Schema schema, TableScanContext context) {
        super(table, schema, context);
        this.spark = spark;
        this.sparkContext = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        this.readConf = readConf;
    }

    protected BatchScan newRefinedScan(Table newTable, Schema newSchema, TableScanContext newContext) {
        return new SparkDistributedDataScan(this.spark, newTable, this.readConf, newSchema, newContext);
    }

    protected int remoteParallelism() {
        return this.readConf.parallelism();
    }

    protected PlanningMode dataPlanningMode() {
        return this.readConf.dataPlanningMode();
    }

    protected boolean shouldCopyRemotelyPlannedDataFiles() {
        return false;
    }

    protected Iterable<CloseableIterable<DataFile>> planDataRemotely(List<ManifestFile> dataManifests, boolean withColumnStats) {
        JobGroupInfo info = new JobGroupInfo(DATA_PLANNING_JOB_GROUP_ID, this.jobDesc("data"));
        return this.withJobGroupInfo(info, () -> this.doPlanDataRemotely(dataManifests, withColumnStats));
    }

    private Iterable<CloseableIterable<DataFile>> doPlanDataRemotely(List<ManifestFile> dataManifests, boolean withColumnStats) {
        this.scanMetrics().scannedDataManifests().increment(dataManifests.size());
        JavaRDD dataFileRDD = this.sparkContext.parallelize(this.toBeans(dataManifests), dataManifests.size()).flatMap((FlatMapFunction)new ReadDataManifest(this.tableBroadcast(), this.context(), withColumnStats));
        List dataFileGroups = this.collectPartitions(dataFileRDD);
        int matchingFilesCount = dataFileGroups.stream().mapToInt(List::size).sum();
        int skippedFilesCount = this.liveFilesCount(dataManifests) - matchingFilesCount;
        this.scanMetrics().skippedDataFiles().increment(skippedFilesCount);
        return Iterables.transform(dataFileGroups, CloseableIterable::withNoopClose);
    }

    protected PlanningMode deletePlanningMode() {
        return this.readConf.deletePlanningMode();
    }

    protected DeleteFileIndex planDeletesRemotely(List<ManifestFile> deleteManifests) {
        JobGroupInfo info = new JobGroupInfo(DELETE_PLANNING_JOB_GROUP_ID, this.jobDesc("deletes"));
        return this.withJobGroupInfo(info, () -> this.doPlanDeletesRemotely(deleteManifests));
    }

    private DeleteFileIndex doPlanDeletesRemotely(List<ManifestFile> deleteManifests) {
        this.scanMetrics().scannedDeleteManifests().increment(deleteManifests.size());
        List deleteFiles = this.sparkContext.parallelize(this.toBeans(deleteManifests), deleteManifests.size()).flatMap((FlatMapFunction)new ReadDeleteManifest(this.tableBroadcast(), this.context())).collect();
        int skippedFilesCount = this.liveFilesCount(deleteManifests) - deleteFiles.size();
        this.scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
        return DeleteFileIndex.builderFor((Iterable)deleteFiles).specsById(this.table().specs()).caseSensitive(this.isCaseSensitive()).scanMetrics(this.scanMetrics()).build();
    }

    private <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
        return JobGroupUtils.withJobGroupInfo(this.sparkContext, info, supplier);
    }

    private String jobDesc(String type) {
        ArrayList options = Lists.newArrayList();
        options.add("snapshot_id=" + this.snapshot().snapshotId());
        String optionsAsString = COMMA.join((Iterable)options);
        return String.format("Planning %s (%s) for %s", type, optionsAsString, this.table().name());
    }

    private List<ManifestFileBean> toBeans(List<ManifestFile> manifests) {
        return manifests.stream().map(ManifestFileBean::fromManifest).collect(Collectors.toList());
    }

    private Broadcast<Table> tableBroadcast() {
        if (this.tableBroadcast == null) {
            Table serializableTable = SerializableTableWithSize.copyOf(this.table());
            this.tableBroadcast = this.sparkContext.broadcast((Object)serializableTable);
        }
        return this.tableBroadcast;
    }

    private <T> List<List<T>> collectPartitions(JavaRDD<T> rdd) {
        int[] partitionIds = IntStream.range(0, rdd.getNumPartitions()).toArray();
        return Arrays.asList(rdd.collectPartitions(partitionIds));
    }

    private int liveFilesCount(List<ManifestFile> manifests) {
        return manifests.stream().mapToInt(this::liveFilesCount).sum();
    }

    private int liveFilesCount(ManifestFile manifest) {
        return manifest.existingFilesCount() + manifest.addedFilesCount();
    }

    private static TableScanContext newTableScanContext(Table table) {
        if (table instanceof BaseTable) {
            MetricsReporter reporter = ((BaseTable)table).reporter();
            return ImmutableTableScanContext.builder().metricsReporter(reporter).build();
        }
        return TableScanContext.empty();
    }

    private static class ReadDeleteManifest
    implements FlatMapFunction<ManifestFileBean, DeleteFile> {
        private final Broadcast<Table> table;
        private final Expression filter;
        private final boolean isCaseSensitive;

        ReadDeleteManifest(Broadcast<Table> table, TableScanContext context) {
            this.table = table;
            this.filter = context.rowFilter();
            this.isCaseSensitive = context.caseSensitive();
        }

        public Iterator<DeleteFile> call(ManifestFileBean manifest) throws Exception {
            FileIO io = ((Table)this.table.value()).io();
            Map specs = ((Table)this.table.value()).specs();
            return new ClosingIterator(ManifestFiles.readDeleteManifest((ManifestFile)manifest, (FileIO)io, (Map)specs).select((Collection)BaseScan.DELETE_SCAN_WITH_STATS_COLUMNS).filterRows(this.filter).caseSensitive(this.isCaseSensitive).iterator());
        }
    }

    private static class ReadDataManifest
    implements FlatMapFunction<ManifestFileBean, DataFile> {
        private final Broadcast<Table> table;
        private final Expression filter;
        private final boolean withStats;
        private final boolean isCaseSensitive;

        ReadDataManifest(Broadcast<Table> table, TableScanContext context, boolean withStats) {
            this.table = table;
            this.filter = context.rowFilter();
            this.withStats = withStats;
            this.isCaseSensitive = context.caseSensitive();
        }

        public Iterator<DataFile> call(ManifestFileBean manifest) throws Exception {
            FileIO io = ((Table)this.table.value()).io();
            Map specs = ((Table)this.table.value()).specs();
            return new ClosingIterator(ManifestFiles.read((ManifestFile)manifest, (FileIO)io, (Map)specs).select((Collection)(this.withStats ? BaseScan.SCAN_WITH_STATS_COLUMNS : BaseScan.SCAN_COLUMNS)).filterRows(this.filter).caseSensitive(this.isCaseSensitive).iterator());
        }
    }
}

