/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster;

import io.atomix.AtomixReplica;
import io.atomix.catalyst.util.Assert;
import io.atomix.cluster.ClusterManager;
import io.atomix.cluster.Quorum;
import io.atomix.copycat.error.ConfigurationException;
import io.atomix.copycat.server.cluster.Cluster;
import io.atomix.copycat.server.cluster.Member;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BalancingClusterManager
implements ClusterManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(BalancingClusterManager.class);
    private final int quorumHint;
    private final int backupCount;
    private boolean closed;

    public static Builder builder() {
        return new Builder();
    }

    public BalancingClusterManager(int quorumHint, int backupCount) {
        this.quorumHint = quorumHint;
        this.backupCount = backupCount;
    }

    @Override
    public CompletableFuture<Void> start(Cluster cluster, AtomixReplica replica) {
        cluster.members().forEach(m -> {
            m.onTypeChange(t -> this.balance(cluster));
            m.onStatusChange(s -> this.balance(cluster));
        });
        cluster.onLeaderElection(l -> this.balance(cluster));
        cluster.onJoin(m -> {
            m.onTypeChange(t -> this.balance(cluster));
            m.onStatusChange(s -> this.balance(cluster));
            this.balance(cluster);
        });
        cluster.onLeave(m -> this.balance(cluster));
        return null;
    }

    public CompletableFuture<Void> balance(Cluster cluster) {
        if (cluster.member().equals(cluster.leader())) {
            LOGGER.info("Balancing cluster...");
            return this.balance(cluster, new CompletableFuture<Void>());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> balance(Cluster cluster, CompletableFuture<Void> future) {
        if (this.closed) {
            future.completeExceptionally(new IllegalStateException("balancer closed"));
            return future;
        }
        Collection<Member> members = cluster.members();
        Member member = cluster.member();
        Collection active = members.stream().filter(m -> m.type() == Member.Type.ACTIVE).collect(Collectors.toList());
        Collection passive = members.stream().filter(m -> m.type() == Member.Type.PASSIVE).collect(Collectors.toList());
        Collection reserve = members.stream().filter(m -> m.type() == Member.Type.RESERVE).collect(Collectors.toList());
        int totalActiveCount = active.size();
        int totalPassiveCount = passive.size();
        long availableActiveCount = active.stream().filter(m -> m.status() == Member.Status.AVAILABLE).count();
        long availablePassiveCount = passive.stream().filter(m -> m.status() == Member.Status.AVAILABLE).count();
        long availableReserveCount = reserve.stream().filter(m -> m.status() == Member.Status.AVAILABLE).count();
        BiConsumer<Void, Throwable> completeFunction = (result, error) -> {
            if (error == null || error.getCause() instanceof ConfigurationException) {
                this.balance(cluster, future);
            } else {
                future.completeExceptionally((Throwable)error);
            }
        };
        if (this.quorumHint == Quorum.ALL.size() || availableActiveCount < (long)this.quorumHint) {
            if (availablePassiveCount > 0L) {
                Member promote = passive.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst().get();
                LOGGER.info("Promoting {} to ACTIVE: not enough active members", (Object)promote.address());
                promote.promote(Member.Type.ACTIVE).whenComplete((BiConsumer)completeFunction);
                return future;
            }
            if (availableReserveCount > 0L) {
                Member promote = reserve.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst().get();
                LOGGER.info("Promoting {} to ACTIVE: not enough active members", (Object)promote.address());
                promote.promote(Member.Type.ACTIVE).whenComplete((BiConsumer)completeFunction);
                return future;
            }
        }
        if (this.quorumHint != Quorum.ALL.size() && totalActiveCount > this.quorumHint) {
            if (availablePassiveCount < (long)((this.quorumHint - 1) * this.backupCount)) {
                Member demote = active.stream().filter(m -> m.status() == Member.Status.UNAVAILABLE).findFirst().orElseGet(() -> active.stream().filter(m -> !m.equals(member)).findAny().get());
                LOGGER.info("Demoting {} to PASSIVE: too many active members", (Object)demote.address());
                demote.demote(Member.Type.PASSIVE).whenComplete((BiConsumer)completeFunction);
                return future;
            }
            Member demote = active.stream().filter(m -> m.status() == Member.Status.UNAVAILABLE).findAny().orElseGet(() -> active.stream().filter(m -> !m.equals(member)).findAny().get());
            LOGGER.info("Demoting {} to RESERVE: too many active members", (Object)demote.address());
            demote.demote(Member.Type.RESERVE).whenComplete((BiConsumer)completeFunction);
            return future;
        }
        if (this.quorumHint != Quorum.ALL.size() && availablePassiveCount < (long)((this.quorumHint - 1) * this.backupCount) && availableReserveCount > 0L) {
            Member promote = reserve.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst().get();
            LOGGER.info("Promoting {} to PASSIVE: not enough passive members", (Object)promote.address());
            promote.promote(Member.Type.PASSIVE).whenComplete((BiConsumer)completeFunction);
            return future;
        }
        if (this.quorumHint != Quorum.ALL.size() && totalPassiveCount > (this.quorumHint - 1) * this.backupCount) {
            Member demote = passive.stream().filter(m -> m.status() == Member.Status.UNAVAILABLE).findAny().orElseGet(() -> (Member)passive.stream().findAny().get());
            LOGGER.info("Demoting {} to RESERVE: too many passive members", (Object)demote.address());
            demote.demote(Member.Type.RESERVE).whenComplete((BiConsumer)completeFunction);
            return future;
        }
        future.complete(null);
        return future;
    }

    @Override
    public CompletableFuture<Void> stop(Cluster cluster, AtomixReplica replica) {
        LOGGER.debug("Balancing cluster...");
        return this.replace(cluster, new CompletableFuture<Void>()).whenComplete((result, error) -> {
            this.closed = true;
        });
    }

    private CompletableFuture<Void> replace(Cluster cluster, CompletableFuture<Void> future) {
        if (this.closed) {
            future.completeExceptionally(new IllegalStateException("cluster balancer closed"));
            return future;
        }
        BiConsumer<Void, Throwable> completeFunction = (result, error) -> {
            if (error == null) {
                future.complete(null);
            } else if (error.getCause() instanceof ConfigurationException) {
                this.replace(cluster, future);
            } else {
                future.completeExceptionally((Throwable)error);
            }
        };
        Function<Void, CompletableFuture> demoteFunction = v -> {
            long passiveCount = cluster.members().stream().filter(m -> m.type() == Member.Type.PASSIVE).count();
            if (passiveCount < (long)((this.quorumHint - 1) * this.backupCount)) {
                LOGGER.info("Demoting {} to PASSIVE", (Object)cluster.member().address());
                return cluster.member().demote(Member.Type.PASSIVE);
            }
            LOGGER.info("Demoting {} to RESERVE", (Object)cluster.member().address());
            return cluster.member().demote(Member.Type.RESERVE);
        };
        if (this.quorumHint == Quorum.ALL.size()) {
            return CompletableFuture.completedFuture(null);
        }
        if (cluster.member().type() == Member.Type.ACTIVE) {
            Member member;
            Optional<Member> optionalMember;
            Collection passive = cluster.members().stream().filter(m -> m.type() == Member.Type.PASSIVE).collect(Collectors.toList());
            Collection reserve = cluster.members().stream().filter(m -> m.type() == Member.Type.RESERVE).collect(Collectors.toList());
            if (!passive.isEmpty() && (optionalMember = passive.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst()).isPresent()) {
                LOGGER.info("Promoting {} to ACTIVE: replacing {}", (Object)optionalMember.get().address(), (Object)cluster.member().address());
                ((CompletableFuture)optionalMember.get().promote(Member.Type.ACTIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
                return future;
            }
            if (!reserve.isEmpty() && (optionalMember = reserve.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst()).isPresent()) {
                LOGGER.info("Promoting {} to ACTIVE: replacing {}", (Object)optionalMember.get().address(), (Object)cluster.member().address());
                ((CompletableFuture)optionalMember.get().promote(Member.Type.ACTIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
                return future;
            }
            if (!passive.isEmpty()) {
                member = (Member)passive.iterator().next();
                LOGGER.info("Promoting {} to ACTIVE: replacing {}", (Object)member.address(), (Object)cluster.member().address());
                ((CompletableFuture)member.promote(Member.Type.ACTIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
            } else if (!reserve.isEmpty()) {
                member = (Member)reserve.iterator().next();
                LOGGER.info("Promoting {} to ACTIVE: replacing {}", (Object)member.address(), (Object)cluster.member().address());
                ((CompletableFuture)member.promote(Member.Type.ACTIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
            } else {
                future.complete(null);
            }
        } else if (cluster.member().type() == Member.Type.PASSIVE) {
            Collection reserve = cluster.members().stream().filter(m -> m.type() == Member.Type.RESERVE).collect(Collectors.toList());
            if (!reserve.isEmpty()) {
                Optional<Member> optionalMember = reserve.stream().filter(m -> m.status() == Member.Status.AVAILABLE).findFirst();
                if (optionalMember.isPresent()) {
                    LOGGER.info("Promoting {} to PASSIVE: replacing {}", (Object)optionalMember.get().address(), (Object)cluster.member().address());
                    ((CompletableFuture)optionalMember.get().promote(Member.Type.PASSIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
                } else {
                    Member member = (Member)reserve.iterator().next();
                    LOGGER.info("Promoting {} to PASSIVE: replacing {}", (Object)member.address(), (Object)cluster.member().address());
                    ((CompletableFuture)member.promote(Member.Type.PASSIVE).thenCompose(demoteFunction)).whenComplete(completeFunction);
                }
            } else {
                future.complete(null);
            }
        } else {
            future.complete(null);
        }
        return future;
    }

    public static class Builder
    implements ClusterManager.Builder {
        private int quorumHint = Quorum.ALL.size();
        private int backupCount = 0;

        public Builder withQuorumHint(int quorumHint) {
            this.quorumHint = Assert.argNot(quorumHint, quorumHint < -1, "quorumHint must be positive or -1", new Object[0]);
            return this;
        }

        public Builder withQuorumHint(Quorum quorum) {
            this.quorumHint = Assert.notNull(quorum, "quorum").size();
            return this;
        }

        public Builder withBackupCount(int backupCount) {
            this.backupCount = Assert.argNot(backupCount, backupCount < 0, "backupCount must be positive", new Object[0]);
            return this;
        }

        @Override
        public ClusterManager build() {
            return new BalancingClusterManager(this.quorumHint, this.backupCount);
        }
    }
}

