/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.ParameterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlinkTableSource
implements ScanTableSource,
SupportsFilterPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);
    protected static final String FLINK_INFER_SCAN_PARALLELISM = String.format("%s%s", "paimon.", FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
    protected final Table table;
    protected final Options options;
    @Nullable
    protected Predicate predicate;
    @Nullable
    protected PartitionPredicate partitionPredicate;
    @Nullable
    protected int[][] projectFields;
    @Nullable
    protected Long limit;
    protected SplitStatistics splitStatistics;

    public FlinkTableSource(Table table) {
        this(table, null, null, null);
    }

    public FlinkTableSource(Table table, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit) {
        this.table = table;
        this.options = Options.fromMap(table.options());
        this.partitionPredicate = this.getPartitionPredicateWithOptions();
        this.predicate = predicate;
        this.projectFields = projectFields;
        this.limit = limit;
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> filters) {
        List<String> partitionKeys = this.table.partitionKeys();
        RowType rowType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        ArrayList<ResolvedExpression> unConsumedFilters = new ArrayList<ResolvedExpression>();
        ArrayList<ResolvedExpression> consumedFilters = new ArrayList<ResolvedExpression>();
        ArrayList<Predicate> converted = new ArrayList<Predicate>();
        PartitionPredicateVisitor onlyPartFieldsVisitor = new PartitionPredicateVisitor(partitionKeys);
        for (ResolvedExpression filter : filters) {
            Optional<Predicate> predicateOptional = PredicateConverter.convert(rowType, filter);
            if (!predicateOptional.isPresent()) {
                unConsumedFilters.add(filter);
                continue;
            }
            Predicate p = predicateOptional.get();
            if (this.isUnbounded() || !p.visit(onlyPartFieldsVisitor).booleanValue()) {
                unConsumedFilters.add(filter);
            } else {
                consumedFilters.add(filter);
            }
            converted.add(p);
        }
        this.predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
        LOG.info("Consumed filters: {} of {}", consumedFilters, filters);
        return SupportsFilterPushDown.Result.of(filters, unConsumedFilters);
    }

    private PartitionPredicate getPartitionPredicateWithOptions() {
        if (this.options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
            try {
                Predicate predicate = PartitionPredicate.createPartitionPredicate(ParameterUtils.getPartitions(this.options.get(FlinkConnectorOptions.SCAN_PARTITIONS).split(";")), this.table.rowType(), this.options.get(CoreOptions.PARTITION_DEFAULT_NAME));
                Predicate transformed = PredicateBuilder.transformFieldMapping(predicate, PredicateBuilder.fieldIdxToPartitionIdx(this.table.rowType(), this.table.partitionKeys())).orElseThrow(() -> new RuntimeException("Failed to transform the partition predicate " + predicate));
                return PartitionPredicate.fromPredicate(this.table.rowType().project(this.table.partitionKeys()), transformed);
            }
            catch (IllegalArgumentException e) {
                return null;
            }
        }
        return null;
    }

    public boolean supportsNestedProjection() {
        return true;
    }

    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
        this.projectFields = projectedFields;
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }

    public abstract boolean isUnbounded();

    @Nullable
    protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
        Integer parallelism;
        Configuration envConfig = (Configuration)env.getConfiguration();
        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
            this.options.set(FlinkConnectorOptions.INFER_SCAN_PARALLELISM, Boolean.parseBoolean((String)envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
        }
        if ((parallelism = this.options.get(FlinkConnectorOptions.SCAN_PARALLELISM)) == null && env.getParallelism() == -1 && this.options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM).booleanValue()) {
            if (this.isUnbounded()) {
                if (this.options.get(CoreOptions.BUCKET) == -1) {
                    return null;
                }
                parallelism = Math.max(1, this.options.get(CoreOptions.BUCKET));
            } else {
                this.scanSplitsForInference();
                parallelism = this.splitStatistics.splitNumber();
                if (null != this.limit && this.limit > 0L) {
                    int limitCount = this.limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : this.limit.intValue();
                    parallelism = Math.min(parallelism, limitCount);
                }
                parallelism = Math.max(1, parallelism);
                parallelism = Math.min(parallelism, this.options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
            }
        }
        return parallelism;
    }

    protected void scanSplitsForInference() {
        if (this.splitStatistics == null) {
            if (this.table instanceof DataTable) {
                List<PartitionEntry> partitionEntries = this.table.newReadBuilder().dropStats().withFilter(this.predicate).withPartitionFilter(this.partitionPredicate).newScan().listPartitionEntries();
                long totalSize = 0L;
                long rowCount = 0L;
                for (PartitionEntry entry : partitionEntries) {
                    totalSize += entry.fileSizeInBytes();
                    rowCount += entry.recordCount();
                }
                long splitTargetSize = ((DataTable)this.table).coreOptions().splitTargetSize();
                this.splitStatistics = new SplitStatistics((int)(totalSize / splitTargetSize + 1L), rowCount);
            } else {
                List<Split> splits = this.table.newReadBuilder().dropStats().withFilter(this.predicate).withPartitionFilter(this.partitionPredicate).withProjection(new int[0]).newScan().plan().splits();
                this.splitStatistics = new SplitStatistics(splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
            }
        }
    }

    public Table getTable() {
        return this.table;
    }

    protected static class SplitStatistics {
        private final int splitNumber;
        private final long totalRowCount;

        protected SplitStatistics(int splitNumber, long totalRowCount) {
            this.splitNumber = splitNumber;
            this.totalRowCount = totalRowCount;
        }

        public int splitNumber() {
            return this.splitNumber;
        }

        public long totalRowCount() {
            return this.totalRowCount;
        }
    }
}

