/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import com.codahale.metrics.Gauge;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.MetricRegistries;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.junit.Test;
import org.slf4j.Logger;

public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 5;

    public RaftBasicTests() {
        Log4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)TimeDuration.valueOf((long)5L, (TimeUnit)TimeUnit.SECONDS));
    }

    @Test
    public void testBasicAppendEntries() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.runTestBasicAppendEntries(false, false, 10, cluster, this.LOG));
    }

    @Test
    public void testBasicAppendEntriesKillLeader() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.runTestBasicAppendEntries(false, true, 10, cluster, this.LOG));
    }

    static CompletableFuture<Void> killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        new Thread(() -> {
            try {
                Thread.sleep(killSleepMs);
                cluster.killServer(id);
                Thread.sleep(restartSleepMs);
                LOG.info("restart server: " + id);
                cluster.restartServer(id, false);
                future.complete(null);
            }
            catch (Exception e) {
                ExitUtils.terminate((int)-1, (String)("Failed to kill/restart server: " + id), (Throwable)e, (Logger)LOG);
            }
        }).start();
        return future;
    }

    static void runTestBasicAppendEntries(boolean async, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
        CompletableFuture<Object> killAndRestartLeader;
        LOG.info("runTestBasicAppendEntries: async? {}, killLeader={}, numMessages={}", new Object[]{async, killLeader, numMessages});
        for (RaftServer s : cluster.getServers()) {
            cluster.restartServer(s.getId(), false);
        }
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        long term = leader.getInfo().getCurrentTerm();
        CompletableFuture<Void> killAndRestartFollower = RaftBasicTests.killAndRestartServer(cluster.getFollowers().get(0).getId(), 0L, 1000L, cluster, LOG);
        if (killLeader) {
            LOG.info("killAndRestart leader " + leader.getId());
            killAndRestartLeader = RaftBasicTests.killAndRestartServer(leader.getId(), 2000L, 4000L, cluster, LOG);
        } else {
            killAndRestartLeader = CompletableFuture.completedFuture(null);
        }
        LOG.info(cluster.printServers());
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
        RaftClient client = cluster.createClient();
        Object object = null;
        try {
            AtomicInteger asyncReplyCount = new AtomicInteger();
            CompletableFuture f = new CompletableFuture();
            for (RaftTestUtil.SimpleMessage message : messages) {
                if (async) {
                    client.async().send((Message)message).thenAcceptAsync(reply -> {
                        if (!reply.isSuccess()) {
                            f.completeExceptionally((Throwable)((Object)new AssertionError((Object)("Failed with reply " + reply))));
                        } else if (asyncReplyCount.incrementAndGet() == messages.length) {
                            f.complete(null);
                        }
                    });
                    continue;
                }
                RaftClientReply reply2 = client.io().send((Message)message);
                Assert.assertTrue((boolean)reply2.isSuccess());
            }
            if (async) {
                f.join();
                Assert.assertEquals((long)messages.length, (long)asyncReplyCount.get());
            }
        }
        catch (Throwable asyncReplyCount) {
            object = asyncReplyCount;
            throw asyncReplyCount;
        }
        finally {
            if (client != null) {
                if (object != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable asyncReplyCount) {
                        ((Throwable)object).addSuppressed(asyncReplyCount);
                    }
                } else {
                    client.close();
                }
            }
        }
        Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
        LOG.info(cluster.printAllLogs());
        killAndRestartFollower.join();
        killAndRestartLeader.join();
        List divisions = cluster.getServerAliveStream().collect(Collectors.toList());
        for (RaftServer.Division impl : divisions) {
            JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages), (int)5, (TimeDuration)TimeDuration.valueOf((long)1L, (TimeUnit)TimeUnit.SECONDS), (String)(impl.getId() + " assertLogEntries"), (Logger)LOG);
        }
    }

    @Test
    public void testOldLeaderCommit() throws Exception {
        this.runWithNewCluster(5, this::runTestOldLeaderCommit);
    }

    void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        RaftPeerId leaderId = leader.getId();
        long term = leader.getInfo().getCurrentTerm();
        List<RaftServer.Division> followers = ((MiniRaftCluster)cluster).getFollowers();
        List<RaftServer.Division> followersToSendLog = followers.subList(0, followers.size() / 2);
        for (int i = followers.size() / 2; i < 4; ++i) {
            ((MiniRaftCluster)cluster).killServer(followers.get(i).getId());
        }
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(1);
        RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
        Thread.sleep(((MiniRaftCluster)cluster).getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        for (RaftServer.Division followerToSendLog : followersToSendLog) {
            RaftLog followerLog = followerToSendLog.getRaftLog();
            Assert.assertTrue((boolean)RaftTestUtil.logEntriesContains(followerLog, messages));
        }
        this.LOG.info(String.format("killing old leader: %s", leaderId.toString()));
        ((MiniRaftCluster)cluster).killServer(leaderId);
        for (int i = followers.size() / 2; i < 4; ++i) {
            RaftPeerId followerId = followers.get(i).getId();
            this.LOG.info(String.format("restarting follower: %s", followerId));
            ((MiniRaftCluster)cluster).restartServer(followerId, false);
        }
        Thread.sleep(((MiniRaftCluster)cluster).getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5L);
        RaftPeerId newLeaderId = RaftTestUtil.waitForLeader(cluster).getId();
        Set followersToSendLogIds = followersToSendLog.stream().map(f -> f.getId()).collect(Collectors.toSet());
        Assert.assertTrue((boolean)followersToSendLogIds.contains(newLeaderId));
        ((MiniRaftCluster)cluster).getServerAliveStream().map(RaftServer.Division::getRaftLog).forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
    }

    @Test
    public void testOldLeaderNotCommit() throws Exception {
        this.runWithNewCluster(5, this::runTestOldLeaderNotCommit);
    }

    void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
        RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
        List<RaftServer.Division> followers = ((MiniRaftCluster)cluster).getFollowers();
        RaftServer.Division followerToCommit = followers.get(0);
        try {
            for (int i = 1; i < 4; ++i) {
                ((MiniRaftCluster)cluster).killServer(followers.get(i).getId());
            }
        }
        catch (IndexOutOfBoundsException e) {
            throw new AssumptionViolatedException("The assumption is follower.size() = NUM_SERVERS - 1, actual NUM_SERVERS is 5, and actual follower.size() is " + followers.size(), (Throwable)e);
        }
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(1);
        RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
        Thread.sleep(((MiniRaftCluster)cluster).getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        RaftTestUtil.logEntriesContains(followerToCommit.getRaftLog(), messages);
        ((MiniRaftCluster)cluster).killServer(leaderId);
        ((MiniRaftCluster)cluster).killServer(followerToCommit.getId());
        for (int i = 1; i < 4; ++i) {
            ((MiniRaftCluster)cluster).restartServer(followers.get(i).getId(), false);
        }
        RaftTestUtil.waitForLeader(cluster);
        Thread.sleep(((MiniRaftCluster)cluster).getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        Predicate<RaftProtos.LogEntryProto> predicate = l -> l.getTerm() != 1L;
        ((MiniRaftCluster)cluster).getServerAliveStream().map(RaftServer.Division::getRaftLog).forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
    }

    @Test
    public void testWithLoad() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.testWithLoad(10, 300, false, cluster, this.LOG));
    }

    static void testWithLoad(int numClients, int numMessages, boolean useAsync, final MiniRaftCluster cluster, final Logger LOG) throws Exception {
        LOG.info("Running testWithLoad: numClients=" + numClients + ", numMessages=" + numMessages + ", async=" + useAsync);
        RaftTestUtil.waitForLeader(cluster);
        final List<Client4TestWithLoad> clients = Stream.iterate(0, i -> i + 1).limit(numClients).map(i -> new Client4TestWithLoad((int)i, numMessages, useAsync, cluster, LOG)).collect(Collectors.toList());
        final AtomicInteger lastStep = new AtomicInteger();
        Timer timer = new Timer();
        timer.schedule(new TimerTask(){
            private int previousLastStep;
            {
                this.previousLastStep = lastStep.get();
            }

            @Override
            public void run() {
                LOG.info(cluster.printServers());
                LOG.info(BlockRequestHandlingInjection.getInstance().toString());
                LOG.info(cluster.toString());
                clients.forEach(c -> LOG.info("  " + c));
                JavaUtils.dumpAllThreads(s -> LOG.info(s));
                int last = lastStep.get();
                if (last != this.previousLastStep) {
                    this.previousLastStep = last;
                } else {
                    RaftServer.Division leader = cluster.getLeader();
                    LOG.info("NO PROGRESS at " + last + ", try to restart leader=" + leader);
                    if (leader != null) {
                        try {
                            cluster.restartServer(leader.getId(), false);
                            LOG.info("Restarted leader=" + leader);
                        }
                        catch (IOException e) {
                            LOG.error("Failed to restart leader=" + leader);
                        }
                    }
                }
            }
        }, 5000L, 10000L);
        clients.forEach(Thread::start);
        int count = 0;
        while (!clients.stream().noneMatch(Client4TestWithLoad::isRunning)) {
            int n = clients.stream().mapToInt(c -> c.step.get()).sum();
            Assert.assertTrue((n >= lastStep.get() ? 1 : 0) != 0);
            if (n - lastStep.get() < 50 * numClients) {
                Thread.sleep(10L);
                continue;
            }
            lastStep.set(n);
            ++count;
            try {
                RaftServer.Division leader = cluster.getLeader();
                if (leader == null) continue;
                RaftTestUtil.changeLeader(cluster, leader.getId());
            }
            catch (IllegalStateException e) {
                LOG.error("Failed to change leader ", (Throwable)e);
            }
        }
        LOG.info("Leader change count=" + count);
        timer.cancel();
        for (Client4TestWithLoad c2 : clients) {
            if (c2.exceptionInClientThread.get() != null) {
                throw new AssertionError((Object)c2.exceptionInClientThread.get());
            }
            RaftTestUtil.assertLogEntries(cluster, c2.messages);
        }
    }

    public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        Timestamp startTime = Timestamp.currentTime();
        try (RaftClient client = cluster.createClient();){
            ClientInvocationId invocationId = RaftClientTestUtil.getClientInvocationId((RaftClient)client);
            cluster.getServerAliveStream().forEach(raftServer -> RetryCacheTestUtil.getOrCreateEntry(raftServer, invocationId));
            if (async) {
                CompletableFuture replyFuture = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
                replyFuture.get();
            } else {
                client.io().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            }
            TimeDuration duration = startTime.elapsedTime();
            TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime((RaftProperties)cluster.getProperties());
            Assert.assertTrue((duration.compareTo(retryCacheExpiryDuration) >= 0 ? 1 : 0) != 0);
        }
    }

    public static void testStateMachineMetrics(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        try (RaftClient client = cluster.createClient();){
            Gauge appliedIndexGauge = RaftBasicTests.getStatemachineGaugeWithName(leader, "appliedIndex");
            Gauge smAppliedIndexGauge = RaftBasicTests.getStatemachineGaugeWithName(leader, "applyCompletedIndex");
            long appliedIndexBefore = (Long)appliedIndexGauge.getValue();
            long smAppliedIndexBefore = (Long)smAppliedIndexGauge.getValue();
            RaftBasicTests.checkFollowerCommitLagsLeader(cluster);
            if (async) {
                CompletableFuture replyFuture = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
                replyFuture.get();
            } else {
                client.io().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            }
            long appliedIndexAfter = (Long)appliedIndexGauge.getValue();
            long smAppliedIndexAfter = (Long)smAppliedIndexGauge.getValue();
            RaftBasicTests.checkFollowerCommitLagsLeader(cluster);
            Assert.assertTrue((String)"StateMachine Applied Index not incremented", (appliedIndexAfter > appliedIndexBefore ? 1 : 0) != 0);
            Assert.assertTrue((String)"StateMachine Apply completed Index not incremented", (smAppliedIndexAfter > smAppliedIndexBefore ? 1 : 0) != 0);
        }
    }

    private static void checkFollowerCommitLagsLeader(MiniRaftCluster cluster) {
        List<RaftServer.Division> followers = cluster.getFollowers();
        RaftGroupMemberId leader = cluster.getLeader().getMemberId();
        Gauge leaderCommitGauge = RaftServerMetricsImpl.getPeerCommitIndexGauge((RaftGroupMemberId)leader, (RaftPeerId)leader.getPeerId());
        for (RaftServer.Division f : followers) {
            RaftGroupMemberId follower = f.getMemberId();
            Gauge followerCommitGauge = RaftServerMetricsImpl.getPeerCommitIndexGauge((RaftGroupMemberId)leader, (RaftPeerId)follower.getPeerId());
            Assert.assertTrue(((Long)leaderCommitGauge.getValue() >= (Long)followerCommitGauge.getValue() ? 1 : 0) != 0);
            Gauge followerMetric = RaftServerMetricsImpl.getPeerCommitIndexGauge((RaftGroupMemberId)follower, (RaftPeerId)follower.getPeerId());
            System.out.println(followerCommitGauge.getValue());
            System.out.println(followerMetric.getValue());
            Assert.assertTrue(((Long)followerCommitGauge.getValue() <= (Long)followerMetric.getValue() ? 1 : 0) != 0);
        }
    }

    private static Gauge getStatemachineGaugeWithName(RaftServer.Division server, String gaugeName) {
        MetricRegistryInfo info = new MetricRegistryInfo(server.getMemberId().toString(), "ratis", "state_machine", "Metrics for State Machine Updater");
        Optional metricRegistry = MetricRegistries.global().get(info);
        Assert.assertTrue((boolean)metricRegistry.isPresent());
        RatisMetricRegistry ratisStateMachineMetricRegistry = (RatisMetricRegistry)metricRegistry.get();
        SortedMap gaugeMap = ratisStateMachineMetricRegistry.getGauges((s, metric) -> s.contains(gaugeName));
        return (Gauge)gaugeMap.get(gaugeMap.firstKey());
    }

    static class Client4TestWithLoad
    extends Thread {
        boolean useAsync;
        final int index;
        final RaftTestUtil.SimpleMessage[] messages;
        final AtomicBoolean isRunning = new AtomicBoolean(true);
        final AtomicInteger step = new AtomicInteger();
        final AtomicReference<Throwable> exceptionInClientThread = new AtomicReference();
        final MiniRaftCluster cluster;
        final Logger LOG;

        Client4TestWithLoad(int index, int numMessages, boolean useAsync, MiniRaftCluster cluster, Logger LOG) {
            super("client-" + index);
            this.index = index;
            this.messages = RaftTestUtil.SimpleMessage.create(numMessages, index + "-");
            this.useAsync = useAsync;
            this.cluster = cluster;
            this.LOG = LOG;
        }

        boolean isRunning() {
            return this.isRunning.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try (RaftClient client = this.cluster.createClient();){
                CompletableFuture f = new CompletableFuture();
                for (int i = 0; i < this.messages.length; ++i) {
                    if (!this.useAsync) {
                        RaftClientReply reply = client.io().send((Message)this.messages[this.step.getAndIncrement()]);
                        Assert.assertTrue((boolean)reply.isSuccess());
                        continue;
                    }
                    CompletableFuture replyFuture = client.async().send((Message)this.messages[i]);
                    replyFuture.thenAcceptAsync(r -> {
                        if (!r.isSuccess()) {
                            f.completeExceptionally((Throwable)((Object)new AssertionError((Object)("Failed with reply: " + r))));
                        }
                        if (this.step.incrementAndGet() == this.messages.length) {
                            f.complete(null);
                        }
                        Assert.assertTrue((boolean)r.isSuccess());
                    });
                }
                if (this.useAsync) {
                    f.join();
                    Assert.assertTrue((this.step.get() == this.messages.length ? 1 : 0) != 0);
                }
            }
            catch (Exception t) {
                if (this.exceptionInClientThread.compareAndSet(null, t)) {
                    this.LOG.error(this + " failed", (Throwable)t);
                } else {
                    this.exceptionInClientThread.get().addSuppressed(t);
                    this.LOG.error(this + " failed again!", (Throwable)t);
                }
            }
            finally {
                this.isRunning.set(false);
            }
        }

        @Override
        public String toString() {
            return JavaUtils.getClassSimpleName(this.getClass()) + this.index + "(step=" + this.step + "/" + this.messages.length + ", isRunning=" + this.isRunning + ", isAlive=" + this.isAlive() + ", exception=" + this.exceptionInClientThread + ")";
        }
    }
}

