/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.topology;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.utils.Version;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.topology.PersistedClusterTopology;
import io.camunda.zeebe.topology.StaticConfiguration;
import io.camunda.zeebe.topology.TopologyUpdateNotifier;
import io.camunda.zeebe.topology.serializer.ClusterTopologySerializer;
import io.camunda.zeebe.topology.state.ClusterTopology;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface TopologyInitializer {
    public static final Logger LOG = LoggerFactory.getLogger(TopologyInitializer.class);

    public ActorFuture<ClusterTopology> initialize();

    default public TopologyInitializer orThen(TopologyInitializer after) {
        TopologyInitializer actual = this;
        return () -> {
            CompletableActorFuture chainedInitialize = new CompletableActorFuture();
            actual.initialize().onComplete((arg_0, arg_1) -> TopologyInitializer.lambda$orThen$0((ActorFuture)chainedInitialize, after, arg_0, arg_1));
            return chainedInitialize;
        };
    }

    default public TopologyInitializer recover(Class<? extends InitializerError> exception, TopologyInitializer recovery) {
        TopologyInitializer actual = this;
        return () -> {
            CompletableActorFuture chainedInitialize = new CompletableActorFuture();
            actual.initialize().onComplete((arg_0, arg_1) -> TopologyInitializer.lambda$recover$2(exception, recovery, (ActorFuture)chainedInitialize, arg_0, arg_1));
            return chainedInitialize;
        };
    }

    private static /* synthetic */ void lambda$recover$2(Class exception, TopologyInitializer recovery, ActorFuture chainedInitialize, ClusterTopology topology, Throwable error) {
        if (error != null && exception.isAssignableFrom(error.getClass())) {
            LOG.warn("Recovering from {} by falling back to {}", (Object)error, (Object)recovery);
            recovery.initialize().onComplete((BiConsumer)chainedInitialize);
        } else if (error != null) {
            chainedInitialize.completeExceptionally(error);
        } else {
            chainedInitialize.complete((Object)topology);
        }
    }

    private static /* synthetic */ void lambda$orThen$0(ActorFuture chainedInitialize, TopologyInitializer after, ClusterTopology topology, Throwable error) {
        if (error != null) {
            LOG.error("Failed to initialize topology", error);
            chainedInitialize.completeExceptionally(error);
        } else if (topology.isUninitialized()) {
            after.initialize().onComplete((BiConsumer)chainedInitialize);
        } else {
            chainedInitialize.complete((Object)topology);
        }
    }

    public static sealed interface InitializerError {

        public static final class PersistedTopologyIsBroken
        extends RuntimeException
        implements InitializerError {
            public PersistedTopologyIsBroken(Path file, Throwable cause) {
                super("File %s is corrupted".formatted(file), cause);
            }
        }
    }

    public static class RollingUpdateAwareInitializerV83ToV84
    implements TopologyInitializer {
        private static final Logger LOGGER = LoggerFactory.getLogger(RollingUpdateAwareInitializerV83ToV84.class);
        private static final Duration RETRY_DELAY = Duration.ofMillis(200L);
        private final ClusterMembershipService membershipService;
        private final StaticInitializer staticInitializer;
        private final int staticClusterSize;
        private final ConcurrencyControl executor;
        private final CompletableActorFuture<ClusterTopology> initializeFuture;

        public RollingUpdateAwareInitializerV83ToV84(ClusterMembershipService membershipService, StaticConfiguration staticConfiguration, ConcurrencyControl executor) {
            this.membershipService = membershipService;
            this.staticInitializer = new StaticInitializer(staticConfiguration);
            this.staticClusterSize = staticConfiguration.clusterMembers().size();
            this.executor = executor;
            this.initializeFuture = new CompletableActorFuture();
        }

        @Override
        public ActorFuture<ClusterTopology> initialize() {
            if (this.staticClusterSize == 1) {
                this.initializeFuture.complete((Object)ClusterTopology.uninitialized());
                return this.initializeFuture;
            }
            Version version = this.membershipService.getLocalMember().version();
            if (this.isVersion84(version)) {
                boolean knowOtherMembers = this.hasOtherReachableBrokers(this.membershipService);
                if (knowOtherMembers && this.isRollingUpdate(this.membershipService)) {
                    LOGGER.debug("Cluster is doing rolling update. Cannot initialize cluster topology via gossip. Initializing cluster topology from static configuration.");
                    this.staticInitializer.initialize().onComplete(this.initializeFuture);
                } else if (!knowOtherMembers) {
                    LOGGER.debug("No other members are reachable. Cannot initialize topology. Will retry in {}", (Object)RETRY_DELAY);
                    this.executor.schedule(RETRY_DELAY, this::initialize);
                } else {
                    LOGGER.trace("Cluster is not doing rolling update. Will not initialize cluster topology.");
                    this.initializeFuture.complete((Object)ClusterTopology.uninitialized());
                }
            } else {
                LOGGER.trace("Cluster is not doing rolling update. Will not initialize cluster topology.");
                this.initializeFuture.complete((Object)ClusterTopology.uninitialized());
            }
            return this.initializeFuture;
        }

        private boolean hasOtherReachableBrokers(ClusterMembershipService membershipService) {
            return membershipService.getMembers().stream().filter(this::isBroker).map(Member::id).anyMatch(memberId -> !memberId.equals((Object)membershipService.getLocalMember().id()));
        }

        private boolean isRollingUpdate(ClusterMembershipService membershipService) {
            List<Member> otherBrokers = membershipService.getMembers().stream().filter(this::isBroker).filter(member -> !member.id().equals((Object)membershipService.getLocalMember().id())).toList();
            boolean cannotDetermineVersionOfOtherMembers = otherBrokers.stream().map(Member::version).noneMatch(Objects::nonNull);
            if (cannotDetermineVersionOfOtherMembers) {
                LOGGER.warn("Cannot determine version of remote members. Assuming this is not rolling update and skip initialization.");
                return false;
            }
            return otherBrokers.stream().map(Member::version).anyMatch(this::isVersion83);
        }

        private boolean isBroker(Member member) {
            try {
                return Integer.parseInt((String)((Object)member.id().id())) >= 0;
            }
            catch (NumberFormatException e) {
                return false;
            }
        }

        private boolean isVersion84(Version version) {
            if (version == null) {
                LOGGER.warn("Cannot determine version of local member. Assuming this is not rolling update and skip initialization.");
                return false;
            }
            return version.major() == 8 && version.minor() == 4;
        }

        private boolean isVersion83(Version version) {
            return version.major() == 8 && version.minor() == 3;
        }
    }

    public static class StaticInitializer
    implements TopologyInitializer {
        private static final Logger LOGGER = LoggerFactory.getLogger(StaticInitializer.class);
        private final StaticConfiguration staticConfiguration;

        public StaticInitializer(StaticConfiguration staticConfiguration) {
            this.staticConfiguration = staticConfiguration;
        }

        @Override
        public ActorFuture<ClusterTopology> initialize() {
            try {
                ClusterTopology topology = this.staticConfiguration.generateTopology();
                LOGGER.debug("Generated cluster topology from provided configuration. {}", (Object)topology);
                return CompletableActorFuture.completed((Object)topology);
            }
            catch (Exception e) {
                return CompletableActorFuture.completedExceptionally((Throwable)e);
            }
        }
    }

    public static class SyncInitializer
    implements TopologyInitializer,
    TopologyUpdateNotifier.TopologyUpdateListener {
        private static final Logger LOGGER = LoggerFactory.getLogger(SyncInitializer.class);
        private static final Duration SYNC_QUERY_RETRY_DELAY = Duration.ofSeconds(5L);
        private final TopologyUpdateNotifier topologyUpdateNotifier;
        private final ActorFuture<ClusterTopology> initialized;
        private final List<MemberId> knownMembersToSync;
        private final ConcurrencyControl executor;
        private final Function<MemberId, ActorFuture<ClusterTopology>> syncRequester;

        public SyncInitializer(TopologyUpdateNotifier topologyUpdateNotifier, List<MemberId> knownMembersToSync, ConcurrencyControl executor, Function<MemberId, ActorFuture<ClusterTopology>> syncRequester) {
            this.topologyUpdateNotifier = topologyUpdateNotifier;
            this.knownMembersToSync = knownMembersToSync;
            this.executor = executor;
            this.syncRequester = syncRequester;
            this.initialized = new CompletableActorFuture();
        }

        @Override
        public ActorFuture<ClusterTopology> initialize() {
            if (this.knownMembersToSync.isEmpty()) {
                this.initialized.complete((Object)ClusterTopology.uninitialized());
            } else {
                LOGGER.debug("Querying members {} before initializing ClusterTopology", this.knownMembersToSync);
                this.topologyUpdateNotifier.addUpdateListener(this);
                this.knownMembersToSync.forEach(this::tryInitializeFrom);
            }
            return this.initialized;
        }

        private void tryInitializeFrom(MemberId memberId) {
            this.requestSync(memberId).onComplete((topology, error) -> {
                if (this.initialized.isDone()) {
                    return;
                }
                if (error != null) {
                    LOGGER.trace("Failed to get a response for cluster topology sync query to {}. Will retry.", (Object)memberId, error);
                } else if (topology == null) {
                    LOGGER.trace("Received null cluster topology from {}. Will retry.", (Object)memberId);
                } else {
                    if (topology.isUninitialized()) {
                        LOGGER.trace("Cluster topology is uninitialized in {}", (Object)memberId);
                        this.initialized.complete(topology);
                        return;
                    }
                    LOGGER.debug("Received cluster topology {} from {}", topology, (Object)memberId);
                    this.onTopologyUpdated((ClusterTopology)topology);
                    return;
                }
                if (!this.initialized.isDone()) {
                    this.executor.schedule(SYNC_QUERY_RETRY_DELAY, () -> this.tryInitializeFrom(memberId));
                }
            });
        }

        private ActorFuture<ClusterTopology> requestSync(MemberId memberId) {
            return this.syncRequester.apply(memberId);
        }

        @Override
        public void onTopologyUpdated(ClusterTopology clusterTopology) {
            this.executor.run(() -> {
                if (this.initialized.isDone()) {
                    return;
                }
                if (!clusterTopology.isUninitialized()) {
                    this.initialized.complete((Object)clusterTopology);
                    this.topologyUpdateNotifier.removeUpdateListener(this);
                }
            });
        }
    }

    public static class GossipInitializer
    implements TopologyInitializer,
    TopologyUpdateNotifier.TopologyUpdateListener {
        private static final Logger LOGGER = LoggerFactory.getLogger(GossipInitializer.class);
        private final TopologyUpdateNotifier topologyUpdateNotifier;
        private final PersistedClusterTopology persistedClusterTopology;
        private final Consumer<ClusterTopology> topologyGossiper;
        private final ActorFuture<ClusterTopology> initialized;
        private final ConcurrencyControl executor;

        public GossipInitializer(TopologyUpdateNotifier topologyUpdateNotifier, PersistedClusterTopology persistedClusterTopology, Consumer<ClusterTopology> topologyGossiper, ConcurrencyControl executor) {
            this.topologyUpdateNotifier = topologyUpdateNotifier;
            this.persistedClusterTopology = persistedClusterTopology;
            this.topologyGossiper = topologyGossiper;
            this.executor = executor;
            this.initialized = new CompletableActorFuture();
        }

        @Override
        public ActorFuture<ClusterTopology> initialize() {
            LOGGER.debug("Waiting for initial cluster topology via gossip.");
            this.topologyUpdateNotifier.addUpdateListener(this);
            if (this.persistedClusterTopology.isUninitialized()) {
                this.topologyGossiper.accept(this.persistedClusterTopology.getTopology());
            }
            return this.initialized;
        }

        @Override
        public void onTopologyUpdated(ClusterTopology clusterTopology) {
            this.executor.run(() -> {
                if (this.initialized.isDone()) {
                    return;
                }
                if (!clusterTopology.isUninitialized()) {
                    LOGGER.debug("Received cluster topology {} via gossip.", (Object)clusterTopology);
                    this.initialized.complete((Object)clusterTopology);
                    this.topologyUpdateNotifier.removeUpdateListener(this);
                }
            });
        }
    }

    public static class FileInitializer
    implements TopologyInitializer {
        private static final Logger LOGGER = LoggerFactory.getLogger(FileInitializer.class);
        private final Path topologyFile;
        private final ClusterTopologySerializer serializer;

        public FileInitializer(Path topologyFile, ClusterTopologySerializer serializer) {
            this.topologyFile = topologyFile;
            this.serializer = serializer;
        }

        @Override
        public ActorFuture<ClusterTopology> initialize() {
            try {
                ClusterTopology persistedTopology = PersistedClusterTopology.ofFile(this.topologyFile, this.serializer).getTopology();
                if (!persistedTopology.isUninitialized()) {
                    LOGGER.debug("Initialized cluster topology '{}' from file '{}'", (Object)persistedTopology, (Object)this.topologyFile);
                }
                return CompletableActorFuture.completed((Object)persistedTopology);
            }
            catch (Exception e) {
                return CompletableActorFuture.completedExceptionally((Throwable)new InitializerError.PersistedTopologyIsBroken(this.topologyFile, (Throwable)e));
            }
        }
    }
}

