/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.bootstrap.BrokerContext;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupContextImpl;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupProcess;
import io.camunda.zeebe.broker.bootstrap.CloseProcess;
import io.camunda.zeebe.broker.bootstrap.StartProcess;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.engine.impl.SubscriptionApiCommandMessageHandlerService;
import io.camunda.zeebe.broker.exporter.repo.ExporterLoadException;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.partitioning.PartitionManager;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.system.configuration.DataCfg;
import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.broker.system.configuration.SocketBindingCfg;
import io.camunda.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.camunda.zeebe.broker.system.management.BrokerAdminService;
import io.camunda.zeebe.broker.system.management.BrokerAdminServiceImpl;
import io.camunda.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.transport.ServerTransport;
import io.camunda.zeebe.transport.TransportFactory;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.LogUtil;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
import io.camunda.zeebe.util.jar.ExternalJarLoadException;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.netty.util.NetUtil;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.slf4j.Logger;

public final class Broker
implements AutoCloseable {
    public static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final SystemContext systemContext;
    private final List<PartitionListener> partitionListeners;
    private boolean isClosed = false;
    private ClusterServicesImpl clusterServices;
    private CompletableFuture<Broker> startFuture;
    private LeaderManagementRequestHandler managementRequestHandler;
    private CommandApiServiceImpl commandApiService;
    private final ActorScheduler scheduler;
    private CloseProcess closeProcess;
    private EmbeddedGatewayService embeddedGatewayService;
    private BrokerHealthCheckService healthCheckService;
    private final List<DiskSpaceUsageListener> diskSpaceUsageListeners = new ArrayList<DiskSpaceUsageListener>();
    private final SpringBrokerBridge springBrokerBridge;
    private DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private BrokerAdminService brokerAdminService;
    private PartitionManagerImpl partitionManager;
    private final TestCompanionClass testCompanionObject = new TestCompanionClass();
    private final BrokerStartupActor brokerStartupActor;
    private final BrokerInfo localBroker;
    private BrokerContext brokerContext;

    public Broker(SystemContext systemContext, SpringBrokerBridge springBrokerBridge) {
        this.systemContext = systemContext;
        this.partitionListeners = new ArrayList<PartitionListener>();
        this.springBrokerBridge = springBrokerBridge;
        this.scheduler = this.systemContext.getScheduler();
        this.localBroker = this.createBrokerInfo(this.getConfig());
        this.healthCheckService = new BrokerHealthCheckService(this.localBroker);
        BrokerStartupContextImpl startupContext = new BrokerStartupContextImpl(this.localBroker, systemContext.getBrokerConfiguration(), springBrokerBridge, (ActorSchedulingService)this.scheduler, this.healthCheckService);
        this.brokerStartupActor = new BrokerStartupActor(startupContext);
        this.scheduler.submitActor((Actor)this.brokerStartupActor);
    }

    public void addPartitionListener(PartitionListener listener) {
        this.partitionListeners.add(listener);
    }

    public synchronized CompletableFuture<Broker> start() {
        if (this.startFuture == null) {
            this.logBrokerStart();
            this.startFuture = new CompletableFuture();
            LogUtil.doWithMDC(this.systemContext.getDiagnosticContext(), this::internalStart);
        }
        return this.startFuture;
    }

    private void logBrokerStart() {
        if (LOG.isInfoEnabled()) {
            BrokerCfg brokerCfg = this.getConfig();
            LOG.info("Version: {}", (Object)VersionUtil.getVersion());
            LOG.info("Starting broker {} with configuration {}", (Object)brokerCfg.getCluster().getNodeId(), (Object)brokerCfg.toJson());
        }
    }

    private void internalStart() {
        StartProcess startProcess = this.initStart();
        try {
            this.closeProcess = startProcess.start();
            this.startFuture.complete(this);
            this.healthCheckService.setBrokerStarted();
        }
        catch (Exception bootStrapException) {
            BrokerCfg brokerCfg = this.getConfig();
            LOG.error("Failed to start broker {}!", (Object)brokerCfg.getCluster().getNodeId(), (Object)bootStrapException);
            UncheckedExecutionException exception = new UncheckedExecutionException("Failed to start broker", (Throwable)bootStrapException);
            this.startFuture.completeExceptionally((Throwable)exception);
            throw exception;
        }
    }

    private StartProcess initStart() {
        BrokerCfg brokerCfg = this.getConfig();
        StartProcess startContext = new StartProcess("Broker-" + this.localBroker.getNodeId());
        startContext.addStep("Migrated Startup Steps", this::migratedStartupSteps);
        startContext.addStep("command api transport and handler", () -> this.commandApiTransportAndHandlerStep(brokerCfg, this.localBroker));
        startContext.addStep("subscription api", () -> this.subscriptionAPIStep(this.localBroker));
        startContext.addStep("cluster services", () -> this.clusterServices.start().join());
        if (brokerCfg.getGateway().isEnable()) {
            startContext.addStep("embedded gateway", () -> {
                this.embeddedGatewayService = new EmbeddedGatewayService(brokerCfg, this.scheduler, this.clusterServices.getMessagingService(), this.clusterServices.getMembershipService(), this.clusterServices.getEventService());
                return this.embeddedGatewayService;
            });
        }
        startContext.addStep("disk space monitor", () -> this.diskSpaceMonitorStep(brokerCfg.getData()));
        startContext.addStep("leader management request handler", () -> this.managementRequestStep(this.localBroker));
        startContext.addStep("zeebe partitions", () -> this.partitionsStep(brokerCfg, this.localBroker));
        startContext.addStep("register diskspace usage listeners", this::addDiskSpaceUsageListeners);
        startContext.addStep("upgrade manager", this::addBrokerAdminService);
        return startContext;
    }

    private AutoCloseable migratedStartupSteps() {
        this.brokerContext = (BrokerContext)this.brokerStartupActor.start().join();
        this.partitionListeners.addAll(this.brokerContext.getPartitionListeners());
        this.clusterServices = this.brokerContext.getClusterServices();
        this.testCompanionObject.atomix = this.clusterServices.getAtomixCluster();
        return () -> {
            this.brokerStartupActor.stop().join();
            this.healthCheckService = null;
        };
    }

    private BrokerInfo createBrokerInfo(BrokerCfg brokerCfg) {
        ClusterCfg clusterCfg = brokerCfg.getCluster();
        BrokerInfo result = new BrokerInfo(clusterCfg.getNodeId(), NetUtil.toSocketAddressString((InetSocketAddress)brokerCfg.getNetwork().getCommandApi().getAdvertisedAddress()));
        result.setClusterSize(clusterCfg.getClusterSize()).setPartitionsCount(clusterCfg.getPartitionsCount()).setReplicationFactor(clusterCfg.getReplicationFactor());
        String version = VersionUtil.getVersion();
        if (version != null && !version.isBlank()) {
            result.setVersion(version);
        }
        return result;
    }

    private AutoCloseable addBrokerAdminService() {
        BrokerAdminServiceImpl adminService = new BrokerAdminServiceImpl();
        this.scheduleActor(adminService);
        adminService.injectAdminAccess(this.partitionManager.createAdminAccess((ConcurrencyControl)adminService));
        adminService.injectPartitionInfoSource(this.partitionManager.getPartitions());
        this.brokerAdminService = adminService;
        this.springBrokerBridge.registerBrokerAdminServiceSupplier(() -> this.brokerAdminService);
        return adminService;
    }

    private AutoCloseable commandApiTransportAndHandlerStep(BrokerCfg brokerCfg, BrokerInfo localBroker) {
        SocketBindingCfg.CommandApiCfg commandApiCfg = brokerCfg.getNetwork().getCommandApi();
        ManagedMessagingService messagingService = this.createMessagingService(brokerCfg.getCluster(), commandApiCfg);
        messagingService.start().join();
        LOG.debug("Bound command API to {}, using advertised address {} ", (Object)messagingService.bindingAddresses(), (Object)messagingService.address());
        TransportFactory transportFactory = new TransportFactory(this.scheduler);
        ServerTransport serverTransport = transportFactory.createServerTransport(localBroker.getNodeId(), (MessagingService)messagingService);
        BackpressureCfg backpressureCfg = brokerCfg.getBackpressure();
        PartitionAwareRequestLimiter limiter = PartitionAwareRequestLimiter.newNoopLimiter();
        if (backpressureCfg.isEnabled()) {
            limiter = PartitionAwareRequestLimiter.newLimiter(backpressureCfg);
        }
        this.commandApiService = new CommandApiServiceImpl(serverTransport, localBroker, limiter, (ActorSchedulingService)this.scheduler, brokerCfg.getExperimental().getQueryApi());
        this.partitionListeners.add(this.commandApiService);
        this.scheduleActor(this.commandApiService);
        this.diskSpaceUsageListeners.add(this.commandApiService);
        return () -> {
            this.commandApiService.close();
            serverTransport.close();
            messagingService.stop().join();
        };
    }

    private ManagedMessagingService createMessagingService(ClusterCfg clusterCfg, SocketBindingCfg socketCfg) {
        MessagingConfig messagingConfig = new MessagingConfig();
        messagingConfig.setInterfaces(List.of(socketCfg.getHost()));
        messagingConfig.setPort(Integer.valueOf(socketCfg.getPort()));
        return new NettyMessagingService(clusterCfg.getClusterName(), Address.from((String)socketCfg.getAdvertisedHost(), (int)socketCfg.getAdvertisedPort()), messagingConfig);
    }

    private AutoCloseable subscriptionAPIStep(BrokerInfo localBroker) {
        SubscriptionApiCommandMessageHandlerService messageHandlerService = new SubscriptionApiCommandMessageHandlerService(localBroker, this.clusterServices.getCommunicationService());
        this.partitionListeners.add(messageHandlerService);
        this.scheduleActor(messageHandlerService);
        this.diskSpaceUsageListeners.add(messageHandlerService);
        return messageHandlerService;
    }

    private void addDiskSpaceUsageListeners() {
        this.diskSpaceUsageListeners.forEach(this.diskSpaceUsageMonitor::addDiskUsageListener);
    }

    private void scheduleActor(Actor actor) {
        this.systemContext.getScheduler().submitActor(actor).join();
    }

    private AutoCloseable diskSpaceMonitorStep(DataCfg data) {
        try {
            FileUtil.ensureDirectoryExists((Path)new File(data.getDirectory()).toPath());
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
        this.diskSpaceUsageMonitor = new DiskSpaceUsageMonitor(data);
        if (data.isDiskUsageMonitoringEnabled()) {
            this.scheduleActor(this.diskSpaceUsageMonitor);
            this.diskSpaceUsageListeners.forEach(l -> this.diskSpaceUsageMonitor.addDiskUsageListener((DiskSpaceUsageListener)l));
            return () -> this.diskSpaceUsageMonitor.close();
        }
        LOG.info("Skipping start of disk space usage monitor, as it is disabled by configuration");
        return () -> {};
    }

    private AutoCloseable managementRequestStep(BrokerInfo localBroker) {
        this.managementRequestHandler = new LeaderManagementRequestHandler(localBroker, this.clusterServices.getCommunicationService(), this.clusterServices.getEventService());
        this.scheduleActor(this.managementRequestHandler);
        this.partitionListeners.add(this.managementRequestHandler);
        this.diskSpaceUsageListeners.add(this.managementRequestHandler);
        return this.managementRequestHandler;
    }

    private AutoCloseable partitionsStep(BrokerCfg brokerCfg, BrokerInfo localBroker) {
        this.partitionManager = new PartitionManagerImpl((ActorSchedulingService)this.scheduler, brokerCfg, localBroker, this.clusterServices, this.healthCheckService, this.managementRequestHandler.getPushDeploymentRequestHandler(), this.diskSpaceUsageListeners::add, this.partitionListeners, this.commandApiService, this.buildExporterRepository(brokerCfg));
        this.partitionManager.start().join();
        return () -> {
            this.partitionManager.stop().join();
            this.partitionManager = null;
        };
    }

    private ExporterRepository buildExporterRepository(BrokerCfg cfg) {
        ExporterRepository exporterRepository = new ExporterRepository();
        Set<Map.Entry<String, ExporterCfg>> exporterEntries = cfg.getExporters().entrySet();
        for (Map.Entry<String, ExporterCfg> exporterEntry : exporterEntries) {
            String id = exporterEntry.getKey();
            ExporterCfg exporterCfg = exporterEntry.getValue();
            try {
                exporterRepository.load(id, exporterCfg);
            }
            catch (ExporterLoadException | ExternalJarLoadException e) {
                throw new IllegalStateException("Failed to load exporter with configuration: " + exporterCfg, e);
            }
        }
        return exporterRepository;
    }

    public BrokerCfg getConfig() {
        return this.systemContext.getBrokerConfiguration();
    }

    @Override
    public void close() {
        LogUtil.doWithMDC(this.systemContext.getDiagnosticContext(), () -> {
            if (!this.isClosed && this.startFuture != null) {
                ((CompletableFuture)this.startFuture.thenAccept(b -> {
                    this.closeProcess.closeReverse();
                    this.isClosed = true;
                    LOG.info("Broker shut down.");
                })).join();
            }
        });
    }

    public EmbeddedGatewayService getEmbeddedGatewayService() {
        return this.embeddedGatewayService;
    }

    @Deprecated
    public AtomixCluster getAtomixCluster() {
        return this.testCompanionObject.atomix;
    }

    public ClusterServices getClusterServices() {
        return this.clusterServices;
    }

    public DiskSpaceUsageMonitor getDiskSpaceUsageMonitor() {
        return this.diskSpaceUsageMonitor;
    }

    public BrokerAdminService getBrokerAdminService() {
        return this.brokerAdminService;
    }

    public SystemContext getSystemContext() {
        return this.systemContext;
    }

    public PartitionManager getPartitionManager() {
        return this.partitionManager;
    }

    private static final class BrokerStartupActor
    extends Actor {
        private final BrokerStartupProcess brokerStartupProcess;
        private final int nodeId;

        private BrokerStartupActor(BrokerStartupContextImpl startupContext) {
            this.nodeId = startupContext.getBrokerInfo().getNodeId();
            startupContext.setConcurrencyControl((ConcurrencyControl)this.actor);
            this.brokerStartupProcess = new BrokerStartupProcess(startupContext);
        }

        public String getName() {
            return BrokerStartupActor.buildActorName((int)this.nodeId, (String)"Startup");
        }

        private ActorFuture<BrokerContext> start() {
            ActorFuture result = this.createFuture();
            this.actor.run(() -> this.actor.runOnCompletion(this.brokerStartupProcess.start(), (BiConsumer)result));
            return result;
        }

        private ActorFuture<Void> stop() {
            ActorFuture result = this.createFuture();
            this.actor.run(() -> this.actor.runOnCompletion(this.brokerStartupProcess.stop(), (BiConsumer)result));
            return result;
        }
    }

    @Deprecated
    private static final class TestCompanionClass {
        private AtomixCluster atomix;

        private TestCompanionClass() {
        }
    }
}

