/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Scan;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.True;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.spark.source.SparkPartitioningAwareScan;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkBatchQueryScan
extends SparkPartitioningAwareScan<PartitionScanTask>
implements SupportsRuntimeV2Filtering {
    private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class);
    private final Long snapshotId;
    private final Long startSnapshotId;
    private final Long endSnapshotId;
    private final Long asOfTimestamp;
    private final String tag;
    private final List<Expression> runtimeFilterExpressions;

    SparkBatchQueryScan(SparkSession spark, Table table, Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan, SparkReadConf readConf, Schema expectedSchema, List<Expression> filters, Supplier<ScanReport> scanReportSupplier) {
        super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier);
        this.snapshotId = readConf.snapshotId();
        this.startSnapshotId = readConf.startSnapshotId();
        this.endSnapshotId = readConf.endSnapshotId();
        this.asOfTimestamp = readConf.asOfTimestamp();
        this.tag = readConf.tag();
        this.runtimeFilterExpressions = Lists.newArrayList();
    }

    Long snapshotId() {
        return this.snapshotId;
    }

    @Override
    protected Class<PartitionScanTask> taskJavaClass() {
        return PartitionScanTask.class;
    }

    public NamedReference[] filterAttributes() {
        HashSet partitionFieldSourceIds = Sets.newHashSet();
        for (PartitionSpec spec : this.specs()) {
            for (PartitionField field : spec.fields()) {
                partitionFieldSourceIds.add(field.sourceId());
            }
        }
        Map<Integer, String> quotedNameById = SparkSchemaUtil.indexQuotedNameById(this.expectedSchema());
        return (NamedReference[])partitionFieldSourceIds.stream().filter(fieldId -> this.expectedSchema().findField(fieldId.intValue()) != null).map(fieldId -> Spark3Util.toNamedReference((String)quotedNameById.get(fieldId))).toArray(NamedReference[]::new);
    }

    public void filter(Predicate[] predicates) {
        Expression runtimeFilterExpr = this.convertRuntimeFilters(predicates);
        if (runtimeFilterExpr != Expressions.alwaysTrue()) {
            HashMap evaluatorsBySpecId = Maps.newHashMap();
            for (PartitionSpec spec : this.specs()) {
                Expression inclusiveExpr = Projections.inclusive((PartitionSpec)spec, (boolean)this.caseSensitive()).project(runtimeFilterExpr);
                Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);
                evaluatorsBySpecId.put(spec.specId(), inclusive);
            }
            List filteredTasks = this.tasks().stream().filter(task -> {
                Evaluator evaluator = (Evaluator)evaluatorsBySpecId.get(task.spec().specId());
                return evaluator.eval(task.partition());
            }).collect(Collectors.toList());
            LOG.info("{} of {} task(s) for table {} matched runtime filter {}", new Object[]{filteredTasks.size(), this.tasks().size(), this.table().name(), ExpressionUtil.toSanitizedString((Expression)runtimeFilterExpr)});
            if (filteredTasks.size() < this.tasks().size()) {
                this.resetTasks(filteredTasks);
            }
            this.runtimeFilterExpressions.add(runtimeFilterExpr);
        }
    }

    private Expression convertRuntimeFilters(Predicate[] predicates) {
        True runtimeFilterExpr = Expressions.alwaysTrue();
        for (Predicate predicate : predicates) {
            Expression expr = SparkV2Filters.convert(predicate);
            if (expr != null) {
                try {
                    Binder.bind((Types.StructType)this.expectedSchema().asStruct(), (Expression)expr, (boolean)this.caseSensitive());
                    runtimeFilterExpr = Expressions.and((Expression)runtimeFilterExpr, (Expression)expr);
                }
                catch (ValidationException e) {
                    LOG.warn("Failed to bind {} to expected schema, skipping runtime filter", (Object)expr, (Object)e);
                }
                continue;
            }
            LOG.warn("Unsupported runtime filter {}", (Object)predicate);
        }
        return runtimeFilterExpr;
    }

    @Override
    public Statistics estimateStatistics() {
        if (this.scan() == null) {
            return this.estimateStatistics(null);
        }
        if (this.snapshotId != null) {
            Snapshot snapshot = this.table().snapshot(this.snapshotId.longValue());
            return this.estimateStatistics(snapshot);
        }
        if (this.asOfTimestamp != null) {
            long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime((Table)this.table(), (long)this.asOfTimestamp);
            Snapshot snapshot = this.table().snapshot(snapshotIdAsOfTime);
            return this.estimateStatistics(snapshot);
        }
        if (this.branch() != null) {
            Snapshot snapshot = this.table().snapshot(this.branch());
            return this.estimateStatistics(snapshot);
        }
        if (this.tag != null) {
            Snapshot snapshot = this.table().snapshot(this.tag);
            return this.estimateStatistics(snapshot);
        }
        Snapshot snapshot = this.table().currentSnapshot();
        return this.estimateStatistics(snapshot);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        SparkBatchQueryScan that = (SparkBatchQueryScan)o;
        return this.table().name().equals(that.table().name()) && Objects.equals(this.branch(), that.branch()) && this.readSchema().equals((Object)that.readSchema()) && this.filterExpressions().toString().equals(that.filterExpressions().toString()) && this.runtimeFilterExpressions.toString().equals(that.runtimeFilterExpressions.toString()) && Objects.equals(this.snapshotId, that.snapshotId) && Objects.equals(this.startSnapshotId, that.startSnapshotId) && Objects.equals(this.endSnapshotId, that.endSnapshotId) && Objects.equals(this.asOfTimestamp, that.asOfTimestamp) && Objects.equals(this.tag, that.tag);
    }

    public int hashCode() {
        return Objects.hash(this.table().name(), this.branch(), this.readSchema(), this.filterExpressions().toString(), this.runtimeFilterExpressions.toString(), this.snapshotId, this.startSnapshotId, this.endSnapshotId, this.asOfTimestamp, this.tag);
    }

    public String toString() {
        return String.format("IcebergScan(table=%s, branch=%s, type=%s, filters=%s, runtimeFilters=%s, caseSensitive=%s)", this.table(), this.branch(), this.expectedSchema().asStruct(), this.filterExpressions(), this.runtimeFilterExpressions, this.caseSensitive());
    }
}

