/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.SparkBatchQueryScan;
import org.apache.iceberg.spark.source.SparkCopyOnWriteScan;
import org.apache.iceberg.spark.source.SparkScan;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkScanBuilder
implements ScanBuilder,
SupportsPushDownFilters,
SupportsPushDownRequiredColumns,
SupportsReportStatistics {
    private static final Filter[] NO_FILTERS = new Filter[0];
    private final SparkSession spark;
    private final Table table;
    private final CaseInsensitiveStringMap options;
    private final SparkReadConf readConf;
    private final List<String> metaColumns = Lists.newArrayList();
    private Schema schema = null;
    private boolean caseSensitive;
    private List<Expression> filterExpressions = null;
    private Filter[] pushedFilters = NO_FILTERS;

    SparkScanBuilder(SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) {
        this.spark = spark;
        this.table = table;
        this.schema = schema;
        this.options = options;
        this.readConf = new SparkReadConf(spark, table, (Map<String, String>)options);
        this.caseSensitive = this.readConf.caseSensitive();
    }

    SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
        this(spark, table, table.schema(), options);
    }

    private Expression filterExpression() {
        if (this.filterExpressions != null) {
            return (Expression)this.filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
        }
        return Expressions.alwaysTrue();
    }

    public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
        this.caseSensitive = isCaseSensitive;
        return this;
    }

    public Filter[] pushFilters(Filter[] filters) {
        ArrayList expressions = Lists.newArrayListWithExpectedSize((int)filters.length);
        ArrayList pushed = Lists.newArrayListWithExpectedSize((int)filters.length);
        for (Filter filter : filters) {
            Expression expr = null;
            try {
                expr = SparkFilters.convert(filter);
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
            if (expr == null) continue;
            try {
                Binder.bind((Types.StructType)this.schema.asStruct(), (Expression)expr, (boolean)this.caseSensitive);
                expressions.add(expr);
                pushed.add(filter);
            }
            catch (ValidationException validationException) {
                // empty catch block
            }
        }
        this.filterExpressions = expressions;
        this.pushedFilters = pushed.toArray(new Filter[0]);
        return filters;
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType requestedSchema) {
        StructType requestedProjection = new StructType((StructField[])Stream.of(requestedSchema.fields()).filter(field -> MetadataColumns.nonMetadataColumn((String)field.name())).toArray(StructField[]::new));
        this.schema = SparkSchemaUtil.prune(this.schema, requestedProjection, this.filterExpression(), this.caseSensitive);
        Stream.of(requestedSchema.fields()).map(StructField::name).filter(MetadataColumns::isMetadataColumn).distinct().forEach(this.metaColumns::add);
    }

    private Schema schemaWithMetadataColumns() {
        List fields = this.metaColumns.stream().distinct().map(name -> MetadataColumns.metadataColumn((Table)this.table, (String)name)).collect(Collectors.toList());
        Schema meta = new Schema(fields);
        return TypeUtil.join((Schema)this.schema, (Schema)meta);
    }

    public Scan build() {
        Long snapshotId = this.readConf.snapshotId();
        Long asOfTimestamp = this.readConf.asOfTimestamp();
        Preconditions.checkArgument((snapshotId == null || asOfTimestamp == null ? 1 : 0) != 0, (String)"Cannot set both %s and %s to select which table snapshot to scan", (Object)"snapshot-id", (Object)"as-of-timestamp");
        Long startSnapshotId = this.readConf.startSnapshotId();
        Long endSnapshotId = this.readConf.endSnapshotId();
        if (snapshotId != null || asOfTimestamp != null) {
            Preconditions.checkArgument((startSnapshotId == null && endSnapshotId == null ? 1 : 0) != 0, (String)"Cannot set %s and %s for incremental scans when either %s or %s is set", (Object)"start-snapshot-id", (Object)"end-snapshot-id", (Object)"snapshot-id", (Object)"as-of-timestamp");
        }
        Preconditions.checkArgument((startSnapshotId != null || endSnapshotId == null ? 1 : 0) != 0, (String)"Cannot set only %s for incremental scans. Please, set %s too.", (Object)"end-snapshot-id", (Object)"start-snapshot-id");
        Schema expectedSchema = this.schemaWithMetadataColumns();
        TableScan scan = (TableScan)((TableScan)((TableScan)this.table.newScan().caseSensitive(this.caseSensitive)).filter(this.filterExpression())).project(expectedSchema);
        if (snapshotId != null) {
            scan = scan.useSnapshot(snapshotId.longValue());
        }
        if (asOfTimestamp != null) {
            scan = scan.asOfTime(asOfTimestamp.longValue());
        }
        if (startSnapshotId != null) {
            scan = endSnapshotId != null ? scan.appendsBetween(startSnapshotId.longValue(), endSnapshotId.longValue()) : scan.appendsAfter(startSnapshotId.longValue());
        }
        scan = this.configureSplitPlanning(scan);
        return new SparkBatchQueryScan(this.spark, this.table, scan, this.readConf, expectedSchema, this.filterExpressions);
    }

    public Scan buildMergeOnReadScan() {
        Preconditions.checkArgument((this.readConf.snapshotId() == null && this.readConf.asOfTimestamp() == null ? 1 : 0) != 0, (String)"Cannot set time travel options %s and %s for row-level command scans", (Object)"snapshot-id", (Object)"as-of-timestamp");
        Preconditions.checkArgument((this.readConf.startSnapshotId() == null && this.readConf.endSnapshotId() == null ? 1 : 0) != 0, (String)"Cannot set incremental scan options %s and %s for row-level command scans", (Object)"start-snapshot-id", (Object)"end-snapshot-id");
        Snapshot snapshot = this.table.currentSnapshot();
        if (snapshot == null) {
            return new SparkBatchQueryScan(this.spark, this.table, null, this.readConf, this.schemaWithMetadataColumns(), this.filterExpressions);
        }
        long snapshotId = snapshot.snapshotId();
        CaseInsensitiveStringMap adjustedOptions = Spark3Util.setOption("snapshot-id", Long.toString(snapshotId), this.options);
        SparkReadConf adjustedReadConf = new SparkReadConf(this.spark, this.table, (Map<String, String>)adjustedOptions);
        Schema expectedSchema = this.schemaWithMetadataColumns();
        TableScan scan = (TableScan)((TableScan)((TableScan)this.table.newScan().useSnapshot(snapshotId).caseSensitive(this.caseSensitive)).filter(this.filterExpression())).project(expectedSchema);
        scan = this.configureSplitPlanning(scan);
        return new SparkBatchQueryScan(this.spark, this.table, scan, adjustedReadConf, expectedSchema, this.filterExpressions);
    }

    public Scan buildCopyOnWriteScan() {
        Snapshot snapshot = this.table.currentSnapshot();
        if (snapshot == null) {
            return new SparkCopyOnWriteScan(this.spark, this.table, this.readConf, this.schemaWithMetadataColumns(), this.filterExpressions);
        }
        Schema expectedSchema = this.schemaWithMetadataColumns();
        TableScan scan = (TableScan)((TableScan)((TableScan)((TableScan)this.table.newScan().useSnapshot(snapshot.snapshotId()).ignoreResiduals()).caseSensitive(this.caseSensitive)).filter(this.filterExpression())).project(expectedSchema);
        scan = this.configureSplitPlanning(scan);
        return new SparkCopyOnWriteScan(this.spark, this.table, scan, snapshot, this.readConf, expectedSchema, this.filterExpressions);
    }

    private TableScan configureSplitPlanning(TableScan scan) {
        Long splitOpenFileCost;
        Integer splitLookback;
        TableScan configuredScan = scan;
        Long splitSize = this.readConf.splitSizeOption();
        if (splitSize != null) {
            configuredScan = (TableScan)configuredScan.option("read.split.target-size", String.valueOf(splitSize));
        }
        if ((splitLookback = this.readConf.splitLookbackOption()) != null) {
            configuredScan = (TableScan)configuredScan.option("read.split.planning-lookback", String.valueOf(splitLookback));
        }
        if ((splitOpenFileCost = this.readConf.splitOpenFileCostOption()) != null) {
            configuredScan = (TableScan)configuredScan.option("read.split.open-file-cost", String.valueOf(splitOpenFileCost));
        }
        return configuredScan;
    }

    public Statistics estimateStatistics() {
        return ((SparkScan)this.build()).estimateStatistics();
    }

    public StructType readSchema() {
        return this.build().readSchema();
    }
}

