/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.topology.changes;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.NodeId;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.ClusterTopologyManager;
import io.camunda.zeebe.topology.api.TopologyRequestFailedException;
import io.camunda.zeebe.topology.changes.NoopPartitionChangeExecutor;
import io.camunda.zeebe.topology.changes.NoopTopologyMembershipChangeExecutor;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliers;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliersImpl;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinator;
import io.camunda.zeebe.topology.state.ClusterChangePlan;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.topology.state.CompletedChange;
import io.camunda.zeebe.topology.state.TopologyChangeOperation;
import io.camunda.zeebe.util.Either;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyChangeCoordinatorImpl
implements TopologyChangeCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyChangeCoordinatorImpl.class);
    private final ClusterTopologyManager clusterTopologyManager;
    private final ConcurrencyControl executor;
    private final MemberId localMemberId;

    public TopologyChangeCoordinatorImpl(ClusterTopologyManager clusterTopologyManager, MemberId localMemberId, ConcurrencyControl executor) {
        this.clusterTopologyManager = clusterTopologyManager;
        this.executor = executor;
        this.localMemberId = localMemberId;
    }

    @Override
    public ActorFuture<ClusterTopology> getTopology() {
        return this.clusterTopologyManager.getClusterTopology();
    }

    @Override
    public ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> applyOperations(TopologyChangeCoordinator.TopologyChangeRequest request) {
        return this.applyOrDryRun(false, request);
    }

    @Override
    public ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> simulateOperations(TopologyChangeCoordinator.TopologyChangeRequest request) {
        return this.applyOrDryRun(true, request);
    }

    @Override
    public ActorFuture<ClusterTopology> cancelChange(long changeId) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> this.clusterTopologyManager.updateClusterTopology(clusterTopology -> {
            if (!this.validateCancel(changeId, (ClusterTopology)clusterTopology, (ActorFuture<ClusterTopology>)future)) {
                return clusterTopology;
            }
            List completedOperation = clusterTopology.pendingChanges().map(ClusterChangePlan::completedOperations).orElse(List.of());
            List cancelledOperations = clusterTopology.pendingChanges().map(ClusterChangePlan::pendingOperations).orElse(List.of());
            LOG.warn("Cancelling topology change '{}'. Following operations have been already applied: {}. Following pending operations won't be applied: {}", new Object[]{changeId, completedOperation, cancelledOperations});
            ClusterTopology cancelledTopology = clusterTopology.cancelPendingChanges();
            future.complete((Object)cancelledTopology);
            return cancelledTopology;
        }));
        return future;
    }

    private ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> applyOrDryRun(boolean dryRun, TopologyChangeCoordinator.TopologyChangeRequest request) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> this.clusterTopologyManager.getClusterTopology().onComplete((currentClusterTopology, errorOnGettingTopology) -> {
            if (errorOnGettingTopology != null) {
                this.failFuture((ActorFuture<?>)future, (Throwable)errorOnGettingTopology);
                return;
            }
            if (!request.isForced() && !this.isCoordinator((ClusterTopology)currentClusterTopology)) {
                this.failFuture(future, new TopologyRequestFailedException.InternalError(String.format("Cannot process request to change the topology. The broker '%s' is not the coordinator.", this.localMemberId)));
                return;
            }
            Either<Exception, List<TopologyChangeOperation>> generatedOperations = request.operations((ClusterTopology)currentClusterTopology);
            if (generatedOperations.isLeft()) {
                this.failFuture(future, (Throwable)generatedOperations.getLeft());
                return;
            }
            this.applyOrDryRunOnTopology(dryRun, (ClusterTopology)currentClusterTopology, (List)generatedOperations.get(), (ActorFuture<TopologyChangeCoordinator.TopologyChangeResult>)future);
        }, (Executor)this.executor));
        return future;
    }

    private void applyOrDryRunOnTopology(boolean dryRun, ClusterTopology currentClusterTopology, List<TopologyChangeOperation> operations, ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> future) {
        if (operations.isEmpty()) {
            future.complete((Object)new TopologyChangeCoordinator.TopologyChangeResult(currentClusterTopology, currentClusterTopology, currentClusterTopology.lastChange().map(CompletedChange::id).orElse(0L), operations));
            return;
        }
        ActorFuture<ClusterTopology> validation = this.validateTopologyChangeRequest(currentClusterTopology, operations);
        validation.onComplete((simulatedFinalTopology, validationError) -> {
            if (validationError != null) {
                this.failFuture(future, (Throwable)validationError);
                return;
            }
            ActorFuture applyFuture = this.executor.createFuture();
            if (dryRun) {
                applyFuture.complete((Object)currentClusterTopology.startTopologyChange(operations));
            } else {
                this.applyTopologyChange(operations, currentClusterTopology, (ClusterTopology)simulatedFinalTopology, (ActorFuture<ClusterTopology>)applyFuture);
            }
            applyFuture.onComplete((clusterTopologyWithPendingChanges, error) -> {
                if (error == null) {
                    long changeId = clusterTopologyWithPendingChanges.pendingChanges().map(ClusterChangePlan::id).orElse(0L);
                    future.complete((Object)new TopologyChangeCoordinator.TopologyChangeResult(currentClusterTopology, (ClusterTopology)simulatedFinalTopology, changeId, operations));
                } else {
                    this.failFuture(future, (Throwable)error);
                }
            });
        });
    }

    private ActorFuture<ClusterTopology> validateTopologyChangeRequest(ClusterTopology currentClusterTopology, List<TopologyChangeOperation> operations) {
        ActorFuture validationFuture = this.executor.createFuture();
        if (currentClusterTopology.isUninitialized()) {
            this.failFuture(validationFuture, new TopologyRequestFailedException.OperationNotAllowed("Cannot apply topology change. The topology is not initialized."));
        } else if (currentClusterTopology.hasPendingChanges()) {
            this.failFuture(validationFuture, new TopologyRequestFailedException.ConcurrentModificationException(String.format("Cannot apply topology change. Another topology change [%s] is in progress.", currentClusterTopology)));
        } else {
            TopologyChangeAppliersImpl topologyChangeSimulator = new TopologyChangeAppliersImpl(new NoopPartitionChangeExecutor(), new NoopTopologyMembershipChangeExecutor());
            ClusterTopology topologyWithPendingOperations = currentClusterTopology.startTopologyChange(operations);
            this.simulateTopologyChange(topologyWithPendingOperations, topologyChangeSimulator, (ActorFuture<ClusterTopology>)validationFuture);
        }
        return validationFuture;
    }

    private void applyTopologyChange(List<TopologyChangeOperation> operations, ClusterTopology currentClusterTopology, ClusterTopology simulatedFinalTopology, ActorFuture<ClusterTopology> future) {
        this.executor.run(() -> this.clusterTopologyManager.updateClusterTopology(clusterTopology -> {
            if (!clusterTopology.equals(currentClusterTopology)) {
                throw new TopologyRequestFailedException.ConcurrentModificationException("Topology changed while applying the change. Please retry.");
            }
            return clusterTopology.startTopologyChange(operations);
        }).onComplete((topologyWithPendingOperations, errorOnUpdatingTopology) -> {
            if (errorOnUpdatingTopology != null) {
                this.failFuture(future, (Throwable)errorOnUpdatingTopology);
                return;
            }
            LOG.debug("Applying the topology change has started. The resulting topology will be {}", (Object)simulatedFinalTopology);
            future.complete(topologyWithPendingOperations);
        }));
    }

    private void simulateTopologyChange(ClusterTopology updatedTopology, TopologyChangeAppliersImpl topologyChangeSimulator, ActorFuture<ClusterTopology> simulationCompleted) {
        if (!updatedTopology.hasPendingChanges()) {
            simulationCompleted.complete((Object)updatedTopology);
            return;
        }
        TopologyChangeOperation operation = updatedTopology.nextPendingOperation();
        TopologyChangeAppliers.ClusterOperationApplier applier = topologyChangeSimulator.getApplier(operation);
        Either<Exception, UnaryOperator<ClusterTopology>> result = applier.init(updatedTopology);
        if (result.isLeft()) {
            this.failFuture(simulationCompleted, new TopologyRequestFailedException.InvalidRequest((Throwable)result.getLeft()));
            return;
        }
        ClusterTopology initializedChanges = (ClusterTopology)((UnaryOperator)result.get()).apply(updatedTopology);
        applier.apply().onComplete((topologyUpdater, error) -> {
            if (error != null) {
                this.failFuture(simulationCompleted, new TopologyRequestFailedException.InvalidRequest((Throwable)error));
                return;
            }
            ClusterTopology newTopology = initializedChanges.advanceTopologyChange((UnaryOperator<ClusterTopology>)topologyUpdater);
            this.simulateTopologyChange(newTopology, topologyChangeSimulator, simulationCompleted);
        });
    }

    private void failFuture(ActorFuture<?> future, Throwable error) {
        LOG.warn("Failed to handle topology request", error);
        if (error instanceof TopologyRequestFailedException) {
            future.completeExceptionally(error);
        } else {
            future.completeExceptionally((Throwable)new TopologyRequestFailedException.InternalError(error));
        }
    }

    private boolean validateCancel(long changeId, ClusterTopology currentClusterTopology, ActorFuture<ClusterTopology> future) {
        if (currentClusterTopology.isUninitialized()) {
            this.failFuture(future, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because the topology is not initialized"));
            return false;
        }
        if (!currentClusterTopology.hasPendingChanges()) {
            this.failFuture(future, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because no change is in progress"));
            return false;
        }
        ClusterChangePlan clusterChangePlan = currentClusterTopology.pendingChanges().orElseThrow();
        if (clusterChangePlan.id() != changeId) {
            this.failFuture(future, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because it is not the current change"));
            return false;
        }
        return true;
    }

    private boolean isCoordinator(ClusterTopology clusterTopology) {
        return this.localMemberId.equals(clusterTopology.members().keySet().stream().min(NodeId::compareTo).orElse(null));
    }
}

