/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.metadata;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkExpressionIndex;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieMetadataBulkInsertPartitioner;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkHoodieBackedTableMetadataWriter
extends HoodieBackedTableMetadataWriter<JavaRDD<HoodieRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class);

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, context, inflightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
        return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp);
    }

    public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
        return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig, context, (Option<String>)Option.empty());
    }

    SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, HoodieEngineContext engineContext, Option<String> inflightInstantTimestamp) {
        super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
    }

    protected void initRegistry() {
        if (this.metadataWriteConfig.isMetricsOn()) {
            if (this.metadataWriteConfig.isExecutorMetricsEnabled() && this.metadataWriteConfig.getMetricsReporterType() != MetricsReporterType.INMEMORY) {
                Registry registry = Registry.getRegistry((String)"HoodieMetadata", (String)DistributedRegistry.class.getName());
                HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)this.engineContext;
                ((DistributedRegistry)registry).register(sparkEngineContext.getJavaSparkContext());
            } else {
                Registry registry = Registry.getRegistry((String)"HoodieMetadata");
            }
            this.metrics = Option.of((Object)new HoodieMetadataMetrics(this.metadataWriteConfig.getMetricsConfig(), this.dataMetaClient.getStorage()));
        } else {
            this.metrics = Option.empty();
        }
    }

    protected void commit(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
        this.commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
    }

    protected JavaRDD<HoodieRecord> convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
        return HoodieJavaRDD.getJavaRDD(records);
    }

    protected void bulkCommit(String instantTime, String partitionName, HoodieData<HoodieRecord> records, int fileGroupCount) {
        SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
        this.commitInternal(instantTime, Collections.singletonMap(partitionName, records), true, Option.of((Object)partitioner));
    }

    public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
        List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
        LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
        SparkRDDWriteClient writeClient = (SparkRDDWriteClient)this.getWriteClient();
        String actionType = CommitUtils.getCommitActionType((WriteOperationType)WriteOperationType.DELETE_PARTITION, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
        writeClient.startCommitWithTime(instantTime, actionType);
        writeClient.deletePartitions(partitionsToDrop, instantTime);
    }

    protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf, String instantTime) {
        HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)this.engineContext;
        if (indexDefinition.getSourceFields().isEmpty()) {
            return sparkEngineContext.emptyHoodieData();
        }
        String columnToIndex = (String)indexDefinition.getSourceFields().get(0);
        SQLContext sqlContext = sparkEngineContext.getSqlContext();
        HoodieData rowData = sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism).flatMap((SerializableFunction & Serializable)entry -> {
            String partition = (String)entry.getKey();
            Pair filePathSizePair = (Pair)entry.getValue();
            String filePath = (String)filePathSizePair.getKey();
            String relativeFilePath = FSUtils.getRelativePartitionPath((StoragePath)metaClient.getBasePath(), (StoragePath)new StoragePath(filePath));
            long fileSize = (Long)filePathSizePair.getValue();
            List<Row> rowsForFilePath = SparkMetadataWriterUtils.readRecordsAsRows(new StoragePath[]{new StoragePath(filePath)}, sqlContext, metaClient, readerSchema, this.dataWriteConfig, FSUtils.isBaseFile((StoragePath)new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
            List<Row> rowsWithIndexMetadata = SparkMetadataWriterUtils.getRowsWithExpressionIndexMetadata(rowsForFilePath, partition, relativeFilePath, fileSize);
            return rowsWithIndexMetadata.iterator();
        });
        StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema).add(StructField.apply((String)"_hoodie_expression_index_partition", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_relative_file_path", (DataType)DataTypes.StringType, (boolean)false, (Metadata)Metadata.empty())).add(StructField.apply((String)"_hoodie_expression_index_file_size", (DataType)DataTypes.LongType, (boolean)false, (Metadata)Metadata.empty()));
        Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), structType);
        HoodieSparkExpressionIndex expressionIndex = new HoodieSparkExpressionIndex(indexDefinition.getIndexName(), indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), indexDefinition.getIndexOptions());
        Column indexedColumn = (Column)expressionIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex)));
        rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn);
        if (indexDefinition.getIndexType().equalsIgnoreCase("column_stats")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingColumnStats((Dataset<Row>)rowDataset, expressionIndex, columnToIndex);
        }
        if (indexDefinition.getIndexType().equalsIgnoreCase("bloom_filters")) {
            return SparkMetadataWriterUtils.getExpressionIndexRecordsUsingBloomFilter((Dataset<Row>)rowDataset, columnToIndex, this.metadataWriteConfig, instantTime, indexDefinition.getIndexName());
        }
        throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported");
    }

    protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) {
        return HoodieSparkTable.create(writeConfig, this.engineContext, metaClient);
    }

    public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> initializeWriteClient() {
        return new SparkRDDWriteClient(this.engineContext, this.metadataWriteConfig, (Option<EmbeddedTimelineService>)Option.empty());
    }

    protected EngineType getEngineType() {
        return EngineType.SPARK;
    }

    public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) {
        HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)engineContext;
        if (recordKeySecondaryKeyMap.isEmpty()) {
            return sparkEngineContext.emptyHoodieData();
        }
        ArrayList deletedRecords = new ArrayList();
        recordKeySecondaryKeyMap.forEach((key, value) -> {
            HoodieRecord siRecord = HoodieMetadataPayload.createSecondaryIndexRecord((String)key, (String)value, (String)indexDefinition.getIndexName(), (Boolean)true);
            deletedRecords.add(siRecord);
        });
        return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1);
    }
}

