/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.hash.FileIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.bloom.BaseHoodieBloomIndexHelper;
import org.apache.hudi.index.bloom.BloomIndexFileInfo;
import org.apache.hudi.index.bloom.BucketizedBloomCheckPartitioner;
import org.apache.hudi.index.bloom.HoodieBloomIndexCheckFunction;
import org.apache.hudi.index.bloom.HoodieFileProbingFunction;
import org.apache.hudi.index.bloom.HoodieMetadataBloomFilterProbingFunction;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkHoodieBloomIndexHelper
extends BaseHoodieBloomIndexHelper {
    private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieBloomIndexHelper.class);
    private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE = new SparkHoodieBloomIndexHelper();

    private SparkHoodieBloomIndexHelper() {
    }

    public static SparkHoodieBloomIndexHelper getInstance() {
        return SINGLETON_INSTANCE;
    }

    @Override
    public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData<String, String> partitionRecordKeyPairs, HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
        JavaRDD keyLookupResultRDD;
        int inputParallelism = partitionRecordKeyPairs.deduceNumPartitions();
        int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
        int targetParallelism = configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism : inputParallelism;
        LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", inputParallelism, targetParallelism));
        JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
        if (config.getBloomIndexUseMetadata() && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath())) {
            StorageConfiguration<?> storageConf = hoodieTable.getStorageConf();
            HoodieTableFileSystemView baseFileOnlyView = SparkHoodieBloomIndexHelper.getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
            Broadcast baseFileOnlyViewBroadcast = ((HoodieSparkEngineContext)context).getJavaSparkContext().broadcast((Object)baseFileOnlyView);
            int bloomFilterPartitionFileGroupCount = config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
            int adjustedTargetParallelism = targetParallelism % bloomFilterPartitionFileGroupCount == 0 ? targetParallelism : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) * bloomFilterPartitionFileGroupCount;
            AffineBloomIndexFileGroupPartitioner partitioner = new AffineBloomIndexFileGroupPartitioner((Broadcast<HoodieTableFileSystemView>)baseFileOnlyViewBroadcast, adjustedTargetParallelism);
            keyLookupResultRDD = fileComparisonsRDD.repartitionAndSortWithinPartitions((Partitioner)partitioner).mapPartitionsToPair((PairFlatMapFunction)new HoodieMetadataBloomFilterProbingFunction((Broadcast<HoodieTableFileSystemView>)baseFileOnlyViewBroadcast, hoodieTable)).mapPartitions((FlatMapFunction)new HoodieFileProbingFunction((Broadcast<HoodieTableFileSystemView>)baseFileOnlyViewBroadcast, storageConf), true);
        } else if (config.useBloomIndexBucketizedChecking()) {
            Map<HoodieFileGroupId, Long> comparisonsPerFileGroup = this.computeComparisonsPerFileGroup(config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
            BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup, config.getBloomIndexKeysPerBucket());
            keyLookupResultRDD = fileComparisonsRDD.mapToPair((PairFunction & Serializable)fileGroupAndRecordKey -> new Tuple2(fileGroupAndRecordKey, (Object)false)).repartitionAndSortWithinPartitions((Partitioner)partitioner, (Comparator)new FileGroupIdComparator()).map(Tuple2::_1).mapPartitions((FlatMapFunction)new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
        } else {
            keyLookupResultRDD = config.isBloomIndexFileGroupIdKeySortingEnabled() ? fileComparisonsRDD.mapToPair((PairFunction & Serializable)fileGroupAndRecordKey -> new Tuple2(fileGroupAndRecordKey, (Object)false)).sortByKey((Comparator)new FileGroupIdAndRecordKeyComparator(), true, targetParallelism).map(Tuple2::_1).mapPartitions((FlatMapFunction)new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true) : fileComparisonsRDD.sortByKey(true, targetParallelism).mapPartitions((FlatMapFunction)new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
        }
        return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator).filter((Function & Serializable)lr -> lr.getMatchingRecordKeysAndPositions().size() > 0).flatMapToPair((PairFlatMapFunction & Serializable)lookupResult -> lookupResult.getMatchingRecordKeysAndPositions().stream().map(recordKeyAndPosition -> new Tuple2((Object)new HoodieKey((String)recordKeyAndPosition.getLeft(), lookupResult.getPartitionPath()), (Object)new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId(), (Long)recordKeyAndPosition.getRight()))).collect(Collectors.toList()).iterator()));
    }

    private Map<HoodieFileGroupId, Long> computeComparisonsPerFileGroup(HoodieWriteConfig config, Map<String, Long> recordsPerPartition, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD, HoodieEngineContext context) {
        Map<HoodieFileGroupId, Long> fileToComparisons;
        if (config.getBloomIndexPruneByRanges()) {
            context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName());
            fileToComparisons = fileComparisonsRDD.countByKey();
        } else {
            fileToComparisons = new HashMap<HoodieFileGroupId, Long>();
            partitionToFileInfo.forEach((partitionPath, fileInfos) -> {
                for (BloomIndexFileInfo fileInfo : fileInfos) {
                    fileToComparisons.put(new HoodieFileGroupId((String)partitionPath, fileInfo.getFileId()), (Long)recordsPerPartition.get(partitionPath));
                }
            });
        }
        return fileToComparisons;
    }

    private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable<?, ?, ?, ?> hoodieTable, Collection<String> partitionPaths) {
        try {
            List<String> fullPartitionPaths = partitionPaths.stream().map(partitionPath -> String.format("%s/%s", hoodieTable.getMetaClient().getBasePath(), partitionPath)).collect(Collectors.toList());
            List<StoragePathInfo> allFiles = hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream().flatMap(e -> e.stream()).collect(Collectors.toList());
            return new HoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getActiveTimeline(), allFiles);
        }
        catch (IOException e2) {
            LOG.error(String.format("Failed to fetch all files for partitions (%s)", partitionPaths));
            throw new HoodieIOException("Failed to fetch all files for partitions", e2);
        }
    }

    public static class HoodieSparkBloomIndexCheckFunction
    extends HoodieBloomIndexCheckFunction<Tuple2<HoodieFileGroupId, String>>
    implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, List<HoodieKeyLookupResult>> {
        public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
            super(hoodieTable, config, t -> (HoodieFileGroupId)t._1, t -> (String)t._2);
        }

        public Iterator<List<HoodieKeyLookupResult>> call(Iterator<Tuple2<HoodieFileGroupId, String>> fileGroupIdRecordKeyPairIterator) {
            TaskContext taskContext = TaskContext.get();
            LOG.info("HoodieSparkBloomIndexCheckFunction with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ", new Object[]{taskContext.stageId(), taskContext.stageAttemptNumber(), taskContext.partitionId(), taskContext.attemptNumber(), taskContext.taskAttemptId()});
            return new HoodieBloomIndexCheckFunction.LazyKeyCheckIterator(this, fileGroupIdRecordKeyPairIterator);
        }
    }

    static class AffineBloomIndexFileGroupPartitioner
    extends Partitioner {
        private final Broadcast<HoodieTableFileSystemView> latestBaseFilesBroadcast;
        private final Map<String, Map<String, String>> cachedLatestBaseFileNames = new HashMap<String, Map<String, String>>(16);
        private final int targetPartitions;

        AffineBloomIndexFileGroupPartitioner(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast, int targetPartitions) {
            this.targetPartitions = targetPartitions;
            this.latestBaseFilesBroadcast = baseFileOnlyViewBroadcast;
        }

        public int numPartitions() {
            return this.targetPartitions;
        }

        public int getPartition(Object key) {
            HoodieFileGroupId partitionFileGroupId = (HoodieFileGroupId)key;
            String partitionPath = partitionFileGroupId.getPartitionPath();
            String fileGroupId = partitionFileGroupId.getFileId();
            String baseFileName = (String)this.cachedLatestBaseFileNames.computeIfAbsent(partitionPath, ignored -> ((HoodieTableFileSystemView)this.latestBaseFilesBroadcast.getValue()).getLatestBaseFiles(partitionPath).collect(Collectors.toMap(HoodieBaseFile::getFileId, BaseFile::getFileName))).get(fileGroupId);
            if (baseFileName == null) {
                throw new HoodieException(String.format("File from file-group (%s) not found in partition path (%s)", fileGroupId, partitionPath));
            }
            String bloomIndexEncodedKey = HoodieMetadataPayload.getBloomFilterIndexKey(new PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionPath)), new FileIndexID(baseFileName));
            return HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, this.targetPartitions);
        }
    }

    private static class FileGroupIdAndRecordKeyComparator
    implements Comparator<Tuple2<HoodieFileGroupId, String>>,
    Serializable {
        private FileGroupIdAndRecordKeyComparator() {
        }

        @Override
        public int compare(Tuple2<HoodieFileGroupId, String> o1, Tuple2<HoodieFileGroupId, String> o2) {
            int fileGroupIdComparison = ((HoodieFileGroupId)o1._1).compareTo((HoodieFileGroupId)o2._1);
            if (fileGroupIdComparison != 0) {
                return fileGroupIdComparison;
            }
            return ((String)o1._2).compareTo((String)o2._2);
        }
    }

    private static class FileGroupIdComparator
    implements Comparator<Tuple2<HoodieFileGroupId, String>>,
    Serializable {
        private FileGroupIdComparator() {
        }

        @Override
        public int compare(Tuple2<HoodieFileGroupId, String> o1, Tuple2<HoodieFileGroupId, String> o2) {
            return ((HoodieFileGroupId)o1._1()).compareTo((HoodieFileGroupId)o2._1());
        }
    }
}

