/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.utils;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkRowSerDe;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.stats.SparkValueMetadataUtils;
import org.apache.hudi.stats.ValueMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.List;

public class SparkMetadataWriterUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SparkMetadataWriterUtils.class);

    public static Column[] getExpressionIndexColumns() {
        return new Column[]{functions.col((String)"_hoodie_expression_index_partition"), functions.col((String)"_hoodie_expression_index_relative_file_path"), functions.col((String)"_hoodie_expression_index_file_size")};
    }

    public static String[] getExpressionIndexColumnNames() {
        return new String[]{"_hoodie_expression_index_partition", "_hoodie_expression_index_relative_file_path", "_hoodie_expression_index_file_size"};
    }

    public static ClosableIterator<Row> getRowsWithExpressionIndexMetadata(ClosableIterator<InternalRow> rowsForFilePath, SparkRowSerDe sparkRowSerDe, String partition, String filePath, long fileSize) {
        return new CloseableMappingIterator(rowsForFilePath, row -> {
            List<Object> indexMetadata = JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition, filePath, fileSize));
            Row expressionIndexRow = Row.fromSeq(indexMetadata);
            ArrayList<Row> rows = new ArrayList<Row>(2);
            rows.add(sparkRowSerDe.deserializeRow((InternalRow)row));
            rows.add(expressionIndexRow);
            List rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows);
            return Row.merge(rowSeq);
        });
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingColumnStats(Dataset<Row> dataset, HoodieExpressionIndex<Column, Column> expressionIndex, String columnToIndex, Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt, HoodieIndexVersion indexVersion) {
        Dataset columnRangeMetadataDataset = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()).groupBy(SparkMetadataWriterUtils.getExpressionIndexColumns()).agg(functions.count((Column)functions.when((Column)functions.col((String)columnToIndex).isNull(), (Object)1)).alias("nullCount"), new Column[]{functions.min((String)columnToIndex).alias("minValue"), functions.max((String)columnToIndex).alias("maxValue"), functions.count((String)columnToIndex).alias("valueCount")});
        ValueMetadata valueMetadata = SparkMetadataWriterUtils.getValueMetadataFromColumnRangeDatasetSchema(columnRangeMetadataDataset.schema(), indexVersion);
        HoodiePairData rangeMetadataHoodieJavaRDD = HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMapToPair((SerializableFunction & Serializable)row -> {
            int baseAggregatePosition = SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
            long nullCount = row.getLong(baseAggregatePosition);
            Comparable minValue = SparkValueMetadataUtils.convertSparkToJava(valueMetadata, row.get(baseAggregatePosition + 1));
            Comparable maxValue = SparkValueMetadataUtils.convertSparkToJava(valueMetadata, row.get(baseAggregatePosition + 2));
            long valueCount = row.getLong(baseAggregatePosition + 3);
            String partitionName = row.getString(0);
            String relativeFilePath = row.getString(1);
            long totalFileSize = row.getLong(2);
            long totalUncompressedSize = totalFileSize * 2L;
            HoodieColumnRangeMetadata rangeMetadata = HoodieColumnRangeMetadata.create((String)relativeFilePath, (String)columnToIndex, (Comparable)minValue, (Comparable)maxValue, (long)nullCount, (long)valueCount, (long)totalFileSize, (long)totalUncompressedSize, (ValueMetadata)valueMetadata);
            return Collections.singletonList(Pair.of((Object)partitionName, (Object)rangeMetadata)).iterator();
        });
        if (partitionRecordsFunctionOpt.isPresent()) {
            rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");
        }
        HoodieData colStatRecords = rangeMetadataHoodieJavaRDD.map((SerializableFunction & Serializable)pair -> HoodieMetadataPayload.createColumnStatsRecords((String)((String)pair.getKey()), Collections.singletonList(pair.getValue()), (boolean)false, (String)expressionIndex.getIndexName(), (int)MetadataPartitionType.COLUMN_STATS.getRecordType()).collect(Collectors.toList())).flatMap((SerializableFunction & Serializable)records -> records.iterator());
        Option partitionStatRecordsOpt = Option.empty();
        if (partitionRecordsFunctionOpt.isPresent()) {
            partitionStatRecordsOpt = Option.of(((Function)partitionRecordsFunctionOpt.get()).apply(rangeMetadataHoodieJavaRDD));
            rangeMetadataHoodieJavaRDD.unpersist();
        }
        return partitionRecordsFunctionOpt.isPresent() ? new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata((HoodieData<HoodieRecord>)colStatRecords, (Option<HoodieData<HoodieRecord>>)partitionStatRecordsOpt) : new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata((HoodieData<HoodieRecord>)colStatRecords);
    }

    private static ValueMetadata getValueMetadataFromColumnRangeDatasetSchema(StructType datasetSchema, HoodieIndexVersion indexVersion) {
        DataType maxDataType;
        int baseAggregatePosition = SparkMetadataWriterUtils.getExpressionIndexColumnNames().length;
        DataType minDataType = datasetSchema.apply(baseAggregatePosition + 1).dataType();
        if (minDataType != (maxDataType = datasetSchema.apply(baseAggregatePosition + 2).dataType())) {
            throw new HoodieException(String.format("Column stats data types do not match for min (%s) and max (%s)", minDataType, maxDataType));
        }
        return SparkValueMetadataUtils.getValueMetadata(minDataType, indexVersion);
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String columnToIndex, HoodieStorageConfig storageConfig, String instantTime, HoodieIndexDefinition indexDefinition) {
        String indexName = indexDefinition.getIndexName();
        SparkMetadataWriterUtils.setBloomFilterProps(storageConfig, indexDefinition.getIndexOptions());
        Dataset bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()).groupByKey((MapFunction & Serializable)row -> Pair.of((Object)row.getString(1), (Object)row.getString(2)), Encoders.kryo(Pair.class)).flatMapGroups((FlatMapGroupsFunction & Serializable)(pair, iterator) -> {
            String partition = pair.getLeft().toString();
            String relativeFilePath = pair.getRight().toString();
            String fileName = FSUtils.getFileName((String)relativeFilePath, (String)partition);
            BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter((HoodieConfig)storageConfig);
            iterator.forEachRemaining(row -> {
                byte[] key = row.getAs(columnToIndex).toString().getBytes();
                bloomFilter.add(key);
            });
            ByteBuffer bloomByteBuffer = ByteBuffer.wrap(StringUtils.getUTF8Bytes((String)bloomFilter.serializeToString()));
            HoodieRecord bloomFilterRecord = HoodieMetadataPayload.createBloomFilterMetadataRecord((String)partition, (String)fileName, (String)instantTime, (String)storageConfig.getBloomFilterType(), (ByteBuffer)bloomByteBuffer, (boolean)false, (String)indexName);
            return Collections.singletonList(bloomFilterRecord).iterator();
        }, Encoders.kryo(HoodieRecord.class));
        return new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(HoodieJavaRDD.of(bloomFilterRecords.javaRDD()));
    }

    private static void setBloomFilterProps(HoodieStorageConfig storageConfig, Map<String, String> indexOptions) {
        HoodieExpressionIndex.BLOOM_FILTER_CONFIG_MAPPING.forEach((sourceKey, targetKey) -> {
            if (indexOptions.containsKey(sourceKey)) {
                storageConfig.getProps().setProperty(targetKey, (String)indexOptions.get(sourceKey));
            }
        });
    }

    public static HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata getExprIndexRecords(java.util.List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema readerSchema, String instantTime, HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig, Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt) {
        HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)engineContext;
        if (indexDefinition.getSourceFields().isEmpty()) {
            return new HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata(sparkEngineContext.emptyHoodieData());
        }
        ValidationUtils.checkArgument((indexDefinition.getSourceFields().size() == 1 ? 1 : 0) != 0, (String)"Only one source field is supported for expression index");
        String columnToIndex = (String)indexDefinition.getSourceFields().get(0);
        ReaderContextFactory readerContextFactory = engineContext.getReaderContextFactory(metaClient);
        HoodieData rowData = sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism).flatMap((SerializableFunction & Serializable)entry -> SparkMetadataWriterUtils.getExpressionIndexRecordsIterator((HoodieReaderContext<InternalRow>)readerContextFactory.getContext(), metaClient, tableSchema, readerSchema, dataWriteConfig, (Pair<String, Pair<String, Long>>)entry));
        StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema).add(StructField.apply((String)"_hoodie_expression_index_partition", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_relative_file_path", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_file_size", (DataType)DataTypes.LongType, (boolean)false, (Metadata)Metadata.empty()));
        Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), structType);
        HoodieSparkExpressionIndex expressionIndex = new HoodieSparkExpressionIndex(indexDefinition);
        Column indexedColumn = (Column)expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
        rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
        if (indexDefinition.getIndexType().equalsIgnoreCase("column_stats")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingColumnStats((Dataset<Row>)rowDataset, expressionIndex, columnToIndex, partitionRecordsFunctionOpt, indexDefinition.getVersion());
        }
        if (indexDefinition.getIndexType().equalsIgnoreCase("bloom_filters")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter((Dataset<Row>)rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(), instantTime, indexDefinition);
        }
        throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported");
    }

    private static Iterator<Row> getExpressionIndexRecordsIterator(HoodieReaderContext<InternalRow> readerContext, HoodieTableMetaClient metaClient, Schema tableSchema, Schema readerSchema, HoodieWriteConfig dataWriteConfig, Pair<String, Pair<String, Long>> entry) {
        Stream<Object> logFileStream;
        Option baseFileOption;
        String partition = (String)entry.getKey();
        Pair filePathSizePair = (Pair)entry.getValue();
        String filePath = (String)filePathSizePair.getKey();
        String relativeFilePath = FSUtils.getRelativePartitionPath((StoragePath)metaClient.getBasePath(), (StoragePath)new StoragePath(filePath));
        long fileSize = (Long)filePathSizePair.getValue();
        boolean isBaseFile = FSUtils.isBaseFile((StoragePath)new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)));
        if (isBaseFile) {
            baseFileOption = Option.of((Object)new HoodieBaseFile(filePath));
            logFileStream = Stream.empty();
        } else {
            baseFileOption = Option.empty();
            logFileStream = Stream.of(new HoodieLogFile(filePath));
        }
        HoodieFileGroupReader fileGroupReader = HoodieFileGroupReader.newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withDataSchema(tableSchema).withRequestedSchema(readerSchema).withProps(dataWriteConfig.getProps()).withLatestCommitTime((String)metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse((Object)"")).withAllowInflightInstants(true).withBaseFileOption(baseFileOption).withLogFiles(logFileStream).withPartitionPath(partition).withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan()).build();
        try {
            ClosableIterator rowsForFilePath = fileGroupReader.getClosableIterator();
            SparkRowSerDe sparkRowSerDe = HoodieSparkUtils.getCatalystRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema));
            return SparkMetadataWriterUtils.getRowsWithExpressionIndexMetadata((ClosableIterator<InternalRow>)rowsForFilePath, sparkRowSerDe, partition, relativeFilePath, fileSize);
        }
        catch (IOException ex) {
            throw new HoodieIOException("Error reading file " + filePath, ex);
        }
    }

    public static HoodiePairData<String, java.util.List<HoodieColumnRangeMetadata<Comparable>>> getExpressionIndexPartitionStatsForExistingFiles(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieEngineContext engineContext, HoodieTableMetadata tableMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt, String instantTime, HoodieWriteConfig dataWriteConfig) {
        HoodieIndexVersion indexVersion = HoodieTableMetadataUtil.existingIndexVersionOrDefault((String)indexPartition, (HoodieTableMetaClient)dataMetaClient);
        HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition((String)indexPartition, (HoodieTableMetaClient)dataMetaClient);
        java.util.List columnsToIndex = Collections.singletonList(indexDefinition.getSourceFields().get(0));
        try {
            Option writerSchema = Option.ofNullable((Object)commitMetadata.getMetadata("schema")).flatMap(writerSchemaStr -> StringUtils.isNullOrEmpty((String)writerSchemaStr) ? Option.empty() : Option.of((Object)new Schema.Parser().parse(writerSchemaStr)));
            HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
            Schema tableSchema = (Schema)writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieAvroUtils.addMetadataFields((Schema)schema) : schema).orElseThrow(() -> new IllegalStateException(String.format("Expected writer schema in commit metadata %s", commitMetadata)));
            java.util.List columnsToIndexSchemaMap = columnsToIndex.stream().map(columnToIndex -> Pair.of((Object)columnToIndex, (Object)((Schema.Field)HoodieAvroUtils.getSchemaForField((Schema)tableSchema, (String)columnToIndex).getValue()).schema())).collect(Collectors.toList());
            java.util.List validColumnsToIndex = columnsToIndexSchemaMap.stream().filter(colSchemaPair -> HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey()) || HoodieTableMetadataUtil.isColumnTypeSupported((Schema)((Schema)colSchemaPair.getValue()), (Option)recordTypeOpt, (HoodieIndexVersion)indexVersion)).map(entry -> (String)entry.getKey()).collect(Collectors.toList());
            if (validColumnsToIndex.isEmpty()) {
                return engineContext.emptyHoodieData().mapToPair((SerializablePairFunction & Serializable)o -> Pair.of((Object)"", new ArrayList()));
            }
            LOG.debug("Indexing following columns for partition stats index: {}", validColumnsToIndex);
            ArrayList<java.util.List<HoodieWriteStat>> partitionedWriteStats = new ArrayList<java.util.List<HoodieWriteStat>>(commitMetadata.getWriteStats().stream().collect(Collectors.groupingBy(HoodieWriteStat::getPartitionPath)).values());
            Map fileGroupIdsToReplaceMap = commitMetadata instanceof HoodieReplaceCommitMetadata ? ((HoodieReplaceCommitMetadata)commitMetadata).getPartitionToReplaceFileIds().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet((Collection)e.getValue()))) : Collections.emptyMap();
            String maxInstantTime = HoodieMetadataWriteUtils.getMaxInstantTime((HoodieTableMetaClient)dataMetaClient, (String)instantTime);
            int parallelism = Math.max(Math.min(partitionedWriteStats.size(), metadataConfig.getPartitionStatsIndexParallelism()), 1);
            return engineContext.parallelize(partitionedWriteStats, parallelism).mapToPair((SerializablePairFunction & Serializable)partitionedWriteStat -> {
                String partitionName = ((HoodieWriteStat)partitionedWriteStat.get(0)).getPartitionPath();
                ValidationUtils.checkState((tableMetadata != null ? 1 : 0) != 0, (String)"tableMetadata should not be null when scanning metadata table");
                Set fileNames = HoodieMetadataWriteUtils.getFilesToFetchColumnStats((java.util.List)partitionedWriteStat, (HoodieTableMetaClient)dataMetaClient, (HoodieTableMetadata)tableMetadata, (HoodieWriteConfig)dataWriteConfig, (String)partitionName, (String)maxInstantTime, (String)instantTime, (Map)fileGroupIdsToReplaceMap, (java.util.List)validColumnsToIndex, (HoodieIndexVersion)indexVersion);
                java.util.List partitionColumnMetadata = tableMetadata.getRecordsByKeyPrefixes((HoodieData)HoodieListData.lazy((java.util.List)HoodieTableMetadataUtil.generateColumnStatsKeys((java.util.List)validColumnsToIndex, (String)partitionName)), indexPartition, false).map((SerializableFunction & Serializable)record -> ((HoodieMetadataPayload)record.getData()).getInsertValue(null, null)).filter(Option::isPresent).map((SerializableFunction & Serializable)data -> ((HoodieMetadataRecord)data.get()).getColumnStatsMetadata()).filter((SerializableFunction & Serializable)stats -> fileNames.contains(stats.getFileName())).map(HoodieColumnRangeMetadata::fromColumnStats).collectAsList();
                return Pair.of((Object)partitionName, (Object)partitionColumnMetadata);
            });
        }
        catch (Exception e2) {
            throw new HoodieException("Failed to generate column stats records for metadata table", (Throwable)e2);
        }
    }
}

