/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDMetadataWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.MetadataTableFileGroupIndexParser;
import org.apache.hudi.metadata.SparkHoodieMetadataBulkInsertPartitioner;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter<JavaRDD<HoodieRecord>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class);

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, context, inflightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig, context, (Option<String>)Option.empty());
    }

    SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        this(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, false);
    }

    SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp, boolean streamingWrites) {
        super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, streamingWrites);
    }

    protected void initRegistry() {
        if (this.metadataWriteConfig.isMetricsOn()) {
            if (this.metadataWriteConfig.isExecutorMetricsEnabled() && this.metadataWriteConfig.getMetricsReporterType() != MetricsReporterType.INMEMORY) {
                Registry registry = Registry.getRegistry((String)"HoodieMetadata", (String)DistributedRegistry.class.getName());
                HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)this.engineContext;
                ((DistributedRegistry)registry).register(sparkEngineContext.getJavaSparkContext());
            } else {
                Registry registry = Registry.getRegistry((String)"HoodieMetadata");
            }
            this.metrics = Option.of((Object)new HoodieMetadataMetrics(this.metadataWriteConfig.getMetricsConfig(), this.dataMetaClient.getStorage()));
        } else {
            this.metrics = Option.empty();
        }
    }

    protected void commit(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
        this.commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
    }

    protected JavaRDD<HoodieRecord> convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
        return HoodieJavaRDD.getJavaRDD(records);
    }

    protected HoodieData<WriteStatus> convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
        return HoodieJavaRDD.of(records);
    }

    public JavaRDD<WriteStatus> streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
        JavaRDD mdtRecords = HoodieJavaRDD.getJavaRDD((HoodieData)fileGroupIdToTaggedRecords.getValue());
        this.engineContext.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), String.format("Upserting with instant %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
        JavaRDD<WriteStatus> partialMetadataWriteStatuses = this.getSparkWriteClient(Option.empty()).firstUpsertPreppedRecords(mdtRecords, instantTime, (List)fileGroupIdToTaggedRecords.getKey());
        return partialMetadataWriteStatuses;
    }

    public JavaRDD<WriteStatus> secondaryWriteToMetadataTablePartitions(JavaRDD<HoodieRecord> preppedRecords, String instantTime) {
        this.engineContext.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, this.metadataWriteConfig.getTableName()));
        JavaRDD<WriteStatus> partialMetadataWriteStatuses = this.getSparkWriteClient(Option.empty()).secondaryUpsertPreppedRecords(preppedRecords, instantTime);
        return partialMetadataWriteStatuses;
    }

    protected void bulkInsertAndCommit(BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String instantTime, JavaRDD<HoodieRecord> preppedRecordInputs, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        JavaRDD writeStatusJavaRDD = (JavaRDD)writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
        writeClient.commit(instantTime, (Object)writeStatusJavaRDD, Option.empty(), "deltacommit", Collections.emptyMap());
    }

    protected void upsertAndCommit(BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
        int parallelism = this.dataWriteConfig.getMetadataConfig().getRecordPreparationParallelism();
        if (parallelism > 0 && preppedRecordInputs.getNumPartitions() > parallelism) {
            preppedRecordInputs = preppedRecordInputs.coalesce(parallelism);
        }
        JavaRDD writeStatusJavaRDD = (JavaRDD)writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
        writeClient.commit(instantTime, (Object)writeStatusJavaRDD, Option.empty(), "deltacommit", Collections.emptyMap());
    }

    protected void upsertAndCommit(BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String instantTime, JavaRDD<HoodieRecord> preppedRecordInputs, List<HoodieFileGroupId> fileGroupsIdsToUpdate) {
        JavaRDD<WriteStatus> writeStatusJavaRDD = this.getSparkWriteClient(Option.of(writeClient)).firstUpsertPreppedRecords(preppedRecordInputs, instantTime, fileGroupsIdsToUpdate);
        writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), "deltacommit", Collections.emptyMap());
    }

    protected void bulkCommit(String instantTime, String partitionPath, HoodieData<HoodieRecord> records, MetadataTableFileGroupIndexParser indexParser) {
        SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(indexParser);
        this.commitInternal(instantTime, Collections.singletonMap(partitionPath, records), true, Option.of((Object)partitioner));
    }

    public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
        List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
        LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
        SparkRDDWriteClient writeClient = (SparkRDDWriteClient)this.getWriteClient();
        String actionType = CommitUtils.getCommitActionType((WriteOperationType)WriteOperationType.DELETE_PARTITION, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
        writeClient.startCommitForMetadataTable(this.metadataMetaClient, instantTime, actionType);
        HoodieWriteResult result = writeClient.deletePartitions(partitionsToDrop, instantTime);
        writeClient.commit(instantTime, result.getWriteStatuses(), Option.empty(), "replacecommit", result.getPartitionToReplaceFileIds());
    }

    protected HoodieData<HoodieRecord> getExpressionIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
        HoodieIndexDefinition indexDefinition = this.getIndexDefinition(indexPartition);
        boolean isExprIndexUsingColumnStats = indexDefinition.getIndexType().equals("column_stats");
        Option partitionRecordsFunctionOpt = Option.empty();
        if (isExprIndexUsingColumnStats) {
            HoodiePairData exprIndexPartitionStatUpdates = SparkMetadataWriterUtils.getExpressionIndexPartitionStatsForExistingFiles(commitMetadata, indexPartition, this.engineContext, (HoodieTableMetadata)this.getTableMetadata(), this.dataMetaClient, this.dataWriteConfig.getMetadataConfig(), (Option<HoodieRecord.HoodieRecordType>)Option.of((Object)this.dataWriteConfig.getRecordMerger().getRecordType()), instantTime, this.dataWriteConfig).flatMapValues(List::iterator);
            partitionRecordsFunctionOpt = Option.of(rangeMetadata -> HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords((HoodiePairData)exprIndexPartitionStatUpdates.union(rangeMetadata), (boolean)true, (Option)Option.of((Object)indexDefinition.getIndexName())));
        }
        ArrayList<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new ArrayList<Pair<String, Pair<String, Long>>>();
        commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(Pair.of((Object)writeStat.getPartitionPath(), (Object)Pair.of((Object)new StoragePath(this.dataMetaClient.getBasePath(), writeStat.getPath()).toString(), (Object)writeStat.getFileSizeInBytes())))));
        int parallelism = Math.min(partitionFilePathPairs.size(), this.dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
        Schema tableSchema = new TableSchemaResolver(this.dataMetaClient).getTableAvroSchema();
        Schema readerSchema = HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex((HoodieIndexDefinition)indexDefinition, (HoodieTableMetaClient)this.dataMetaClient, (Schema)tableSchema);
        HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata expressionIndexComputationMetadata = SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs, indexDefinition, this.dataMetaClient, parallelism, tableSchema, readerSchema, instantTime, this.engineContext, this.dataWriteConfig, (Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>)partitionRecordsFunctionOpt);
        return expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent() ? expressionIndexComputationMetadata.getExpressionIndexRecords().union((HoodieData)expressionIndexComputationMetadata.getPartitionStatRecordsOption().get()) : expressionIndexComputationMetadata.getExpressionIndexRecords();
    }

    protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema readerSchema, StorageConfiguration<?> storageConf, String instantTime) {
        HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata expressionIndexComputationMetadata = SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet, indexDefinition, metaClient, parallelism, tableSchema, readerSchema, instantTime, this.engineContext, this.dataWriteConfig, (Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>)Option.of(rangeMetadata -> HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords((HoodiePairData)rangeMetadata, (boolean)true, (Option)Option.of((Object)indexDefinition.getIndexName()))));
        HoodieData exprIndexRecords = expressionIndexComputationMetadata.getExpressionIndexRecords();
        if (indexDefinition.getIndexType().equals("column_stats")) {
            exprIndexRecords = exprIndexRecords.union((HoodieData)expressionIndexComputationMetadata.getPartitionStatRecordsOption().get());
        }
        return exprIndexRecords;
    }

    protected SparkRDDMetadataWriteClient getSparkWriteClient(Option<BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>>> writeClientOpt) {
        return (SparkRDDMetadataWriteClient)((Object)writeClientOpt.orElse((Object)this.getWriteClient()));
    }

    public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> initializeWriteClient() {
        return new SparkRDDMetadataWriteClient(this.engineContext, this.metadataWriteConfig, (Option<EmbeddedTimelineService>)Option.empty());
    }

    protected EngineType getEngineType() {
        return EngineType.SPARK;
    }

    protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) {
        new HoodieSparkIndexClient(this.dataWriteConfig, this.engineContext).createOrUpdateColumnStatsIndexDefinition(this.dataMetaClient, columnsToIndex);
    }
}

