/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.engine.impl.DeploymentDistributorImpl;
import io.camunda.zeebe.broker.engine.impl.LongPollingJobNotification;
import io.camunda.zeebe.broker.engine.impl.PartitionCommandSenderImpl;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.logstreams.state.StatePositionSupplier;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManager;
import io.camunda.zeebe.broker.partitioning.topology.TopologyPartitionListenerImpl;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.RocksdbCfg;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.partitions.PartitionStartupAndTransitionContextImpl;
import io.camunda.zeebe.broker.system.partitions.PartitionStartupContext;
import io.camunda.zeebe.broker.system.partitions.PartitionStep;
import io.camunda.zeebe.broker.system.partitions.PartitionStepMigrationHelper;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.broker.system.partitions.TypedRecordProcessorsFactory;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.broker.system.partitions.impl.AtomixPartitionMessagingService;
import io.camunda.zeebe.broker.system.partitions.impl.AtomixRecordEntrySupplierImpl;
import io.camunda.zeebe.broker.system.partitions.impl.NewPartitionTransitionImpl;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionTransitionImpl;
import io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ExporterDirectorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ExporterDirectorPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogDeletionPartitionStartupStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStoragePartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStoragePartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStreamPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStreamPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.QueryServicePartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.QueryServiceStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.RockDbMetricExporterPartitionStartupStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.SnapshotDirectorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.SnapshotDirectorPartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionTransitionStep;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.db.impl.rocksdb.RocksDbConfiguration;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.deployment.DeploymentResponder;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.engine.processing.message.command.PartitionCommandSender;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.startup.StartupStep;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public final class PartitionFactory {
    public static final boolean FEATURE_TOGGLE_USE_NEW_CODE = true;
    private static final List<StartupStep<PartitionStartupContext>> STARTUP_STEPS = List.of(new LogDeletionPartitionStartupStep(), new RockDbMetricExporterPartitionStartupStep());
    private static final List<PartitionTransitionStep> TRANSITION_STEPS = List.of(new LogStoragePartitionTransitionStep(), new LogStreamPartitionTransitionStep(), new ZeebeDbPartitionTransitionStep(), new QueryServicePartitionTransitionStep(), new StreamProcessorTransitionStep(), new SnapshotDirectorPartitionTransitionStep(), new ExporterDirectorPartitionTransitionStep());
    private static final List<PartitionStep> LEADER_STEPS = List.of(PartitionStepMigrationHelper.fromStartupStep(new LogDeletionPartitionStartupStep()), new LogStoragePartitionStep(), new LogStreamPartitionStep(), new ZeebeDbPartitionStep(), new QueryServiceStep(), new StreamProcessorPartitionStep(), new SnapshotDirectorPartitionStep(), PartitionStepMigrationHelper.fromStartupStep(new RockDbMetricExporterPartitionStartupStep()), new ExporterDirectorPartitionStep());
    private static final List<PartitionStep> FOLLOWER_STEPS = List.of(PartitionStepMigrationHelper.fromStartupStep(new LogDeletionPartitionStartupStep()), new LogStoragePartitionStep(), new LogStreamPartitionStep(), new ZeebeDbPartitionStep(), new QueryServiceStep(), new StreamProcessorPartitionStep(), new SnapshotDirectorPartitionStep(), PartitionStepMigrationHelper.fromStartupStep(new RockDbMetricExporterPartitionStartupStep()), new ExporterDirectorPartitionStep());
    private final ActorSchedulingService actorSchedulingService;
    private final BrokerCfg brokerCfg;
    private final BrokerInfo localBroker;
    private final PushDeploymentRequestHandler deploymentRequestHandler;
    private final CommandApiService commandApiService;
    private final FileBasedSnapshotStoreFactory snapshotStoreFactory;
    private final ClusterServices clusterServices;
    private final ExporterRepository exporterRepository;
    private final BrokerHealthCheckService healthCheckService;

    PartitionFactory(ActorSchedulingService actorSchedulingService, BrokerCfg brokerCfg, BrokerInfo localBroker, PushDeploymentRequestHandler deploymentRequestHandler, CommandApiService commandApiService, FileBasedSnapshotStoreFactory snapshotStoreFactory, ClusterServices clusterServices, ExporterRepository exporterRepository, BrokerHealthCheckService healthCheckService) {
        this.actorSchedulingService = actorSchedulingService;
        this.brokerCfg = brokerCfg;
        this.localBroker = localBroker;
        this.deploymentRequestHandler = deploymentRequestHandler;
        this.commandApiService = commandApiService;
        this.snapshotStoreFactory = snapshotStoreFactory;
        this.clusterServices = clusterServices;
        this.exporterRepository = exporterRepository;
        this.healthCheckService = healthCheckService;
    }

    List<ZeebePartition> constructPartitions(RaftPartitionGroup partitionGroup, List<PartitionListener> partitionListeners, TopologyManager topologyManager) {
        ArrayList<ZeebePartition> partitions = new ArrayList<ZeebePartition>();
        ClusterCommunicationService communicationService = this.clusterServices.getCommunicationService();
        ClusterEventService eventService = this.clusterServices.getEventService();
        ClusterMembershipService membershipService = this.clusterServices.getMembershipService();
        MemberId nodeId = membershipService.getLocalMember().id();
        List owningPartitions = partitionGroup.getPartitionsWithMember(nodeId).stream().map(RaftPartition.class::cast).collect(Collectors.toList());
        TypedRecordProcessorsFactory typedRecordProcessorsFactory = this.createFactory(topologyManager, this.localBroker, communicationService, eventService, this.deploymentRequestHandler);
        for (RaftPartition owningPartition : owningPartitions) {
            Integer partitionId = (Integer)owningPartition.id().id();
            ConstructableSnapshotStore constructableSnapshotStore = this.snapshotStoreFactory.getConstructableSnapshotStore(partitionId.intValue());
            StateController stateController = this.createStateController(owningPartition, constructableSnapshotStore, this.snapshotStoreFactory.getSnapshotStoreConcurrencyControl(partitionId.intValue()));
            PartitionStartupAndTransitionContextImpl partitionStartupAndTransitionContext = new PartitionStartupAndTransitionContextImpl(this.localBroker.getNodeId(), owningPartition, partitionListeners, new AtomixPartitionMessagingService(communicationService, membershipService, owningPartition.members()), this.actorSchedulingService, this.brokerCfg, this.commandApiService::newCommandResponseWriter, () -> this.commandApiService.getOnProcessedListener(partitionId), constructableSnapshotStore, this.snapshotStoreFactory.getReceivableSnapshotStore(partitionId.intValue()), stateController, typedRecordProcessorsFactory, this.exporterRepository, new PartitionProcessingState(owningPartition));
            PartitionTransitionImpl transitionBehavior = new PartitionTransitionImpl(partitionStartupAndTransitionContext, LEADER_STEPS, FOLLOWER_STEPS);
            NewPartitionTransitionImpl newTransitionBehavior = new NewPartitionTransitionImpl(TRANSITION_STEPS, partitionStartupAndTransitionContext.createTransitionContext());
            ZeebePartition zeebePartition = new ZeebePartition(partitionStartupAndTransitionContext, newTransitionBehavior, STARTUP_STEPS);
            this.healthCheckService.registerMonitoredPartition(zeebePartition.getPartitionId(), zeebePartition);
            partitions.add(zeebePartition);
        }
        return partitions;
    }

    private StateController createStateController(RaftPartition raftPartition, ConstructableSnapshotStore snapshotStore, ConcurrencyControl concurrencyControl) {
        Path runtimeDirectory = raftPartition.dataDirectory().toPath().resolve("runtime");
        RocksdbCfg databaseCfg = this.brokerCfg.getExperimental().getRocksdb();
        return new StateControllerImpl(DefaultZeebeDbFactory.defaultFactory((RocksDbConfiguration)databaseCfg.createRocksDbConfiguration()), snapshotStore, runtimeDirectory, new AtomixRecordEntrySupplierImpl(raftPartition.getServer()), StatePositionSupplier::getHighestExportedPosition, concurrencyControl);
    }

    private TypedRecordProcessorsFactory createFactory(final TopologyManager topologyManager, BrokerInfo localBroker, ClusterCommunicationService communicationService, ClusterEventService eventService, PushDeploymentRequestHandler deploymentRequestHandler) {
        return processingContext -> {
            ActorControl actor = processingContext.getActor();
            LogStream stream = processingContext.getLogStream();
            final TopologyPartitionListenerImpl partitionListener = new TopologyPartitionListenerImpl(actor);
            topologyManager.addTopologyPartitionListener(partitionListener);
            DeploymentDistributorImpl deploymentDistributor = new DeploymentDistributorImpl(communicationService, eventService, partitionListener, actor);
            PartitionCommandSenderImpl partitionCommandSender = new PartitionCommandSenderImpl(communicationService, partitionListener);
            SubscriptionCommandSender subscriptionCommandSender = new SubscriptionCommandSender(stream.getPartitionId(), (PartitionCommandSender)partitionCommandSender);
            LongPollingJobNotification jobsAvailableNotification = new LongPollingJobNotification(eventService);
            TypedRecordProcessors processor = EngineProcessors.createEngineProcessors((ProcessingContext)processingContext, (int)localBroker.getPartitionsCount(), (SubscriptionCommandSender)subscriptionCommandSender, (DeploymentDistributor)deploymentDistributor, (DeploymentResponder)deploymentRequestHandler, jobsAvailableNotification::onJobsAvailable);
            return processor.withListener(new StreamProcessorLifecycleAware(){

                public void onClose() {
                    topologyManager.removeTopologyPartitionListener(partitionListener);
                }
            });
        };
    }
}

