/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.WatchRequestTests;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.apache.ratis.util.function.CheckedSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/*
 * Exception performing whole class analysis ignored.
 */
public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    static final int NUM_SERVERS = 3;
    static final int GET_TIMEOUT_SECOND = 10;

    public WatchRequestTests() {
        RaftServerTestUtil.setWatchRequestsLogLevel((Level)Level.DEBUG);
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
    }

    @Before
    public void setup() {
        RaftProperties p = this.getProperties();
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testWatchRequestAsync() throws Exception {
        this.runWithNewCluster(3, cluster -> WatchRequestTests.runTest(WatchRequestTests::runTestWatchRequestAsync, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
        try (RaftClient client = cluster.createClient(RaftTestUtil.waitForLeader((MiniRaftCluster)cluster).getId());){
            int[] numMessages;
            for (int n : numMessages = new int[]{1, 10, 20}) {
                TestParameters p = new TestParameters(n, client, cluster, LOG);
                LOG.info("{}) {}, {}", new Object[]{n, p, cluster.printServers()});
                testCase.accept((Object)p);
            }
        }
    }

    static void runSingleTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
        try (RaftClient client = cluster.createClient(RaftTestUtil.waitForLeader((MiniRaftCluster)cluster).getId());){
            int[] numMessages;
            for (int n : numMessages = new int[]{1}) {
                TestParameters p = new TestParameters(n, client, cluster, LOG);
                LOG.info("{}) {}, {}", new Object[]{n, p, cluster.printServers()});
                testCase.accept((Object)p);
            }
        }
    }

    static void runTestWatchRequestAsync(TestParameters p) throws Exception {
        Logger LOG = p.log;
        MiniRaftCluster cluster = p.cluster;
        int numMessages = p.numMessages;
        RaftServer.Division leader = cluster.getLeader();
        LOG.info("block leader {}", (Object)leader.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)leader).blockStartTransaction();
        List followers = cluster.getFollowers();
        RaftServer.Division blockedFollower = (RaftServer.Division)followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        LOG.info("block follower {}", (Object)blockedFollower.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).blockFlushStateMachineData();
        ArrayList replies = new ArrayList();
        ArrayList watches = new ArrayList();
        p.sendRequests(replies, watches);
        Assert.assertEquals((long)numMessages, (long)replies.size());
        Assert.assertEquals((long)numMessages, (long)watches.size());
        TimeUnit.SECONDS.sleep(1L);
        WatchRequestTests.assertNotDone(replies);
        WatchRequestTests.assertNotDone(watches);
        SimpleStateMachine4Testing.get((RaftServer.Division)leader).unblockStartTransaction();
        LOG.info("unblock leader {}", (Object)leader.getId());
        WatchRequestTests.checkMajority(replies, watches, (Logger)LOG);
        Assert.assertEquals((long)numMessages, (long)watches.size());
        TimeUnit.SECONDS.sleep(1L);
        WatchRequestTests.assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> WatchReplies.access$200((WatchReplies)w)));
        WatchRequestTests.assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> WatchReplies.access$100((WatchReplies)w)));
        LOG.info("unblock follower {}", (Object)blockedFollower.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).unblockFlushStateMachineData();
        WatchRequestTests.checkAll(watches, (Logger)LOG);
    }

    static void checkMajority(List<CompletableFuture<RaftClientReply>> replies, List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
        for (int i = 0; i < replies.size(); ++i) {
            RaftClientReply reply = replies.get(i).get(10L, TimeUnit.SECONDS);
            LOG.info("checkMajority {}: receive {}", (Object)i, (Object)reply);
            long logIndex = reply.getLogIndex();
            Assert.assertTrue((boolean)reply.isSuccess());
            WatchReplies watchReplies = watches.get(i).get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((long)logIndex, (long)WatchReplies.access$000((WatchReplies)watchReplies));
            RaftClientReply watchMajorityReply = watchReplies.getMajority();
            RaftClientReply watchMajorityCommittedReply = watchReplies.getMajorityCommitted();
            Collection commitInfos = watchMajorityCommittedReply.getCommitInfos();
            String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString((Collection)commitInfos);
            Assert.assertEquals((long)3L, (long)commitInfos.size());
            long min = commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex).min(Long::compare).get();
            Assert.assertTrue((String)message, (logIndex > min ? 1 : 0) != 0);
            commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex).sorted(Long::compare).skip(1L).forEach(ci -> Assert.assertTrue((String)message, (logIndex <= ci ? 1 : 0) != 0));
        }
    }

    static void checkAll(List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
        for (int i = 0; i < watches.size(); ++i) {
            WatchReplies watchReplies = watches.get(i).get(10L, TimeUnit.SECONDS);
            long logIndex = WatchReplies.access$000((WatchReplies)watchReplies);
            LOG.info("checkAll {}: logIndex={}", (Object)i, (Object)logIndex);
            RaftClientReply watchAllReply = watchReplies.getAll();
            RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted();
            Collection commitInfos = watchAllCommittedReply.getCommitInfos();
            String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString((Collection)commitInfos);
            Assert.assertEquals((long)3L, (long)commitInfos.size());
            commitInfos.forEach(info -> Assert.assertTrue((String)message, (logIndex <= info.getCommitIndex() ? 1 : 0) != 0));
        }
    }

    static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
        WatchRequestTests.assertNotDone(futures.stream());
    }

    static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) {
        futures.forEach(f -> {
            if (f.isDone()) {
                try {
                    Assert.fail((String)("Done unexpectedly: " + f.get()));
                }
                catch (Exception e) {
                    Assert.fail((String)("Done unexpectedly and failed to get: " + e));
                }
            }
        });
    }

    @Test
    public void testWatchRequestAsyncChangeLeader() throws Exception {
        this.runWithNewCluster(3, cluster -> WatchRequestTests.runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    static void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception {
        Logger LOG = p.log;
        MiniRaftCluster cluster = p.cluster;
        int numMessages = p.numMessages;
        List followers = cluster.getFollowers();
        RaftServer.Division blockedFollower = (RaftServer.Division)followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        LOG.info("block follower {}", (Object)blockedFollower.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).blockFlushStateMachineData();
        ArrayList replies = new ArrayList();
        ArrayList watches = new ArrayList();
        p.sendRequests(replies, watches);
        Assert.assertEquals((long)numMessages, (long)replies.size());
        Assert.assertEquals((long)numMessages, (long)watches.size());
        WatchRequestTests.checkMajority(replies, watches, (Logger)LOG);
        TimeUnit.SECONDS.sleep(1L);
        WatchRequestTests.assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> WatchReplies.access$100((WatchReplies)w)));
        RaftTestUtil.changeLeader((MiniRaftCluster)cluster, (RaftPeerId)cluster.getLeader().getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).unblockFlushStateMachineData();
        LOG.info("unblock follower {}", (Object)blockedFollower.getId());
        WatchRequestTests.checkAll(watches, (Logger)LOG);
    }

    @Test
    public void testWatchRequestTimeout() throws Exception {
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Watch.setTimeout((RaftProperties)p, (TimeDuration)TimeDuration.valueOf((long)500L, (TimeUnit)TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.Watch.setTimeoutDenomination((RaftProperties)p, (TimeDuration)TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.MILLISECONDS));
        try {
            this.runWithNewCluster(3, cluster -> WatchRequestTests.runTest(WatchRequestTests::runTestWatchRequestTimeout, (MiniRaftCluster)cluster, (Logger)this.LOG));
        }
        finally {
            RaftServerConfigKeys.Watch.setTimeout((RaftProperties)p, (TimeDuration)RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
            RaftServerConfigKeys.Watch.setTimeoutDenomination((RaftProperties)p, (TimeDuration)RaftServerConfigKeys.Watch.TIMEOUT_DENOMINATION_DEFAULT);
        }
    }

    static void runTestWatchRequestTimeout(TestParameters p) throws Exception {
        Logger LOG = p.log;
        MiniRaftCluster cluster = p.cluster;
        int numMessages = p.numMessages;
        RaftProperties properties = cluster.getProperties();
        TimeDuration watchTimeout = RaftServerConfigKeys.Watch.timeout((RaftProperties)properties);
        TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.Watch.timeoutDenomination((RaftProperties)properties);
        RaftServer.Division leader = cluster.getLeader();
        LOG.info("block leader {}", (Object)leader.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)leader).blockStartTransaction();
        List followers = cluster.getFollowers();
        RaftServer.Division blockedFollower = (RaftServer.Division)followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        LOG.info("block follower {}", (Object)blockedFollower.getId());
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).blockFlushStateMachineData();
        ArrayList replies = new ArrayList();
        ArrayList watches = new ArrayList();
        p.sendRequests(replies, watches);
        Assert.assertEquals((long)numMessages, (long)replies.size());
        Assert.assertEquals((long)numMessages, (long)watches.size());
        watchTimeout.sleep();
        watchTimeoutDenomination.sleep();
        WatchRequestTests.assertNotDone(replies);
        WatchRequestTests.assertNotDone(watches);
        SimpleStateMachine4Testing.get((RaftServer.Division)leader).unblockStartTransaction();
        LOG.info("unblock leader {}", (Object)leader.getId());
        WatchRequestTests.checkMajority(replies, watches, (Logger)LOG);
        WatchRequestTests.checkTimeout(replies, watches, (Logger)LOG);
        SimpleStateMachine4Testing.get((RaftServer.Division)blockedFollower).unblockFlushStateMachineData();
        LOG.info("unblock follower {}", (Object)blockedFollower.getId());
    }

    @Test
    public void testWatchRequestClientTimeout() throws Exception {
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Watch.setTimeout((RaftProperties)p, (TimeDuration)TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.SECONDS));
        RaftClientConfigKeys.Rpc.setWatchRequestTimeout((RaftProperties)p, (TimeDuration)TimeDuration.valueOf((long)15L, (TimeUnit)TimeUnit.SECONDS));
        try {
            this.runWithNewCluster(3, cluster -> WatchRequestTests.runSingleTest(WatchRequestTests::runTestWatchRequestClientTimeout, (MiniRaftCluster)cluster, (Logger)this.LOG));
        }
        finally {
            RaftServerConfigKeys.Watch.setTimeout((RaftProperties)p, (TimeDuration)RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
            RaftClientConfigKeys.Rpc.setWatchRequestTimeout((RaftProperties)p, (TimeDuration)RaftClientConfigKeys.Rpc.WATCH_REQUEST_TIMEOUT_DEFAULT);
        }
    }

    static void runTestWatchRequestClientTimeout(TestParameters p) throws Exception {
        block2: {
            Logger LOG = p.log;
            CompletableFuture watchReply = p.sendWatchRequest(1000L, RetryPolicies.noRetry());
            try {
                watchReply.get();
                Assert.fail((String)"runTestWatchRequestClientTimeout failed");
            }
            catch (Exception ex) {
                LOG.error("error occurred", (Throwable)ex);
                Assert.assertTrue((ex.getCause().getClass() == AlreadyClosedException.class || ex.getCause().getClass() == RaftRetryFailureException.class ? 1 : 0) != 0);
                if (ex.getCause() == null || ex.getCause().getCause() == null) break block2;
                Assert.assertEquals(TimeoutIOException.class, ex.getCause().getCause().getClass());
            }
        }
    }

    static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies, List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
        for (int i = 0; i < replies.size(); ++i) {
            RaftClientReply reply = replies.get(i).get(10L, TimeUnit.SECONDS);
            LOG.info("checkTimeout {}: receive {}", (Object)i, (Object)reply);
            long logIndex = reply.getLogIndex();
            Assert.assertTrue((boolean)reply.isSuccess());
            WatchReplies watchReplies = watches.get(i).get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((long)logIndex, (long)WatchReplies.access$000((WatchReplies)watchReplies));
            WatchRequestTests.assertNotReplicatedException((long)logIndex, (RaftProtos.ReplicationLevel)RaftProtos.ReplicationLevel.ALL, () -> ((WatchReplies)watchReplies).getAll());
            WatchRequestTests.assertNotReplicatedException((long)logIndex, (RaftProtos.ReplicationLevel)RaftProtos.ReplicationLevel.ALL_COMMITTED, () -> ((WatchReplies)watchReplies).getAllCommitted());
        }
    }

    static void assertNotReplicatedException(long logIndex, RaftProtos.ReplicationLevel replication, CheckedSupplier<RaftClientReply, Exception> replySupplier) throws Exception {
        try {
            replySupplier.get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            WatchRequestTests.assertNotReplicatedException((long)logIndex, (RaftProtos.ReplicationLevel)replication, (Throwable)cause);
        }
    }

    static void assertNotReplicatedException(long logIndex, RaftProtos.ReplicationLevel replication, Throwable t) {
        Assert.assertSame(NotReplicatedException.class, t.getClass());
        NotReplicatedException nre = (NotReplicatedException)t;
        Assert.assertNotNull((Object)nre);
        Assert.assertEquals((long)logIndex, (long)nre.getLogIndex());
        Assert.assertEquals((Object)replication, (Object)nre.getRequiredReplication());
    }
}

