/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.service.subscription;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;

class ExactWeightRebalancer
implements BiFunction<Session[], Partition[], Partition[]> {
    ExactWeightRebalancer() {
    }

    @Override
    public Partition[] apply(Session[] sessions, Partition[] currentPartitions) {
        Map<String, Integer> activeSessionWeights = Stream.of(sessions).collect(Collectors.toMap(Session::getId, Session::getWeight));
        List<String> activeSessionIds = activeSessionWeights.keySet().stream().sorted().collect(Collectors.toList());
        int[] partitionsPerSession = ExactWeightRebalancer.splitByWeight(currentPartitions.length, activeSessionIds.stream().mapToInt(activeSessionWeights::get).toArray());
        Set toRebalance = Stream.of(currentPartitions).filter(p -> p.mustBeRebalanced(activeSessionIds)).collect(Collectors.toSet());
        Map<String, List<Partition>> partitions = Stream.of(currentPartitions).filter(p -> !toRebalance.contains(p)).collect(Collectors.groupingBy(Partition::getSessionOrNextSession));
        for (int idx = 0; idx < activeSessionIds.size(); ++idx) {
            String sessionId = (String)activeSessionIds.get(idx);
            int suggestedCount = partitionsPerSession[idx];
            for (int toTake = (partitions.containsKey(sessionId) ? partitions.get(sessionId).size() : 0) - suggestedCount; toTake > 0; --toTake) {
                List<Partition> candidates = partitions.get(sessionId);
                Partition toTakeItem = candidates.stream().filter(p -> p.getState() == Partition.State.REASSIGNING).findAny().orElse(candidates.get(candidates.size() - 1));
                candidates.remove(toTakeItem);
                toRebalance.add(toTakeItem);
            }
        }
        if (!toRebalance.isEmpty()) {
            ArrayList<Partition> result = new ArrayList<Partition>();
            for (int idx = 0; idx < activeSessionIds.size(); ++idx) {
                String sessionId = (String)activeSessionIds.get(idx);
                int suggestedCount = partitionsPerSession[idx];
                int currentCount = partitions.containsKey(sessionId) ? partitions.get(sessionId).size() : 0;
                for (int i = 0; i < suggestedCount - currentCount; ++i) {
                    Partition toMove = (Partition)toRebalance.iterator().next();
                    toRebalance.remove(toMove);
                    result.add(toMove.moveToSessionId(sessionId, activeSessionIds));
                }
            }
            return result.toArray(new Partition[result.size()]);
        }
        return new Partition[0];
    }

    static int[] splitByWeight(int itemCount, int[] weigths) {
        if (itemCount < weigths.length) {
            throw new IllegalArgumentException("Can not rebalance " + itemCount + " onto " + weigths.length);
        }
        if (IntStream.of(weigths).filter(w -> w <= 0).findAny().isPresent()) {
            throw new IllegalArgumentException("Weight can not be below zero: " + Arrays.toString(weigths));
        }
        int totalWeight = IntStream.of(weigths).sum();
        int fixed = itemCount / totalWeight;
        int[] result = IntStream.of(weigths).map(w -> fixed * w).toArray();
        if (fixed == 0) {
            Arrays.fill(result, 1);
        }
        int left = itemCount - IntStream.of(result).sum();
        while (left > 0) {
            int i = 0;
            while (i < result.length && left > 0) {
                int v = Math.min(left, weigths[i]);
                int n = i++;
                result[n] = result[n] + v;
                left -= v;
            }
        }
        return result;
    }
}

