/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.cluster;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
    private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
    private final HoodieClusteringPlan clusteringPlan;

    public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime) {
        super(context, config, table, instantTime, WriteOperationType.CLUSTER);
        this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime)).map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime));
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
        HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(this.instantTime);
        this.table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
        this.table.getMetaClient().reloadActiveTimeline();
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.context);
        JavaRDD writeStatusRDD = this.clusteringPlan.getInputGroups().stream().map(inputGroup -> this.runClusteringForGroupAsync((HoodieClusteringGroup)((Object)inputGroup), this.clusteringPlan.getStrategy().getStrategyParams())).map(CompletableFuture::join).reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());
        HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = this.buildWriteMetadata((JavaRDD<WriteStatus>)writeStatusRDD);
        JavaRDD<WriteStatus> statuses = this.updateIndex((JavaRDD<WriteStatus>)writeStatusRDD, writeMetadata);
        writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
        this.validateWriteResult(writeMetadata);
        this.commitOnAutoCommit(writeMetadata);
        if (!writeMetadata.getCommitMetadata().isPresent()) {
            HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, this.getSchemaToStoreInCommit(), this.getCommitActionType());
            writeMetadata.setCommitMetadata(Option.of(commitMetadata));
        }
        return writeMetadata;
    }

    private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
        if (writeMetadata.getWriteStatuses().isEmpty()) {
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + this.instantTime + " #groups: " + this.clusteringPlan.getInputGroups().size() + " expected at least " + this.clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + " write statuses");
        }
    }

    private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams) {
        CompletableFuture<JavaRDD<WriteStatus>> writeStatusesFuture = CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.context);
            JavaRDD<HoodieRecord<HoodieRecordPayload>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()));
            return (JavaRDD)((ClusteringExecutionStrategy)ReflectionUtils.loadClass(this.config.getClusteringExecutionStrategyClass(), this.table, this.context, this.config)).performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), this.instantTime, strategyParams, readerSchema);
        });
        return writeStatusesFuture;
    }

    @Override
    protected String getCommitActionType() {
        return "replacecommit";
    }

    @Override
    protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
        return ClusteringUtils.getFileGroupsFromClusteringPlan(this.clusteringPlan).collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent();
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(jsc, clusteringOps);
        }
        return this.readRecordsForGroupBaseFiles(jsc, clusteringOps);
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupWithLogs(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList recordIterators = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.config.getProps());
                LOG.info((Object)("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction));
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()));
                    HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
                    HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(this.table.getMetaClient().getFs(), this.table.getMetaClient().getBasePath(), clusteringOp.getDeltaFilePaths(), readerSchema, this.instantTime, maxMemoryPerCompaction, this.config.getCompactionLazyBlockReadEnabled(), this.config.getCompactionReverseLogReadEnabled(), this.config.getMaxDFSStreamBufferSize(), this.config.getSpillableMapBasePath());
                    recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, this.table.getMetaClient().getTableConfig().getPayloadClass()));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(recordIterators);
        });
    }

    private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList iteratorsForPartition = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.config.getSchema()));
                    HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(this.table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
                    iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(iteratorsForPartition);
        }).map(this::transform);
    }

    private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord indexedRecord) {
        GenericRecord record = (GenericRecord)indexedRecord;
        String key = record.get("_hoodie_record_key").toString();
        String partition = record.get("_hoodie_partition_path").toString();
        HoodieKey hoodieKey = new HoodieKey(key, partition);
        Object avroPayload = ReflectionUtils.loadPayload(this.table.getMetaClient().getTableConfig().getPayloadClass(), new Object[]{Option.of(record)}, Option.class);
        HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
        return hoodieRecord;
    }

    private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> writeStatusJavaRDD) {
        HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<JavaRDD<WriteStatus>>();
        result.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(writeStatusJavaRDD));
        result.setWriteStatuses(writeStatusJavaRDD);
        result.setCommitMetadata(Option.empty());
        result.setCommitted(false);
        return result;
    }
}

