/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.shuffle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MapRangePartitioner
implements Partitioner<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(MapRangePartitioner.class);
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final Comparator<StructLike> comparator;
    private final Map<SortKey, Long> mapStatistics;
    private final double closeFileCostInWeightPercentage;
    private long newSortKeyCounter;
    private long lastNewSortKeyLogTimeMilli;
    private Map<SortKey, KeyAssignment> assignment;
    private NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost;

    MapRangePartitioner(Schema schema, SortOrder sortOrder, Map<SortKey, Long> mapStatistics, double closeFileCostInWeightPercentage) {
        mapStatistics.forEach((key, value) -> Preconditions.checkArgument((value > 0L ? 1 : 0) != 0, (String)"Invalid statistics: weight is 0 for key %s", (Object)key));
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.comparator = SortOrderComparators.forSchema((Schema)schema, (SortOrder)sortOrder);
        this.mapStatistics = mapStatistics;
        this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
        this.newSortKeyCounter = 0L;
        this.lastNewSortKeyLogTimeMilli = System.currentTimeMillis();
    }

    public int partition(RowData row, int numPartitions) {
        Map<SortKey, KeyAssignment> assignmentMap = this.assignment(numPartitions);
        this.sortKey.wrap((StructLike)this.rowDataWrapper.wrap(row));
        KeyAssignment keyAssignment = assignmentMap.get(this.sortKey);
        if (keyAssignment == null) {
            LOG.trace("Encountered new sort key: {}. Fall back to round robin as statistics not learned yet.", (Object)this.sortKey);
            ++this.newSortKeyCounter;
            long now = System.currentTimeMillis();
            if (now - this.lastNewSortKeyLogTimeMilli > TimeUnit.MINUTES.toMillis(1L)) {
                LOG.info("Encounter new sort keys {} times. Fall back to round robin as statistics not learned yet", (Object)this.newSortKeyCounter);
                this.lastNewSortKeyLogTimeMilli = now;
                this.newSortKeyCounter = 0L;
            }
            return (int)(this.newSortKeyCounter % (long)numPartitions);
        }
        return keyAssignment.select();
    }

    @VisibleForTesting
    Map<SortKey, KeyAssignment> assignment(int numPartitions) {
        if (this.assignment == null) {
            long totalWeight = this.mapStatistics.values().stream().mapToLong(l -> l).sum();
            double targetWeightPerSubtask = (double)totalWeight / (double)numPartitions;
            long closeFileCostInWeight = (long)Math.ceil(targetWeightPerSubtask * this.closeFileCostInWeightPercentage / 100.0);
            this.sortedStatsWithCloseFileCost = Maps.newTreeMap(this.comparator);
            this.mapStatistics.forEach((k, v) -> {
                int estimatedSplits = (int)Math.ceil((double)v.longValue() / targetWeightPerSubtask);
                long estimatedCloseFileCost = closeFileCostInWeight * (long)estimatedSplits;
                this.sortedStatsWithCloseFileCost.put((SortKey)k, v + estimatedCloseFileCost);
            });
            long totalWeightWithCloseFileCost = this.sortedStatsWithCloseFileCost.values().stream().mapToLong(l -> l).sum();
            long targetWeightPerSubtaskWithCloseFileCost = (long)Math.ceil((double)totalWeightWithCloseFileCost / (double)numPartitions);
            this.assignment = this.buildAssignment(numPartitions, this.sortedStatsWithCloseFileCost, targetWeightPerSubtaskWithCloseFileCost, closeFileCostInWeight);
        }
        return this.assignment;
    }

    @VisibleForTesting
    Map<SortKey, Long> mapStatistics() {
        return this.mapStatistics;
    }

    Map<Integer, Pair<Long, Integer>> assignmentInfo() {
        TreeMap assignmentInfo = Maps.newTreeMap();
        this.assignment.forEach((key, keyAssignment) -> {
            for (int i = 0; i < ((KeyAssignment)keyAssignment).assignedSubtasks.length; ++i) {
                int subtaskId = ((KeyAssignment)keyAssignment).assignedSubtasks[i];
                long subtaskWeight = ((KeyAssignment)keyAssignment).subtaskWeightsExcludingCloseCost[i];
                Pair oldValue = assignmentInfo.getOrDefault(subtaskId, Pair.of((Object)0L, (Object)0));
                assignmentInfo.put(subtaskId, Pair.of((Object)((Long)oldValue.first() + subtaskWeight), (Object)((Integer)oldValue.second() + 1)));
            }
        });
        return assignmentInfo;
    }

    private Map<SortKey, KeyAssignment> buildAssignment(int numPartitions, NavigableMap<SortKey, Long> sortedStatistics, long targetWeightPerSubtask, long closeFileCostInWeight) {
        HashMap assignmentMap = Maps.newHashMapWithExpectedSize((int)sortedStatistics.size());
        Iterator mapKeyIterator = sortedStatistics.keySet().iterator();
        int subtaskId = 0;
        SortKey currentKey = null;
        long keyRemainingWeight = 0L;
        long subtaskRemainingWeight = targetWeightPerSubtask;
        ArrayList assignedSubtasks = Lists.newArrayList();
        ArrayList subtaskWeights = Lists.newArrayList();
        while (mapKeyIterator.hasNext() || currentKey != null) {
            if (subtaskId >= numPartitions) {
                LOG.error("Internal algorithm error: exhausted subtasks with unassigned keys left. number of partitions: {}, target weight per subtask: {}, close file cost in weight: {}, data statistics: {}", new Object[]{numPartitions, targetWeightPerSubtask, closeFileCostInWeight, sortedStatistics});
                throw new IllegalStateException("Internal algorithm error: exhausted subtasks with unassigned keys left");
            }
            if (currentKey == null) {
                currentKey = (SortKey)mapKeyIterator.next();
                keyRemainingWeight = (Long)sortedStatistics.get(currentKey);
            }
            assignedSubtasks.add(subtaskId);
            if (keyRemainingWeight < subtaskRemainingWeight) {
                subtaskWeights.add(keyRemainingWeight);
                subtaskRemainingWeight -= keyRemainingWeight;
                keyRemainingWeight = 0L;
            } else {
                long assignedWeight = subtaskRemainingWeight;
                keyRemainingWeight -= subtaskRemainingWeight;
                if (assignedWeight <= closeFileCostInWeight) {
                    long paddingWeight = Math.min(keyRemainingWeight, closeFileCostInWeight);
                    keyRemainingWeight -= paddingWeight;
                    assignedWeight += paddingWeight;
                }
                subtaskWeights.add(assignedWeight);
                ++subtaskId;
                subtaskRemainingWeight = targetWeightPerSubtask;
            }
            Preconditions.checkState((assignedSubtasks.size() == subtaskWeights.size() ? 1 : 0) != 0, (String)"List size mismatch: assigned subtasks = %s, subtask weights = %s", (Object)assignedSubtasks, (Object)subtaskWeights);
            if (keyRemainingWeight > 0L && keyRemainingWeight <= closeFileCostInWeight) {
                keyRemainingWeight = 0L;
            }
            if (keyRemainingWeight != 0L) continue;
            KeyAssignment keyAssignment = new KeyAssignment(assignedSubtasks, subtaskWeights, closeFileCostInWeight);
            assignmentMap.put(currentKey, keyAssignment);
            assignedSubtasks.clear();
            subtaskWeights.clear();
            currentKey = null;
        }
        return assignmentMap;
    }

    @VisibleForTesting
    static class KeyAssignment {
        private final int[] assignedSubtasks;
        private final long[] subtaskWeightsExcludingCloseCost;
        private final long keyWeight;
        private final long[] cumulativeWeights;

        KeyAssignment(List<Integer> assignedSubtasks, List<Long> subtaskWeightsWithCloseFileCost, long closeFileCostInWeight) {
            Preconditions.checkArgument((assignedSubtasks != null && !assignedSubtasks.isEmpty() ? 1 : 0) != 0, (Object)"Invalid assigned subtasks: null or empty");
            Preconditions.checkArgument((subtaskWeightsWithCloseFileCost != null && !subtaskWeightsWithCloseFileCost.isEmpty() ? 1 : 0) != 0, (Object)"Invalid assigned subtasks weights: null or empty");
            Preconditions.checkArgument((assignedSubtasks.size() == subtaskWeightsWithCloseFileCost.size() ? 1 : 0) != 0, (String)"Invalid assignment: size mismatch (tasks length = %s, weights length = %s)", (int)assignedSubtasks.size(), (int)subtaskWeightsWithCloseFileCost.size());
            subtaskWeightsWithCloseFileCost.forEach(weight -> Preconditions.checkArgument((weight > closeFileCostInWeight ? 1 : 0) != 0, (String)"Invalid weight: should be larger than close file cost: weight = %s, close file cost = %s", (Object)weight, (long)closeFileCostInWeight));
            this.assignedSubtasks = assignedSubtasks.stream().mapToInt(i -> i).toArray();
            this.subtaskWeightsExcludingCloseCost = subtaskWeightsWithCloseFileCost.stream().mapToLong(weightWithCloseFileCost -> weightWithCloseFileCost - closeFileCostInWeight).toArray();
            this.keyWeight = Arrays.stream(this.subtaskWeightsExcludingCloseCost).sum();
            this.cumulativeWeights = new long[this.subtaskWeightsExcludingCloseCost.length];
            long cumulativeWeight = 0L;
            for (int i2 = 0; i2 < this.subtaskWeightsExcludingCloseCost.length; ++i2) {
                this.cumulativeWeights[i2] = cumulativeWeight += this.subtaskWeightsExcludingCloseCost[i2];
            }
        }

        int select() {
            if (this.assignedSubtasks.length == 1) {
                return this.assignedSubtasks[0];
            }
            long randomNumber = ThreadLocalRandom.current().nextLong(this.keyWeight);
            int index = Arrays.binarySearch(this.cumulativeWeights, randomNumber);
            int position = Math.abs(index + 1);
            Preconditions.checkState((position < this.assignedSubtasks.length ? 1 : 0) != 0, (String)"Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s", (Object)this.keyWeight, (Object)randomNumber, (Object)this.cumulativeWeights);
            return this.assignedSubtasks[position];
        }

        public int hashCode() {
            return 31 * Arrays.hashCode(this.assignedSubtasks) + Arrays.hashCode(this.subtaskWeightsExcludingCloseCost);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KeyAssignment that = (KeyAssignment)o;
            return Arrays.equals(this.assignedSubtasks, that.assignedSubtasks) && Arrays.equals(this.subtaskWeightsExcludingCloseCost, that.subtaskWeightsExcludingCloseCost);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("assignedSubtasks", (Object)this.assignedSubtasks).add("subtaskWeightsExcludingCloseCost", (Object)this.subtaskWeightsExcludingCloseCost).toString();
        }
    }
}

