/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.partitionmanagement;

import io.eventuate.messaging.partitionmanagement.Assignment;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionManager {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private int partitionCount;
    private boolean initialized;
    private Map<String, Assignment> currentAssignments;

    public PartitionManager(int partitionCount) {
        this.partitionCount = partitionCount;
    }

    public Map<String, Assignment> initialize(Map<String, Assignment> assignments) {
        this.initialized = true;
        this.currentAssignments = this.rebalance(assignments);
        Map<String, Assignment> reassignments = this.filterUnchangedAssignments(assignments, this.currentAssignments);
        this.logger.info("Initializing rebalancer: assignments = {}, currentAssignments = {}, reassignments = {}", new Object[]{assignments, this.currentAssignments, reassignments});
        return reassignments;
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public Map<String, Assignment> getCurrentAssignments() {
        return this.currentAssignments;
    }

    public Map<String, Assignment> rebalance(Map<String, Set<String>> addedGroupMembersWithTheirSubscribedChannels, Set<String> removedGroupMembers) {
        Map<String, Assignment> assignmentsWithoutRemovedMembers = this.currentAssignments.entrySet().stream().filter(groupMemberAndAssignment -> !removedGroupMembers.contains(groupMemberAndAssignment.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Map<String, Assignment> newMemberAssignments = addedGroupMembersWithTheirSubscribedChannels.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, groupMemberAndChannels -> new Assignment((Set)groupMemberAndChannels.getValue(), new HashMap<String, Set<Integer>>())));
        HashMap<String, Assignment> assignmentsToRebalance = new HashMap<String, Assignment>();
        assignmentsToRebalance.putAll(assignmentsWithoutRemovedMembers);
        assignmentsToRebalance.putAll(newMemberAssignments);
        Map<String, Assignment> reassignments = this.rebalance(assignmentsToRebalance);
        Map<String, Assignment> changedAssignments = this.filterUnchangedAssignments(this.currentAssignments, reassignments);
        this.logger.info("Rebalancing: addedGroupMembersWithTheirSubscribedChannels = {}, removedGroupMembers = {}, currentAssignments = {}, reassignments = {}", new Object[]{addedGroupMembersWithTheirSubscribedChannels, removedGroupMembers, this.currentAssignments, reassignments});
        this.currentAssignments = reassignments;
        return changedAssignments;
    }

    Map<String, Assignment> rebalance(Map<String, Assignment> assignments) {
        this.logger.info("Rebalancing assignments: {}", assignments);
        Set assignmentDescriptions = assignments.entrySet().stream().flatMap(this::assignmentToAssignmentDescriptions).collect(Collectors.groupingBy(AssignmentDescription::getChannel)).values().stream().map(HashSet::new).map(this::rebalance).flatMap(Collection::stream).collect(Collectors.toSet());
        Map<String, Map> assignmentDescriptionsByGroupMemberAndChannel = assignmentDescriptions.stream().collect(Collectors.groupingBy(AssignmentDescription::getGroupMember)).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, this::groupAssignmentDescriptionsByChannel));
        Map<String, Assignment> rebalancedAssignments = assignmentDescriptionsByGroupMemberAndChannel.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, this::assignmentDescriptionsByChannelToAssignment));
        this.logger.info("Rebalanced assignments: {}", rebalancedAssignments);
        return rebalancedAssignments;
    }

    private Stream<AssignmentDescription> assignmentToAssignmentDescriptions(Map.Entry<String, Assignment> groupMemberAndAssignment) {
        String groupMember = groupMemberAndAssignment.getKey();
        Assignment assignment = groupMemberAndAssignment.getValue();
        return assignment.getChannels().stream().map(channel -> new AssignmentDescription(groupMember, (String)channel, assignment.getPartitionAssignmentsByChannel().getOrDefault(channel, Collections.emptySet())));
    }

    private Map<String, List<AssignmentDescription>> groupAssignmentDescriptionsByChannel(Map.Entry<String, List<AssignmentDescription>> groupMemberAndAssignmentDescriptions) {
        List<AssignmentDescription> assignmentDescriptions = groupMemberAndAssignmentDescriptions.getValue();
        return assignmentDescriptions.stream().collect(Collectors.groupingBy(AssignmentDescription::getChannel));
    }

    private Assignment assignmentDescriptionsByChannelToAssignment(Map.Entry<String, Map<String, List<AssignmentDescription>>> groupMemberAndAssignmentDescriptionsByChannel) {
        Map<String, List<AssignmentDescription>> assignmentDescriptionsByChannel = groupMemberAndAssignmentDescriptionsByChannel.getValue();
        Map<String, Set<Integer>> partitionsByChannel = assignmentDescriptionsByChannel.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, channelAndAssignmentDescriptions -> ((List)channelAndAssignmentDescriptions.getValue()).stream().flatMap(assignmentDescription -> assignmentDescription.calculateExpectedPartitions().stream()).collect(Collectors.toSet())));
        return new Assignment(assignmentDescriptionsByChannel.keySet(), partitionsByChannel);
    }

    private Map<String, Assignment> filterUnchangedAssignments(Map<String, Assignment> originalAssignments, Map<String, Assignment> reassignments) {
        return reassignments.entrySet().stream().filter(e -> {
            if (originalAssignments.containsKey(e.getKey())) {
                return !((Assignment)originalAssignments.get(e.getKey())).equals(e.getValue());
            }
            return true;
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private Set<AssignmentDescription> rebalance(Set<AssignmentDescription> assignmentDescriptions) {
        this.logger.info("Rebalancing assignment descriptions {}", assignmentDescriptions);
        Set<AssignmentDescription> rebalancingDescriptions = assignmentDescriptions.stream().map(AssignmentDescription::new).collect(Collectors.toSet());
        Set<Integer> notActivePartitions = this.findNotActivePartitions(rebalancingDescriptions);
        notActivePartitions.forEach(partition -> this.findAssignmentDescriptionWithMinPartitions(rebalancingDescriptions).assignPartition((int)partition));
        AssignmentDescription minPartitionAssignment = this.findAssignmentDescriptionWithMinPartitions(rebalancingDescriptions);
        AssignmentDescription maxPartitionAssignment = this.findAssignmentDescriptionWithMaxPartitions(rebalancingDescriptions);
        while (maxPartitionAssignment.calculateRebalancedPartitions() - minPartitionAssignment.calculateRebalancedPartitions() > 1) {
            minPartitionAssignment.takePartitionFrom(maxPartitionAssignment);
            minPartitionAssignment = this.findAssignmentDescriptionWithMinPartitions(rebalancingDescriptions);
            maxPartitionAssignment = this.findAssignmentDescriptionWithMaxPartitions(rebalancingDescriptions);
        }
        this.logger.info("Rebalanced assignment descriptions {}", rebalancingDescriptions);
        return rebalancingDescriptions;
    }

    private Set<Integer> findNotActivePartitions(Set<AssignmentDescription> assignmentDescriptions) {
        Set activePartitions = assignmentDescriptions.stream().flatMap(assignmentDescription -> {
            HashSet<Integer> expectedPartitions = new HashSet<Integer>();
            expectedPartitions.addAll(assignmentDescription.getCurrentPartitions());
            expectedPartitions.addAll(assignmentDescription.getAssignedPartitions());
            expectedPartitions.removeAll(assignmentDescription.getResignedPartitions());
            return expectedPartitions.stream();
        }).collect(Collectors.toSet());
        return IntStream.range(0, this.partitionCount).boxed().filter(partition -> !activePartitions.contains(partition)).collect(Collectors.toSet());
    }

    private AssignmentDescription findAssignmentDescriptionWithMinPartitions(Set<AssignmentDescription> assignmentDescriptions) {
        return assignmentDescriptions.stream().min(Comparator.comparingInt(AssignmentDescription::calculateRebalancedPartitions)).get();
    }

    private AssignmentDescription findAssignmentDescriptionWithMaxPartitions(Set<AssignmentDescription> assignmentDescriptions) {
        return assignmentDescriptions.stream().max(Comparator.comparingInt(AssignmentDescription::calculateRebalancedPartitions)).get();
    }

    private static class AssignmentDescription {
        private String groupMember;
        private String channel;
        private Set<Integer> currentPartitions;
        private Set<Integer> assignedPartitions = new HashSet<Integer>();
        private Set<Integer> resignedPartitions = new HashSet<Integer>();

        public AssignmentDescription(AssignmentDescription copy) {
            this(copy.getGroupMember(), copy.getChannel(), new HashSet<Integer>(copy.currentPartitions));
            this.assignedPartitions.addAll(copy.assignedPartitions);
            this.resignedPartitions.addAll(copy.resignedPartitions);
        }

        public AssignmentDescription(String groupMember, String channel, Set<Integer> currentPartitions) {
            this.groupMember = groupMember;
            this.channel = channel;
            this.currentPartitions = new HashSet<Integer>(currentPartitions);
        }

        public void assignPartition(int partition) {
            this.assignedPartitions.add(partition);
        }

        public void takePartitionFrom(AssignmentDescription assignmentDescription) {
            Optional partitionToReassign = assignmentDescription.assignedPartitions.stream().findAny();
            if (partitionToReassign.isPresent()) {
                assignmentDescription.assignedPartitions.remove(partitionToReassign.get());
                this.assignedPartitions.add((Integer)partitionToReassign.get());
            } else {
                partitionToReassign = assignmentDescription.calculateExpectedPartitions().stream().findAny();
                assignmentDescription.resignedPartitions.add((Integer)partitionToReassign.get());
                this.assignedPartitions.add((Integer)partitionToReassign.get());
            }
        }

        public Set<Integer> calculateExpectedPartitions() {
            HashSet<Integer> expectedPartitions = new HashSet<Integer>(this.currentPartitions);
            expectedPartitions.removeAll(this.resignedPartitions);
            expectedPartitions.addAll(this.assignedPartitions);
            return expectedPartitions;
        }

        public int calculateRebalancedPartitions() {
            return this.currentPartitions.size() + this.assignedPartitions.size() - this.resignedPartitions.size();
        }

        public String getGroupMember() {
            return this.groupMember;
        }

        public String getChannel() {
            return this.channel;
        }

        public Set<Integer> getCurrentPartitions() {
            return Collections.unmodifiableSet(this.currentPartitions);
        }

        public Set<Integer> getAssignedPartitions() {
            return Collections.unmodifiableSet(this.assignedPartitions);
        }

        public Set<Integer> getResignedPartitions() {
            return Collections.unmodifiableSet(this.resignedPartitions);
        }

        public String toString() {
            return ToStringBuilder.reflectionToString((Object)this);
        }
    }
}

