/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.partitionmanagement;

import io.eventuate.coordination.leadership.EventuateLeaderSelector;
import io.eventuate.coordination.leadership.LeaderSelectedCallback;
import io.eventuate.coordination.leadership.LeaderSelectorFactory;
import io.eventuate.coordination.leadership.LeadershipController;
import io.eventuate.messaging.partitionmanagement.Assignment;
import io.eventuate.messaging.partitionmanagement.AssignmentListener;
import io.eventuate.messaging.partitionmanagement.AssignmentListenerFactory;
import io.eventuate.messaging.partitionmanagement.AssignmentManager;
import io.eventuate.messaging.partitionmanagement.GroupMember;
import io.eventuate.messaging.partitionmanagement.GroupMemberFactory;
import io.eventuate.messaging.partitionmanagement.MemberGroupManager;
import io.eventuate.messaging.partitionmanagement.MemberGroupManagerFactory;
import io.eventuate.messaging.partitionmanagement.PartitionManager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Coordinator {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private final String subscriptionId;
    private final String subscriberId;
    private Set<String> channels;
    private int partitionCount;
    private GroupMember groupMember;
    private MemberGroupManagerFactory memberGroupManagerFactory;
    private AssignmentListener assignmentListener;
    private EventuateLeaderSelector leaderSelector;
    private AssignmentManager assignmentManager;
    private MemberGroupManager memberGroupManager;
    private PartitionManager partitionManager;
    private Set<String> previousGroupMembers;
    private LeaderSelectedCallback leaderSelected;
    private Runnable leaderRemoved;

    public Coordinator(String subscriptionId, String subscriberId, Set<String> channels, int partitionCount, GroupMemberFactory groupMemberFactory, MemberGroupManagerFactory memberGroupManagerFactory, AssignmentManager assignmentManager, AssignmentListenerFactory assignmentListenerFactory, LeaderSelectorFactory leaderSelectorFactory, Consumer<Assignment> assignmentUpdatedCallback, String lockId, LeaderSelectedCallback leaderSelected, Runnable leaderRemoved) {
        this.leaderSelected = leaderSelected;
        this.leaderRemoved = leaderRemoved;
        this.subscriptionId = subscriptionId;
        this.subscriberId = subscriberId;
        this.channels = channels;
        this.partitionCount = partitionCount;
        this.assignmentManager = assignmentManager;
        this.memberGroupManagerFactory = memberGroupManagerFactory;
        this.createInitialAssignments();
        this.groupMember = groupMemberFactory.create(subscriberId, subscriptionId);
        this.assignmentListener = assignmentListenerFactory.create(subscriberId, subscriptionId, assignmentUpdatedCallback);
        this.leaderSelector = leaderSelectorFactory.create(lockId, String.format("[subscriberId: %s, subscriptionId: %s]", subscriberId, subscriptionId), this::onLeaderSelected, this::onLeaderRemoved);
        this.leaderSelector.start();
    }

    private void createInitialAssignments() {
        try {
            this.logger.info("Creating initial assignments");
            HashMap<String, Set<Integer>> partitionAssignmentsByChannel = new HashMap<String, Set<Integer>>();
            this.channels.forEach(channel -> partitionAssignmentsByChannel.put((String)channel, new HashSet()));
            Assignment assignment = new Assignment(this.channels, partitionAssignmentsByChannel);
            this.assignmentManager.initializeAssignment(this.subscriberId, this.subscriptionId, assignment);
            this.logger.info("Created initial assignments");
        }
        catch (Exception e) {
            this.logger.error("Creation of initial assignments failed", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void onLeaderSelected(LeadershipController leadershipController) {
        this.logger.info("Calling onLeaderSelected");
        this.leaderSelected.run(leadershipController);
        this.partitionManager = new PartitionManager(this.partitionCount);
        this.previousGroupMembers = new HashSet<String>();
        this.memberGroupManager = this.memberGroupManagerFactory.create(this.subscriberId, this.subscriptionId, this::onGroupMembersUpdated);
        this.logger.info("Called onLeaderSelected");
    }

    private void onLeaderRemoved() {
        this.logger.info("Calling memberGroupManager.stop(), subscriberId : {}, subscriptionId : {}", (Object)this.subscriberId, (Object)this.subscriptionId);
        this.memberGroupManager.stop();
        this.logger.info("Called memberGroupManager.stop(), subscriberId : {}, subscriptionId : {}", (Object)this.subscriberId, (Object)this.subscriptionId);
        this.logger.info("Calling leaderRemoved, subscriberId : {}, subscriptionId : {}", (Object)this.subscriberId, (Object)this.subscriptionId);
        this.leaderRemoved.run();
        this.logger.info("Called leaderRemoved, subscriberId : {}, subscriptionId : {}", (Object)this.subscriberId, (Object)this.subscriptionId);
    }

    private void onGroupMembersUpdated(Set<String> expectedGroupMembers) {
        this.logger.info("Updating group members, expectedGroupMembers : {}, subscriberId : {}, subscriptionId : {}", new Object[]{expectedGroupMembers, this.subscriberId, this.subscriptionId});
        try {
            if (!this.partitionManager.isInitialized()) {
                this.initializePartitionManager(expectedGroupMembers);
            } else {
                this.rebalance(expectedGroupMembers);
            }
        }
        catch (Exception e) {
            this.logger.error(e.getMessage(), (Throwable)e);
            return;
        }
        this.previousGroupMembers = expectedGroupMembers;
        this.logger.info("Updated group members, subscriberId : {}, subscriptionId : {}", (Object)this.subscriberId, (Object)this.subscriptionId);
    }

    private void initializePartitionManager(Set<String> expectedGroupMembers) {
        this.logger.info("Initializing partition manager, expectedGroupMembers : {}, subscriberId : {}, subscriptionId : {}", new Object[]{expectedGroupMembers, this.subscriberId, this.subscriptionId});
        Map<String, Assignment> assignments = expectedGroupMembers.stream().collect(Collectors.toMap(Function.identity(), this::readAssignment));
        this.partitionManager.initialize(assignments).forEach(this::saveAssignment);
    }

    private void rebalance(Set<String> expectedGroupMembers) {
        this.logger.info("Preparing to rebalance, expectedGroupMembers : {}, subscriberId : {}, subscriptionId : {}", new Object[]{expectedGroupMembers, this.subscriberId, this.subscriptionId});
        Set<String> removedGroupMembers = this.previousGroupMembers.stream().filter(groupMember -> !expectedGroupMembers.contains(groupMember)).collect(Collectors.toSet());
        Map<String, Set<String>> addedGroupMembersWithTheirSubscribedChannels = expectedGroupMembers.stream().filter(groupMember -> !this.previousGroupMembers.contains(groupMember)).collect(Collectors.toMap(Function.identity(), groupMember -> this.readAssignment(this.subscriptionId).getChannels()));
        this.partitionManager.rebalance(addedGroupMembersWithTheirSubscribedChannels, removedGroupMembers).forEach(this::saveAssignment);
    }

    private Assignment readAssignment(String groupMemberId) {
        return this.assignmentManager.readAssignment(this.subscriberId, groupMemberId);
    }

    private void saveAssignment(String groupMemberId, Assignment assignment) {
        this.assignmentManager.saveAssignment(this.subscriberId, groupMemberId, assignment);
    }

    public void close() {
        this.assignmentListener.remove();
        this.groupMember.remove();
        this.leaderSelector.stop();
    }
}

