/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftBasicTests;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedRunnable;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/*
 * Exception performing whole class analysis ignored.
 */
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 3;
    private static final DelayLocalExecutionInjection LOG_SYNC_DELAY = RaftServerTestUtil.getLogSyncDelay();

    public RaftAsyncTests() {
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        Slf4jUtils.setLogLevel((Logger)RaftClient.LOG, (Level)Level.DEBUG);
        this.getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testAsyncConfiguration() throws IOException {
        this.LOG.info("Running testAsyncConfiguration");
        RaftProperties properties = new RaftProperties();
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)properties, (boolean)false);
        RaftPeer server = RaftPeer.newBuilder().setId("s0").build();
        RaftClient.Builder clientBuilder = RaftClient.newBuilder().setRaftGroup(RaftGroup.valueOf((RaftGroupId)RaftGroupId.randomId(), (RaftPeer[])new RaftPeer[]{server})).setProperties(properties);
        int maxOutstandingRequests = 100;
        try (RaftClient client = clientBuilder.build();){
            RaftClientTestUtil.assertAsyncRequestSemaphore((RaftClient)client, (int)maxOutstandingRequests, (int)0);
        }
        maxOutstandingRequests = 5;
        RaftClientConfigKeys.Async.setOutstandingRequestsMax((RaftProperties)properties, (int)maxOutstandingRequests);
        client = clientBuilder.build();
        var6_6 = null;
        try {
            RaftClientTestUtil.assertAsyncRequestSemaphore((RaftClient)client, (int)maxOutstandingRequests, (int)0);
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var6_6 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var6_6.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    static void assertRaftRetryFailureException(RaftRetryFailureException rfe, RetryPolicy retryPolicy, String name) {
        Assert.assertNotNull((String)(name + " does not have RaftRetryFailureException"), (Object)rfe);
        Assert.assertTrue((String)(name + ": unexpected error message, rfe=" + rfe + ", retryPolicy=" + retryPolicy), (boolean)rfe.getMessage().contains(retryPolicy.toString()));
    }

    @Test
    public void testRequestAsyncWithRetryFailure() throws Exception {
        this.runTestRequestAsyncWithRetryFailure(false);
    }

    @Test
    public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws Exception {
        this.runTestRequestAsyncWithRetryFailure(true);
    }

    void runTestRequestAsyncWithRetryFailure(boolean initialMessages) throws Exception {
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)false);
        this.runWithNewCluster(1, initialMessages, cluster -> this.runTestRequestAsyncWithRetryFailure(initialMessages, cluster));
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)true);
    }

    void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluster) throws Exception {
        TimeDuration sleepTime = HUNDRED_MILLIS;
        RetryPolicies.RetryLimited retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep((int)10, (TimeDuration)sleepTime);
        try (RaftClient client = cluster.createClient(null, (RetryPolicy)retryPolicy);){
            int i;
            RaftPeerId leader = null;
            if (initialMessages) {
                leader = RaftTestUtil.waitForLeader(cluster).getId();
                RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)10, (String)"initial-");
                ArrayList<CompletableFuture> replies = new ArrayList<CompletableFuture>();
                for (i = 0; i < messages.length; ++i) {
                    replies.add(client.async().send((Message)messages[i]));
                }
                for (i = 0; i < messages.length; ++i) {
                    RaftTestUtil.assertSuccessReply((CompletableFuture)((CompletableFuture)replies.get(i)));
                }
                cluster.killServer(leader);
            }
            ArrayList<CompletableFuture> replies = new ArrayList<CompletableFuture>();
            RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)10);
            for (i = 0; i < messages.length / 2; ++i) {
                replies.add(client.async().send((Message)messages[i]));
            }
            sleepTime.apply(t -> t * (long)(retryPolicy.getMaxAttempts() - 1)).sleep();
            while (i < messages.length) {
                replies.add(client.async().send((Message)messages[i]));
                ++i;
            }
            Assert.assertEquals((long)messages.length, (long)replies.size());
            sleepTime.apply(t -> t * 2L).sleep();
            if (leader != null) {
                cluster.restartServer(leader, false);
            } else {
                cluster.start();
            }
            for (int i2 = 0; i2 < replies.size(); ++i2) {
                CheckedRunnable getReply = ((CompletableFuture)replies.get(i2))::get;
                String name = "retry-failure-" + i2;
                if (i2 == 0) {
                    Throwable t2 = this.testFailureCase(name, getReply, ExecutionException.class, new Class[]{RaftRetryFailureException.class});
                    RaftAsyncTests.assertRaftRetryFailureException((RaftRetryFailureException)((RaftRetryFailureException)t2.getCause()), (RetryPolicy)retryPolicy, (String)name);
                    continue;
                }
                this.testFailureCase(name, getReply, ExecutionException.class, new Class[]{AlreadyClosedException.class, RaftRetryFailureException.class});
            }
            this.testFailureCaseAsync("last-request", () -> client.async().send((Message)new RaftTestUtil.SimpleMessage("last")), AlreadyClosedException.class, new Class[]{RaftRetryFailureException.class});
        }
    }

    @Test
    public void testAsyncRequestSemaphore() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestAsyncRequestSemaphore(arg_0));
    }

    void runTestAsyncRequestSemaphore(CLUSTER cluster) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        int numMessages = RaftClientConfigKeys.Async.outstandingRequestsMax((RaftProperties)this.getProperties());
        CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
        RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create((int)numMessages);
        try (RaftClient client = cluster.createClient();){
            int i;
            StreamSupport.stream(cluster.getServers().spliterator(), false).map(arg_0 -> cluster.getDivision(arg_0)).map(SimpleStateMachine4Testing::get).forEach(SimpleStateMachine4Testing::blockStartTransaction);
            AtomicInteger blockedRequestsCount = new AtomicInteger();
            for (i = 0; i < numMessages; ++i) {
                blockedRequestsCount.getAndIncrement();
                futures[i] = client.async().send((Message)messages[i]);
                blockedRequestsCount.decrementAndGet();
            }
            Assert.assertEquals((long)0L, (long)blockedRequestsCount.get());
            futures[numMessages] = CompletableFuture.supplyAsync(() -> {
                blockedRequestsCount.incrementAndGet();
                client.async().send((Message)new RaftTestUtil.SimpleMessage("n1"));
                blockedRequestsCount.decrementAndGet();
                return null;
            });
            while (blockedRequestsCount.get() != 1) {
                Thread.sleep(1000L);
            }
            Assert.assertEquals((long)1L, (long)blockedRequestsCount.get());
            RaftClientTestUtil.assertAsyncRequestSemaphore((RaftClient)client, (int)0, (int)1);
            StreamSupport.stream(cluster.getServers().spliterator(), false).map(arg_0 -> cluster.getDivision(arg_0)).map(SimpleStateMachine4Testing::get).forEach(SimpleStateMachine4Testing::unblockStartTransaction);
            for (i = 0; i <= numMessages; ++i) {
                futures[i].join();
            }
            Assert.assertEquals((long)0L, (long)blockedRequestsCount.get());
        }
    }

    void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception {
        this.runWithNewCluster(killLeader ? 5 : 3, cluster -> RaftBasicTests.runTestBasicAppendEntries((boolean)true, (boolean)killLeader, (int)100, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    @Test
    public void testBasicAppendEntriesAsync() throws Exception {
        this.runTestBasicAppendEntriesAsync(false);
    }

    @Test
    public void testBasicAppendEntriesAsyncKillLeader() throws Exception {
        this.runTestBasicAppendEntriesAsync(true);
    }

    @Test
    public void testWithLoadAsync() throws Exception {
        this.runWithNewCluster(3, cluster -> RaftBasicTests.testWithLoad((int)5, (int)500, (boolean)true, (MiniRaftCluster)cluster, (Logger)this.LOG));
    }

    @Test
    public void testStaleReadAsync() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestStaleReadAsync(arg_0));
    }

    void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
        int numMesssages = 10;
        try (RaftClient client = cluster.createClient();){
            RaftTestUtil.waitForLeader(cluster);
            ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
            for (int i = 0; i < 10; ++i) {
                String s = "" + i;
                this.LOG.info("sendAsync " + (String)s);
                futures.add(client.async().send((Message)new RaftTestUtil.SimpleMessage(s)));
            }
            Assert.assertEquals((long)10L, (long)futures.size());
            ArrayList<RaftClientReply> replies = new ArrayList<RaftClientReply>();
            for (CompletableFuture completableFuture : futures) {
                RaftClientReply r = (RaftClientReply)completableFuture.join();
                Assert.assertTrue((boolean)r.isSuccess());
                replies.add(r);
            }
            futures.clear();
            RaftClientReply lastWriteReply = (RaftClientReply)replies.get(replies.size() - 1);
            RaftPeerId raftPeerId = lastWriteReply.getServerId();
            this.LOG.info("leader = " + raftPeerId);
            Collection commitInfos = lastWriteReply.getCommitInfos();
            this.LOG.info("commitInfos = " + commitInfos);
            RaftProtos.CommitInfoProto followerCommitInfo = commitInfos.stream().filter(info -> !RaftPeerId.valueOf((ByteString)info.getServer().getId()).equals((Object)leader)).max(Comparator.comparing(RaftProtos.CommitInfoProto::getCommitIndex)).get();
            RaftPeerId follower = RaftPeerId.valueOf((ByteString)followerCommitInfo.getServer().getId());
            long followerCommitIndex = followerCommitInfo.getCommitIndex();
            this.LOG.info("max follower = {}, commitIndex = {}", (Object)follower, (Object)followerCommitIndex);
            this.testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index", () -> client.async().sendStaleRead((Message)new RaftTestUtil.SimpleMessage("9223372036854775807"), followerCommitInfo.getCommitIndex(), follower), StateMachineException.class, new Class[]{IndexOutOfBoundsException.class});
            for (int i = 0; i < 10; ++i) {
                RaftClientReply reply = (RaftClientReply)replies.get(i);
                String query = "" + i;
                this.LOG.info("query=" + query + ", reply=" + reply);
                RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage(query);
                CompletableFuture readFuture = client.async().sendReadOnly((Message)message);
                futures.add(((CompletableFuture)readFuture.thenCompose(arg_0 -> this.lambda$runTestStaleReadAsync$9(reply, followerCommitIndex, query, client, (Message)message, follower, arg_0))).thenApply(staleReadReply -> {
                    if (staleReadReply == null) {
                        return null;
                    }
                    ByteString expected = ((RaftClientReply)readFuture.join()).getMessage().getContent();
                    ByteString computed = staleReadReply.getMessage().getContent();
                    try {
                        this.LOG.info("query " + query + " returns " + RaftProtos.LogEntryProto.parseFrom((ByteString)expected).getStateMachineLogEntry().getLogData().toStringUtf8());
                    }
                    catch (InvalidProtocolBufferException e) {
                        throw new CompletionException(e);
                    }
                    Assert.assertEquals((String)("log entry mismatch for query=" + query), (Object)expected, (Object)computed);
                    return null;
                }));
            }
            JavaUtils.allOf(futures).join();
        }
    }

    @Test
    public void testWriteAsyncCustomReplicationLevel() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestWriteAsyncCustomReplicationLevel(arg_0));
    }

    void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
        int numMessages = 20;
        try (RaftClient client = cluster.createClient();){
            RaftTestUtil.waitForLeader(cluster);
            for (int i = 0; i < 20; ++i) {
                String s = "" + i;
                this.LOG.info("sendAsync with ALL_COMMITTED " + s);
                client.async().send((Message)new RaftTestUtil.SimpleMessage(s), RaftProtos.ReplicationLevel.ALL_COMMITTED).whenComplete((reply, exception) -> {
                    if (exception != null) {
                        this.LOG.error("Failed to send message " + s, exception);
                        Assert.assertNull((Object)reply);
                        return;
                    }
                    Assert.assertTrue((boolean)reply.isSuccess());
                    Assert.assertNull((Object)reply.getException());
                    reply.getCommitInfos().forEach(commitInfoProto -> Assert.assertTrue((commitInfoProto.getCommitIndex() >= reply.getLogIndex() ? 1 : 0) != 0));
                });
            }
        }
    }

    @Test
    public void testRequestTimeout() throws Exception {
        TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime((RaftProperties)this.getProperties());
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)FIVE_SECONDS);
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)false);
        this.runWithNewCluster(3, cluster -> RaftBasicTests.testRequestTimeout((boolean)true, (MiniRaftCluster)cluster, (Logger)this.LOG));
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)oldExpiryTime);
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)true);
    }

    @Test
    public void testStateMachineMetrics() throws Exception {
        this.runWithNewCluster(3, cluster -> RaftBasicTests.runTestStateMachineMetrics((boolean)true, (MiniRaftCluster)cluster));
    }

    @Test
    public void testAppendEntriesTimeout() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestAppendEntriesTimeout(arg_0));
    }

    void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
        this.LOG.info("Running testAppendEntriesTimeout");
        TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime((RaftProperties)this.getProperties());
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)TimeDuration.valueOf((long)20L, (TimeUnit)TimeUnit.SECONDS));
        RaftTestUtil.waitForLeader(cluster);
        long time = System.currentTimeMillis();
        long waitTime = 5000L;
        try (RaftClient client = cluster.createClient();){
            cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).map(SimpleStateMachine4Testing::get).forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
            CompletableFuture replyFuture = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            Thread.sleep(waitTime);
            Assert.assertFalse((boolean)replyFuture.isDone());
            cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).map(SimpleStateMachine4Testing::get).forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
            Assert.assertTrue((boolean)((RaftClientReply)replyFuture.get()).isSuccess());
            Assert.assertTrue((System.currentTimeMillis() - time > waitTime ? 1 : 0) != 0);
        }
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)this.getProperties(), (TimeDuration)oldExpiryTime);
    }

    @Test
    public void testCheckLeadershipFailure() throws Exception {
        this.runWithNewCluster(3, arg_0 -> this.runTestCheckLeadershipFailure(arg_0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTestCheckLeadershipFailure(CLUSTER cluster) throws Exception {
        this.LOG.info("Running testCheckLeadershipFailure");
        RaftTestUtil.waitForLeader(cluster);
        RaftServer.Division prevLeader = cluster.getLeader();
        long termOfPrevLeader = prevLeader.getInfo().getCurrentTerm();
        this.LOG.info("Previous Leader is elected on term {}", (Object)termOfPrevLeader);
        try (RaftClient client = cluster.createClient();){
            cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).map(SimpleStateMachine4Testing::get).forEach(peer -> LOG_SYNC_DELAY.setDelayMs(peer.getId().toString(), 1000));
            client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            Thread.sleep(1000L);
            cluster.getServerAliveStream().map(RaftServer.Division::getInfo).forEach(info -> Assert.assertTrue((!info.isLeader() || info.getCurrentTerm() > termOfPrevLeader ? 1 : 0) != 0));
        }
        finally {
            LOG_SYNC_DELAY.clear();
        }
        RaftTestUtil.waitForLeader(cluster);
        RaftServer.Division currLeader = cluster.getLeader();
        long termOfCurrLeader = currLeader.getInfo().getCurrentTerm();
        this.LOG.info("Current Leader is elected on term {}", (Object)termOfCurrLeader);
        Assert.assertTrue((termOfPrevLeader < termOfCurrLeader ? 1 : 0) != 0);
    }

    @Test
    public void testNoRetryWaitOnNotLeaderException() throws Exception {
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)false);
        this.runWithNewCluster(3, arg_0 -> this.runTestNoRetryWaitOnNotLeaderException(arg_0));
        RaftClientConfigKeys.Async.Experimental.setSendDummyRequest((RaftProperties)this.getProperties(), (boolean)true);
    }

    private void runTestNoRetryWaitOnNotLeaderException(MiniRaftCluster cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        List followers = cluster.getFollowers();
        Assert.assertNotNull((Object)followers);
        Assert.assertEquals((long)2L, (long)followers.size());
        Assert.assertNotSame((Object)leader, followers.get(0));
        Assert.assertNotSame((Object)leader, followers.get(1));
        try (RaftClient client = cluster.createClient(leader.getId());){
            CompletableFuture f = client.async().send((Message)new RaftTestUtil.SimpleMessage("first"));
            FIVE_SECONDS.apply(f::get);
        }
        RetryPolicy r = event -> () -> TimeDuration.valueOf((long)60L, (TimeUnit)TimeUnit.SECONDS);
        try (RaftClient client = cluster.createClient(((RaftServer.Division)followers.get(0)).getId(), cluster.getGroup(), r);){
            CompletableFuture f = client.async().send((Message)new RaftTestUtil.SimpleMessage("abc"));
            FIVE_SECONDS.apply(f::get);
        }
        catch (TimeoutException e) {
            throw new AssertionError("Failed to get async result", e);
        }
    }

    private /* synthetic */ CompletionStage lambda$runTestStaleReadAsync$9(RaftClientReply reply, long followerCommitIndex, String query, RaftClient client, Message message, RaftPeerId follower, RaftClientReply r) {
        if (reply.getLogIndex() <= followerCommitIndex) {
            this.LOG.info("sendStaleReadAsync, query=" + query);
            return client.async().sendStaleRead(message, followerCommitIndex, follower);
        }
        return CompletableFuture.completedFuture(null);
    }
}

