/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SecondaryIndexRecordGenerationUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SecondaryIndexRecordGenerationUtils.class);

    @VisibleForTesting
    public static HoodieData<HoodieRecord> convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats, String instantTime, HoodieIndexDefinition indexDefinition, HoodieMetadataConfig metadataConfig, HoodieTableFileSystemView fsView, HoodieTableMetaClient dataMetaClient, HoodieEngineContext engineContext, EngineType engineType) {
        Schema tableSchema;
        if (allWriteStats.stream().anyMatch(writeStat -> {
            String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath());
            return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0L;
        })) {
            throw new HoodieIOException("Secondary index cannot support logs having inserts with current offering. Please disable secondary index.");
        }
        try {
            tableSchema = (Schema)HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient).get();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to get latest schema for " + dataMetaClient.getBasePath(), (Throwable)e);
        }
        Map<String, List<HoodieWriteStat>> writeStatsByFileId = allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
        int parallelism = Math.max(Math.min(writeStatsByFileId.size(), metadataConfig.getSecondaryIndexParallelism()), 1);
        return engineContext.parallelize(new ArrayList<Map.Entry<String, List<HoodieWriteStat>>>(writeStatsByFileId.entrySet()), parallelism).flatMap(writeStatsByFileIdEntry -> {
            Map<Object, Object> recordKeyToSecondaryKeyForPreviousFileSlice;
            String fileId = (String)writeStatsByFileIdEntry.getKey();
            List writeStats = (List)writeStatsByFileIdEntry.getValue();
            String partition = ((HoodieWriteStat)writeStats.get(0)).getPartitionPath();
            FileSlice previousFileSliceForFileId = (FileSlice)fsView.getLatestFileSlice(partition, fileId).orElse(null);
            if (previousFileSliceForFileId == null) {
                recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
            } else {
                StoragePath previousBaseFile = (StoragePath)previousFileSliceForFileId.getBaseFile().map(BaseFile::getStoragePath).orElse(null);
                List<String> logFiles = previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
                recordKeyToSecondaryKeyForPreviousFileSlice = SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles, tableSchema, partition, (Option<StoragePath>)Option.ofNullable((Object)previousBaseFile), indexDefinition, instantTime);
            }
            List<FileSlice> latestIncludingInflightFileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, (Option<HoodieTableFileSystemView>)Option.empty(), partition);
            FileSlice currentFileSliceForFileId = latestIncludingInflightFileSlices.stream().filter(fs -> fs.getFileId().equals(fileId)).findFirst().orElseThrow(() -> new HoodieException("Could not find any file slice for fileId " + fileId));
            StoragePath currentBaseFile = (StoragePath)currentFileSliceForFileId.getBaseFile().map(BaseFile::getStoragePath).orElse(null);
            List<String> logFilesIncludingInflight = currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
            Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice = SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFilesIncludingInflight, tableSchema, partition, (Option<StoragePath>)Option.ofNullable((Object)currentBaseFile), indexDefinition, instantTime);
            ArrayList records = new ArrayList();
            recordKeyToSecondaryKeyForCurrentFileSlice.forEach((recordKey, secondaryKey) -> {
                if (!recordKeyToSecondaryKeyForPreviousFileSlice.containsKey(recordKey)) {
                    records.add(HoodieMetadataPayload.createSecondaryIndexRecord(recordKey, secondaryKey, indexDefinition.getIndexName(), false));
                } else if (!((String)recordKeyToSecondaryKeyForPreviousFileSlice.get(recordKey)).equals(secondaryKey)) {
                    records.add(HoodieMetadataPayload.createSecondaryIndexRecord(recordKey, (String)recordKeyToSecondaryKeyForPreviousFileSlice.get(recordKey), indexDefinition.getIndexName(), true));
                    records.add(HoodieMetadataPayload.createSecondaryIndexRecord(recordKey, secondaryKey, indexDefinition.getIndexName(), false));
                }
            });
            recordKeyToSecondaryKeyForPreviousFileSlice.forEach((recordKey, secondaryKey) -> {
                if (!recordKeyToSecondaryKeyForCurrentFileSlice.containsKey(recordKey)) {
                    records.add(HoodieMetadataPayload.createSecondaryIndexRecord(recordKey, secondaryKey, indexDefinition.getIndexName(), true));
                }
            });
            return records.iterator();
        });
    }

    private static Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMetaClient metaClient, EngineType engineType, List<String> logFilePaths, Schema tableSchema, String partition, Option<StoragePath> dataFilePath, HoodieIndexDefinition indexDefinition, String instantTime) throws Exception {
        String basePath = metaClient.getBasePath().toString();
        StorageConfiguration<?> storageConf = metaClient.getStorageConf();
        HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(basePath, engineType, Collections.emptyList(), metaClient.getTableConfig().getRecordMergeStrategyId());
        HoodieMergedLogRecordScanner mergedLogRecordScanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(metaClient.getStorage()).withBasePath(metaClient.getBasePath()).withLogFilePaths((List)logFilePaths)).withReaderSchema(tableSchema).withLatestInstantTime(instantTime).withReverseReader(false).withMaxMemorySizeInBytes(storageConf.getLong(HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION.key(), 0x40000000L)).withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()).withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()).withPartition(partition).withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie.optimized.log.blocks.scan.enable", false)).withDiskMapType((ExternalSpillableMap.DiskMapType)storageConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withRecordMerger(recordMerger).withTableMetaClient(metaClient).build();
        Option baseFileReader = Option.empty();
        if (dataFilePath.isPresent()) {
            baseFileReader = Option.of((Object)HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(ConfigUtils.getReaderConfigs(storageConf), (StoragePath)dataFilePath.get()));
        }
        HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader((Option<HoodieFileReader>)baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, metaClient.getTableConfig().getProps(), (Option<Pair<String, String>>)Option.empty(), (Option<BaseKeyGenerator>)Option.empty());
        HashMap<String, String> recordKeyToSecondaryKey = new HashMap<String, String>();
        while (fileSliceReader.hasNext()) {
            HoodieRecord record = (HoodieRecord)fileSliceReader.next();
            String secondaryKey = SecondaryIndexRecordGenerationUtils.getSecondaryKey(record, tableSchema, indexDefinition);
            if (secondaryKey == null) continue;
            recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey);
        }
        return recordKeyToSecondaryKey;
    }

    private static String getSecondaryKey(HoodieRecord record, Schema tableSchema, HoodieIndexDefinition indexDefinition) {
        try {
            if (record.toIndexedRecord(tableSchema, CollectionUtils.emptyProps()).isPresent()) {
                GenericRecord genericRecord = (GenericRecord)((HoodieAvroIndexedRecord)record.toIndexedRecord(tableSchema, CollectionUtils.emptyProps()).get()).getData();
                String secondaryKeyFields = String.join((CharSequence)".", indexDefinition.getSourceFields());
                return HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields, true, false);
            }
        }
        catch (IOException e) {
            LOG.debug("Failed to fetch secondary key for record key " + record.getKey().toString());
        }
        return null;
    }

    public static HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext, List<Pair<String, FileSlice>> partitionFileSlicePairs, int secondaryIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, EngineType engineType, HoodieIndexDefinition indexDefinition) {
        Schema tableSchema;
        if (partitionFileSlicePairs.isEmpty()) {
            return engineContext.emptyHoodieData();
        }
        int parallelism = Math.min(partitionFileSlicePairs.size(), secondaryIndexMaxParallelism);
        StoragePath basePath = metaClient.getBasePath();
        try {
            tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
        }
        catch (Exception e) {
            throw new HoodieException("Failed to get latest schema for " + metaClient.getBasePath(), (Throwable)e);
        }
        engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices");
        return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
            String partition = (String)partitionAndBaseFile.getKey();
            FileSlice fileSlice = (FileSlice)partitionAndBaseFile.getValue();
            List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> l.getPath().toString()).collect(Collectors.toList());
            Option dataFilePath = Option.ofNullable((Object)fileSlice.getBaseFile().map(baseFile -> HoodieTableMetadataUtil.filePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
            Schema readerSchema = dataFilePath.isPresent() ? HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat()).readAvroSchema(metaClient.getStorage(), (StoragePath)dataFilePath.get()) : tableSchema;
            return SecondaryIndexRecordGenerationUtils.createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, (Option<StoragePath>)dataFilePath, indexDefinition, (String)metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse((Object)""));
        });
    }

    private static ClosableIterator<HoodieRecord> createSecondaryIndexGenerator(HoodieTableMetaClient metaClient, EngineType engineType, List<String> logFilePaths, final Schema tableSchema, String partition, Option<StoragePath> dataFilePath, final HoodieIndexDefinition indexDefinition, String instantTime) throws Exception {
        String basePath = metaClient.getBasePath().toString();
        StorageConfiguration<?> storageConf = metaClient.getStorageConf();
        HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(basePath, engineType, Collections.emptyList(), metaClient.getTableConfig().getRecordMergeStrategyId());
        HoodieMergedLogRecordScanner mergedLogRecordScanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(metaClient.getStorage()).withBasePath(metaClient.getBasePath()).withLogFilePaths((List)logFilePaths)).withReaderSchema(tableSchema).withLatestInstantTime(instantTime).withReverseReader(false).withMaxMemorySizeInBytes(storageConf.getLong(HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION.key(), 0x40000000L)).withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()).withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()).withPartition(partition).withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie.optimized.log.blocks.scan.enable", false)).withDiskMapType((ExternalSpillableMap.DiskMapType)storageConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withRecordMerger(recordMerger).withTableMetaClient(metaClient).build();
        Option baseFileReader = Option.empty();
        if (dataFilePath.isPresent()) {
            baseFileReader = Option.of((Object)HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(ConfigUtils.getReaderConfigs(storageConf), (StoragePath)dataFilePath.get()));
        }
        HoodieFileSliceReader fileSliceReader = new HoodieFileSliceReader((Option<HoodieFileReader>)baseFileReader, mergedLogRecordScanner, tableSchema, metaClient.getTableConfig().getPreCombineField(), recordMerger, metaClient.getTableConfig().getProps(), (Option<Pair<String, String>>)Option.empty(), (Option<BaseKeyGenerator>)Option.empty());
        final ClosableIterator fileSliceIterator = ClosableIterator.wrap(fileSliceReader);
        return new ClosableIterator<HoodieRecord>(){
            private HoodieRecord nextValidRecord;

            @Override
            public void close() {
                fileSliceIterator.close();
            }

            @Override
            public boolean hasNext() {
                if (this.nextValidRecord != null) {
                    return true;
                }
                while (fileSliceIterator.hasNext()) {
                    HoodieRecord record = (HoodieRecord)fileSliceIterator.next();
                    String secondaryKey = this.getSecondaryKey(record);
                    if (secondaryKey == null) continue;
                    this.nextValidRecord = HoodieMetadataPayload.createSecondaryIndexRecord(record.getRecordKey(tableSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey, indexDefinition.getIndexName(), false);
                    return true;
                }
                return false;
            }

            @Override
            public HoodieRecord next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException("No more valid records available.");
                }
                HoodieRecord result = this.nextValidRecord;
                this.nextValidRecord = null;
                return result;
            }

            private String getSecondaryKey(HoodieRecord record) {
                try {
                    if (record.toIndexedRecord(tableSchema, CollectionUtils.emptyProps()).isPresent()) {
                        GenericRecord genericRecord = (GenericRecord)((HoodieAvroIndexedRecord)record.toIndexedRecord(tableSchema, CollectionUtils.emptyProps()).get()).getData();
                        String secondaryKeyFields = String.join((CharSequence)".", indexDefinition.getSourceFields());
                        return HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields, true, false);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to fetch records: " + e);
                }
                return null;
            }
        };
    }
}

