/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.grpc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RetryCacheTests;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class TestRetryCacheWithGrpc
extends RetryCacheTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
    public TestRetryCacheWithGrpc() {
        Slf4jUtils.setLogLevel((Logger)RetryCache.LOG, (Level)Level.TRACE);
        this.getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testInvalidateRepliedCalls() throws Exception {
        this.runWithNewCluster(3, cluster -> new InvalidateRepliedCallsTest((MiniRaftCluster)cluster).run());
    }

    static long assertReply(RaftClientReply reply) {
        Assertions.assertTrue((boolean)reply.isSuccess());
        return reply.getCallId();
    }

    @Test
    @Timeout(value=10L)
    public void testRetryOnResourceUnavailableException() throws InterruptedException, IOException {
        RaftProperties properties = new RaftProperties();
        properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        RaftServerConfigKeys.Write.setElementLimit((RaftProperties)properties, (int)1);
        MiniRaftClusterWithGrpc cluster = (MiniRaftClusterWithGrpc)this.getFactory().newCluster(3, properties);
        cluster.start();
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        RaftServer leaderProxy = leader.getRaftServer();
        for (RaftServer.Division follower : cluster.getFollowers()) {
            ((SimpleStateMachine4Testing)follower.getStateMachine()).blockWriteStateMachineData();
        }
        AtomicBoolean failure = new AtomicBoolean(false);
        long callId = 1L;
        ClientId clientId = ClientId.randomId();
        RaftClientRequest r = null;
        while (!failure.get()) {
            long cid = callId;
            r = cluster.newRaftClientRequest(clientId, leaderProxy.getId(), callId++, (Message)new RaftTestUtil.SimpleMessage("message"));
            CompletableFuture f = leaderProxy.submitClientRequestAsync(r);
            f.exceptionally(e -> {
                if (e.getCause() instanceof ResourceUnavailableException) {
                    RetryCacheTestUtil.isFailed((RetryCache.Entry)RetryCacheTestUtil.get((RaftServer.Division)leader, (ClientId)clientId, (long)cid));
                    failure.set(true);
                }
                return null;
            });
        }
        for (RaftServer.Division follower : cluster.getFollowers()) {
            ((SimpleStateMachine4Testing)follower.getStateMachine()).unblockWriteStateMachineData();
        }
        while (failure.get()) {
            try {
                RaftClientReply reply = (RaftClientReply)leaderProxy.submitClientRequestAsync(r).get();
                if (!reply.isSuccess()) continue;
                failure.set(false);
            }
            catch (Exception exception) {}
        }
        cluster.shutdown();
    }

    class InvalidateRepliedCallsTest {
        private final MiniRaftCluster cluster;
        private final RaftServer.Division leader;
        private final AtomicInteger count = new AtomicInteger();

        InvalidateRepliedCallsTest(MiniRaftCluster cluster) throws Exception {
            this.cluster = cluster;
            this.leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        }

        RaftTestUtil.SimpleMessage nextMessage() {
            return new RaftTestUtil.SimpleMessage("m" + this.count.incrementAndGet());
        }

        void assertRetryCacheEntry(RaftClient client, long callId, boolean exist) throws InterruptedException {
            this.assertRetryCacheEntry(client, callId, exist, false);
        }

        void assertRetryCacheEntry(RaftClient client, long callId, boolean exist, boolean eventually) throws InterruptedException {
            Consumer<RetryCache.Entry> assertion;
            Supplier<RetryCache.Entry> lookup = () -> RetryCacheTestUtil.get((RaftServer.Division)this.leader, (ClientId)client.getId(), (long)callId);
            Consumer<RetryCache.Entry> consumer = assertion = exist ? Assertions::assertNotNull : Assertions::assertNull;
            if (eventually) {
                JavaUtils.attempt(() -> assertion.accept((RetryCache.Entry)lookup.get()), (int)100, (TimeDuration)TimeDuration.ONE_MILLISECOND, (String)"retry cache entry", null);
            } else {
                assertion.accept(lookup.get());
            }
        }

        long send(RaftClient client, Long previousCallId) throws Exception {
            RaftClientReply reply = client.io().send((Message)this.nextMessage());
            long callId = TestRetryCacheWithGrpc.assertReply(reply);
            if (previousCallId != null) {
                this.assertRetryCacheEntry(client, previousCallId, false);
            }
            this.assertRetryCacheEntry(client, callId, true);
            return callId;
        }

        CompletableFuture<Long> sendAsync(RaftClient client) {
            return client.async().send((Message)this.nextMessage()).thenApply(TestRetryCacheWithGrpc::assertReply);
        }

        CompletableFuture<Long> watch(long logIndex, RaftClient client) {
            return client.async().watch(logIndex, RaftProtos.ReplicationLevel.MAJORITY).thenApply(TestRetryCacheWithGrpc::assertReply);
        }

        void run() throws Exception {
            try (RaftClient client = this.cluster.createClient();){
                Long lastBlockingCall = null;
                for (int i = 0; i < 5; ++i) {
                    lastBlockingCall = this.send(client, lastBlockingCall);
                }
                long lastBlockingCallId = lastBlockingCall;
                SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get((RaftServer.Division)this.leader);
                stateMachine.blockApplyTransaction();
                ArrayList<CompletableFuture<Long>> asyncCalls = new ArrayList<CompletableFuture<Long>>();
                for (int i = 0; i < 5; ++i) {
                    asyncCalls.add(this.sendAsync(client));
                }
                this.assertRetryCacheEntry(client, lastBlockingCallId, false, true);
                BaseTest.ONE_SECOND.sleep();
                for (CompletableFuture completableFuture : asyncCalls) {
                    Assertions.assertFalse((boolean)completableFuture.isDone());
                }
                stateMachine.unblockApplyTransaction();
                for (CompletableFuture completableFuture : asyncCalls) {
                    this.assertRetryCacheEntry(client, (Long)completableFuture.join(), true);
                }
                long oneMoreBlockingCall = this.send(client, null);
                TestRetryCacheWithGrpc.this.LOG.info("oneMoreBlockingCall callId={}", (Object)oneMoreBlockingCall);
                this.assertRetryCacheEntry(client, oneMoreBlockingCall, true);
                for (CompletableFuture completableFuture : asyncCalls) {
                    this.assertRetryCacheEntry(client, (Long)completableFuture.join(), false);
                }
                long watchAsyncCall = this.watch(1L, client).get();
                TestRetryCacheWithGrpc.this.LOG.info("watchAsyncCall callId={}", (Object)watchAsyncCall);
                this.assertRetryCacheEntry(client, oneMoreBlockingCall, false);
                this.assertRetryCacheEntry(client, watchAsyncCall, false);
            }
        }
    }
}

