/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.BloomIndexFileInfo;
import org.apache.hudi.index.bloom.BucketizedBloomCheckPartitioner;
import org.apache.hudi.index.bloom.HoodieBloomIndexCheckFunction;
import org.apache.hudi.index.bloom.IndexFileFilter;
import org.apache.hudi.index.bloom.IntervalTreeBasedIndexFileFilter;
import org.apache.hudi.index.bloom.ListBasedIndexFileFilter;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

public class HoodieBloomIndex<T extends HoodieRecordPayload>
extends HoodieIndex<T> {
    private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1572864000;
    private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300;
    private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class);
    private static int MAX_ITEMS_PER_SHUFFLE_PARTITION = 0x500000;

    public HoodieBloomIndex(HoodieWriteConfig config) {
        super(config);
    }

    @Override
    public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        if (this.config.getBloomIndexUseCaching()) {
            recordRDD.persist(this.config.getBloomIndexInputStorageLevel());
        }
        JavaPairRDD partitionRecordKeyPairRDD = recordRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getPartitionPath(), (Object)record.getRecordKey()));
        JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD = this.lookupIndex((JavaPairRDD<String, String>)partitionRecordKeyPairRDD, jsc, hoodieTable);
        if (this.config.getBloomIndexUseCaching()) {
            keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        }
        if (LOG.isDebugEnabled()) {
            long totalTaggedRecords = keyFilenamePairRDD.count();
            LOG.debug((Object)("Number of update records (ones tagged with a fileID): " + totalTaggedRecords));
        }
        JavaRDD<HoodieRecord<T>> taggedRecordRDD = this.tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
        if (this.config.getBloomIndexUseCaching()) {
            recordRDD.unpersist();
            keyFilenamePairRDD.unpersist();
        }
        return taggedRecordRDD;
    }

    @Override
    public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys.mapToPair((PairFunction & Serializable)key -> new Tuple2((Object)key.getPartitionPath(), (Object)key.getRecordKey()));
        JavaPairRDD<HoodieKey, HoodieRecordLocation> recordKeyLocationRDD = this.lookupIndex((JavaPairRDD<String, String>)partitionRecordKeyPairRDD, jsc, hoodieTable);
        JavaPairRDD keyHoodieKeyPairRDD = hoodieKeys.mapToPair((PairFunction & Serializable)key -> new Tuple2(key, null));
        return keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair((PairFunction & Serializable)keyLoc -> {
            Option<Object> partitionPathFileidPair = ((Optional)((Tuple2)keyLoc._2)._2).isPresent() ? Option.of(Pair.of(((HoodieKey)keyLoc._1()).getPartitionPath(), ((HoodieRecordLocation)((Optional)((Tuple2)keyLoc._2)._2).get()).getFileId())) : Option.empty();
            return new Tuple2(keyLoc._1, partitionPathFileidPair);
        });
    }

    private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(JavaPairRDD<String, String> partitionRecordKeyPairRDD, JavaSparkContext jsc, HoodieTable hoodieTable) {
        Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
        ArrayList<String> affectedPartitionPathList = new ArrayList<String>(recordsPerPartition.keySet());
        List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = this.loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
        Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream().collect(Collectors.groupingBy(Tuple2::_1, Collectors.mapping(Tuple2::_2, Collectors.toList())));
        Map<String, Long> comparisonsPerFileGroup = this.computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
        int safeParallelism = this.computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
        int joinParallelism = this.determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
        return this.findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup);
    }

    private Map<String, Long> computeComparisonsPerFileGroup(Map<String, Long> recordsPerPartition, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
        HashMap<String, Long> fileToComparisons;
        if (this.config.getBloomIndexPruneByRanges()) {
            fileToComparisons = this.explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD).mapToPair((PairFunction & Serializable)t -> t).countByKey();
        } else {
            fileToComparisons = new HashMap();
            partitionToFileInfo.forEach((key, value) -> {
                for (BloomIndexFileInfo fileInfo : value) {
                    fileToComparisons.put(fileInfo.getFileId(), (Long)recordsPerPartition.get(key));
                }
            });
        }
        return fileToComparisons;
    }

    int computeSafeParallelism(Map<String, Long> recordsPerPartition, Map<String, Long> comparisonsPerFileGroup) {
        long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum();
        long totalFiles = comparisonsPerFileGroup.size();
        long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum();
        int parallelism = (int)(totalComparisons / (long)MAX_ITEMS_PER_SHUFFLE_PARTITION + 1L);
        LOG.info((Object)String.format("TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, SafeParallelism %d", totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism));
        return parallelism;
    }

    private int determineParallelism(int inputParallelism, int totalSubPartitions) {
        int indexParallelism = Math.max(inputParallelism, this.config.getBloomIndexParallelism());
        int joinParallelism = Math.max(totalSubPartitions, indexParallelism);
        LOG.info((Object)("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" + this.config.getBloomIndexParallelism() + "}, TotalSubParts: ${" + totalSubPartitions + "}, Join Parallelism set to : " + joinParallelism));
        return joinParallelism;
    }

    @VisibleForTesting
    List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, JavaSparkContext jsc, HoodieTable hoodieTable) {
        List partitionPathFileIDList = jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap((FlatMapFunction & Serializable)partitionPath -> {
            Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
            List<Object> filteredFiles = new ArrayList();
            if (latestCommitTime.isPresent()) {
                filteredFiles = hoodieTable.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn((String)partitionPath, latestCommitTime.get().getTimestamp()).map(f -> Pair.of(partitionPath, f.getFileId())).collect(Collectors.toList());
            }
            return filteredFiles.iterator();
        }).collect();
        if (this.config.getBloomIndexPruneByRanges()) {
            return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair((PairFunction & Serializable)pf -> {
                try {
                    HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(this.config, hoodieTable, (Pair<String, String>)pf);
                    String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
                    return new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
                }
                catch (MetadataNotFoundException me) {
                    LOG.warn((Object)("Unable to find range metadata in file :" + pf));
                    return new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue()));
                }
            }).collect();
        }
        return partitionPathFileIDList.stream().map(pf -> new Tuple2(pf.getKey(), (Object)new BloomIndexFileInfo((String)pf.getValue()))).collect(Collectors.toList());
    }

    @Override
    public boolean rollbackCommit(String commitTime) {
        return true;
    }

    @Override
    public boolean isGlobal() {
        return false;
    }

    @Override
    public boolean canIndexLogFiles() {
        return false;
    }

    @Override
    public boolean isImplicitWithStorage() {
        return true;
    }

    @VisibleForTesting
    JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
        IndexFileFilter indexFileFilter = this.config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
        return partitionRecordKeyPairRDD.map((Function & Serializable)partitionRecordKeyPair -> {
            String recordKey = (String)partitionRecordKeyPair._2();
            String partitionPath = (String)partitionRecordKeyPair._1();
            return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream().map(partitionFileIdPair -> new Tuple2(partitionFileIdPair.getRight(), (Object)new HoodieKey(recordKey, partitionPath))).collect(Collectors.toList());
        }).flatMap(List::iterator);
    }

    @VisibleForTesting
    JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, Map<String, Long> fileGroupToComparisons) {
        JavaRDD fileComparisonsRDD = this.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
        if (this.config.useBloomIndexBucketizedChecking()) {
            BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons, this.config.getBloomIndexKeysPerBucket());
            fileComparisonsRDD = fileComparisonsRDD.mapToPair((PairFunction & Serializable)t -> new Tuple2(Pair.of(t._1, ((HoodieKey)t._2).getRecordKey()), t)).repartitionAndSortWithinPartitions((Partitioner)partitioner).map(Tuple2::_2);
        } else {
            fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
        }
        return fileComparisonsRDD.mapPartitionsWithIndex((Function2)new HoodieBloomIndexCheckFunction(hoodieTable, this.config), true).flatMap(List::iterator).filter((Function & Serializable)lr -> lr.getMatchingRecordKeys().size() > 0).flatMapToPair((PairFlatMapFunction & Serializable)lookupResult -> lookupResult.getMatchingRecordKeys().stream().map(recordKey -> new Tuple2((Object)new HoodieKey((String)recordKey, lookupResult.getPartitionPath()), (Object)new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))).collect(Collectors.toList()).iterator());
    }

    HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord, Option<HoodieRecordLocation> location) {
        HoodieRecord<T> record = inputRecord;
        if (location.isPresent()) {
            record = new HoodieRecord<T>(inputRecord);
            record.unseal();
            record.setCurrentLocation(location.get());
            record.seal();
        }
        return record;
    }

    protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
        JavaPairRDD keyRecordPairRDD = recordRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getKey(), record));
        return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map((Function & Serializable)v1 -> this.getTaggedRecord((HoodieRecord)v1._1, Option.ofNullable(((Optional)v1._2).orNull())));
    }

    @Override
    public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
        return writeStatusRDD;
    }
}

