/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.bootstrap;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.bootstrap.AbstractBrokerStartupStep;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupContext;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.state.ClusterTopology;
import org.slf4j.Logger;

final class PartitionManagerStep
extends AbstractBrokerStartupStep {
    private static final Logger LOGGER = Loggers.SYSTEM_LOGGER;
    private static final int ERROR_CODE_ON_INCONSISTENT_TOPOLOGY = 3;

    PartitionManagerStep() {
    }

    public String getName() {
        return "Partition Manager";
    }

    @Override
    void startupInternal(BrokerStartupContext brokerStartupContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> startupFuture) {
        PartitionManagerImpl partitionManager = new PartitionManagerImpl(brokerStartupContext.getConcurrencyControl(), brokerStartupContext.getActorSchedulingService(), brokerStartupContext.getBrokerConfiguration(), brokerStartupContext.getBrokerInfo(), brokerStartupContext.getClusterServices(), brokerStartupContext.getHealthCheckService(), brokerStartupContext.getDiskSpaceUsageMonitor(), brokerStartupContext.getPartitionListeners(), brokerStartupContext.getPartitionRaftListeners(), brokerStartupContext.getCommandApiService(), brokerStartupContext.getExporterRepository(), brokerStartupContext.getGatewayBrokerTransport(), brokerStartupContext.getJobStreamService().jobStreamer(), brokerStartupContext.getClusterTopology().getPartitionDistribution(), brokerStartupContext.getMeterRegistry());
        concurrencyControl.run(() -> {
            try {
                brokerStartupContext.getClusterTopology().registerTopologyChangeListener((newTopology, oldTopology) -> this.shutdownOnInconsistentTopology(brokerStartupContext.getBrokerInfo().getNodeId(), brokerStartupContext.getSpringBrokerBridge(), newTopology, oldTopology));
                partitionManager.start();
                brokerStartupContext.setPartitionManager(partitionManager);
                brokerStartupContext.getClusterTopology().registerPartitionChangeExecutor(partitionManager);
                startupFuture.complete((Object)brokerStartupContext);
            }
            catch (Exception e) {
                startupFuture.completeExceptionally((Throwable)e);
            }
        });
    }

    @Override
    void shutdownInternal(BrokerStartupContext brokerShutdownContext, ConcurrencyControl concurrencyControl, ActorFuture<BrokerStartupContext> shutdownFuture) {
        PartitionManagerImpl partitionManager = brokerShutdownContext.getPartitionManager();
        if (partitionManager == null) {
            shutdownFuture.complete(null);
            return;
        }
        brokerShutdownContext.getClusterTopology().removePartitionChangeExecutor();
        concurrencyControl.runOnCompletion(partitionManager.stop(), (ok, error) -> {
            brokerShutdownContext.setPartitionManager(null);
            if (error != null) {
                shutdownFuture.completeExceptionally(error);
            } else {
                shutdownFuture.complete((Object)brokerShutdownContext);
            }
        });
        brokerShutdownContext.getClusterTopology().removeTopologyChangeListener();
    }

    private void shutdownOnInconsistentTopology(int localBrokerId, SpringBrokerBridge springBrokerBridge, ClusterTopology newTopology, ClusterTopology oldTopology) {
        MemberId localMemberId = MemberId.from((String)String.valueOf(localBrokerId));
        LOGGER.warn("  Received a newer topology which has a different state for this broker.\n  State of this broker in new topology :'{}'\n  State of this broker in old topology: '{}'\n  This usually happens when the topology was changed forcefully when this broker was unreachable or this broker encountered a data loss. Shutting down the broker. Please restart the broker to use the new topology.\n", (Object)newTopology.getMember(localMemberId), (Object)oldTopology.getMember(localMemberId));
        springBrokerBridge.initiateShutdown(3);
    }
}

