/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.dynamic.config;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationInitializer;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.PersistedClusterConfiguration;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliers;
import io.camunda.zeebe.dynamic.config.metrics.TopologyMetrics;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.dynamic.config.state.ClusterConfigurationChangeOperation;
import io.camunda.zeebe.dynamic.config.state.MemberState;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.ExponentialBackoffRetryDelay;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ClusterConfigurationManagerImpl
implements ClusterConfigurationManager {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigurationManagerImpl.class);
    private static final Duration MIN_RETRY_DELAY = Duration.ofSeconds(10L);
    private static final Duration MAX_RETRY_DELAY = Duration.ofMinutes(1L);
    private final ConcurrencyControl executor;
    private final PersistedClusterConfiguration persistedClusterConfiguration;
    private Consumer<ClusterConfiguration> configurationGossiper;
    private final ActorFuture<Void> startFuture;
    private ConfigurationChangeAppliers changeAppliers;
    private ClusterConfigurationManager.InconsistentConfigurationListener onInconsistentConfigurationDetected;
    private final MemberId localMemberId;
    private boolean onGoingConfigurationChangeOperation = false;
    private boolean shouldRetry = false;
    private final ExponentialBackoffRetryDelay backoffRetry;
    private boolean initialized = false;

    ClusterConfigurationManagerImpl(ConcurrencyControl executor, MemberId localMemberId, PersistedClusterConfiguration persistedClusterConfiguration) {
        this(executor, localMemberId, persistedClusterConfiguration, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
    }

    ClusterConfigurationManagerImpl(ConcurrencyControl executor, MemberId localMemberId, PersistedClusterConfiguration persistedClusterConfiguration, Duration minRetryDelay, Duration maxRetryDelay) {
        this.executor = executor;
        this.persistedClusterConfiguration = persistedClusterConfiguration;
        this.startFuture = executor.createFuture();
        this.localMemberId = localMemberId;
        this.backoffRetry = new ExponentialBackoffRetryDelay(maxRetryDelay, minRetryDelay);
    }

    @Override
    public ActorFuture<ClusterConfiguration> getClusterConfiguration() {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> future.complete((Object)this.persistedClusterConfiguration.getConfiguration()));
        return future;
    }

    @Override
    public ActorFuture<ClusterConfiguration> updateClusterConfiguration(UnaryOperator<ClusterConfiguration> configUpdater) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> {
            try {
                ClusterConfiguration updatedConfiguration = (ClusterConfiguration)configUpdater.apply(this.persistedClusterConfiguration.getConfiguration());
                this.updateLocalConfiguration(updatedConfiguration).ifRightOrLeft(updated -> {
                    future.complete(updated);
                    this.applyConfigurationChangeOperation(updatedConfiguration);
                }, arg_0 -> ((ActorFuture)future).completeExceptionally(arg_0));
            }
            catch (Exception e) {
                LOG.error("Failed to update cluster configuration", (Throwable)e);
                future.completeExceptionally((Throwable)e);
            }
        });
        return future;
    }

    ActorFuture<Void> start(ClusterConfigurationInitializer clusterConfigurationInitializer) {
        this.executor.run(() -> {
            if (this.startFuture.isDone()) {
                return;
            }
            this.initialize(clusterConfigurationInitializer);
        });
        return this.startFuture;
    }

    public void setConfigurationGossiper(Consumer<ClusterConfiguration> configurationGossiper) {
        this.configurationGossiper = configurationGossiper;
    }

    private void initialize(ClusterConfigurationInitializer clusterConfigurationInitializer) {
        clusterConfigurationInitializer.initialize().onComplete((configuration, error) -> {
            if (error != null) {
                LOG.error("Failed to initialize configuration", error);
                this.startFuture.completeExceptionally(error);
            } else if (configuration.isUninitialized()) {
                String errorMessage = "Expected to initialize configuration, but got uninitialized configuration";
                LOG.error("Expected to initialize configuration, but got uninitialized configuration");
                this.startFuture.completeExceptionally((Throwable)new IllegalStateException("Expected to initialize configuration, but got uninitialized configuration"));
            } else {
                try {
                    this.persistedClusterConfiguration.update(configuration.merge(this.persistedClusterConfiguration.getConfiguration()));
                    LOG.debug("Initialized cluster configuration '{}'", (Object)this.persistedClusterConfiguration.getConfiguration());
                    this.configurationGossiper.accept(this.persistedClusterConfiguration.getConfiguration());
                    this.setStarted();
                }
                catch (IOException e) {
                    this.startFuture.completeExceptionally("Failed to start update cluster configuration", (Throwable)e);
                }
            }
        });
    }

    private void setStarted() {
        if (!this.startFuture.isDone()) {
            this.initialized = true;
            this.startFuture.complete(null);
        }
    }

    void onGossipReceived(ClusterConfiguration receivedConfiguration) {
        this.executor.run(() -> {
            if (!this.initialized) {
                LOG.trace("Received configuration {} before ClusterConfigurationManager is initialized.", (Object)receivedConfiguration);
                this.configurationGossiper.accept(receivedConfiguration);
                return;
            }
            try {
                ClusterConfiguration mergedConfiguration;
                if (receivedConfiguration != null && !(mergedConfiguration = this.persistedClusterConfiguration.getConfiguration().merge(receivedConfiguration)).equals(this.persistedClusterConfiguration.getConfiguration())) {
                    LOG.debug("Received new configuration {}. Updating local configuration to {}", (Object)receivedConfiguration, (Object)mergedConfiguration);
                    ClusterConfiguration oldConfiguration = this.persistedClusterConfiguration.getConfiguration();
                    boolean isConflictingConfiguration = this.isConflictingConfiguration(mergedConfiguration, oldConfiguration);
                    this.persistedClusterConfiguration.update(mergedConfiguration);
                    if (isConflictingConfiguration && this.onInconsistentConfigurationDetected != null) {
                        this.onInconsistentConfigurationDetected.onInconsistentConfiguration(mergedConfiguration, oldConfiguration);
                    }
                    this.configurationGossiper.accept(mergedConfiguration);
                    this.applyConfigurationChangeOperation(mergedConfiguration);
                }
            }
            catch (IOException error) {
                LOG.warn("Failed to process cluster configuration received via gossip. '{}'", (Object)receivedConfiguration, (Object)error);
            }
        });
    }

    private boolean isConflictingConfiguration(ClusterConfiguration mergedConfiguration, ClusterConfiguration oldConfiguration) {
        if (!mergedConfiguration.hasMember(this.localMemberId) && oldConfiguration.hasMember(this.localMemberId) && oldConfiguration.getMember(this.localMemberId).state() == MemberState.State.LEFT) {
            return false;
        }
        return !Objects.equals(mergedConfiguration.getMember(this.localMemberId), oldConfiguration.getMember(this.localMemberId));
    }

    private boolean shouldApplyConfigurationChangeOperation(ClusterConfiguration mergedConfiguration) {
        return (!this.onGoingConfigurationChangeOperation || this.shouldRetry) && mergedConfiguration.pendingChangesFor(this.localMemberId).isPresent() && this.changeAppliers != null;
    }

    private void applyConfigurationChangeOperation(ClusterConfiguration mergedConfiguration) {
        if (!this.shouldApplyConfigurationChangeOperation(mergedConfiguration)) {
            return;
        }
        this.onGoingConfigurationChangeOperation = true;
        this.shouldRetry = false;
        ClusterConfigurationChangeOperation operation = mergedConfiguration.pendingChangesFor(this.localMemberId).orElseThrow();
        TopologyMetrics.OperationObserver observer = TopologyMetrics.observeOperation(operation);
        LOG.info("Applying configuration change operation {}", (Object)operation);
        ConfigurationChangeAppliers.ClusterOperationApplier operationApplier = this.changeAppliers.getApplier(operation);
        Either operationInitialized = operationApplier.init(mergedConfiguration).map(transformer -> (ClusterConfiguration)transformer.apply(mergedConfiguration)).flatMap(this::updateLocalConfiguration);
        if (operationInitialized.isLeft()) {
            observer.failed();
            this.onGoingConfigurationChangeOperation = false;
            LOG.error("Failed to initialize configuration change operation {}", (Object)operation, operationInitialized.getLeft());
            return;
        }
        ClusterConfiguration initializedConfiguration = (ClusterConfiguration)operationInitialized.get();
        operationApplier.apply().onComplete((transformer, error) -> this.onOperationApplied(initializedConfiguration, operation, (UnaryOperator<ClusterConfiguration>)transformer, (Throwable)error, observer));
    }

    private void logAndScheduleRetry(ClusterConfigurationChangeOperation operation, Throwable error) {
        this.shouldRetry = true;
        Duration delay = this.backoffRetry.nextDelay();
        LOG.warn("Failed to apply configuration change operation {}. Will be retried in {}.", new Object[]{operation, delay, error});
        this.executor.schedule(delay, () -> {
            LOG.debug("Retrying last applied operation");
            this.applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration());
        });
    }

    private void onOperationApplied(ClusterConfiguration topologyOnWhichOperationIsApplied, ClusterConfigurationChangeOperation operation, UnaryOperator<ClusterConfiguration> transformer, Throwable error, TopologyMetrics.OperationObserver observer) {
        this.onGoingConfigurationChangeOperation = false;
        if (error == null) {
            observer.applied();
            this.backoffRetry.reset();
            if (this.persistedClusterConfiguration.getConfiguration().version() != topologyOnWhichOperationIsApplied.version()) {
                LOG.debug("Configuration changed while applying operation {}. Expected configuration is {}. Current configuration is {}. Most likely the change operation was cancelled.", new Object[]{operation, topologyOnWhichOperationIsApplied, this.persistedClusterConfiguration.getConfiguration()});
                return;
            }
            this.updateLocalConfiguration(this.persistedClusterConfiguration.getConfiguration().advanceConfigurationChange(transformer));
            LOG.info("Operation {} applied. Updated local configuration to {}", (Object)operation, (Object)this.persistedClusterConfiguration.getConfiguration());
            this.executor.run(() -> this.applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration()));
        } else {
            observer.failed();
            this.logAndScheduleRetry(operation, error);
        }
    }

    private Either<Exception, ClusterConfiguration> updateLocalConfiguration(ClusterConfiguration configuration) {
        if (configuration.equals(this.persistedClusterConfiguration.getConfiguration())) {
            return Either.right((Object)configuration);
        }
        try {
            this.persistedClusterConfiguration.update(configuration);
            this.configurationGossiper.accept(configuration);
            return Either.right((Object)configuration);
        }
        catch (Exception e) {
            return Either.left((Object)e);
        }
    }

    void registerTopologyChangeAppliers(ConfigurationChangeAppliers configurationChangeAppliers) {
        this.executor.run(() -> {
            this.changeAppliers = configurationChangeAppliers;
            this.applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration());
        });
    }

    void removeTopologyChangeAppliers() {
        this.executor.run(() -> {
            this.changeAppliers = null;
        });
    }

    void registerTopologyChangedListener(ClusterConfigurationManager.InconsistentConfigurationListener listener) {
        this.executor.run(() -> {
            this.onInconsistentConfigurationDetected = listener;
        });
    }

    void removeTopologyChangedListener() {
        this.executor.run(() -> {
            this.onInconsistentConfigurationDetected = null;
        });
    }
}

