/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.topology;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.ClusterTopologyManager;
import io.camunda.zeebe.topology.PersistedClusterTopology;
import io.camunda.zeebe.topology.TopologyInitializer;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliers;
import io.camunda.zeebe.topology.metrics.TopologyManagerMetrics;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.topology.state.MemberState;
import io.camunda.zeebe.topology.state.TopologyChangeOperation;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.ExponentialBackoffRetryDelay;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ClusterTopologyManagerImpl
implements ClusterTopologyManager {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterTopologyManagerImpl.class);
    private static final Duration MIN_RETRY_DELAY = Duration.ofSeconds(10L);
    private static final Duration MAX_RETRY_DELAY = Duration.ofMinutes(1L);
    private final ConcurrencyControl executor;
    private final PersistedClusterTopology persistedClusterTopology;
    private Consumer<ClusterTopology> topologyGossiper;
    private final ActorFuture<Void> startFuture;
    private TopologyChangeAppliers changeAppliers;
    private ClusterTopologyManager.InconsistentTopologyListener onInconsistentTopologyDetected;
    private final MemberId localMemberId;
    private boolean onGoingTopologyChangeOperation = false;
    private boolean shouldRetry = false;
    private final ExponentialBackoffRetryDelay backoffRetry;
    private boolean initialized = false;
    private final TopologyManagerMetrics topologyManagerMetrics;

    ClusterTopologyManagerImpl(ConcurrencyControl executor, MemberId localMemberId, PersistedClusterTopology persistedClusterTopology, TopologyManagerMetrics topologyManagerMetrics) {
        this(executor, localMemberId, persistedClusterTopology, topologyManagerMetrics, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
    }

    ClusterTopologyManagerImpl(ConcurrencyControl executor, MemberId localMemberId, PersistedClusterTopology persistedClusterTopology, TopologyManagerMetrics topologyMetrics, Duration minRetryDelay, Duration maxRetryDelay) {
        this.topologyManagerMetrics = topologyMetrics;
        this.executor = executor;
        this.persistedClusterTopology = persistedClusterTopology;
        this.startFuture = executor.createFuture();
        this.localMemberId = localMemberId;
        this.backoffRetry = new ExponentialBackoffRetryDelay(maxRetryDelay, minRetryDelay);
    }

    @Override
    public ActorFuture<ClusterTopology> getClusterTopology() {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> future.complete((Object)this.persistedClusterTopology.getTopology()));
        return future;
    }

    @Override
    public ActorFuture<ClusterTopology> updateClusterTopology(UnaryOperator<ClusterTopology> topologyUpdated) {
        ActorFuture future = this.executor.createFuture();
        this.executor.run(() -> {
            try {
                ClusterTopology updatedTopology = (ClusterTopology)topologyUpdated.apply(this.persistedClusterTopology.getTopology());
                this.updateLocalTopology(updatedTopology).ifRightOrLeft(updated -> {
                    future.complete(updated);
                    this.applyTopologyChangeOperation(updatedTopology);
                }, arg_0 -> ((ActorFuture)future).completeExceptionally(arg_0));
            }
            catch (Exception e) {
                LOG.error("Failed to update cluster topology", (Throwable)e);
                future.completeExceptionally((Throwable)e);
            }
        });
        return future;
    }

    ActorFuture<Void> start(TopologyInitializer topologyInitializer) {
        this.executor.run(() -> {
            if (this.startFuture.isDone()) {
                return;
            }
            this.initialize(topologyInitializer);
        });
        return this.startFuture;
    }

    public void setTopologyGossiper(Consumer<ClusterTopology> topologyGossiper) {
        this.topologyGossiper = topologyGossiper;
    }

    private void initialize(TopologyInitializer topologyInitializer) {
        topologyInitializer.initialize().onComplete((topology, error) -> {
            if (error != null) {
                LOG.error("Failed to initialize topology", error);
                this.startFuture.completeExceptionally(error);
            } else if (topology.isUninitialized()) {
                String errorMessage = "Expected to initialize topology, but got uninitialized topology";
                LOG.error("Expected to initialize topology, but got uninitialized topology");
                this.startFuture.completeExceptionally((Throwable)new IllegalStateException("Expected to initialize topology, but got uninitialized topology"));
            } else {
                try {
                    this.persistedClusterTopology.update(topology.merge(this.persistedClusterTopology.getTopology()));
                    LOG.debug("Initialized topology '{}'", (Object)this.persistedClusterTopology.getTopology());
                    this.topologyGossiper.accept(this.persistedClusterTopology.getTopology());
                    this.setStarted();
                }
                catch (IOException e) {
                    this.startFuture.completeExceptionally("Failed to start update cluster topology", (Throwable)e);
                }
            }
        });
    }

    private void setStarted() {
        if (!this.startFuture.isDone()) {
            this.initialized = true;
            this.startFuture.complete(null);
        }
    }

    void onGossipReceived(ClusterTopology receivedTopology) {
        this.executor.run(() -> {
            if (!this.initialized) {
                LOG.trace("Received topology {} before ClusterTopologyManager is initialized.", (Object)receivedTopology);
                this.topologyGossiper.accept(receivedTopology);
                return;
            }
            try {
                ClusterTopology mergedTopology;
                if (receivedTopology != null && !(mergedTopology = this.persistedClusterTopology.getTopology().merge(receivedTopology)).equals(this.persistedClusterTopology.getTopology())) {
                    LOG.debug("Received new topology {}. Updating local topology to {}", (Object)receivedTopology, (Object)mergedTopology);
                    ClusterTopology oldTopology = this.persistedClusterTopology.getTopology();
                    boolean isConflictingTopology = this.isConflictingTopology(mergedTopology, oldTopology);
                    this.persistedClusterTopology.update(mergedTopology);
                    if (isConflictingTopology && this.onInconsistentTopologyDetected != null) {
                        this.onInconsistentTopologyDetected.onInconsistentLocalTopology(mergedTopology, oldTopology);
                    }
                    this.topologyGossiper.accept(mergedTopology);
                    this.applyTopologyChangeOperation(mergedTopology);
                }
            }
            catch (IOException error) {
                LOG.warn("Failed to process cluster topology received via gossip. '{}'", (Object)receivedTopology, (Object)error);
            }
        });
    }

    private boolean isConflictingTopology(ClusterTopology mergedTopology, ClusterTopology oldTopology) {
        if (!mergedTopology.hasMember(this.localMemberId) && oldTopology.hasMember(this.localMemberId) && oldTopology.getMember(this.localMemberId).state() == MemberState.State.LEFT) {
            return false;
        }
        return !Objects.equals(mergedTopology.getMember(this.localMemberId), oldTopology.getMember(this.localMemberId));
    }

    private boolean shouldApplyTopologyChangeOperation(ClusterTopology mergedTopology) {
        return (!this.onGoingTopologyChangeOperation || this.shouldRetry) && mergedTopology.pendingChangesFor(this.localMemberId).isPresent() && this.changeAppliers != null;
    }

    private void applyTopologyChangeOperation(ClusterTopology mergedTopology) {
        if (!this.shouldApplyTopologyChangeOperation(mergedTopology)) {
            return;
        }
        this.onGoingTopologyChangeOperation = true;
        this.shouldRetry = false;
        TopologyChangeOperation operation = mergedTopology.pendingChangesFor(this.localMemberId).orElseThrow();
        TopologyManagerMetrics.OperationObserver observer = this.topologyManagerMetrics.observeOperation(operation);
        LOG.info("Applying topology change operation {}", (Object)operation);
        TopologyChangeAppliers.ClusterOperationApplier operationApplier = this.changeAppliers.getApplier(operation);
        Either operationInitialized = operationApplier.init(mergedTopology).map(transformer -> (ClusterTopology)transformer.apply(mergedTopology)).flatMap(this::updateLocalTopology);
        if (operationInitialized.isLeft()) {
            observer.failed();
            this.onGoingTopologyChangeOperation = false;
            LOG.error("Failed to initialize topology change operation {}", (Object)operation, operationInitialized.getLeft());
            return;
        }
        ClusterTopology initializedTopology = (ClusterTopology)operationInitialized.get();
        operationApplier.apply().onComplete((transformer, error) -> this.onOperationApplied(initializedTopology, operation, (UnaryOperator<ClusterTopology>)transformer, (Throwable)error, observer));
    }

    private void logAndScheduleRetry(TopologyChangeOperation operation, Throwable error) {
        this.shouldRetry = true;
        Duration delay = this.backoffRetry.nextDelay();
        LOG.warn("Failed to apply topology change operation {}. Will be retried in {}.", new Object[]{operation, delay, error});
        this.executor.schedule(delay, () -> {
            LOG.debug("Retrying last applied operation");
            this.applyTopologyChangeOperation(this.persistedClusterTopology.getTopology());
        });
    }

    private void onOperationApplied(ClusterTopology topologyOnWhichOperationIsApplied, TopologyChangeOperation operation, UnaryOperator<ClusterTopology> transformer, Throwable error, TopologyManagerMetrics.OperationObserver observer) {
        this.onGoingTopologyChangeOperation = false;
        if (error == null) {
            observer.applied();
            this.backoffRetry.reset();
            if (this.persistedClusterTopology.getTopology().version() != topologyOnWhichOperationIsApplied.version()) {
                LOG.debug("Topology changed while applying operation {}. Expected topology is {}. Current topology is {}. Most likely the change operation was cancelled.", new Object[]{operation, topologyOnWhichOperationIsApplied, this.persistedClusterTopology.getTopology()});
                return;
            }
            this.updateLocalTopology(this.persistedClusterTopology.getTopology().advanceTopologyChange(transformer));
            LOG.info("Operation {} applied. Updated local topology to {}", (Object)operation, (Object)this.persistedClusterTopology.getTopology());
            this.executor.run(() -> this.applyTopologyChangeOperation(this.persistedClusterTopology.getTopology()));
        } else {
            observer.failed();
            this.logAndScheduleRetry(operation, error);
        }
    }

    private Either<Exception, ClusterTopology> updateLocalTopology(ClusterTopology topology) {
        if (topology.equals(this.persistedClusterTopology.getTopology())) {
            return Either.right((Object)topology);
        }
        try {
            this.persistedClusterTopology.update(topology);
            this.topologyGossiper.accept(topology);
            return Either.right((Object)topology);
        }
        catch (Exception e) {
            return Either.left((Object)e);
        }
    }

    void registerTopologyChangeAppliers(TopologyChangeAppliers topologyChangeAppliers) {
        this.executor.run(() -> {
            this.changeAppliers = topologyChangeAppliers;
            this.applyTopologyChangeOperation(this.persistedClusterTopology.getTopology());
        });
    }

    void removeTopologyChangeAppliers() {
        this.executor.run(() -> {
            this.changeAppliers = null;
        });
    }

    void registerTopologyChangedListener(ClusterTopologyManager.InconsistentTopologyListener listener) {
        this.executor.run(() -> {
            this.onInconsistentTopologyDetected = listener;
        });
    }

    void removeTopologyChangedListener() {
        this.executor.run(() -> {
            this.onInconsistentTopologyDetected = null;
        });
    }
}

