/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftBasicTests;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.MetricRegistries;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.metrics.ServerMetricsTestUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/*
 * Exception performing whole class analysis ignored.
 */
public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 5;

    public RaftBasicTests() {
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        RaftServerTestUtil.setStateMachineUpdaterLogLevel((Level)Level.DEBUG);
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)TimeDuration.valueOf((long)5L, (TimeUnit)TimeUnit.SECONDS));
    }

    @Test
    public void testBasicAppendEntries() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.runTestBasicAppendEntries((boolean)false, (boolean)false, (int)10, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    @Test
    public void testBasicAppendEntriesKillLeader() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.runTestBasicAppendEntries((boolean)false, (boolean)true, (int)10, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    static CompletableFuture<Void> killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        new Thread(() -> {
            try {
                Thread.sleep(killSleepMs);
                cluster.killServer(id);
                Thread.sleep(restartSleepMs);
                LOG.info("restart server: " + id);
                cluster.restartServer(id, false);
                future.complete(null);
            }
            catch (Exception e) {
                ExitUtils.terminate((int)-1, (String)("Failed to kill/restart server: " + id), (Throwable)e, (Logger)LOG);
            }
        }).start();
        return future;
    }

    static void runTestBasicAppendEntries(boolean async, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
        CompletableFuture killAndRestartLeader;
        LOG.info("runTestBasicAppendEntries: async? {}, killLeader={}, numMessages={}", new Object[]{async, killLeader, numMessages});
        for (RaftServer s : cluster.getServers()) {
            cluster.restartServer(s.getId(), false);
        }
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        long term = leader.getInfo().getCurrentTerm();
        CompletableFuture killAndRestartFollower = RaftBasicTests.killAndRestartServer((RaftPeerId)((RaftServer.Division)cluster.getFollowers().get(0)).getId(), (long)0L, (long)1000L, (MiniRaftCluster)cluster, (Logger)LOG);
        if (killLeader) {
            LOG.info("killAndRestart leader " + leader.getId());
            killAndRestartLeader = RaftBasicTests.killAndRestartServer((RaftPeerId)leader.getId(), (long)2000L, (long)4000L, (MiniRaftCluster)cluster, (Logger)LOG);
        } else {
            killAndRestartLeader = CompletableFuture.completedFuture(null);
        }
        LOG.info(cluster.printServers());
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)numMessages);
        RaftClient client = cluster.createClient();
        Object object = null;
        try {
            AtomicInteger asyncReplyCount = new AtomicInteger();
            CompletableFuture f = new CompletableFuture();
            for (RaftTestUtil.SimpleMessage message : messages) {
                if (async) {
                    client.async().send((Message)message).thenAcceptAsync(reply -> {
                        if (!reply.isSuccess()) {
                            f.completeExceptionally((Throwable)((Object)new AssertionError((Object)("Failed with reply " + reply))));
                        } else if (asyncReplyCount.incrementAndGet() == messages.length) {
                            f.complete(null);
                        }
                    });
                    continue;
                }
                RaftClientReply reply2 = client.io().send((Message)message);
                Assert.assertTrue((boolean)reply2.isSuccess());
            }
            if (async) {
                f.join();
                Assert.assertEquals((long)messages.length, (long)asyncReplyCount.get());
            }
        }
        catch (Throwable asyncReplyCount) {
            object = asyncReplyCount;
            throw asyncReplyCount;
        }
        finally {
            if (client != null) {
                if (object != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable asyncReplyCount) {
                        ((Throwable)object).addSuppressed(asyncReplyCount);
                    }
                } else {
                    client.close();
                }
            }
        }
        Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
        LOG.info(cluster.printAllLogs());
        killAndRestartFollower.join();
        killAndRestartLeader.join();
        List divisions = cluster.getServerAliveStream().collect(Collectors.toList());
        for (RaftServer.Division impl : divisions) {
            JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries((RaftServer.Division)impl, (long)term, (RaftTestUtil.SimpleMessage[])messages), (int)50, (TimeDuration)TimeDuration.valueOf((long)1L, (TimeUnit)TimeUnit.SECONDS), (String)(impl.getId() + " assertLogEntries"), (Logger)LOG);
        }
    }

    @Test
    public void testOldLeaderCommit() throws Exception {
        this.runWithNewCluster(5, arg_0 -> this.runTestOldLeaderCommit(arg_0));
    }

    void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        RaftPeerId leaderId = leader.getId();
        long term = leader.getInfo().getCurrentTerm();
        List followers = cluster.getFollowers();
        List followersToSendLog = followers.subList(0, followers.size() / 2);
        for (int i = followers.size() / 2; i < 4; ++i) {
            cluster.killServer(((RaftServer.Division)followers.get(i)).getId());
        }
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)1);
        RaftTestUtil.sendMessageInNewThread(cluster, (RaftPeerId)leaderId, (RaftTestUtil.SimpleMessage[])messages);
        Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        for (RaftServer.Division followerToSendLog : followersToSendLog) {
            RaftLog followerLog = followerToSendLog.getRaftLog();
            Assert.assertTrue((boolean)RaftTestUtil.logEntriesContains((RaftLog)followerLog, (RaftTestUtil.SimpleMessage[])messages));
        }
        this.LOG.info(String.format("killing old leader: %s", leaderId.toString()));
        cluster.killServer(leaderId);
        for (int i = followers.size() / 2; i < 4; ++i) {
            RaftPeerId followerId = ((RaftServer.Division)followers.get(i)).getId();
            this.LOG.info(String.format("restarting follower: %s", followerId));
            cluster.restartServer(followerId, false);
        }
        Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5L);
        RaftPeerId newLeaderId = RaftTestUtil.waitForLeader(cluster).getId();
        Set followersToSendLogIds = followersToSendLog.stream().map(f -> f.getId()).collect(Collectors.toSet());
        Assert.assertTrue((boolean)followersToSendLogIds.contains(newLeaderId));
        cluster.getServerAliveStream().map(RaftServer.Division::getRaftLog).forEach(log -> RaftTestUtil.assertLogEntries((RaftLog)log, (long)term, (RaftTestUtil.SimpleMessage[])messages));
    }

    @Test
    public void testOldLeaderNotCommit() throws Exception {
        this.runWithNewCluster(5, arg_0 -> this.runTestOldLeaderNotCommit(arg_0));
    }

    void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
        RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
        List followers = cluster.getFollowers();
        RaftServer.Division followerToCommit = (RaftServer.Division)followers.get(0);
        try {
            for (int i = 1; i < 4; ++i) {
                cluster.killServer(((RaftServer.Division)followers.get(i)).getId());
            }
        }
        catch (IndexOutOfBoundsException e) {
            throw new AssumptionViolatedException("The assumption is follower.size() = NUM_SERVERS - 1, actual NUM_SERVERS is 5, and actual follower.size() is " + followers.size(), (Throwable)e);
        }
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)1);
        RaftTestUtil.sendMessageInNewThread(cluster, (RaftPeerId)leaderId, (RaftTestUtil.SimpleMessage[])messages);
        Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        RaftTestUtil.logEntriesContains((RaftLog)followerToCommit.getRaftLog(), (RaftTestUtil.SimpleMessage[])messages);
        cluster.killServer(leaderId);
        cluster.killServer(followerToCommit.getId());
        for (int i = 1; i < 4; ++i) {
            cluster.restartServer(((RaftServer.Division)followers.get(i)).getId(), false);
        }
        RaftTestUtil.waitForLeader(cluster);
        Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100L);
        Predicate<RaftProtos.LogEntryProto> predicate = l -> l.getTerm() != 1L;
        cluster.getServerAliveStream().map(RaftServer.Division::getRaftLog).forEach(log -> RaftTestUtil.checkLogEntries((RaftLog)log, (RaftTestUtil.SimpleMessage[])messages, (Predicate)predicate));
    }

    @Test
    public void testWithLoad() throws Exception {
        this.runWithNewCluster(5, cluster -> RaftBasicTests.testWithLoad((int)10, (int)300, (boolean)false, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    static void testWithLoad(int numClients, int numMessages, boolean useAsync, MiniRaftCluster cluster, Logger LOG) throws Exception {
        LOG.info("Running testWithLoad: numClients=" + numClients + ", numMessages=" + numMessages + ", async=" + useAsync);
        RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        List<Client4TestWithLoad> clients = Stream.iterate(0, i -> i + 1).limit(numClients).map(i -> new Client4TestWithLoad(i.intValue(), numMessages, useAsync, cluster, LOG)).collect(Collectors.toList());
        AtomicInteger lastStep = new AtomicInteger();
        Timer timer = new Timer();
        timer.schedule((TimerTask)new /* Unavailable Anonymous Inner Class!! */, 5000L, 10000L);
        clients.forEach(Thread::start);
        int count = 0;
        while (!clients.stream().noneMatch(Client4TestWithLoad::isRunning)) {
            int n = clients.stream().mapToInt(c -> c.step.get()).sum();
            Assert.assertTrue((n >= lastStep.get() ? 1 : 0) != 0);
            if (n - lastStep.get() < 50 * numClients) {
                Thread.sleep(10L);
                continue;
            }
            lastStep.set(n);
            ++count;
            try {
                RaftServer.Division leader = cluster.getLeader();
                if (leader == null) continue;
                RaftTestUtil.changeLeader((MiniRaftCluster)cluster, (RaftPeerId)leader.getId());
            }
            catch (IllegalStateException e) {
                LOG.error("Failed to change leader ", (Throwable)e);
            }
        }
        LOG.info("Leader change count=" + count);
        timer.cancel();
        for (Client4TestWithLoad c2 : clients) {
            if (c2.exceptionInClientThread.get() != null) {
                throw new AssertionError(c2.exceptionInClientThread.get());
            }
            RaftTestUtil.assertLogEntries((MiniRaftCluster)cluster, (RaftTestUtil.SimpleMessage[])c2.messages);
        }
    }

    public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
        RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        Timestamp startTime = Timestamp.currentTime();
        try (RaftClient client = cluster.createClient();){
            ClientInvocationId invocationId = RaftClientTestUtil.getClientInvocationId((RaftClient)client);
            cluster.getServerAliveStream().forEach(raftServer -> RetryCacheTestUtil.getOrCreateEntry((RaftServer.Division)raftServer, (ClientInvocationId)invocationId));
            if (async) {
                CompletableFuture replyFuture = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
                replyFuture.get();
            } else {
                client.io().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            }
            TimeDuration duration = startTime.elapsedTime();
            TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime((RaftProperties)cluster.getProperties());
            Assert.assertTrue((duration.compareTo(retryCacheExpiryDuration) >= 0 ? 1 : 0) != 0);
        }
    }

    public static void testStateMachineMetrics(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        try (RaftClient client = cluster.createClient();){
            Gauge appliedIndexGauge = RaftBasicTests.getStatemachineGaugeWithName((RaftServer.Division)leader, (String)"appliedIndex");
            Gauge smAppliedIndexGauge = RaftBasicTests.getStatemachineGaugeWithName((RaftServer.Division)leader, (String)"applyCompletedIndex");
            long appliedIndexBefore = (Long)appliedIndexGauge.getValue();
            long smAppliedIndexBefore = (Long)smAppliedIndexGauge.getValue();
            RaftBasicTests.checkFollowerCommitLagsLeader((MiniRaftCluster)cluster);
            if (async) {
                CompletableFuture replyFuture = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
                replyFuture.get();
            } else {
                client.io().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            }
            long appliedIndexAfter = (Long)appliedIndexGauge.getValue();
            long smAppliedIndexAfter = (Long)smAppliedIndexGauge.getValue();
            RaftBasicTests.checkFollowerCommitLagsLeader((MiniRaftCluster)cluster);
            Assert.assertTrue((String)"StateMachine Applied Index not incremented", (appliedIndexAfter > appliedIndexBefore ? 1 : 0) != 0);
            Assert.assertTrue((String)"StateMachine Apply completed Index not incremented", (smAppliedIndexAfter > smAppliedIndexBefore ? 1 : 0) != 0);
        }
    }

    private static void checkFollowerCommitLagsLeader(MiniRaftCluster cluster) {
        List followers = cluster.getFollowers();
        RaftGroupMemberId leader = cluster.getLeader().getMemberId();
        Gauge leaderCommitGauge = ServerMetricsTestUtils.getPeerCommitIndexGauge((RaftGroupMemberId)leader, (RaftPeerId)leader.getPeerId());
        for (RaftServer.Division f : followers) {
            RaftGroupMemberId follower = f.getMemberId();
            Gauge followerCommitGauge = ServerMetricsTestUtils.getPeerCommitIndexGauge((RaftGroupMemberId)leader, (RaftPeerId)follower.getPeerId());
            Assert.assertTrue(((Long)leaderCommitGauge.getValue() >= (Long)followerCommitGauge.getValue() ? 1 : 0) != 0);
            Gauge followerMetric = ServerMetricsTestUtils.getPeerCommitIndexGauge((RaftGroupMemberId)follower, (RaftPeerId)follower.getPeerId());
            System.out.println(followerCommitGauge.getValue());
            System.out.println(followerMetric.getValue());
            Assert.assertTrue(((Long)followerCommitGauge.getValue() <= (Long)followerMetric.getValue() ? 1 : 0) != 0);
        }
    }

    private static Gauge getStatemachineGaugeWithName(RaftServer.Division server, String gaugeName) {
        MetricRegistryInfo info = new MetricRegistryInfo(server.getMemberId().toString(), "ratis", "state_machine", "Metrics for State Machine Updater");
        Optional metricRegistry = MetricRegistries.global().get(info);
        Assert.assertTrue((boolean)metricRegistry.isPresent());
        return ServerMetricsTestUtils.getGaugeWithName((String)gaugeName, metricRegistry::get);
    }
}

