/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.topology;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.AsyncClosable;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.topology.ClusterTopologyManager;
import io.camunda.zeebe.topology.ClusterTopologyManagerImpl;
import io.camunda.zeebe.topology.PersistedClusterTopology;
import io.camunda.zeebe.topology.StaticConfiguration;
import io.camunda.zeebe.topology.TopologyInitializer;
import io.camunda.zeebe.topology.TopologyUpdateNotifier;
import io.camunda.zeebe.topology.api.TopologyManagementRequestsHandler;
import io.camunda.zeebe.topology.api.TopologyRequestServer;
import io.camunda.zeebe.topology.changes.NoopTopologyMembershipChangeExecutor;
import io.camunda.zeebe.topology.changes.PartitionChangeExecutor;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliersImpl;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinator;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinatorImpl;
import io.camunda.zeebe.topology.gossip.ClusterTopologyGossiper;
import io.camunda.zeebe.topology.gossip.ClusterTopologyGossiperConfig;
import io.camunda.zeebe.topology.metrics.TopologyManagerMetrics;
import io.camunda.zeebe.topology.metrics.TopologyMetrics;
import io.camunda.zeebe.topology.serializer.ProtoBufSerializer;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.util.FileUtil;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

public final class ClusterTopologyManagerService
implements TopologyUpdateNotifier,
AsyncClosable {
    private static final String COORDINATOR_ID = "0";
    private static final String TOPOLOGY_FILE_NAME = ".topology.meta";
    private final ClusterTopologyManagerImpl clusterTopologyManager;
    private final ClusterTopologyGossiper clusterTopologyGossiper;
    private final ClusterMembershipService memberShipService;
    private final boolean isCoordinator;
    private final PersistedClusterTopology persistedClusterTopology;
    private final Path topologyFile;
    private final TopologyChangeCoordinator topologyChangeCoordinator;
    private final TopologyRequestServer topologyRequestServer;
    private final Actor gossipActor;
    private final Actor managerActor;
    private final TopologyMetrics topologyMetrics;
    private final TopologyManagerMetrics topologyManagerMetrics;

    public ClusterTopologyManagerService(Path dataRootDirectory, ClusterCommunicationService communicationService, ClusterMembershipService memberShipService, ClusterTopologyGossiperConfig config, MeterRegistry meterRegistry) {
        this.memberShipService = memberShipService;
        try {
            FileUtil.ensureDirectoryExists((Path)dataRootDirectory);
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
        MemberId localMemberId = memberShipService.getLocalMember().id();
        this.topologyFile = dataRootDirectory.resolve(TOPOLOGY_FILE_NAME);
        this.persistedClusterTopology = PersistedClusterTopology.ofFile(this.topologyFile, new ProtoBufSerializer());
        this.gossipActor = new Actor(this){};
        this.managerActor = new Actor(this){};
        this.topologyMetrics = new TopologyMetrics(meterRegistry);
        this.topologyManagerMetrics = new TopologyManagerMetrics(meterRegistry);
        this.clusterTopologyManager = new ClusterTopologyManagerImpl((ConcurrencyControl)this.managerActor, localMemberId, this.persistedClusterTopology, this.topologyManagerMetrics);
        this.clusterTopologyGossiper = new ClusterTopologyGossiper((ConcurrencyControl)this.gossipActor, communicationService, memberShipService, new ProtoBufSerializer(), config, this.clusterTopologyManager::onGossipReceived, this.topologyMetrics);
        this.isCoordinator = ((String)((Object)localMemberId.id())).equals(COORDINATOR_ID);
        this.topologyChangeCoordinator = new TopologyChangeCoordinatorImpl(this.clusterTopologyManager, localMemberId, (ConcurrencyControl)this.managerActor);
        this.topologyRequestServer = new TopologyRequestServer(communicationService, new ProtoBufSerializer(), new TopologyManagementRequestsHandler(this.topologyChangeCoordinator, localMemberId, (ConcurrencyControl)this.managerActor));
        this.clusterTopologyManager.setTopologyGossiper(this.clusterTopologyGossiper::updateClusterTopology);
    }

    private TopologyInitializer getNonCoordinatorInitializer(ClusterMembershipService membershipService, StaticConfiguration staticConfiguration) {
        List<MemberId> otherKnownMembers = staticConfiguration.clusterMembers().stream().filter(m -> !m.equals((Object)staticConfiguration.localMemberId())).toList();
        return new TopologyInitializer.FileInitializer(this.topologyFile, new ProtoBufSerializer()).recover(TopologyInitializer.InitializerError.PersistedTopologyIsBroken.class, new TopologyInitializer.SyncInitializer(this.clusterTopologyGossiper, otherKnownMembers, (ConcurrencyControl)this.managerActor, this.clusterTopologyGossiper::queryClusterTopology)).orThen(new TopologyInitializer.RollingUpdateAwareInitializerV83ToV84(membershipService, staticConfiguration, (ConcurrencyControl)this.managerActor)).orThen(new TopologyInitializer.GossipInitializer(this.clusterTopologyGossiper, this.persistedClusterTopology, this.clusterTopologyGossiper::updateClusterTopology, (ConcurrencyControl)this.managerActor));
    }

    private TopologyInitializer getCoordinatorInitializer(StaticConfiguration staticConfiguration) {
        List<MemberId> otherKnownMembers = staticConfiguration.clusterMembers().stream().filter(m -> !m.equals((Object)staticConfiguration.localMemberId())).toList();
        return new TopologyInitializer.FileInitializer(this.topologyFile, new ProtoBufSerializer()).orThen(new TopologyInitializer.RollingUpdateAwareInitializerV83ToV84(this.memberShipService, staticConfiguration, (ConcurrencyControl)this.managerActor)).orThen(new TopologyInitializer.SyncInitializer(this.clusterTopologyGossiper, otherKnownMembers, (ConcurrencyControl)this.managerActor, this.clusterTopologyGossiper::queryClusterTopology)).orThen(new TopologyInitializer.StaticInitializer(staticConfiguration));
    }

    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        return this.startGossiper(actorSchedulingService).andThen(() -> this.startClusterTopologyServices(actorSchedulingService, staticConfiguration), Runnable::run);
    }

    private ActorFuture<Void> startGossiper(ActorSchedulingService actorSchedulingService) {
        return actorSchedulingService.submitActor(this.gossipActor).andThen(this.clusterTopologyGossiper::start, Runnable::run);
    }

    private CompletableActorFuture<Void> startClusterTopologyServices(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        CompletableActorFuture result = new CompletableActorFuture();
        TopologyInitializer topologyInitializer = this.isCoordinator ? this.getCoordinatorInitializer(staticConfiguration) : this.getNonCoordinatorInitializer(this.memberShipService, staticConfiguration);
        this.topologyRequestServer.start();
        actorSchedulingService.submitActor(this.managerActor).onComplete((ok, error) -> {
            if (error != null) {
                result.completeExceptionally(error);
            } else {
                this.clusterTopologyManager.start(topologyInitializer).onComplete((BiConsumer)result);
            }
        });
        return result;
    }

    public ActorFuture<ClusterTopology> getClusterTopology() {
        return this.clusterTopologyManager.getClusterTopology();
    }

    public Optional<TopologyChangeCoordinator> getTopologyChangeCoordinator() {
        return Optional.ofNullable(this.topologyChangeCoordinator);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.topologyRequestServer != null) {
            this.topologyRequestServer.close();
        }
        this.clusterTopologyGossiper.close();
        return this.managerActor.closeAsync().andThen(() -> ((Actor)this.gossipActor).closeAsync(), Runnable::run);
    }

    public void registerPartitionChangeExecutor(PartitionChangeExecutor partitionChangeExecutor) {
        this.clusterTopologyManager.registerTopologyChangeAppliers(new TopologyChangeAppliersImpl(partitionChangeExecutor, new NoopTopologyMembershipChangeExecutor()));
    }

    public void removePartitionChangeExecutor() {
        this.clusterTopologyManager.removeTopologyChangeAppliers();
    }

    public void registerTopologyChangedListener(ClusterTopologyManager.InconsistentTopologyListener listener) {
        this.clusterTopologyManager.registerTopologyChangedListener(listener);
    }

    public void removeTopologyChangedListener() {
        this.clusterTopologyManager.removeTopologyChangedListener();
    }

    @Override
    public void addUpdateListener(TopologyUpdateNotifier.TopologyUpdateListener listener) {
        this.clusterTopologyGossiper.addUpdateListener(listener);
    }

    @Override
    public void removeUpdateListener(TopologyUpdateNotifier.TopologyUpdateListener listener) {
        this.clusterTopologyGossiper.removeUpdateListener(listener);
    }
}

