/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinTask;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.BucketSelector;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;

public abstract class AbstractFileStoreScan
implements FileStoreScan {
    private final FieldStatsArraySerializer partitionStatsConverter;
    private final RowDataToObjectArrayConverter partitionConverter;
    protected final RowType bucketKeyType;
    private final SnapshotManager snapshotManager;
    private final ManifestFile.Factory manifestFileFactory;
    private final ManifestList manifestList;
    private final int numOfBuckets;
    private final boolean checkNumOfBuckets;
    private final ConcurrentMap<Long, TableSchema> tableSchemas;
    private final SchemaManager schemaManager;
    private Predicate partitionFilter;
    private BucketSelector bucketSelector;
    private Long specifiedSnapshotId = null;
    private Integer specifiedBucket = null;
    private List<ManifestFileMeta> specifiedManifests = null;
    private ScanKind scanKind = ScanKind.ALL;
    private Filter<Integer> levelFilter = null;
    private ManifestCacheFilter manifestCacheFilter = null;

    public AbstractFileStoreScan(RowType partitionType, RowType bucketKeyType, SnapshotManager snapshotManager, SchemaManager schemaManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets) {
        this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
        this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
        Preconditions.checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys should not be empty.");
        this.bucketKeyType = bucketKeyType;
        this.snapshotManager = snapshotManager;
        this.schemaManager = schemaManager;
        this.manifestFileFactory = manifestFileFactory;
        this.manifestList = manifestListFactory.create();
        this.numOfBuckets = numOfBuckets;
        this.checkNumOfBuckets = checkNumOfBuckets;
        this.tableSchemas = new ConcurrentHashMap<Long, TableSchema>();
    }

    @Override
    public FileStoreScan withPartitionFilter(Predicate predicate) {
        this.partitionFilter = predicate;
        return this;
    }

    protected FileStoreScan withBucketKeyFilter(Predicate predicate) {
        this.bucketSelector = BucketSelector.create(predicate, this.bucketKeyType).orElse(null);
        return this;
    }

    @Override
    public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
        PredicateBuilder builder = new PredicateBuilder(this.partitionConverter.rowType());
        Function<BinaryRow, Predicate> partitionToPredicate = p -> {
            ArrayList<Predicate> fieldPredicates = new ArrayList<Predicate>();
            Object[] partitionObjects = this.partitionConverter.convert((InternalRow)p);
            for (int i = 0; i < this.partitionConverter.getArity(); ++i) {
                Object partition = partitionObjects[i];
                fieldPredicates.add(builder.equal(i, partition));
            }
            return PredicateBuilder.and(fieldPredicates);
        };
        List<Predicate> predicates = partitions.stream().filter(p -> p.getFieldCount() > 0).map(partitionToPredicate).collect(Collectors.toList());
        if (predicates.isEmpty()) {
            return this;
        }
        return this.withPartitionFilter(PredicateBuilder.or(predicates));
    }

    @Override
    public FileStoreScan withBucket(int bucket) {
        this.specifiedBucket = bucket;
        return this;
    }

    @Override
    public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
        if (this.manifestCacheFilter != null) {
            Preconditions.checkArgument(this.manifestCacheFilter.test(partition, bucket), String.format("This is a bug! The partition %s and bucket %s is filtered!", partition, bucket));
        }
        this.withPartitionFilter(Collections.singletonList(partition));
        this.withBucket(bucket);
        return this;
    }

    @Override
    public FileStoreScan withSnapshot(long snapshotId) {
        this.specifiedSnapshotId = snapshotId;
        if (this.specifiedManifests != null) {
            throw new IllegalStateException("Cannot set both snapshot id and manifests.");
        }
        return this;
    }

    @Override
    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
        this.specifiedManifests = manifests;
        if (this.specifiedSnapshotId != null) {
            throw new IllegalStateException("Cannot set both snapshot id and manifests.");
        }
        return this;
    }

    @Override
    public FileStoreScan withKind(ScanKind scanKind) {
        this.scanKind = scanKind;
        return this;
    }

    @Override
    public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
        this.levelFilter = levelFilter;
        return this;
    }

    @Override
    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter) {
        this.manifestCacheFilter = manifestFilter;
        return this;
    }

    @Override
    public FileStoreScan.Plan plan() {
        List entries;
        List<ManifestFileMeta> manifests = this.specifiedManifests;
        Long snapshotId = this.specifiedSnapshotId;
        if (manifests == null) {
            if (snapshotId == null) {
                snapshotId = this.snapshotManager.latestSnapshotId();
            }
            if (snapshotId == null) {
                manifests = Collections.emptyList();
            } else {
                Snapshot snapshot = this.snapshotManager.snapshot(snapshotId);
                manifests = this.readManifests(snapshot);
            }
        }
        final Long readSnapshot = snapshotId;
        List<ManifestFileMeta> readManifests = manifests;
        try {
            entries = (List)((ForkJoinTask)FileUtils.COMMON_IO_FORK_JOIN_POOL.submit(() -> readManifests.parallelStream().filter(this::filterManifestFileMeta).flatMap(m -> this.readManifestFileMeta((ManifestFileMeta)m).stream()).filter(this::filterByStats).collect(Collectors.toList()))).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
        }
        final ArrayList<ManifestEntry> files = new ArrayList<ManifestEntry>();
        for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
            if (this.checkNumOfBuckets && file.totalBuckets() != this.numOfBuckets) {
                String partInfo = this.partitionConverter.getArity() > 0 ? "partition " + FileStorePathFactory.getPartitionComputer(this.partitionConverter.rowType(), FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue()).generatePartValues(file.partition()) : "table";
                throw new RuntimeException(String.format("Try to write %s with a new bucket num %d, but the previous bucket num is %d. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", partInfo, this.numOfBuckets, file.totalBuckets()));
            }
            if (!this.filterByBucket(file) || !this.filterByBucketSelector(file) || !this.filterByLevel(file)) continue;
            files.add(file);
        }
        return new FileStoreScan.Plan(){

            @Override
            @Nullable
            public Long snapshotId() {
                return readSnapshot;
            }

            @Override
            public List<ManifestEntry> files() {
                return files;
            }
        };
    }

    private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
        switch (this.scanKind) {
            case ALL: {
                return snapshot.dataManifests(this.manifestList);
            }
            case DELTA: {
                return snapshot.deltaManifests(this.manifestList);
            }
            case CHANGELOG: {
                if (snapshot.version() > 1) {
                    return snapshot.changelogManifests(this.manifestList);
                }
                if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
                    return snapshot.deltaManifests(this.manifestList);
                }
                throw new IllegalStateException(String.format("Incremental scan does not accept %s snapshot", new Object[]{snapshot.commitKind()}));
            }
        }
        throw new UnsupportedOperationException("Unknown scan kind " + this.scanKind.name());
    }

    protected TableSchema scanTableSchema(long id) {
        return this.tableSchemas.computeIfAbsent(id, key -> this.schemaManager.schema(id));
    }

    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
        return this.partitionFilter == null || this.partitionFilter.test(manifest.numAddedFiles() + manifest.numDeletedFiles(), manifest.partitionStats().fields(this.partitionStatsConverter));
    }

    private boolean filterByBucket(ManifestEntry entry) {
        return this.specifiedBucket == null || entry.bucket() == this.specifiedBucket.intValue();
    }

    private boolean filterByBucketSelector(ManifestEntry entry) {
        return this.bucketSelector == null || this.bucketSelector.select(entry.bucket(), entry.totalBuckets());
    }

    private boolean filterByLevel(ManifestEntry entry) {
        return this.levelFilter == null || this.levelFilter.test(entry.file().level());
    }

    protected abstract boolean filterByStats(ManifestEntry var1);

    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
        return this.manifestFileFactory.create().read(manifest.fileName(), this.manifestCacheRowFilter(), this.manifestEntryRowFilter());
    }

    private Filter<InternalRow> manifestEntryRowFilter() {
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> totalBucketGetter = ManifestEntrySerializer.totalBucketGetter();
        return row -> {
            if (this.partitionFilter != null && !this.partitionFilter.test(this.partitionConverter.convert((InternalRow)partitionGetter.apply((InternalRow)row)))) {
                return false;
            }
            if (this.specifiedBucket != null && this.numOfBuckets == (Integer)totalBucketGetter.apply((InternalRow)row)) {
                return this.specifiedBucket.intValue() == ((Integer)bucketGetter.apply((InternalRow)row)).intValue();
            }
            return true;
        };
    }

    private Filter<InternalRow> manifestCacheRowFilter() {
        if (this.manifestCacheFilter == null) {
            return Filter.alwaysTrue();
        }
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> totalBucketGetter = ManifestEntrySerializer.totalBucketGetter();
        return row -> {
            if (this.numOfBuckets != (Integer)totalBucketGetter.apply((InternalRow)row)) {
                return true;
            }
            return this.manifestCacheFilter.test((BinaryRow)partitionGetter.apply((InternalRow)row), (Integer)bucketGetter.apply((InternalRow)row));
        };
    }
}

