/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.utils;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.collection.immutable.List;

public class SparkMetadataWriterUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SparkMetadataWriterUtils.class);

    public static Column[] getExpressionIndexColumns() {
        return new Column[]{functions.col((String)"_hoodie_expression_index_partition"), functions.col((String)"_hoodie_expression_index_relative_file_path"), functions.col((String)"_hoodie_expression_index_file_size")};
    }

    public static String[] getExpressionIndexColumnNames() {
        return new String[]{"_hoodie_expression_index_partition", "_hoodie_expression_index_relative_file_path", "_hoodie_expression_index_file_size"};
    }

    public static java.util.List<Row> getRowsWithExpressionIndexMetadata(java.util.List<Row> rowsForFilePath, String partition, String filePath, long fileSize) {
        return rowsForFilePath.stream().map(row -> {
            List<Object> indexMetadata = JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition, filePath, fileSize));
            Row expressionIndexRow = Row.fromSeq(indexMetadata);
            ArrayList<Row> rows = new ArrayList<Row>(2);
            rows.add((Row)row);
            rows.add(expressionIndexRow);
            List rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows);
            return Row.merge(rowSeq);
        }).collect(Collectors.toList());
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingColumnStats(Dataset<Row> dataset, HoodieExpressionIndex<Column, Column> expressionIndex, String columnToIndex, Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
        Dataset columnRangeMetadataDataset = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()).groupBy(SparkMetadataWriterUtils.getExpressionIndexColumns()).agg(functions.count((Column)functions.when((Column)functions.col((String)columnToIndex).isNull(), (Object)1)).alias("nullCount"), new Column[]{functions.min((String)columnToIndex).alias("minValue"), functions.max((String)columnToIndex).alias("maxValue"), functions.count((String)columnToIndex).alias("valueCount")});
        HoodiePairData rangeMetadataHoodieJavaRDD = HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMapToPair(row -> {
            int baseAggregatePosition = SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
            long nullCount = row.getLong(baseAggregatePosition);
            Comparable minValue = (Comparable)row.get(baseAggregatePosition + 1);
            Comparable maxValue = (Comparable)row.get(baseAggregatePosition + 2);
            long valueCount = row.getLong(baseAggregatePosition + 3);
            String partitionName = row.getString(0);
            String relativeFilePath = row.getString(1);
            long totalFileSize = row.getLong(2);
            long totalUncompressedSize = totalFileSize * 2L;
            HoodieColumnRangeMetadata<Comparable> rangeMetadata = HoodieColumnRangeMetadata.create(relativeFilePath, columnToIndex, minValue, maxValue, nullCount, valueCount, totalFileSize, totalUncompressedSize);
            return Collections.singletonList(Pair.of(partitionName, rangeMetadata)).iterator();
        });
        if (partitionRecordsFunctionOpt.isPresent()) {
            rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");
        }
        HoodieData<HoodieRecord> colStatRecords = rangeMetadataHoodieJavaRDD.map(pair -> HoodieMetadataPayload.createColumnStatsRecords((String)pair.getKey(), Collections.singletonList(pair.getValue()), false, expressionIndex.getIndexName(), MetadataPartitionType.COLUMN_STATS.getRecordType()).collect(Collectors.toList())).flatMap(records -> records.iterator());
        Option<HoodieData<HoodieRecord>> partitionStatRecordsOpt = Option.empty();
        if (partitionRecordsFunctionOpt.isPresent()) {
            partitionStatRecordsOpt = Option.of(partitionRecordsFunctionOpt.get().apply(rangeMetadataHoodieJavaRDD));
            rangeMetadataHoodieJavaRDD.unpersist();
        }
        return partitionRecordsFunctionOpt.isPresent() ? new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(colStatRecords, partitionStatRecordsOpt) : new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(colStatRecords);
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String columnToIndex, HoodieWriteConfig metadataWriteConfig, String instantTime, HoodieIndexDefinition indexDefinition) {
        String indexName = indexDefinition.getIndexName();
        SparkMetadataWriterUtils.setBloomFilterProps(metadataWriteConfig, indexDefinition.getIndexOptions());
        Dataset bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()).groupByKey((MapFunction & Serializable)row -> Pair.of(row.getString(1), row.getString(2)), Encoders.kryo(Pair.class)).flatMapGroups((FlatMapGroupsFunction & Serializable)(pair, iterator2) -> {
            String partition = pair.getLeft().toString();
            String relativeFilePath = pair.getRight().toString();
            String fileName = FSUtils.getFileName(relativeFilePath, partition);
            BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
            iterator2.forEachRemaining(row -> {
                byte[] key = row.getAs(columnToIndex).toString().getBytes();
                bloomFilter.add(key);
            });
            ByteBuffer bloomByteBuffer = ByteBuffer.wrap(StringUtils.getUTF8Bytes(bloomFilter.serializeToString()));
            HoodieRecord<HoodieMetadataPayload> bloomFilterRecord = HoodieMetadataPayload.createBloomFilterMetadataRecord(partition, fileName, instantTime, metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
            return Collections.singletonList(bloomFilterRecord).iterator();
        }, Encoders.kryo(HoodieRecord.class));
        return new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
    }

    private static void setBloomFilterProps(HoodieWriteConfig metadataWriteConfig, Map<String, String> indexOptions) {
        HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
            if (indexOptions.containsKey(sourceKey)) {
                metadataWriteConfig.getProps().setProperty((String)targetKey, (String)indexOptions.get(sourceKey));
            }
        });
    }

    public static java.util.List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext, HoodieTableMetaClient metaClient, Schema schema, HoodieWriteConfig dataWriteConfig, boolean isBaseFile) {
        java.util.List<HoodieRecord> records = isBaseFile ? SparkMetadataWriterUtils.getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema) : SparkMetadataWriterUtils.getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema);
        return SparkMetadataWriterUtils.toRows(records, schema, dataWriteConfig, sqlContext, paths[0].toString());
    }

    private static java.util.List<HoodieRecord> getUnmergedLogFileRecords(java.util.List<String> logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
        HoodieUnMergedLogRecordScanner scanner = ((HoodieUnMergedLogRecordScanner.Builder)HoodieUnMergedLogRecordScanner.newBuilder().withStorage(metaClient.getStorage()).withBasePath(metaClient.getBasePath()).withLogFilePaths((java.util.List)logFilePaths)).withBufferSize(HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue()).withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()).withReaderSchema(readerSchema).withTableMetaClient(metaClient).withLogRecordScannerCallback(records::add).build();
        scanner.scan(false);
        return records;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static java.util.List<HoodieRecord> getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
        ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
        HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), metaClient.getTableConfig().getRecordMergeStrategyId());
        try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(ConfigUtils.getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath());){
            baseFileReader.getRecordIterator(readerSchema).forEachRemaining(record -> records.add((HoodieRecord)record));
            ArrayList<HoodieRecord> arrayList = records;
            return arrayList;
        }
        catch (IOException e) {
            throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e);
        }
    }

    private static java.util.List<Row> toRows(java.util.List<HoodieRecord> records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) {
        StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
        Function1<GenericRecord, Row> converterToRow = AvroConversionUtils.createConverterToRow(schema, structType);
        java.util.List<Row> avroRecords = records.stream().map(r -> {
            try {
                return (GenericRecord)(r.getData() instanceof GenericRecord ? r.getData() : ((HoodieRecordPayload)r.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get());
            }
            catch (IOException e) {
                throw new HoodieIOException("Could not fetch record payload");
            }
        }).map(arg_0 -> converterToRow.apply(arg_0)).collect(Collectors.toList());
        return avroRecords;
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExprIndexRecords(java.util.List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, String instantTime, HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, HoodieWriteConfig metadataWriteConfig, Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
        HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)engineContext;
        if (indexDefinition.getSourceFields().isEmpty()) {
            return new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
        }
        ValidationUtils.checkArgument(indexDefinition.getSourceFields().size() == 1, "Only one source field is supported for expression index");
        String columnToIndex = indexDefinition.getSourceFields().get(0);
        SQLContext sqlContext = sparkEngineContext.getSqlContext();
        HoodieData rowData = sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism).flatMap(entry -> SparkMetadataWriterUtils.getExpressionIndexRecordsIterator(metaClient, readerSchema, dataWriteConfig, entry, sqlContext));
        StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema).add(StructField.apply((String)"_hoodie_expression_index_partition", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_relative_file_path", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_file_size", (DataType)DataTypes.LongType, (boolean)false, (Metadata)Metadata.empty()));
        Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), structType);
        HoodieSparkExpressionIndex expressionIndex = new HoodieSparkExpressionIndex(indexDefinition);
        Column indexedColumn = (Column)expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
        rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
        if (indexDefinition.getIndexType().equalsIgnoreCase("column_stats")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingColumnStats((Dataset<Row>)rowDataset, expressionIndex, columnToIndex, partitionRecordsFunctionOpt);
        }
        if (indexDefinition.getIndexType().equalsIgnoreCase("bloom_filters")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter((Dataset<Row>)rowDataset, columnToIndex, metadataWriteConfig, instantTime, indexDefinition);
        }
        throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported");
    }

    private static Iterator<Row> getExpressionIndexRecordsIterator(HoodieTableMetaClient metaClient, Schema readerSchema, HoodieWriteConfig dataWriteConfig, Pair<String, Pair<String, Long>> entry, SQLContext sqlContext) {
        String partition = entry.getKey();
        Pair<String, Long> filePathSizePair = entry.getValue();
        String filePath = filePathSizePair.getKey();
        String relativeFilePath = FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(filePath));
        long fileSize = filePathSizePair.getValue();
        java.util.List<Row> rowsForFilePath = SparkMetadataWriterUtils.readRecordsAsRows(new StoragePath[]{new StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig, FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
        java.util.List<Row> rowsWithIndexMetadata = SparkMetadataWriterUtils.getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, relativeFilePath, fileSize);
        return rowsWithIndexMetadata.iterator();
    }

    public static HoodiePairData<String, java.util.List<HoodieColumnRangeMetadata<Comparable>>> getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieEngineContext engineContext, HoodieTableMetadata tableMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
        HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, dataMetaClient);
        java.util.List<String> columnsToIndex = Collections.singletonList(indexDefinition.getSourceFields().get(0));
        try {
            Option<Schema> writerSchema = Option.ofNullable(commitMetadata.getMetadata("schema")).flatMap(writerSchemaStr -> StringUtils.isNullOrEmpty(writerSchemaStr) ? Option.empty() : Option.of(new Schema.Parser().parse(writerSchemaStr)));
            HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
            Schema tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieAvroUtils.addMetadataFields(schema) : schema).orElseThrow(() -> new IllegalStateException(String.format("Expected writer schema in commit metadata %s", commitMetadata)));
            java.util.List columnsToIndexSchemaMap = columnsToIndex.stream().map(columnToIndex -> Pair.of(columnToIndex, HoodieAvroUtils.getSchemaForField(tableSchema, columnToIndex).getValue().schema())).collect(Collectors.toList());
            java.util.List validColumnsToIndex = columnsToIndexSchemaMap.stream().filter(colSchemaPair -> HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey()) || HoodieTableMetadataUtil.isColumnTypeSupported((Schema)colSchemaPair.getValue(), recordTypeOpt)).map(entry -> (String)entry.getKey()).collect(Collectors.toList());
            if (validColumnsToIndex.isEmpty()) {
                return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new ArrayList()));
            }
            LOG.debug("Indexing following columns for partition stats index: {}", validColumnsToIndex);
            ArrayList<String> partitionPaths = new ArrayList<String>(commitMetadata.getWritePartitionPaths());
            HoodieTableFileSystemView fileSystemView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(dataMetaClient);
            int parallelism = Math.max(Math.min(partitionPaths.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
            return engineContext.parallelize(partitionPaths, parallelism).mapToPair(partitionName -> {
                ValidationUtils.checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table");
                Set fileNames = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.of(fileSystemView), partitionName).stream().flatMap(fileSlice -> Stream.concat(Stream.of(fileSlice.getBaseFile().map(BaseFile::getFileName).orElse(null)), fileSlice.getLogFiles().map(HoodieLogFile::getFileName))).filter(Objects::nonNull).collect(Collectors.toSet());
                java.util.List<HoodieColumnRangeMetadata> partitionColumnMetadata = tableMetadata.getRecordsByKeyPrefixes(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex, partitionName), indexPartition, false).map(record -> ((HoodieMetadataPayload)record.getData()).getInsertValue(null, null)).filter(Option::isPresent).map(data -> ((HoodieMetadataRecord)((Object)((Object)((Object)((Object)data.get()))))).getColumnStatsMetadata()).filter(stats -> fileNames.contains(stats.getFileName())).map(HoodieColumnRangeMetadata::fromColumnStats).collectAsList();
                return Pair.of(partitionName, partitionColumnMetadata);
            });
        }
        catch (Exception e) {
            throw new HoodieException("Failed to generate column stats records for metadata table", e);
        }
    }
}

