/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.ConsistentBucketIndexBulkInsertPartitionerWithRows;
import org.apache.hudi.execution.bulkinsert.RDDConsistentBucketBulkInsertPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRecordPayload<T>>
extends MultipleSparkJobExecutionStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkConsistentBucketClusteringExecutionStrategy.class);

    public SparkConsistentBucketClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords, int numOutputGroups, String instantTime, Map<String, String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList, boolean shouldPreserveHoodieMetadata, Map<String, String> extraMetadata) {
        LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
        TypedProperties props = this.getWriteConfig().getProps();
        HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps((Map)props).build();
        Pair<String, List<ConsistentHashingNode>> childNodesPair = this.extractChildNodes(extraMetadata);
        ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new ConsistentBucketIndexBulkInsertPartitionerWithRows(this.getHoodieTable(), strategyParams, shouldPreserveHoodieMetadata, Collections.singletonMap(childNodesPair.getKey(), childNodesPair.getValue()));
        Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);
        return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, this.getHoodieTable(), newConfig, partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, WriteOperationType.CLUSTER);
    }

    @Override
    public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int numOutputGroups, String instantTime, Map<String, String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList, boolean preserveHoodieMetadata, Map<String, String> extraMetadata) {
        LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
        TypedProperties props = this.getWriteConfig().getProps();
        props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
        HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps((Map)props).build();
        Pair<String, List<ConsistentHashingNode>> childNodesPair = this.extractChildNodes(extraMetadata);
        RDDConsistentBucketBulkInsertPartitioner partitioner = new RDDConsistentBucketBulkInsertPartitioner(this.getHoodieTable(), strategyParams, preserveHoodieMetadata, Collections.singletonMap(childNodesPair.getKey(), childNodesPair.getValue()));
        return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, this.getHoodieTable(), newConfig, false, (BulkInsertPartitioner)partitioner, true, numOutputGroups);
    }

    private Pair<String, List<ConsistentHashingNode>> extractChildNodes(Map<String, String> extraMetadata) {
        try {
            List nodes = ConsistentHashingNode.fromJsonString((String)extraMetadata.get("clustering.group.child.node"));
            return Pair.of((Object)extraMetadata.get("clustering.group.partition"), (Object)nodes);
        }
        catch (Exception e) {
            LOG.error("Failed to extract hashing children nodes", (Throwable)e);
            throw new HoodieClusteringException("Failed to extract hashing children nodes", (Throwable)e);
        }
    }
}

