/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.impl.broker.cluster;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterStateImpl;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyListener;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.cluster.GatewayTopologyMetrics;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.protocol.record.PartitionHealthStatus;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;

public final class BrokerTopologyManagerImpl
extends Actor
implements BrokerTopologyManager,
ClusterMembershipEventListener {
    private static final Logger LOG = Loggers.GATEWAY_LOGGER;
    private final AtomicReference<BrokerClusterStateImpl> topology;
    private final Supplier<Set<Member>> membersSupplier;
    private final GatewayTopologyMetrics topologyMetrics = new GatewayTopologyMetrics();
    private final Set<BrokerTopologyListener> topologyListeners = new HashSet<BrokerTopologyListener>();
    private final ActorFuture<Void> startFuture = new CompletableActorFuture();

    public BrokerTopologyManagerImpl(Supplier<Set<Member>> membersSupplier) {
        this.membersSupplier = membersSupplier;
        this.topology = new AtomicReference<Object>(null);
    }

    @Override
    public BrokerClusterState getTopology() {
        return this.topology.get();
    }

    public void setTopology(BrokerClusterStateImpl topology) {
        this.topology.set(topology);
    }

    @Override
    public void addTopologyListener(BrokerTopologyListener listener) {
        this.actor.run(() -> {
            this.topologyListeners.add(listener);
            BrokerClusterStateImpl currentTopology = this.topology.get();
            if (currentTopology != null) {
                currentTopology.getBrokers().stream().map(b -> MemberId.from((String)String.valueOf(b))).forEach(listener::brokerAdded);
            }
        });
    }

    @Override
    public void removeTopologyListener(BrokerTopologyListener listener) {
        this.actor.run(() -> this.topologyListeners.remove(listener));
    }

    public ActorFuture<Void> start(ActorSchedulingService actorScheduler) {
        if (!this.startFuture.isDone()) {
            actorScheduler.submitActor((Actor)this);
        }
        return this.startFuture;
    }

    private void checkForMissingEvents() {
        Set<Member> members = this.membersSupplier.get();
        if (members == null || members.isEmpty()) {
            return;
        }
        BrokerClusterStateImpl newTopology = new BrokerClusterStateImpl(this.topology.get());
        for (Member member : members) {
            BrokerInfo brokerInfo = BrokerInfo.fromProperties((Properties)member.properties());
            if (brokerInfo == null) continue;
            this.addBroker(newTopology, member, brokerInfo);
        }
        this.topology.set(newTopology);
    }

    private void addBroker(BrokerClusterStateImpl newTopology, Member member, BrokerInfo brokerInfo) {
        if (newTopology.addBrokerIfAbsent(brokerInfo.getNodeId())) {
            this.topologyListeners.forEach(l -> l.brokerAdded(member.id()));
        }
        this.processProperties(brokerInfo, newTopology);
    }

    public String getName() {
        return "GatewayTopologyManager";
    }

    protected void onActorStarted() {
        this.checkForMissingEvents();
        this.startFuture.complete(null);
    }

    public void event(ClusterMembershipEvent event) {
        Member subject = (Member)event.subject();
        ClusterMembershipEvent.Type eventType = (ClusterMembershipEvent.Type)event.type();
        BrokerInfo brokerInfo = BrokerInfo.fromProperties((Properties)subject.properties());
        if (brokerInfo != null) {
            this.actor.call(() -> {
                BrokerClusterStateImpl newTopology = new BrokerClusterStateImpl(this.topology.get());
                switch (eventType) {
                    case MEMBER_ADDED: {
                        LOG.debug("Received new broker {}.", (Object)brokerInfo);
                        this.addBroker(newTopology, subject, brokerInfo);
                        break;
                    }
                    case METADATA_CHANGED: {
                        LOG.debug("Received metadata change from Broker {}, partitions {}, terms {} and health {}.", new Object[]{brokerInfo.getNodeId(), brokerInfo.getPartitionRoles(), brokerInfo.getPartitionLeaderTerms(), brokerInfo.getPartitionHealthStatuses()});
                        this.addBroker(newTopology, subject, brokerInfo);
                        break;
                    }
                    case MEMBER_REMOVED: {
                        LOG.debug("Received broker was removed {}.", (Object)brokerInfo);
                        newTopology.removeBroker(brokerInfo.getNodeId());
                        this.topologyListeners.forEach(l -> l.brokerRemoved(subject.id()));
                        break;
                    }
                    default: {
                        LOG.debug("Received {} for broker {}, do nothing.", (Object)eventType, (Object)brokerInfo.getNodeId());
                    }
                }
                this.topology.set(newTopology);
                this.updateMetrics(newTopology);
            });
        }
    }

    private void processProperties(BrokerInfo distributedBrokerInfo, BrokerClusterStateImpl newTopology) {
        newTopology.setClusterSize(distributedBrokerInfo.getClusterSize());
        newTopology.setPartitionsCount(distributedBrokerInfo.getPartitionsCount());
        newTopology.setReplicationFactor(distributedBrokerInfo.getReplicationFactor());
        int nodeId = distributedBrokerInfo.getNodeId();
        distributedBrokerInfo.consumePartitions(newTopology::addPartitionIfAbsent, (leaderPartitionId, term) -> newTopology.setPartitionLeader((int)leaderPartitionId, nodeId, term), followerPartitionId -> newTopology.addPartitionFollower(followerPartitionId, nodeId), inactivePartitionId -> newTopology.addPartitionInactive(inactivePartitionId, nodeId));
        distributedBrokerInfo.consumePartitionsHealth((partition, health) -> newTopology.setPartitionHealthStatus(nodeId, (int)partition, (PartitionHealthStatus)health));
        String clientAddress = distributedBrokerInfo.getCommandApiAddress();
        if (clientAddress != null) {
            newTopology.setBrokerAddressIfPresent(nodeId, clientAddress);
        }
        newTopology.setBrokerVersionIfPresent(nodeId, distributedBrokerInfo.getVersion());
    }

    private void updateMetrics(BrokerClusterState topology) {
        List<Integer> partitions = topology.getPartitions();
        partitions.forEach(partition -> {
            Set<Integer> followers;
            int leader = topology.getLeaderForPartition((int)partition);
            if (leader != -2) {
                this.topologyMetrics.setLeaderForPartition((int)partition, leader);
            }
            if ((followers = topology.getFollowersForPartition((int)partition)) != null) {
                followers.forEach(broker -> this.topologyMetrics.setFollower((int)partition, (int)broker));
            }
        });
    }
}

