/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public abstract class RequestLimitAsyncBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    private final int writeElementLimit = 5;
    private final int watchElementLimit = 2;

    public RequestLimitAsyncBaseTest() {
        RaftProperties p = this.setStateMachine(SimpleStateMachine4Testing.class);
        RaftServerConfigKeys.Write.setElementLimit((RaftProperties)p, (int)5);
        RaftServerConfigKeys.Watch.setElementLimit((RaftProperties)p, (int)2);
        RaftServerConfigKeys.Rpc.setRequestTimeout((RaftProperties)p, (TimeDuration)FIVE_SECONDS);
        RaftClientConfigKeys.Rpc.setRequestTimeout((RaftProperties)p, (TimeDuration)FIVE_SECONDS);
    }

    @Test
    public void testWriteElementLimit() throws Exception {
        this.runWithSameCluster(1, this::runTestWriteElementLimit);
    }

    void runTestWriteElementLimit(CLUSTER cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
        try (RaftClient c1 = ((MiniRaftCluster)cluster).createClient(leader.getId());){
            RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage("first");
            CompletableFuture future = c1.async().send((Message)message);
            RaftClientReply reply = (RaftClientReply)RequestLimitAsyncBaseTest.getWithDefaultTimeout((Future)future);
            Assertions.assertTrue((boolean)reply.isSuccess());
            BlockingQueue<Runnable> toBeCompleted = SimpleStateMachine4Testing.get(leader).collecting().enable(SimpleStateMachine4Testing.Collecting.Type.APPLY_TRANSACTION);
            ArrayList<CompletableFuture> writeFutures = new ArrayList<CompletableFuture>();
            for (int i = 0; i < 5; ++i) {
                RaftTestUtil.SimpleMessage message2 = new RaftTestUtil.SimpleMessage("m" + i);
                writeFutures.add(c1.async().send((Message)message2));
            }
            long watchBase = 1000L;
            for (int i = 0; i < 2; ++i) {
                c1.async().watch(1000L + (long)i, RaftProtos.ReplicationLevel.ALL);
            }
            HUNDRED_MILLIS.sleep();
            try (RaftClient c2 = ((MiniRaftCluster)cluster).createClient(leader.getId(), RetryPolicies.noRetry());){
                RaftTestUtil.SimpleMessage message3 = new RaftTestUtil.SimpleMessage("err");
                this.testFailureCase("send should fail", () -> c2.io().send((Message)message3), ResourceUnavailableException.class, new Class[0]);
                this.testFailureCase("sendAsync should fail", () -> {
                    RaftClientReply cfr_ignored_0 = (RaftClientReply)c2.async().send((Message)message3).get();
                }, ExecutionException.class, new Class[]{ResourceUnavailableException.class});
                long watchIndex = 1002L;
                this.testFailureCase("sendWatch should fail", () -> c2.io().watch(1002L, RaftProtos.ReplicationLevel.ALL), ResourceUnavailableException.class, new Class[0]);
                this.testFailureCase("sendWatchAsync should fail", () -> {
                    RaftClientReply cfr_ignored_0 = (RaftClientReply)c2.async().watch(1002L, RaftProtos.ReplicationLevel.ALL).get();
                }, ExecutionException.class, new Class[]{ResourceUnavailableException.class});
            }
            toBeCompleted.forEach(Runnable::run);
            for (CompletableFuture f : writeFutures) {
                RaftClientReply reply2 = (RaftClientReply)RequestLimitAsyncBaseTest.getWithDefaultTimeout((Future)f);
                Assertions.assertTrue((boolean)reply2.isSuccess());
            }
        }
    }

    static {
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        Slf4jUtils.setLogLevel((Logger)RaftClient.LOG, (Level)Level.DEBUG);
        RaftServerTestUtil.setPendingRequestsLogLevel(Level.DEBUG);
    }
}

