/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dynamic.config.changes;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.NodeId;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationRequestFailedException;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliers;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliersImpl;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator;
import io.camunda.zeebe.dynamic.config.changes.NoopClusterMembershipChangeExecutor;
import io.camunda.zeebe.dynamic.config.changes.NoopPartitionChangeExecutor;
import io.camunda.zeebe.dynamic.config.state.ClusterChangePlan;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.dynamic.config.state.ClusterConfigurationChangeOperation;
import io.camunda.zeebe.dynamic.config.state.CompletedChange;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.Either;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigurationChangeCoordinatorImpl
implements ConfigurationChangeCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationChangeCoordinatorImpl.class);
    private final ClusterConfigurationManager clusterTopologyManager;
    private final ConcurrencyControl executor;
    private final MemberId localMemberId;

    public ConfigurationChangeCoordinatorImpl(ClusterConfigurationManager clusterTopologyManager, MemberId localMemberId, ConcurrencyControl executor) {
        this.clusterTopologyManager = clusterTopologyManager;
        this.executor = executor;
        this.localMemberId = localMemberId;
    }

    @Override
    public ActorFuture<ClusterConfiguration> getClusterConfiguration() {
        return this.clusterTopologyManager.getClusterConfiguration();
    }

    @Override
    public ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> applyOperations(ConfigurationChangeCoordinator.ConfigurationChangeRequest request) {
        return this.applyOrDryRun(false, request);
    }

    @Override
    public ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> simulateOperations(ConfigurationChangeCoordinator.ConfigurationChangeRequest request) {
        return this.applyOrDryRun(true, request);
    }

    @Override
    public ActorFuture<ClusterConfiguration> cancelChange(long changeId) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> this.clusterTopologyManager.updateClusterConfiguration(clusterTopology -> {
            if (!this.validateCancel(changeId, (ClusterConfiguration)clusterTopology, (ActorFuture<ClusterConfiguration>)future)) {
                return clusterTopology;
            }
            List completedOperation = clusterTopology.pendingChanges().map(ClusterChangePlan::completedOperations).orElse(List.of());
            List cancelledOperations = clusterTopology.pendingChanges().map(ClusterChangePlan::pendingOperations).orElse(List.of());
            LOG.warn("Cancelling configuration change '{}'. Following operations have been already applied: {}. Following pending operations won't be applied: {}", new Object[]{changeId, completedOperation, cancelledOperations});
            ClusterConfiguration cancelledTopology = clusterTopology.cancelPendingChanges();
            future.complete((Object)cancelledTopology);
            return cancelledTopology;
        }));
        return future;
    }

    private ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> applyOrDryRun(boolean dryRun, ConfigurationChangeCoordinator.ConfigurationChangeRequest request) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> this.clusterTopologyManager.getClusterConfiguration().onComplete((currentClusterTopology, errorOnGettingTopology) -> {
            if (errorOnGettingTopology != null) {
                this.failFuture((ActorFuture<?>)future, (Throwable)errorOnGettingTopology);
                return;
            }
            if (!request.isForced() && !this.isCoordinator((ClusterConfiguration)currentClusterTopology)) {
                this.failFuture(future, new InternalError(String.format("Cannot process request to change the configuration. The broker '%s' is not the coordinator.", this.localMemberId)));
                return;
            }
            Either<Exception, List<ClusterConfigurationChangeOperation>> generatedOperations = request.operations((ClusterConfiguration)currentClusterTopology);
            if (generatedOperations.isLeft()) {
                this.failFuture(future, (Throwable)generatedOperations.getLeft());
                return;
            }
            this.applyOrDryRunOnTopology(dryRun, (ClusterConfiguration)currentClusterTopology, (List)generatedOperations.get(), (ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult>)future);
        }, (Executor)this.executor));
        return future;
    }

    private void applyOrDryRunOnTopology(boolean dryRun, ClusterConfiguration currentClusterConfiguration, List<ClusterConfigurationChangeOperation> operations, ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> future) {
        if (operations.isEmpty()) {
            future.complete((Object)new ConfigurationChangeCoordinator.ConfigurationChangeResult(currentClusterConfiguration, currentClusterConfiguration, currentClusterConfiguration.lastChange().map(CompletedChange::id).orElse(0L), operations));
            return;
        }
        ActorFuture<ClusterConfiguration> validation = this.validateTopologyChangeRequest(currentClusterConfiguration, operations);
        validation.onComplete((simulatedFinalTopology, validationError) -> {
            if (validationError != null) {
                this.failFuture(future, (Throwable)validationError);
                return;
            }
            ActorFuture applyFuture = this.executor.createFuture();
            if (dryRun) {
                applyFuture.complete((Object)currentClusterConfiguration.startConfigurationChange(operations));
            } else {
                this.applyTopologyChange(operations, currentClusterConfiguration, (ClusterConfiguration)simulatedFinalTopology, (ActorFuture<ClusterConfiguration>)applyFuture);
            }
            applyFuture.onComplete((clusterTopologyWithPendingChanges, error) -> {
                if (error == null) {
                    long changeId = clusterTopologyWithPendingChanges.pendingChanges().map(ClusterChangePlan::id).orElse(0L);
                    future.complete((Object)new ConfigurationChangeCoordinator.ConfigurationChangeResult(currentClusterConfiguration, (ClusterConfiguration)simulatedFinalTopology, changeId, operations));
                } else {
                    this.failFuture(future, (Throwable)error);
                }
            });
        });
    }

    private ActorFuture<ClusterConfiguration> validateTopologyChangeRequest(ClusterConfiguration currentClusterConfiguration, List<ClusterConfigurationChangeOperation> operations) {
        ActorFuture validationFuture = this.executor.createFuture();
        if (currentClusterConfiguration.isUninitialized()) {
            this.failFuture(validationFuture, new ClusterConfigurationRequestFailedException.OperationNotAllowed("Cannot apply configuration change. The configuration is not initialized."));
        } else if (currentClusterConfiguration.hasPendingChanges()) {
            this.failFuture(validationFuture, new ClusterConfigurationRequestFailedException.ConcurrentModificationException(String.format("Cannot apply configuration change. Another configuration change [%s] is in progress.", currentClusterConfiguration)));
        } else {
            ConfigurationChangeAppliersImpl topologyChangeSimulator = new ConfigurationChangeAppliersImpl(new NoopPartitionChangeExecutor(), new NoopClusterMembershipChangeExecutor());
            ClusterConfiguration topologyWithPendingOperations = currentClusterConfiguration.startConfigurationChange(operations);
            this.simulateTopologyChange(topologyWithPendingOperations, topologyChangeSimulator, (ActorFuture<ClusterConfiguration>)validationFuture);
        }
        return validationFuture;
    }

    private void applyTopologyChange(List<ClusterConfigurationChangeOperation> operations, ClusterConfiguration currentClusterConfiguration, ClusterConfiguration simulatedFinalTopology, ActorFuture<ClusterConfiguration> future) {
        this.executor.run(() -> this.clusterTopologyManager.updateClusterConfiguration(clusterTopology -> {
            if (!clusterTopology.equals(currentClusterConfiguration)) {
                throw new ClusterConfigurationRequestFailedException.ConcurrentModificationException("Topology changed while applying the change. Please retry.");
            }
            return clusterTopology.startConfigurationChange(operations);
        }).onComplete((topologyWithPendingOperations, errorOnUpdatingTopology) -> {
            if (errorOnUpdatingTopology != null) {
                this.failFuture(future, (Throwable)errorOnUpdatingTopology);
                return;
            }
            LOG.debug("Applying the topology change has started. The resulting topology will be {}", (Object)simulatedFinalTopology);
            future.complete(topologyWithPendingOperations);
        }));
    }

    private void simulateTopologyChange(ClusterConfiguration updatedTopology, ConfigurationChangeAppliersImpl topologyChangeSimulator, ActorFuture<ClusterConfiguration> simulationCompleted) {
        if (!updatedTopology.hasPendingChanges()) {
            simulationCompleted.complete((Object)updatedTopology);
            return;
        }
        ClusterConfigurationChangeOperation operation = updatedTopology.nextPendingOperation();
        ConfigurationChangeAppliers.ClusterOperationApplier applier = topologyChangeSimulator.getApplier(operation);
        Either<Exception, UnaryOperator<ClusterConfiguration>> result = applier.init(updatedTopology);
        if (result.isLeft()) {
            this.failFuture(simulationCompleted, new ClusterConfigurationRequestFailedException.InvalidRequest((Throwable)result.getLeft()));
            return;
        }
        ClusterConfiguration initializedChanges = (ClusterConfiguration)((UnaryOperator)result.get()).apply(updatedTopology);
        applier.apply().onComplete((topologyUpdater, error) -> {
            if (error != null) {
                this.failFuture(simulationCompleted, new ClusterConfigurationRequestFailedException.InvalidRequest((Throwable)error));
                return;
            }
            ClusterConfiguration newTopology = initializedChanges.advanceConfigurationChange((UnaryOperator<ClusterConfiguration>)topologyUpdater);
            this.simulateTopologyChange(newTopology, topologyChangeSimulator, simulationCompleted);
        });
    }

    private void failFuture(ActorFuture<?> future, Throwable error) {
        LOG.warn("Failed to handle topology request", error);
        if (error instanceof ClusterConfigurationRequestFailedException) {
            future.completeExceptionally(error);
        } else {
            future.completeExceptionally((Throwable)new ClusterConfigurationRequestFailedException.InternalError(error));
        }
    }

    private boolean validateCancel(long changeId, ClusterConfiguration currentClusterConfiguration, ActorFuture<ClusterConfiguration> future) {
        if (currentClusterConfiguration.isUninitialized()) {
            this.failFuture(future, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because the topology is not initialized"));
            return false;
        }
        if (!currentClusterConfiguration.hasPendingChanges()) {
            this.failFuture(future, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because no change is in progress"));
            return false;
        }
        ClusterChangePlan clusterChangePlan = currentClusterConfiguration.pendingChanges().orElseThrow();
        if (clusterChangePlan.id() != changeId) {
            this.failFuture(future, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + changeId + " because it is not the current change"));
            return false;
        }
        return true;
    }

    private boolean isCoordinator(ClusterConfiguration clusterConfiguration) {
        return this.localMemberId.equals(clusterConfiguration.members().keySet().stream().min(NodeId::compareTo).orElse(null));
    }
}

