/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.partitioning.topology;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.utils.event.EventListener;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManager;
import io.camunda.zeebe.broker.partitioning.topology.TopologyPartitionListener;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.LogUtil;
import io.camunda.zeebe.util.health.HealthStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

public final class TopologyManagerImpl
extends Actor
implements TopologyManager,
ClusterMembershipEventListener,
PartitionListener {
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Int2ObjectHashMap<BrokerInfo> partitionLeaders = new Int2ObjectHashMap();
    private final ClusterMembershipService membershipService;
    private final BrokerInfo localBroker;
    private final List<TopologyPartitionListener> topologyPartitionListeners = new ArrayList<TopologyPartitionListener>();
    private final String actorName;

    public TopologyManagerImpl(ClusterMembershipService membershipService, BrokerInfo localBroker) {
        this.membershipService = membershipService;
        this.localBroker = localBroker;
        this.actorName = "TopologyManager";
    }

    @Override
    public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
        return this.setFollower(partitionId);
    }

    @Override
    public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream, QueryService queryService) {
        return this.setLeader(term, partitionId);
    }

    @Override
    public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
        return this.setInactive(partitionId);
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarted() {
        this.publishTopologyChanges();
        this.membershipService.addListener((EventListener)this);
        this.membershipService.getMembers().forEach(m -> this.event(new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, m)));
    }

    public ActorFuture<Void> setLeader(long term, int partitionId) {
        return this.actor.call(() -> {
            this.partitionLeaders.put(partitionId, (Object)this.localBroker);
            this.localBroker.setLeaderForPartition(partitionId, term);
            this.publishTopologyChanges();
            this.notifyPartitionLeaderUpdated(partitionId, this.localBroker);
        });
    }

    public ActorFuture<Void> setFollower(int partitionId) {
        return this.actor.call(() -> {
            this.removeIfLeader(this.localBroker, partitionId);
            this.localBroker.setFollowerForPartition(partitionId);
            this.publishTopologyChanges();
        });
    }

    public ActorFuture<Void> setInactive(int partitionId) {
        return this.actor.call(() -> {
            this.removeIfLeader(this.localBroker, partitionId);
            this.localBroker.setInactiveForPartition(partitionId);
            this.publishTopologyChanges();
        });
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        Member eventSource = (Member)clusterMembershipEvent.subject();
        BrokerInfo brokerInfo = this.readBrokerInfo(eventSource);
        if (brokerInfo != null && brokerInfo.getNodeId() != this.localBroker.getNodeId()) {
            this.actor.run(() -> {
                switch ((ClusterMembershipEvent.Type)clusterMembershipEvent.type()) {
                    case METADATA_CHANGED: 
                    case MEMBER_ADDED: {
                        this.onMetadataChanged(brokerInfo);
                        break;
                    }
                    case MEMBER_REMOVED: {
                        this.onMemberRemoved(brokerInfo);
                        break;
                    }
                    default: {
                        LOG.debug("Received {} from member {}, was not handled.", (Object)clusterMembershipEvent.type(), (Object)brokerInfo.getNodeId());
                    }
                }
            });
        }
    }

    private void onMemberRemoved(BrokerInfo brokerInfo) {
        LOG.debug("Received member removed {} ", (Object)brokerInfo);
        brokerInfo.consumePartitions(partition -> this.removeIfLeader(brokerInfo, partition), (leaderPartitionId, term) -> {}, followerPartitionId -> {}, inactivePartitionId -> {});
    }

    private void removeIfLeader(BrokerInfo brokerInfo, Integer partition) {
        BrokerInfo currentLeader = (BrokerInfo)this.partitionLeaders.get((Object)partition);
        if (currentLeader != null && currentLeader.getNodeId() == brokerInfo.getNodeId()) {
            this.partitionLeaders.remove((Object)partition);
        }
    }

    private void onMetadataChanged(BrokerInfo brokerInfo) {
        LOG.debug("Received metadata change for {}, partitions {} terms {}", new Object[]{brokerInfo.getNodeId(), brokerInfo.getPartitionRoles(), brokerInfo.getPartitionLeaderTerms()});
        brokerInfo.consumePartitions((leaderPartitionId, term) -> {
            if (this.updatePartitionLeader(brokerInfo, (int)leaderPartitionId, term)) {
                this.notifyPartitionLeaderUpdated((int)leaderPartitionId, brokerInfo);
            }
        }, followerPartitionId -> this.removeIfLeader(brokerInfo, followerPartitionId), inactivePartitionId -> this.removeIfLeader(brokerInfo, inactivePartitionId));
    }

    private boolean updatePartitionLeader(BrokerInfo brokerInfo, int leaderPartitionId, long term) {
        BrokerInfo currentLeader = (BrokerInfo)this.partitionLeaders.get(leaderPartitionId);
        if (currentLeader != null) {
            Long currentLeaderTerm = (Long)currentLeader.getPartitionLeaderTerms().get(leaderPartitionId);
            if (currentLeaderTerm == null) {
                LOG.error("Could not update new leader for partition {} at term {}. Expected to have a non-null value for current leader term, but found null", (Object)leaderPartitionId, (Object)term);
                return false;
            }
            if (currentLeaderTerm >= term) {
                return false;
            }
        }
        this.partitionLeaders.put(leaderPartitionId, (Object)brokerInfo);
        return true;
    }

    private BrokerInfo readBrokerInfo(Member eventSource) {
        BrokerInfo brokerInfo = BrokerInfo.fromProperties((Properties)eventSource.properties());
        if (brokerInfo != null && !this.isStaticConfigValid(brokerInfo)) {
            LOG.error("Static configuration of node {} differs from local node {}: NodeId: 0 <= {} < {}, ClusterSize: {} == {}, PartitionsCount: {} == {}, ReplicationFactor: {} == {}.", new Object[]{eventSource.id(), this.membershipService.getLocalMember().id(), brokerInfo.getNodeId(), this.localBroker.getClusterSize(), brokerInfo.getClusterSize(), this.localBroker.getClusterSize(), brokerInfo.getPartitionsCount(), this.localBroker.getPartitionsCount(), brokerInfo.getReplicationFactor(), this.localBroker.getReplicationFactor()});
            return null;
        }
        return brokerInfo;
    }

    private boolean isStaticConfigValid(BrokerInfo brokerInfo) {
        return brokerInfo.getNodeId() >= 0 && brokerInfo.getNodeId() < this.localBroker.getClusterSize() && this.localBroker.getClusterSize() == brokerInfo.getClusterSize() && this.localBroker.getPartitionsCount() == brokerInfo.getPartitionsCount() && this.localBroker.getReplicationFactor() == brokerInfo.getReplicationFactor();
    }

    private void publishTopologyChanges() {
        Properties memberProperties = this.membershipService.getLocalMember().properties();
        this.localBroker.writeIntoProperties(memberProperties);
    }

    @Override
    public void removeTopologyPartitionListener(TopologyPartitionListener listener) {
        this.actor.run(() -> this.topologyPartitionListeners.remove(listener));
    }

    @Override
    public void addTopologyPartitionListener(TopologyPartitionListener listener) {
        this.actor.run(() -> {
            this.topologyPartitionListeners.add(listener);
            this.partitionLeaders.forEach((partitionId, leader) -> LogUtil.catchAndLog((Logger)LOG, () -> listener.onPartitionLeaderUpdated((int)partitionId, (BrokerInfo)leader)));
        });
    }

    private void notifyPartitionLeaderUpdated(int partitionId, BrokerInfo member) {
        for (TopologyPartitionListener listener : this.topologyPartitionListeners) {
            LogUtil.catchAndLog((Logger)LOG, () -> listener.onPartitionLeaderUpdated(partitionId, member));
        }
    }

    public void onHealthChanged(int partitionId, HealthStatus status) {
        this.actor.run(() -> {
            if (status == HealthStatus.HEALTHY) {
                this.localBroker.setPartitionHealthy(Integer.valueOf(partitionId));
            } else if (status == HealthStatus.UNHEALTHY) {
                this.localBroker.setPartitionUnhealthy(Integer.valueOf(partitionId));
            } else if (status == HealthStatus.DEAD) {
                this.localBroker.setPartitionDead(Integer.valueOf(partitionId));
            }
            this.publishTopologyChanges();
        });
    }
}

