/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.data.partitioner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class ConditionalRangePartitioner<S extends Comparable<S>, V extends Comparable<V>>
extends Partitioner {
    private static final Logger LOG = LoggerFactory.getLogger(ConditionalRangePartitioner.class);
    private final Map<S, List<V>> splitPoints;
    private final Map<S, Integer> startIndex;
    private final int totalPartitions;

    public ConditionalRangePartitioner(Map<S, List<V>> splitPoints) {
        this.splitPoints = splitPoints;
        this.startIndex = new HashMap<S, Integer>();
        int idx = 0;
        List sortedKeys = splitPoints.keySet().stream().sorted().collect(Collectors.toList());
        for (Comparable sortedKey : sortedKeys) {
            this.startIndex.put(sortedKey, idx);
            idx += splitPoints.get(sortedKey).size() + 1;
        }
        this.totalPartitions = idx;
        LOG.info("Total num of partitions to be enforced is {}", (Object)this.totalPartitions);
    }

    public int numPartitions() {
        return this.totalPartitions;
    }

    public int getPartition(Object keyObject) {
        Tuple2 compositeKey = (Tuple2)keyObject;
        Comparable key = (Comparable)compositeKey._1();
        Comparable value = (Comparable)compositeKey._2();
        ValidationUtils.checkArgument((boolean)this.startIndex.containsKey(key), (String)("ConditionalRangePartitioner does not expect key " + key));
        List splits = this.splitPoints.getOrDefault(key, Collections.emptyList());
        int bucket = Collections.binarySearch(splits, value);
        if (bucket < 0) {
            bucket = -bucket - 1;
        }
        return this.startIndex.get(key) + bucket;
    }

    public static <S extends Comparable<S>, V extends Comparable<V>> Map<S, List<V>> computeSplitPointMapDistributed(JavaPairRDD<S, V> sampled, double sampleFraction, int maxKeyPerBucket) {
        return sampled.groupByKey().mapValues((Function & Serializable)values -> {
            ArrayList sortedValues = new ArrayList();
            values.forEach(sortedValues::add);
            Collections.sort(sortedValues);
            int estimatedTotal = (int)Math.ceil((double)sortedValues.size() / sampleFraction);
            int splitCount = (int)Math.ceil((double)estimatedTotal / (double)maxKeyPerBucket) - 1;
            if (splitCount > 0) {
                return ConditionalRangePartitioner.computeSplitPoints(sortedValues, Math.min(splitCount, sortedValues.size()));
            }
            return Collections.emptyList();
        }).collectAsMap();
    }

    public static <V extends Comparable<V>> List<V> computeSplitPoints(List<V> sortedValues, int numSplits) {
        if (sortedValues.size() < numSplits) {
            return sortedValues;
        }
        ArrayList<V> splits = new ArrayList<V>();
        int n = sortedValues.size();
        for (double i = 1.0; i <= (double)numSplits; i += 1.0) {
            int index = (int)Math.ceil(i * (double)n / (double)(numSplits + 1)) - 1;
            splits.add(sortedValues.get(index));
        }
        return splits;
    }

    public static <S extends Comparable<S>, V extends Comparable<V>> JavaPairRDD<Tuple2<S, V>, V> repartitionWithSplits(JavaPairRDD<S, V> baseRdd, ConditionalRangePartitioner<S, V> partitioner) {
        JavaPairRDD compositeKeyRdd = baseRdd.mapToPair((PairFunction & Serializable)t -> new Tuple2(t, t._2()));
        return compositeKeyRdd.repartitionAndSortWithinPartitions(partitioner, new CompositeKeyComparator());
    }

    public static class CompositeKeyComparator<S extends Comparable<S>, V extends Comparable<V>>
    implements Comparator<Tuple2<S, V>>,
    Serializable {
        @Override
        public int compare(Tuple2<S, V> o1, Tuple2<S, V> o2) {
            int cmp = ((Comparable)o1._1()).compareTo(o2._1());
            if (cmp != 0) {
                return cmp;
            }
            return ((Comparable)o1._2()).compareTo(o2._2());
        }
    }
}

