/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dynamic.config.gossip;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.utils.event.EventListener;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossipState;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiperConfig;
import io.camunda.zeebe.dynamic.config.metrics.TopologyMetrics;
import io.camunda.zeebe.dynamic.config.serializer.ClusterConfigurationSerializer;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ClusterConfigurationGossiper
implements ClusterConfigurationUpdateNotifier,
ClusterMembershipEventListener,
AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigurationGossiper.class);
    private static final String SYNC_REQUEST_TOPIC = "cluster-topology-sync";
    private static final String GOSSIP_REQUEST_TOPIC = "cluster-topology-gossip";
    private final ClusterConfigurationGossipState gossipState = new ClusterConfigurationGossipState();
    private final ConcurrencyControl executor;
    private final ClusterCommunicationService communicationService;
    private final ClusterMembershipService membershipService;
    private final ClusterConfigurationGossiperConfig config;
    private final ClusterConfigurationSerializer serializer;
    private final Set<ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener> configurationUpdateListeners = new HashSet<ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener>();
    private List<MemberId> membersToSync = new LinkedList<MemberId>();
    private final Consumer<ClusterConfiguration> clusterConfigurationUpdateHandler;

    public ClusterConfigurationGossiper(ConcurrencyControl executor, ClusterCommunicationService communicationService, ClusterMembershipService membershipService, ClusterConfigurationSerializer serializer, ClusterConfigurationGossiperConfig config, Consumer<ClusterConfiguration> clusterConfigurationUpdateHandler) {
        this.executor = executor;
        this.communicationService = communicationService;
        this.membershipService = membershipService;
        this.config = config;
        this.serializer = serializer;
        this.clusterConfigurationUpdateHandler = clusterConfigurationUpdateHandler;
    }

    public CompletableActorFuture<Void> start() {
        CompletableActorFuture startedFuture = new CompletableActorFuture();
        this.executor.run(() -> {
            this.internalStart();
            startedFuture.complete(null);
        });
        return startedFuture;
    }

    private void internalStart() {
        this.scheduleSync();
        this.registerSyncHandler();
        this.registerGossipHandler();
        this.registerMemberAddedListener();
    }

    private void registerMemberAddedListener() {
        this.membershipService.addListener((EventListener)this);
    }

    private void unregisterMemberListener() {
        this.membershipService.removeListener((EventListener)this);
    }

    private void registerSyncHandler() {
        this.communicationService.replyTo(SYNC_REQUEST_TOPIC, this.serializer::decode, this::handleSyncRequest, this.serializer::encode, arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    private void unregisterSyncHandler() {
        this.communicationService.unsubscribe(SYNC_REQUEST_TOPIC);
    }

    private void registerGossipHandler() {
        this.communicationService.consume(GOSSIP_REQUEST_TOPIC, this.serializer::decode, this::handleGossip, arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    private void unregisterGossipHandler() {
        this.communicationService.unsubscribe(GOSSIP_REQUEST_TOPIC);
    }

    private void scheduleSync() {
        if (this.config.enableSync()) {
            this.executor.schedule(this.config.syncDelay(), this::sync);
        }
    }

    private void sync() {
        this.refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        MemberId randomMemberToSync = this.membersToSync.remove(0);
        this.sync(randomMemberToSync);
    }

    private void sync(MemberId toMember) {
        LOGGER.trace("Sending sync request to {}", (Object)toMember);
        this.sendSyncRequest(toMember).whenCompleteAsync((response, error) -> this.handleSyncResponse((ClusterConfigurationGossipState)response, (Throwable)error, toMember), arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
    }

    private void refreshMembersToSync() {
        if (this.membersToSync.isEmpty()) {
            this.membersToSync = this.membershipService.getMembers().stream().map(Member::id).filter(id -> !id.equals((Object)this.membershipService.getLocalMember().id())).collect(Collectors.toCollection(LinkedList::new));
            Collections.shuffle(this.membersToSync);
        }
    }

    private void handleSyncResponse(ClusterConfigurationGossipState response, Throwable error, MemberId member) {
        if (error == null) {
            this.update(response);
        } else {
            LOGGER.warn("Failed to sync with {}", (Object)member, (Object)error);
        }
        this.scheduleSync();
    }

    private void update(ClusterConfigurationGossipState receivedGossipState) {
        ClusterConfiguration topology;
        if (!receivedGossipState.equals(this.gossipState) && (topology = receivedGossipState.getClusterConfiguration()) != null) {
            this.clusterConfigurationUpdateHandler.accept(topology);
        }
    }

    private void onConfigurationUpdated(ClusterConfiguration updatedConfiguration) {
        this.gossipState.setClusterConfiguration(updatedConfiguration);
        LOGGER.trace("Updated local gossipState to {}", (Object)updatedConfiguration);
        this.gossip();
        this.notifyListeners(updatedConfiguration);
        TopologyMetrics.updateFromTopology(updatedConfiguration);
    }

    private void notifyListeners(ClusterConfiguration updatedTopology) {
        this.configurationUpdateListeners.forEach(listener -> listener.onClusterConfigurationUpdated(updatedTopology));
    }

    private ClusterConfigurationGossipState handleSyncRequest(MemberId memberId, ClusterConfigurationGossipState clusterSharedGossipState) {
        LOGGER.trace("Received configuration sync request from {} with state {}", (Object)memberId, (Object)clusterSharedGossipState);
        this.update(clusterSharedGossipState);
        return this.gossipState;
    }

    public void updateClusterConfiguration(ClusterConfiguration clusterConfiguration) {
        if (clusterConfiguration == null) {
            return;
        }
        this.executor.run(() -> {
            if (!clusterConfiguration.equals(this.gossipState.getClusterConfiguration())) {
                this.onConfigurationUpdated(clusterConfiguration);
            }
        });
    }

    public ActorFuture<ClusterConfiguration> queryClusterConfiguration(MemberId memberId) {
        ActorFuture responseFuture = this.executor.createFuture();
        this.sendSyncRequest(memberId).whenCompleteAsync((response, error) -> {
            if (error == null) {
                responseFuture.complete((Object)response.getClusterConfiguration());
            } else {
                responseFuture.completeExceptionally(error);
            }
        }, arg_0 -> ((ConcurrencyControl)this.executor).run(arg_0));
        return responseFuture;
    }

    private CompletableFuture<ClusterConfigurationGossipState> sendSyncRequest(MemberId memberId) {
        return this.communicationService.send(SYNC_REQUEST_TOPIC, (Object)this.gossipState, this.serializer::encode, this.serializer::decode, memberId, this.config.syncRequestTimeout());
    }

    private void gossip() {
        this.refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        List<MemberId> gossipMembersList = this.membersToSync.subList(0, Math.min(this.config.gossipFanout(), this.membersToSync.size()));
        LOGGER.trace("Gossiping {} to {}", (Object)this.gossipState, gossipMembersList);
        gossipMembersList.forEach(member -> this.communicationService.unicast(GOSSIP_REQUEST_TOPIC, (Object)this.gossipState, this.serializer::encode, member, true));
        gossipMembersList.clear();
    }

    private void handleGossip(MemberId memberId, ClusterConfigurationGossipState receivedState) {
        LOGGER.trace("Received {} from {}", (Object)this.gossipState, (Object)memberId);
        this.update(receivedState);
    }

    @Override
    public void addUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener listener) {
        this.executor.run(() -> {
            this.configurationUpdateListeners.add(listener);
            if (this.gossipState.getClusterConfiguration() != null) {
                listener.onClusterConfigurationUpdated(this.gossipState.getClusterConfiguration());
            }
        });
    }

    @Override
    public void removeUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener listener) {
        this.executor.run(() -> this.configurationUpdateListeners.remove(listener));
    }

    public boolean isRelevant(ClusterMembershipEvent event) {
        return event.type() == ClusterMembershipEvent.Type.MEMBER_ADDED || event.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED;
    }

    public void event(ClusterMembershipEvent event) {
        switch ((ClusterMembershipEvent.Type)event.type()) {
            case MEMBER_ADDED: {
                this.executor.run(() -> {
                    if (this.config.enableSync()) {
                        this.sync(((Member)event.subject()).id());
                    }
                });
                break;
            }
            case MEMBER_REMOVED: {
                this.executor.run(() -> this.membersToSync.remove(((Member)event.subject()).id()));
                break;
            }
        }
    }

    @Override
    public void close() {
        this.unregisterMemberListener();
        this.unregisterSyncHandler();
        this.unregisterGossipHandler();
    }
}

