/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dynamic.config;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationInitializer;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManagerImpl;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.ExporterStateInitializer;
import io.camunda.zeebe.dynamic.config.PersistedClusterConfiguration;
import io.camunda.zeebe.dynamic.config.RoutingStateInitializer;
import io.camunda.zeebe.dynamic.config.StaticConfiguration;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationManagementRequestsHandler;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationRequestServer;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliersImpl;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinatorImpl;
import io.camunda.zeebe.dynamic.config.changes.NoopClusterMembershipChangeExecutor;
import io.camunda.zeebe.dynamic.config.changes.PartitionChangeExecutor;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiper;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiperConfig;
import io.camunda.zeebe.dynamic.config.serializer.ProtoBufSerializer;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.AsyncClosable;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

public final class ClusterConfigurationManagerService
implements ClusterConfigurationUpdateNotifier,
AsyncClosable {
    private static final String COORDINATOR_ID = "0";
    private static final String TOPOLOGY_FILE_NAME = ".topology.meta";
    private final ClusterConfigurationManagerImpl clusterConfigurationManager;
    private final ClusterConfigurationGossiper clusterConfigurationGossiper;
    private final ClusterMembershipService memberShipService;
    private final boolean isCoordinator;
    private final PersistedClusterConfiguration persistedClusterConfiguration;
    private final Path configurationFile;
    private final ConfigurationChangeCoordinator configurationChangeCoordinator;
    private final ClusterConfigurationRequestServer configurationRequestServer;
    private final Actor gossipActor;
    private final Actor managerActor;

    public ClusterConfigurationManagerService(Path dataRootDirectory, ClusterCommunicationService communicationService, ClusterMembershipService memberShipService, ClusterConfigurationGossiperConfig config, boolean enablePartitionScaling) {
        this.memberShipService = memberShipService;
        try {
            FileUtil.ensureDirectoryExists((Path)dataRootDirectory);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
        MemberId localMemberId = memberShipService.getLocalMember().id();
        this.configurationFile = dataRootDirectory.resolve(TOPOLOGY_FILE_NAME);
        this.persistedClusterConfiguration = PersistedClusterConfiguration.ofFile(this.configurationFile, new ProtoBufSerializer());
        this.gossipActor = new Actor(this){};
        this.managerActor = new Actor(this){};
        this.clusterConfigurationManager = new ClusterConfigurationManagerImpl((ConcurrencyControl)this.managerActor, localMemberId, this.persistedClusterConfiguration);
        this.clusterConfigurationGossiper = new ClusterConfigurationGossiper((ConcurrencyControl)this.gossipActor, communicationService, memberShipService, new ProtoBufSerializer(), config, this.clusterConfigurationManager::onGossipReceived);
        this.isCoordinator = ((String)((Object)localMemberId.id())).equals(COORDINATOR_ID);
        this.configurationChangeCoordinator = new ConfigurationChangeCoordinatorImpl(this.clusterConfigurationManager, localMemberId, (ConcurrencyControl)this.managerActor);
        this.configurationRequestServer = new ClusterConfigurationRequestServer(communicationService, new ProtoBufSerializer(), new ClusterConfigurationManagementRequestsHandler(this.configurationChangeCoordinator, localMemberId, (ConcurrencyControl)this.managerActor, enablePartitionScaling));
        this.clusterConfigurationManager.setConfigurationGossiper(this.clusterConfigurationGossiper::updateClusterConfiguration);
    }

    private ClusterConfigurationInitializer getNonCoordinatorInitializer(ClusterMembershipService membershipService, StaticConfiguration staticConfiguration) {
        List<MemberId> otherKnownMembers = staticConfiguration.clusterMembers().stream().filter(m -> !m.equals((Object)staticConfiguration.localMemberId())).toList();
        return new ClusterConfigurationInitializer.FileInitializer(this.configurationFile, new ProtoBufSerializer()).recover(ClusterConfigurationInitializer.InitializerError.PersistedConfigurationIsBroken.class, new ClusterConfigurationInitializer.SyncInitializer(this.clusterConfigurationGossiper, otherKnownMembers, (ConcurrencyControl)this.managerActor, this.clusterConfigurationGossiper::queryClusterConfiguration)).orThen(new ClusterConfigurationInitializer.GossipInitializer(this.clusterConfigurationGossiper, this.persistedClusterConfiguration, this.clusterConfigurationGossiper::updateClusterConfiguration, (ConcurrencyControl)this.managerActor)).andThen(new ExporterStateInitializer(staticConfiguration.partitionConfig().exporting().exporters().keySet(), staticConfiguration.localMemberId(), (ConcurrencyControl)this.managerActor)).andThen(new RoutingStateInitializer(staticConfiguration.enablePartitionScaling(), staticConfiguration.partitionCount()));
    }

    private ClusterConfigurationInitializer getCoordinatorInitializer(StaticConfiguration staticConfiguration) {
        List<MemberId> otherKnownMembers = staticConfiguration.clusterMembers().stream().filter(m -> !m.equals((Object)staticConfiguration.localMemberId())).toList();
        return new ClusterConfigurationInitializer.FileInitializer(this.configurationFile, new ProtoBufSerializer()).orThen(new ClusterConfigurationInitializer.SyncInitializer(this.clusterConfigurationGossiper, otherKnownMembers, (ConcurrencyControl)this.managerActor, this.clusterConfigurationGossiper::queryClusterConfiguration)).orThen(new ClusterConfigurationInitializer.StaticInitializer(staticConfiguration)).andThen(new ExporterStateInitializer(staticConfiguration.partitionConfig().exporting().exporters().keySet(), staticConfiguration.localMemberId(), (ConcurrencyControl)this.managerActor)).andThen(new RoutingStateInitializer(staticConfiguration.enablePartitionScaling(), staticConfiguration.partitionCount()));
    }

    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        return this.startGossiper(actorSchedulingService).andThen(() -> this.startClusterTopologyServices(actorSchedulingService, staticConfiguration), Runnable::run);
    }

    private ActorFuture<Void> startGossiper(ActorSchedulingService actorSchedulingService) {
        return actorSchedulingService.submitActor(this.gossipActor).andThen(this.clusterConfigurationGossiper::start, Runnable::run);
    }

    private CompletableActorFuture<Void> startClusterTopologyServices(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        CompletableActorFuture result = new CompletableActorFuture();
        ClusterConfigurationInitializer clusterConfigurationInitializer = this.isCoordinator ? this.getCoordinatorInitializer(staticConfiguration) : this.getNonCoordinatorInitializer(this.memberShipService, staticConfiguration);
        this.configurationRequestServer.start();
        actorSchedulingService.submitActor(this.managerActor).onComplete((ok, error) -> {
            if (error != null) {
                result.completeExceptionally(error);
            } else {
                this.clusterConfigurationManager.start(clusterConfigurationInitializer).onComplete((BiConsumer)result);
            }
        });
        return result;
    }

    public ActorFuture<ClusterConfiguration> getClusterTopology() {
        return this.clusterConfigurationManager.getClusterConfiguration();
    }

    public Optional<ConfigurationChangeCoordinator> getTopologyChangeCoordinator() {
        return Optional.ofNullable(this.configurationChangeCoordinator);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.configurationRequestServer != null) {
            this.configurationRequestServer.close();
        }
        this.clusterConfigurationGossiper.close();
        return this.managerActor.closeAsync().andThen(() -> ((Actor)this.gossipActor).closeAsync(), Runnable::run);
    }

    public void registerPartitionChangeExecutor(PartitionChangeExecutor partitionChangeExecutor) {
        this.clusterConfigurationManager.registerTopologyChangeAppliers(new ConfigurationChangeAppliersImpl(partitionChangeExecutor, new NoopClusterMembershipChangeExecutor()));
    }

    public void removePartitionChangeExecutor() {
        this.clusterConfigurationManager.removeTopologyChangeAppliers();
    }

    public void registerTopologyChangedListener(ClusterConfigurationManager.InconsistentConfigurationListener listener) {
        this.clusterConfigurationManager.registerTopologyChangedListener(listener);
    }

    public void removeTopologyChangedListener() {
        this.clusterConfigurationManager.removeTopologyChangedListener();
    }

    @Override
    public void addUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener listener) {
        this.clusterConfigurationGossiper.addUpdateListener(listener);
    }

    @Override
    public void removeUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener listener) {
        this.clusterConfigurationGossiper.removeUpdateListener(listener);
    }
}

