/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.execution.bulkinsert.RDDBucketIndexPartitioner;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class RDDConsistentBucketPartitioner<T>
extends RDDBucketIndexPartitioner<T> {
    private static final Logger LOG = LogManager.getLogger(RDDConsistentBucketPartitioner.class);
    private final HoodieTable table;
    private final List<String> indexKeyFields;
    private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
    private final String[] sortColumnNames;
    private final boolean preserveHoodieMetadata;
    private final boolean consistentLogicalTimestampEnabled;
    private List<Boolean> doAppend;
    private List<String> fileIdPfxList;

    public RDDConsistentBucketPartitioner(HoodieTable table, Map<String, String> strategyParams, boolean preserveHoodieMetadata) {
        this.table = table;
        this.indexKeyFields = Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
        this.hashingChildrenNodes = new HashMap<String, List<ConsistentHashingNode>>();
        this.consistentLogicalTimestampEnabled = table.getConfig().isConsistentLogicalTimestampEnabled();
        this.preserveHoodieMetadata = preserveHoodieMetadata;
        this.sortColumnNames = strategyParams.containsKey(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()) ? strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key()).split(",") : null;
    }

    public RDDConsistentBucketPartitioner(HoodieTable table) {
        this(table, Collections.emptyMap(), false);
        ValidationUtils.checkArgument(table.getIndex() instanceof HoodieSparkConsistentBucketIndex, "RDDConsistentBucketPartitioner can only be used together with consistent hashing bucket index");
        ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ), "CoW table with bucket index doesn't support bulk_insert");
    }

    @Override
    public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
        final Map<String, ConsistentBucketIdentifier> partitionToIdentifier = this.initializeBucketIdentifier(records);
        final Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = this.generateFileIdPfx(partitionToIdentifier);
        return this.doPartition(records, new Partitioner(){

            public int numPartitions() {
                return RDDConsistentBucketPartitioner.this.fileIdPfxList.size();
            }

            public int getPartition(Object key) {
                HoodieKey hoodieKey = (HoodieKey)key;
                String partition = hoodieKey.getPartitionPath();
                ConsistentHashingNode node = ((ConsistentBucketIdentifier)partitionToIdentifier.get(partition)).getBucket(hoodieKey, RDDConsistentBucketPartitioner.this.indexKeyFields);
                return (Integer)((Map)partitionToFileIdPfxIdxMap.get(partition)).get(node.getFileIdPrefix());
            }
        });
    }

    @Override
    public boolean arePartitionRecordsSorted() {
        return this.sortColumnNames != null && this.sortColumnNames.length > 0 || this.table.requireSortedRecords() || this.table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
    }

    @Override
    public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
        return this.doAppend.get(idx) != false ? Option.of(new AppendHandleFactory()) : Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(this.getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
    }

    @Override
    public String getFileIdPfx(int partitionId) {
        return this.fileIdPfxList.get(partitionId);
    }

    public void addHashingChildrenNodes(String partition, List<ConsistentHashingNode> nodes) {
        ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as NORMAL");
        this.hashingChildrenNodes.put(partition, nodes);
    }

    private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
        HoodieSparkConsistentBucketIndex index = (HoodieSparkConsistentBucketIndex)this.table.getIndex();
        HoodieConsistentHashingMetadata metadata = index.loadOrCreateMetadata(this.table, partition);
        if (this.hashingChildrenNodes.containsKey(partition)) {
            metadata.setChildrenNodes(this.hashingChildrenNodes.get(partition));
        }
        return new ConsistentBucketIdentifier(metadata);
    }

    private Map<String, ConsistentBucketIdentifier> initializeBucketIdentifier(JavaRDD<HoodieRecord<T>> records) {
        return records.map(HoodieRecord::getPartitionPath).distinct().collect().stream().collect(Collectors.toMap(p -> p, p -> this.getBucketIdentifier((String)p)));
    }

    private Map<String, Map<String, Integer>> generateFileIdPfx(Map<String, ConsistentBucketIdentifier> partitionToIdentifier) {
        HashMap<String, Map<String, Integer>> partitionToFileIdPfxIdxMap = new HashMap<String, Map<String, Integer>>(partitionToIdentifier.size() * 2);
        this.doAppend = new ArrayList<Boolean>();
        this.fileIdPfxList = new ArrayList<String>();
        int count2 = 0;
        for (ConsistentBucketIdentifier identifier : partitionToIdentifier.values()) {
            HashMap<String, Integer> fileIdPfxToIdx = new HashMap<String, Integer>();
            for (ConsistentHashingNode node : identifier.getNodes()) {
                fileIdPfxToIdx.put(node.getFileIdPrefix(), count2++);
            }
            this.fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPrefix).collect(Collectors.toList()));
            if (identifier.getMetadata().isFirstCreated()) {
                this.doAppend.addAll(Collections.nCopies(identifier.getNodes().size(), false));
            } else {
                this.doAppend.addAll(identifier.getNodes().stream().map(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL).collect(Collectors.toList()));
            }
            partitionToFileIdPfxIdxMap.put(identifier.getMetadata().getPartitionPath(), fileIdPfxToIdx);
        }
        ValidationUtils.checkState(this.fileIdPfxList.size() == partitionToIdentifier.values().stream().mapToInt(ConsistentBucketIdentifier::getNumBuckets).sum(), "Error state after constructing fileId & idx mapping");
        return partitionToFileIdPfxIdxMap;
    }

    private JavaRDD<HoodieRecord<T>> doPartition(JavaRDD<HoodieRecord<T>> records, Partitioner partitioner) {
        if (this.sortColumnNames != null && this.sortColumnNames.length > 0) {
            return this.doPartitionAndCustomColumnSort(records, partitioner);
        }
        if (this.table.requireSortedRecords() || this.table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {
            return this.doPartitionAndSortByRecordKey(records, partitioner);
        }
        return records.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getKey(), record)).partitionBy(partitioner).map(Tuple2::_2);
    }

    private JavaRDD<HoodieRecord<T>> doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, final Partitioner partitioner) {
        String[] sortColumns = this.sortColumnNames;
        SerializableSchema schema2 = new SerializableSchema(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.table.getConfig().getSchema())));
        Comparator comparator = (Comparator & Serializable)(t1, t2) -> {
            FlatLists.ComparableList obj1 = FlatLists.ofComparableArray(t1.getColumnValues(schema2.get(), sortColumns, this.consistentLogicalTimestampEnabled));
            FlatLists.ComparableList obj2 = FlatLists.ofComparableArray(t2.getColumnValues(schema2.get(), sortColumns, this.consistentLogicalTimestampEnabled));
            return obj1.compareTo(obj2);
        };
        return records.mapToPair((PairFunction & Serializable)record -> new Tuple2(record, record)).repartitionAndSortWithinPartitions(new Partitioner(){

            public int numPartitions() {
                return partitioner.numPartitions();
            }

            public int getPartition(Object key) {
                return partitioner.getPartition((Object)((HoodieRecord)key).getKey());
            }
        }, comparator).map(Tuple2::_2);
    }

    private JavaRDD<HoodieRecord<T>> doPartitionAndSortByRecordKey(JavaRDD<HoodieRecord<T>> records, Partitioner partitioner) {
        if (this.table.getConfig().getBulkInsertSortMode() == BulkInsertSortMode.GLOBAL_SORT) {
            LOG.warn((Object)"Consistent bucket does not support global sort mode, the sort will only be done within each data partition");
        }
        Comparator comparator = (Comparator & Serializable)(t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey());
        return records.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)record.getKey(), record)).repartitionAndSortWithinPartitions(partitioner, comparator).map(Tuple2::_2);
    }
}

