/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.integ.testsuite.reader;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.integ.testsuite.reader.DFSDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.SparkBasedReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class DFSHoodieDatasetInputReader
extends DFSDeltaInputReader {
    private static Logger log = LoggerFactory.getLogger(DFSHoodieDatasetInputReader.class);
    private transient JavaSparkContext jsc;
    private String schemaStr;
    private HoodieTableMetaClient metaClient;

    public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath, String schemaStr) {
        this.jsc = jsc;
        this.schemaStr = schemaStr;
        this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
    }

    protected List<String> getPartitions(Option<Integer> partitionsLimit) throws IOException {
        HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(this.jsc);
        List partitionPaths = FSUtils.getAllPartitionPaths((HoodieEngineContext)engineContext, (String)this.metaClient.getBasePath(), (boolean)false, (boolean)false, (boolean)false);
        Collections.sort(partitionPaths);
        if (!partitionPaths.isEmpty()) {
            ValidationUtils.checkArgument((partitionPaths.size() >= (Integer)partitionsLimit.get() ? 1 : 0) != 0, (String)("Cannot generate updates for more partitions than present in the dataset, partitions requested " + partitionsLimit.get() + ", partitions present " + partitionPaths.size()));
            return partitionPaths.subList(0, (Integer)partitionsLimit.get());
        }
        return partitionPaths;
    }

    private JavaPairRDD<String, Iterator<FileSlice>> getPartitionToFileSlice(HoodieTableMetaClient metaClient, List<String> partitionPaths) {
        HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants());
        JavaPairRDD partitionToFileSliceList = this.jsc.parallelize(partitionPaths).mapToPair(arg_0 -> DFSHoodieDatasetInputReader.lambda$getPartitionToFileSlice$fff82f0$1((TableFileSystemView.SliceView)fileSystemView, arg_0));
        return partitionToFileSliceList;
    }

    @Override
    protected long analyzeSingleFile(String filePath) {
        return SparkBasedReader.readParquet(new SparkSession(this.jsc.sc()), Arrays.asList(filePath), (Option<String>)Option.empty(), (Option<String>)Option.empty()).count();
    }

    private JavaRDD<GenericRecord> fetchAnyRecordsFromDataset(Option<Long> numRecordsToUpdate) throws IOException {
        return this.fetchRecordsFromDataset((Option<Integer>)Option.empty(), (Option<Integer>)Option.empty(), numRecordsToUpdate, (Option<Double>)Option.empty());
    }

    private JavaRDD<GenericRecord> fetchAnyRecordsFromDataset(Option<Long> numRecordsToUpdate, Option<Integer> numPartitions) throws IOException {
        return this.fetchRecordsFromDataset(numPartitions, (Option<Integer>)Option.empty(), numRecordsToUpdate, (Option<Double>)Option.empty());
    }

    private JavaRDD<GenericRecord> fetchPercentageRecordsFromDataset(Option<Integer> numPartitions, Option<Integer> numFiles, Option<Double> percentageRecordsPerFile) throws IOException {
        return this.fetchRecordsFromDataset(numPartitions, numFiles, (Option<Long>)Option.empty(), percentageRecordsPerFile);
    }

    private JavaRDD<GenericRecord> fetchRecordsFromDataset(Option<Integer> numPartitions, Option<Integer> numFiles, Option<Long> numRecordsToUpdate) throws IOException {
        return this.fetchRecordsFromDataset(numPartitions, numFiles, numRecordsToUpdate, (Option<Double>)Option.empty());
    }

    private JavaRDD<GenericRecord> fetchRecordsFromDataset(Option<Integer> numPartitions, Option<Integer> numFiles, Option<Long> numRecordsToUpdate, Option<Double> percentageRecordsPerFile) throws IOException {
        long numRecordsToUpdatePerFile;
        int numFilesToUpdate;
        log.info("NumPartitions : {}, NumFiles : {}, numRecordsToUpdate : {}, percentageRecordsPerFile : {}", new Object[]{numPartitions, numFiles, numRecordsToUpdate, percentageRecordsPerFile});
        List<String> partitionPaths = this.getPartitions(numPartitions);
        JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice = this.getPartitionToFileSlice(this.metaClient, partitionPaths);
        Map partitionToFileIdCountMap = partitionToFileSlice.mapToPair((PairFunction & Serializable)p -> new Tuple2(p._1, (Object)DFSHoodieDatasetInputReader.iteratorSize((Iterator)p._2))).collectAsMap();
        long recordsInSingleFile = DFSHoodieDatasetInputReader.iteratorSize(this.readParquetOrLogFiles(this.getSingleSliceFromRDD(partitionToFileSlice)));
        if (!numFiles.isPresent() || (Integer)numFiles.get() <= 0) {
            numFilesToUpdate = (int)Math.floor((double)((Long)numRecordsToUpdate.get()).longValue() / (double)recordsInSingleFile);
            if (numFilesToUpdate > 0) {
                int totalExistingFilesCount = (Integer)partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
                numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
                log.info("Files to update {}, records to update per file {}", (Object)numFilesToUpdate, (Object)recordsInSingleFile);
                numRecordsToUpdatePerFile = recordsInSingleFile;
            } else {
                numFilesToUpdate = 1;
                numRecordsToUpdatePerFile = (Long)numRecordsToUpdate.get();
                log.info("Total records passed in < records in single file. Hence setting numFilesToUpdate to 1 and numRecordsToUpdate to {} ", (Object)numRecordsToUpdatePerFile);
            }
        } else {
            numFilesToUpdate = (Integer)numFiles.get();
            numRecordsToUpdatePerFile = percentageRecordsPerFile.isPresent() ? (long)((double)recordsInSingleFile * (Double)percentageRecordsPerFile.get()) : (Long)numRecordsToUpdate.get() / (long)numFilesToUpdate;
        }
        Map<String, Integer> adjustedPartitionToFileIdCountMap = this.getFilesToReadPerPartition(partitionToFileSlice, partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap);
        JavaRDD updates = this.projectSchema(this.generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int)numRecordsToUpdatePerFile));
        if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && (Integer)numFiles.get() != 0 && (Long)numRecordsToUpdate.get() != numRecordsToUpdatePerFile * (long)((Integer)numFiles.get()).intValue()) {
            long remainingRecordsToAdd = (Long)numRecordsToUpdate.get() - numRecordsToUpdatePerFile * (long)((Integer)numFiles.get()).intValue();
            updates = updates.union(this.projectSchema((JavaRDD<GenericRecord>)this.jsc.parallelize(this.generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int)remainingRecordsToAdd).take((int)remainingRecordsToAdd))));
        }
        log.info("Finished generating updates");
        return updates;
    }

    private JavaRDD<GenericRecord> projectSchema(JavaRDD<GenericRecord> updates) {
        return updates.map((Function & Serializable)r -> HoodieAvroUtils.rewriteRecord((GenericRecord)r, (Schema)new Schema.Parser().parse(this.schemaStr)));
    }

    private JavaRDD<GenericRecord> generateUpdates(Map<String, Integer> adjustedPartitionToFileIdCountMap, JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice, int numFiles, int numRecordsToReadPerFile) {
        return partitionToFileSlice.map((Function & Serializable)p -> {
            int maxFilesToRead = (Integer)adjustedPartitionToFileIdCountMap.get(p._1);
            return DFSHoodieDatasetInputReader.iteratorLimit((Iterator)p._2, maxFilesToRead);
        }).flatMap((FlatMapFunction & Serializable)p -> p).repartition(numFiles).map((Function & Serializable)fileSlice -> {
            if (numRecordsToReadPerFile > 0) {
                return DFSHoodieDatasetInputReader.iteratorLimit(this.readParquetOrLogFiles((FileSlice)fileSlice), numRecordsToReadPerFile);
            }
            return this.readParquetOrLogFiles((FileSlice)fileSlice);
        }).flatMap((FlatMapFunction & Serializable)p -> p).map((Function & Serializable)i -> (GenericRecord)i);
    }

    private Map<String, Integer> getFilesToReadPerPartition(JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice, Integer numPartitions, Integer numFiles, Map<String, Integer> partitionToFileIdCountMap) {
        long totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get().intValue();
        ValidationUtils.checkArgument((totalExistingFilesCount >= (long)numFiles.intValue() ? 1 : 0) != 0, (String)("Cannot generate updates for more files than present in the dataset, file requested " + numFiles + ", files present " + totalExistingFilesCount));
        Map partitionToFileIdCountSortedMap = partitionToFileIdCountMap.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new));
        int numFilesPerPartition = (int)Math.ceil((double)numFiles.intValue() / (double)numPartitions.intValue());
        HashMap<String, Integer> adjustedPartitionToFileIdCountMap = new HashMap<String, Integer>();
        partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> {
            if ((Integer)e.getValue() <= numFilesPerPartition) {
                adjustedPartitionToFileIdCountMap.put((String)e.getKey(), (Integer)e.getValue());
            } else {
                adjustedPartitionToFileIdCountMap.put((String)e.getKey(), numFilesPerPartition);
            }
        });
        return adjustedPartitionToFileIdCountMap;
    }

    private FileSlice getSingleSliceFromRDD(JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice) {
        return (FileSlice)partitionToFileSlice.map((Function & Serializable)f -> {
            FileSlice slice = (FileSlice)((Iterator)f._2).next();
            FileSlice newSlice = new FileSlice(slice.getFileGroupId(), slice.getBaseInstantTime());
            if (slice.getBaseFile().isPresent()) {
                newSlice.setBaseFile((HoodieBaseFile)slice.getBaseFile().get());
            } else {
                slice.getLogFiles().forEach(l -> newSlice.addLogFile(l));
            }
            return newSlice;
        }).take(1).get(0);
    }

    private Iterator<IndexedRecord> readParquetOrLogFiles(FileSlice fileSlice) throws IOException {
        if (fileSlice.getBaseFile().isPresent()) {
            Schema schema = new Schema.Parser().parse(this.schemaStr);
            AvroReadSupport.setAvroReadSchema((Configuration)this.metaClient.getHadoopConf(), (Schema)HoodieAvroUtils.addMetadataFields((Schema)schema));
            ParquetReaderIterator itr = new ParquetReaderIterator(AvroParquetReader.builder((Path)new Path(((HoodieBaseFile)fileSlice.getBaseFile().get()).getPath())).withConf(this.metaClient.getHadoopConf()).build());
            return itr;
        }
        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder().withFileSystem((FileSystem)this.metaClient.getFs()).withBasePath(this.metaClient.getBasePath()).withLogFilePaths(fileSlice.getLogFiles().map(l -> l.getPath().getName()).collect(Collectors.toList())).withReaderSchema(new Schema.Parser().parse(this.schemaStr)).withLatestInstantTime(((HoodieInstant)this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get()).getTimestamp()).withMaxMemorySizeInBytes(Long.valueOf(0x40000000L)).withReadBlocksLazily(true).withReverseReader(false).withBufferSize(0x1000000).withSpillableMapBasePath("/tmp/").build();
        Iterable iterable = () -> scanner.iterator();
        Schema schema = new Schema.Parser().parse(this.schemaStr);
        return StreamSupport.stream(iterable.spliterator(), false).map(e -> {
            try {
                return (IndexedRecord)e.getData().getInsertValue(schema).get();
            }
            catch (IOException io) {
                throw new UncheckedIOException(io);
            }
        }).iterator();
    }

    private static int iteratorSize(Iterator<?> iterator) {
        int count = 0;
        while (iterator.hasNext()) {
            iterator.next();
            ++count;
        }
        return count;
    }

    private static <T> Iterator<T> iteratorLimit(final Iterator<T> iterator, final int limitSize) {
        ValidationUtils.checkArgument((iterator != null ? 1 : 0) != 0, (String)"iterator is null");
        ValidationUtils.checkArgument((limitSize >= 0 ? 1 : 0) != 0, (String)"limit is negative");
        return new Iterator<T>(){
            private int count;

            @Override
            public boolean hasNext() {
                return this.count < limitSize && iterator.hasNext();
            }

            @Override
            public T next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                ++this.count;
                return iterator.next();
            }

            @Override
            public void remove() {
                iterator.remove();
            }
        };
    }

    @Override
    public JavaRDD<GenericRecord> read(long numRecords) throws IOException {
        return this.fetchAnyRecordsFromDataset((Option<Long>)Option.of((Object)numRecords));
    }

    @Override
    public JavaRDD<GenericRecord> read(int numPartitions, long approxNumRecords) throws IOException {
        return this.fetchAnyRecordsFromDataset((Option<Long>)Option.of((Object)approxNumRecords), (Option<Integer>)Option.of((Object)numPartitions));
    }

    @Override
    public JavaRDD<GenericRecord> read(int numPartitions, int numFiles, long numRecords) throws IOException {
        return this.fetchRecordsFromDataset((Option<Integer>)Option.of((Object)numPartitions), (Option<Integer>)Option.of((Object)numFiles), (Option<Long>)Option.of((Object)numRecords));
    }

    @Override
    public JavaRDD<GenericRecord> read(int numPartitions, int numFiles, double percentageRecordsPerFile) throws IOException {
        return this.fetchPercentageRecordsFromDataset((Option<Integer>)Option.of((Object)numPartitions), (Option<Integer>)Option.of((Object)numFiles), (Option<Double>)Option.of((Object)percentageRecordsPerFile));
    }

    private static /* synthetic */ Tuple2 lambda$getPartitionToFileSlice$fff82f0$1(TableFileSystemView.SliceView fileSystemView, String p) throws Exception {
        return new Tuple2((Object)p, fileSystemView.getLatestFileSlices(p).iterator());
    }
}

