/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.ThreadPoolUtils;

public abstract class AbstractFileStoreScan
implements FileStoreScan {
    private final ManifestsReader manifestsReader;
    private final SnapshotManager snapshotManager;
    private final ManifestFile.Factory manifestFileFactory;
    private final Integer parallelism;
    private final ConcurrentMap<Long, TableSchema> tableSchemas;
    private final SchemaManager schemaManager;
    private final TableSchema schema;
    private Snapshot specifiedSnapshot = null;
    private Filter<Integer> bucketFilter = null;
    private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
    protected ScanMode scanMode = ScanMode.ALL;
    private Filter<Integer> levelFilter = null;
    private Filter<ManifestEntry> manifestEntryFilter = null;
    private Filter<String> fileNameFilter = null;
    private ManifestCacheFilter manifestCacheFilter = null;
    private ScanMetrics scanMetrics = null;
    private boolean dropStats;

    public AbstractFileStoreScan(ManifestsReader manifestsReader, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, ManifestFile.Factory manifestFileFactory, @Nullable Integer parallelism) {
        this.manifestsReader = manifestsReader;
        this.snapshotManager = snapshotManager;
        this.schemaManager = schemaManager;
        this.schema = schema;
        this.manifestFileFactory = manifestFileFactory;
        this.tableSchemas = new ConcurrentHashMap<Long, TableSchema>();
        this.parallelism = parallelism;
        this.dropStats = false;
    }

    @Override
    public FileStoreScan withPartitionFilter(Predicate predicate) {
        this.manifestsReader.withPartitionFilter(predicate);
        return this;
    }

    @Override
    public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
        this.manifestsReader.withPartitionFilter(partitions);
        return this;
    }

    @Override
    public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
        this.manifestsReader.withPartitionFilter(predicate);
        return this;
    }

    @Override
    public FileStoreScan withBucket(int bucket) {
        this.bucketFilter = i -> i == bucket;
        return this;
    }

    @Override
    public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
        this.bucketFilter = bucketFilter;
        return this;
    }

    @Override
    public FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> totalAwareBucketFilter) {
        this.totalAwareBucketFilter = totalAwareBucketFilter;
        return this;
    }

    @Override
    public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
        if (this.manifestCacheFilter != null && this.manifestFileFactory.isCacheEnabled()) {
            Preconditions.checkArgument(this.manifestCacheFilter.test(partition, bucket), String.format("This is a bug! The partition %s and bucket %s is filtered!", partition, bucket));
        }
        this.withPartitionFilter(Collections.singletonList(partition));
        this.withBucket(bucket);
        return this;
    }

    @Override
    public FileStoreScan withSnapshot(long snapshotId) {
        this.specifiedSnapshot = this.snapshotManager.snapshot(snapshotId);
        return this;
    }

    @Override
    public FileStoreScan withSnapshot(Snapshot snapshot) {
        this.specifiedSnapshot = snapshot;
        return this;
    }

    @Override
    public FileStoreScan withKind(ScanMode scanMode) {
        this.scanMode = scanMode;
        return this;
    }

    @Override
    public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
        this.levelFilter = levelFilter;
        return this;
    }

    @Override
    public FileStoreScan enableValueFilter() {
        return this;
    }

    @Override
    public FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter) {
        this.manifestEntryFilter = filter;
        return this;
    }

    @Override
    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter) {
        this.manifestCacheFilter = manifestFilter;
        return this;
    }

    @Override
    public FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter) {
        this.fileNameFilter = fileNameFilter;
        return this;
    }

    @Override
    public FileStoreScan withMetrics(ScanMetrics metrics) {
        this.scanMetrics = metrics;
        return this;
    }

    @Override
    public FileStoreScan dropStats() {
        this.dropStats = true;
        return this;
    }

    @Override
    @Nullable
    public Integer parallelism() {
        return this.parallelism;
    }

    @Override
    public ManifestsReader manifestsReader() {
        return this.manifestsReader;
    }

    @Override
    public FileStoreScan.Plan plan() {
        long started = System.nanoTime();
        ManifestsReader.Result manifestsResult = this.readManifests();
        final Snapshot snapshot = manifestsResult.snapshot;
        List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
        Iterator<ManifestEntry> iterator = this.readManifestEntries(manifests, false);
        List<ManifestEntry> files = new ArrayList<ManifestEntry>();
        while (iterator.hasNext()) {
            files.add(iterator.next());
        }
        if (this.wholeBucketFilterEnabled()) {
            files = files.stream().collect(Collectors.groupingBy(file -> Pair.of(file.partition(), file.bucket()), LinkedHashMap::new, Collectors.toList())).values().stream().map(this::filterWholeBucketByStats).flatMap(Collection::stream).collect(Collectors.toList());
        }
        final ArrayList<ManifestEntry> result = files;
        long scanDuration = (System.nanoTime() - started) / 1000000L;
        if (this.scanMetrics != null) {
            long allDataFiles = manifestsResult.allManifests.stream().mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles()).sum();
            this.scanMetrics.reportScan(new ScanStats(scanDuration, manifests.size(), allDataFiles - (long)result.size(), result.size()));
        }
        return new FileStoreScan.Plan(){

            @Override
            @Nullable
            public Long watermark() {
                return snapshot == null ? null : snapshot.watermark();
            }

            @Override
            @Nullable
            public Snapshot snapshot() {
                return snapshot;
            }

            @Override
            public List<ManifestEntry> files() {
                return result;
            }
        };
    }

    @Override
    public List<SimpleFileEntry> readSimpleEntries() {
        List<ManifestFileMeta> manifests = this.readManifests().filteredManifests;
        Iterator iterator = this.scanMode == ScanMode.ALL ? this.readAndMergeFileEntries(manifests, SimpleFileEntry::from, false) : this.readAndNoMergeFileEntries(manifests, SimpleFileEntry::from, false);
        ArrayList<SimpleFileEntry> result = new ArrayList<SimpleFileEntry>();
        while (iterator.hasNext()) {
            result.add((SimpleFileEntry)iterator.next());
        }
        return result;
    }

    @Override
    public List<PartitionEntry> readPartitionEntries() {
        List<ManifestFileMeta> manifests = this.readManifests().filteredManifests;
        ConcurrentHashMap partitions = new ConcurrentHashMap();
        Consumer<ManifestFileMeta> processor = m -> PartitionEntry.merge(PartitionEntry.merge(this.readManifest((ManifestFileMeta)m)), partitions);
        ThreadPoolUtils.randomlyOnlyExecute(ManifestReadThreadPool.getExecutorService(this.parallelism), processor, manifests);
        return partitions.values().stream().filter(p -> p.fileCount() > 0L).collect(Collectors.toList());
    }

    @Override
    public List<BucketEntry> readBucketEntries() {
        List<ManifestFileMeta> manifests = this.readManifests().filteredManifests;
        ConcurrentHashMap buckets = new ConcurrentHashMap();
        Consumer<ManifestFileMeta> processor = m -> BucketEntry.merge(BucketEntry.merge(this.readManifest((ManifestFileMeta)m)), buckets);
        ThreadPoolUtils.randomlyOnlyExecute(ManifestReadThreadPool.getExecutorService(this.parallelism), processor, manifests);
        return buckets.values().stream().filter(p -> p.fileCount() > 0L).collect(Collectors.toList());
    }

    @Override
    public Iterator<ManifestEntry> readFileIterator() {
        return this.readManifestEntries(this.readManifests().filteredManifests, true);
    }

    private Iterator<ManifestEntry> readManifestEntries(List<ManifestFileMeta> manifests, boolean useSequential) {
        return this.scanMode == ScanMode.ALL ? this.readAndMergeFileEntries(manifests, Function.identity(), useSequential) : this.readAndNoMergeFileEntries(manifests, Function.identity(), useSequential);
    }

    private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(List<ManifestFileMeta> manifests, Function<List<ManifestEntry>, List<T>> converter, boolean useSequential) {
        Set<FileEntry.Identifier> deletedEntries = FileEntry.readDeletedEntries(manifest -> this.readManifest((ManifestFileMeta)manifest, FileEntry.deletedFilter(), null), manifests, this.parallelism);
        manifests = manifests.stream().filter(file -> file.numAddedFiles() > 0L).collect(Collectors.toList());
        Function<ManifestFileMeta, List> processor = manifest -> (List)converter.apply(this.readManifest((ManifestFileMeta)manifest, FileEntry.addFilter(), entry -> !deletedEntries.contains(entry.identifier())));
        if (useSequential) {
            return ManifestReadThreadPool.sequentialBatchedExecute(processor, manifests, this.parallelism).iterator();
        }
        return ManifestReadThreadPool.randomlyExecuteSequentialReturn(processor, manifests, this.parallelism);
    }

    private <T extends FileEntry> Iterator<T> readAndNoMergeFileEntries(List<ManifestFileMeta> manifests, Function<List<ManifestEntry>, List<T>> converter, boolean useSequential) {
        Function<ManifestFileMeta, List> reader = manifest -> (List)converter.apply(this.readManifest((ManifestFileMeta)manifest));
        if (useSequential) {
            return ManifestReadThreadPool.sequentialBatchedExecute(reader, manifests, this.parallelism).iterator();
        }
        return ManifestReadThreadPool.randomlyExecuteSequentialReturn(reader, manifests, this.parallelism);
    }

    private ManifestsReader.Result readManifests() {
        return this.manifestsReader.read(this.specifiedSnapshot, this.scanMode);
    }

    protected TableSchema scanTableSchema(long id) {
        return this.tableSchemas.computeIfAbsent(id, key -> key.longValue() == this.schema.id() ? this.schema : this.schemaManager.schema(id));
    }

    protected abstract boolean filterByStats(ManifestEntry var1);

    protected boolean wholeBucketFilterEnabled() {
        return false;
    }

    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> entries) {
        return entries;
    }

    @Override
    public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
        return this.readManifest(manifest, null, null);
    }

    private List<ManifestEntry> readManifest(ManifestFileMeta manifest, @Nullable Filter<InternalRow> additionalFilter, @Nullable Filter<ManifestEntry> additionalTFilter) {
        List<ManifestEntry> entries = this.manifestFileFactory.create().withCacheMetrics(this.scanMetrics != null ? this.scanMetrics.getCacheMetrics() : null).read(manifest.fileName(), manifest.fileSize(), this.createCacheRowFilter(), this.createEntryRowFilter().and(additionalFilter), entry -> !(additionalTFilter != null && !additionalTFilter.test((ManifestEntry)entry) || this.manifestEntryFilter != null && !this.manifestEntryFilter.test((ManifestEntry)entry) || !this.filterByStats((ManifestEntry)entry)));
        if (this.dropStats) {
            ArrayList<ManifestEntry> copied = new ArrayList<ManifestEntry>(entries.size());
            for (ManifestEntry entry2 : entries) {
                copied.add(this.dropStats(entry2));
            }
            entries = copied;
        }
        return entries;
    }

    protected ManifestEntry dropStats(ManifestEntry entry) {
        return entry.copyWithoutStats();
    }

    private Filter<InternalRow> createCacheRowFilter() {
        if (this.manifestCacheFilter == null) {
            return Filter.alwaysTrue();
        }
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        return row -> this.manifestCacheFilter.test((BinaryRow)partitionGetter.apply((InternalRow)row), (Integer)bucketGetter.apply((InternalRow)row));
    }

    private Filter<InternalRow> createEntryRowFilter() {
        Function<InternalRow, BinaryRow> partitionGetter = ManifestEntrySerializer.partitionGetter();
        Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
        Function<InternalRow, Integer> totalBucketGetter = ManifestEntrySerializer.totalBucketGetter();
        Function<InternalRow, String> fileNameGetter = ManifestEntrySerializer.fileNameGetter();
        PartitionPredicate partitionFilter = this.manifestsReader.partitionFilter();
        Function<InternalRow, Integer> levelGetter = ManifestEntrySerializer.levelGetter();
        return row -> {
            if (partitionFilter != null && !partitionFilter.test((BinaryRow)partitionGetter.apply((InternalRow)row))) {
                return false;
            }
            int bucket = (Integer)bucketGetter.apply((InternalRow)row);
            if (this.bucketFilter != null && !this.bucketFilter.test(bucket)) {
                return false;
            }
            if (this.totalAwareBucketFilter != null && !this.totalAwareBucketFilter.test(bucket, (Integer)totalBucketGetter.apply((InternalRow)row))) {
                return false;
            }
            if (this.levelFilter != null && !this.levelFilter.test((Integer)levelGetter.apply((InternalRow)row))) {
                return false;
            }
            return this.fileNameFilter == null || this.fileNameFilter.test((String)fileNameGetter.apply((InternalRow)row));
        };
    }
}

