/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.partitioning;

import io.atomix.cluster.MemberId;
import io.atomix.primitive.partition.PartitionManagementService;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.primitive.partition.impl.DefaultPartitionManagementService;
import io.atomix.raft.partition.RaftPartition;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.partitioning.MultiPartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionManager;
import io.camunda.zeebe.broker.partitioning.PartitionStartup;
import io.camunda.zeebe.broker.partitioning.RaftPartitionFactory;
import io.camunda.zeebe.broker.partitioning.ZeebePartitionFactory;
import io.camunda.zeebe.broker.partitioning.topology.PartitionDistribution;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManager;
import io.camunda.zeebe.broker.partitioning.topology.TopologyManagerImpl;
import io.camunda.zeebe.broker.partitioning.topology.TopologyPartitionListener;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.PartitionHealthBroadcaster;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStoreFactory;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.health.HealthStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PartitionManagerImpl
implements PartitionManager,
TopologyManager {
    public static final String GROUP_NAME = "raft-partition";
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManagerImpl.class);
    private final BrokerHealthCheckService healthCheckService;
    private final ActorSchedulingService actorSchedulingService;
    private final TopologyManagerImpl topologyManager;
    private final List<ZeebePartition> zeebePartitions = new CopyOnWriteArrayList<ZeebePartition>();
    private final List<RaftPartition> raftPartitions = new CopyOnWriteArrayList<RaftPartition>();
    private final Map<Integer, PartitionAdminAccess> adminAccess = new ConcurrentHashMap<Integer, PartitionAdminAccess>();
    private final DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private final PartitionDistribution partitionDistribution;
    private final PartitionStartup partitionStartup;
    private final DefaultPartitionManagementService managementService;

    public PartitionManagerImpl(ActorSchedulingService actorSchedulingService, BrokerCfg brokerCfg, BrokerInfo localBroker, ClusterServices clusterServices, BrokerHealthCheckService healthCheckService, DiskSpaceUsageMonitor diskSpaceUsageMonitor, List<PartitionListener> partitionListeners, CommandApiService commandApiService, ExporterRepository exporterRepository, AtomixServerTransport gatewayBrokerTransport, JobStreamer jobStreamer, PartitionDistribution partitionDistribution) {
        this.actorSchedulingService = actorSchedulingService;
        this.healthCheckService = healthCheckService;
        this.diskSpaceUsageMonitor = diskSpaceUsageMonitor;
        FileBasedSnapshotStoreFactory snapshotStoreFactory = new FileBasedSnapshotStoreFactory(actorSchedulingService, localBroker.getNodeId());
        FeatureFlags featureFlags = brokerCfg.getExperimental().getFeatures().toFeatureFlags();
        this.partitionDistribution = partitionDistribution;
        this.topologyManager = new TopologyManagerImpl(clusterServices.getMembershipService(), localBroker);
        ArrayList<PartitionListener> listeners = new ArrayList<PartitionListener>(partitionListeners);
        listeners.add(this.topologyManager);
        ZeebePartitionFactory zeebePartitionFactory = new ZeebePartitionFactory(actorSchedulingService, brokerCfg, localBroker, commandApiService, snapshotStoreFactory, clusterServices, exporterRepository, healthCheckService, diskSpaceUsageMonitor, gatewayBrokerTransport, jobStreamer, listeners, this.topologyManager, featureFlags);
        this.managementService = new DefaultPartitionManagementService(clusterServices.getMembershipService(), clusterServices.getCommunicationService());
        RaftPartitionFactory raftPartitionFactory = new RaftPartitionFactory(brokerCfg, (ReceivableSnapshotStoreFactory)snapshotStoreFactory);
        this.partitionStartup = new PartitionStartup(actorSchedulingService, (PartitionManagementService)this.managementService, raftPartitionFactory, zeebePartitionFactory);
    }

    public PartitionAdminAccess createAdminAccess(ConcurrencyControl concurrencyControl) {
        return new MultiPartitionAdminAccess(concurrencyControl, this.adminAccess);
    }

    public CompletableFuture<Void> start() {
        LOGGER.info("Starting partitions");
        this.actorSchedulingService.submitActor((Actor)this.topologyManager);
        MemberId localMemberId = this.managementService.getMembershipService().getLocalMember().id();
        List<PartitionMetadata> memberPartitions = this.partitionDistribution.partitions().stream().filter(p -> p.members().contains(localMemberId)).toList();
        this.healthCheckService.registerBootstrapPartitions(memberPartitions);
        for (PartitionMetadata partitionMetadata : memberPartitions) {
            Integer id = (Integer)partitionMetadata.id().id();
            this.partitionStartup.bootstrap(partitionMetadata).whenComplete((startedPartition, throwable) -> {
                if (throwable != null) {
                    LOGGER.error("Failed to start partition {}", (Object)id, throwable);
                    this.onHealthChanged(id, HealthStatus.DEAD);
                } else {
                    LOGGER.info("Started partition {}", (Object)id);
                    ZeebePartition zeebePartition = startedPartition.zeebePartition();
                    RaftPartition raftPartition = startedPartition.raftPartition();
                    zeebePartition.addFailureListener(new PartitionHealthBroadcaster(id, this::onHealthChanged));
                    this.diskSpaceUsageMonitor.addDiskUsageListener(zeebePartition);
                    this.adminAccess.put(id, zeebePartition.getAdminAccess());
                    this.zeebePartitions.add(zeebePartition);
                    this.raftPartitions.add(raftPartition);
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> stop() {
        return ((CompletableFuture)this.stopZeebePartitions().thenCompose(ignored -> this.stopRaftPartitions())).whenComplete((ok, error) -> {
            if (error != null) {
                LOGGER.error(error.getMessage(), error);
            }
            this.raftPartitions.clear();
            this.zeebePartitions.clear();
            this.adminAccess.clear();
            this.topologyManager.close();
        });
    }

    private CompletableFuture<Void> stopZeebePartitions() {
        CompletableFuture[] futures = (CompletableFuture[])this.zeebePartitions.stream().map(partition -> CompletableFuture.runAsync(() -> this.stopPartition((ZeebePartition)partition))).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<Void> stopRaftPartitions() {
        return CompletableFuture.allOf((CompletableFuture[])this.raftPartitions.stream().map(RaftPartition::close).toArray(CompletableFuture[]::new));
    }

    private void stopPartition(ZeebePartition partition) {
        this.diskSpaceUsageMonitor.removeDiskUsageListener(partition);
        this.healthCheckService.removeMonitoredPartition(partition.getPartitionId());
        partition.close();
    }

    public String toString() {
        return "PartitionManagerImpl{partitions=" + this.zeebePartitions + "}";
    }

    public void onHealthChanged(int i, HealthStatus healthStatus) {
        this.topologyManager.onHealthChanged(i, healthStatus);
    }

    @Override
    public void removeTopologyPartitionListener(TopologyPartitionListener listener) {
        this.topologyManager.removeTopologyPartitionListener(listener);
    }

    @Override
    public void addTopologyPartitionListener(TopologyPartitionListener listener) {
        this.topologyManager.addTopologyPartitionListener(listener);
    }

    @Override
    public RaftPartition getRaftPartition(int partitionId) {
        return this.raftPartitions.stream().filter(p -> (Integer)p.id().id() == partitionId).findFirst().orElse(null);
    }

    @Override
    public Collection<RaftPartition> getRaftPartitions() {
        return this.raftPartitions;
    }

    @Override
    public Collection<ZeebePartition> getZeebePartitions() {
        return this.zeebePartitions;
    }
}

