/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bloom;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.bloom.HoodieBloomFilterProbingResult;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

public class HoodieFileProbingFunction
implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
    private static final Logger LOG = LogManager.getLogger(HoodieFileProbingFunction.class);
    private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256L;
    private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
    private final SerializableConfiguration hadoopConf;

    public HoodieFileProbingFunction(Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast, SerializableConfiguration hadoopConf) {
        this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
        this.hadoopConf = hadoopConf;
    }

    public Iterator<List<HoodieKeyLookupResult>> call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> tuple2Iterator) throws Exception {
        return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
    }

    private class BloomIndexLazyKeyCheckIterator
    extends LazyIterableIterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
        public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> tuple2Iterator) {
            super(tuple2Iterator);
        }

        @Override
        protected List<HoodieKeyLookupResult> computeNext() {
            HashMap<Pair<String, String>, Object> fileToLookupResults = new HashMap<Pair<String, String>, Object>();
            HashMap<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<String, HoodieBaseFile>();
            while (this.inputItr.hasNext()) {
                Tuple2 entry2 = (Tuple2)this.inputItr.next();
                String partitionPath = ((HoodieFileGroupId)entry2._1).getPartitionPath();
                String fileId = ((HoodieFileGroupId)entry2._1).getFileId();
                if (!fileIDBaseFileMap.containsKey(fileId)) {
                    Option<HoodieBaseFile> baseFile = ((HoodieTableFileSystemView)HoodieFileProbingFunction.this.baseFileOnlyViewBroadcast.getValue()).getLatestBaseFile(partitionPath, fileId);
                    if (!baseFile.isPresent()) {
                        throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath + ", fileId: " + fileId);
                    }
                    fileIDBaseFileMap.put(fileId, baseFile.get());
                }
                fileToLookupResults.putIfAbsent(Pair.of(partitionPath, ((HoodieBaseFile)fileIDBaseFileMap.get(fileId)).getFileName()), entry2._2);
                if ((long)fileToLookupResults.size() <= 256L) continue;
                break;
            }
            if (fileToLookupResults.isEmpty()) {
                return Collections.emptyList();
            }
            return fileToLookupResults.entrySet().stream().map(entry -> {
                Pair partitionPathFileNamePair = (Pair)entry.getKey();
                HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = (HoodieBloomFilterProbingResult)entry.getValue();
                String partitionPath = (String)partitionPathFileNamePair.getLeft();
                String fileName = (String)partitionPathFileNamePair.getRight();
                String fileId = FSUtils.getFileId(fileName);
                ValidationUtils.checkState(!fileId.isEmpty());
                List<String> candidateRecordKeys = bloomFilterKeyLookupResult.getCandidateKeys();
                HoodieBaseFile dataFile = (HoodieBaseFile)fileIDBaseFileMap.get(fileId);
                List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys, HoodieFileProbingFunction.this.hadoopConf.get());
                LOG.debug((Object)String.format("Bloom filter candidates (%d) / false positives (%d), actual matches (%d)", candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
                return new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys);
            }).collect(Collectors.toList());
        }
    }
}

