/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.retry.ExceptionDependentRetry;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    static final int NUM_SERVERS = 3;
    static final String INCREMENT = "INCREMENT";
    static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
    static final String QUERY = "QUERY";
    final Message incrementMessage;
    final Message waitAndIncrementMessage;
    final Message queryMessage;

    public ReadOnlyRequestTests() {
        Slf4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        this.incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
        this.waitAndIncrementMessage = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
        this.queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
    }

    @BeforeEach
    public void setup() {
        RaftProperties p = this.getProperties();
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, CounterStateMachine.class, StateMachine.class);
    }

    @Test
    public void testLinearizableRead() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testReadOnlyImpl);
    }

    @Test
    public void testLeaseRead() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testReadOnlyImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReadOnlyImpl(CLUSTER cluster) throws Exception {
        try {
            RaftTestUtil.waitForLeader(cluster);
            RaftPeerId leaderId = ((MiniRaftCluster)cluster).getLeader().getId();
            try (RaftClient client = ((MiniRaftCluster)cluster).createClient(leaderId);){
                for (int i = 1; i <= 10; ++i) {
                    RaftClientReply reply = client.io().send(this.incrementMessage);
                    Assertions.assertTrue((boolean)reply.isSuccess());
                    reply = client.io().sendReadOnly(this.queryMessage);
                    Assertions.assertEquals((int)i, (int)ReadOnlyRequestTests.retrieve(reply));
                }
            }
        }
        finally {
            ((MiniRaftCluster)cluster).shutdown();
        }
    }

    @Test
    public void testLinearizableReadTimeout() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testReadOnlyTimeoutImpl);
    }

    @Test
    public void testLeaseReadTimeout() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testReadOnlyTimeoutImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReadOnlyTimeoutImpl(CLUSTER cluster) throws Exception {
        try {
            RaftTestUtil.waitForLeader(cluster);
            RaftPeerId leaderId = ((MiniRaftCluster)cluster).getLeader().getId();
            try (RaftClient client = ((MiniRaftCluster)cluster).createClient(leaderId);
                 RaftClient noRetry = ((MiniRaftCluster)cluster).createClient(leaderId, RetryPolicies.noRetry());){
                CompletableFuture result = client.async().send(this.incrementMessage);
                client.admin().transferLeadership(null, 200L);
                Assertions.assertThrows(ReadIndexException.class, () -> {
                    RaftClientReply timeoutReply = noRetry.io().sendReadOnly(this.queryMessage);
                    Assertions.assertNotNull((Object)timeoutReply.getException());
                    Assertions.assertTrue((boolean)(timeoutReply.getException() instanceof ReadException));
                });
            }
        }
        finally {
            ((MiniRaftCluster)cluster).shutdown();
        }
    }

    @Test
    public void testFollowerLinearizableRead() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testFollowerReadOnlyImpl);
    }

    @Test
    public void testFollowerLeaseRead() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testFollowerReadOnlyImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testFollowerReadOnlyImpl(CLUSTER cluster) throws Exception {
        try {
            RaftTestUtil.waitForLeader(cluster);
            List<RaftServer.Division> followers = ((MiniRaftCluster)cluster).getFollowers();
            Assertions.assertEquals((int)2, (int)followers.size());
            RaftPeerId f0 = followers.get(0).getId();
            RaftPeerId f1 = followers.get(1).getId();
            try (RaftClient client = ((MiniRaftCluster)cluster).createClient(((MiniRaftCluster)cluster).getLeader().getId());){
                for (int i = 1; i <= 10; ++i) {
                    RaftClientReply reply = client.io().send(this.incrementMessage);
                    Assertions.assertTrue((boolean)reply.isSuccess());
                    RaftClientReply read1 = client.io().sendReadOnly(this.queryMessage, f0);
                    Assertions.assertEquals((int)i, (int)ReadOnlyRequestTests.retrieve(read1));
                    CompletableFuture read2 = client.async().sendReadOnly(this.queryMessage, f1);
                    Assertions.assertEquals((int)i, (int)ReadOnlyRequestTests.retrieve((RaftClientReply)read2.get(1L, TimeUnit.SECONDS)));
                }
            }
        }
        finally {
            ((MiniRaftCluster)cluster).shutdown();
        }
    }

    @Test
    public void testFollowerLinearizableReadParallel() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testFollowerReadOnlyParallelImpl);
    }

    @Test
    public void testFollowerLeaseReadParallel() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testFollowerReadOnlyParallelImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testFollowerReadOnlyParallelImpl(CLUSTER cluster) throws Exception {
        try {
            RaftTestUtil.waitForLeader(cluster);
            List<RaftServer.Division> followers = ((MiniRaftCluster)cluster).getFollowers();
            Assertions.assertEquals((int)2, (int)followers.size());
            try (RaftClient leaderClient = ((MiniRaftCluster)cluster).createClient(((MiniRaftCluster)cluster).getLeader().getId());
                 RaftClient followerClient1 = ((MiniRaftCluster)cluster).createClient(followers.get(0).getId());){
                leaderClient.io().send(this.incrementMessage);
                leaderClient.async().send(this.waitAndIncrementMessage);
                Thread.sleep(100L);
                RaftClientReply clientReply = followerClient1.io().sendReadOnly(this.queryMessage, followers.get(0).getId());
                Assertions.assertEquals((int)2, (int)ReadOnlyRequestTests.retrieve(clientReply));
            }
        }
        finally {
            ((MiniRaftCluster)cluster).shutdown();
        }
    }

    @Test
    public void testFollowerLinearizableReadFailWhenLeaderDown() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testFollowerReadOnlyFailWhenLeaderDownImpl);
    }

    @Test
    public void testFollowerLeaseReadWhenLeaderDown() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testFollowerReadOnlyFailWhenLeaderDownImpl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testFollowerReadOnlyFailWhenLeaderDownImpl(CLUSTER cluster) throws Exception {
        try {
            RaftTestUtil.waitForLeader(cluster);
            List<RaftServer.Division> followers = ((MiniRaftCluster)cluster).getFollowers();
            Assertions.assertEquals((int)2, (int)followers.size());
            try (RaftClient leaderClient = ((MiniRaftCluster)cluster).createClient(((MiniRaftCluster)cluster).getLeader().getId());
                 RaftClient followerClient1 = ((MiniRaftCluster)cluster).createClient(followers.get(0).getId(), RetryPolicies.noRetry());){
                leaderClient.io().send(this.incrementMessage);
                RaftClientReply clientReply = followerClient1.io().sendReadOnly(this.queryMessage);
                Assertions.assertEquals((int)1, (int)ReadOnlyRequestTests.retrieve(clientReply));
                leaderClient.admin().transferLeadership(null, 200L);
                Assertions.assertThrows(ReadIndexException.class, () -> followerClient1.io().sendReadOnly(this.queryMessage, ((RaftServer.Division)followers.get(0)).getId()));
            }
        }
        finally {
            ((MiniRaftCluster)cluster).shutdown();
        }
    }

    @Test
    public void testFollowerReadOnlyRetryWhenLeaderDown() throws Exception {
        this.getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, (Enum)RaftServerConfigKeys.Read.Option.LINEARIZABLE);
        this.runWithNewCluster(3, this::testFollowerReadOnlyRetryWhenLeaderDown);
    }

    @Test
    public void testFollowerLeaseReadRetryWhenLeaderDown() throws Exception {
        this.getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
        this.runWithNewCluster(3, this::testFollowerReadOnlyRetryWhenLeaderDown);
    }

    private void testFollowerReadOnlyRetryWhenLeaderDown(CLUSTER cluster) throws Exception {
        ExceptionDependentRetry retryPolicy = ExceptionDependentRetry.newBuilder().setDefaultPolicy(RetryPolicies.noRetry()).setExceptionToPolicy(ReadIndexException.class, (RetryPolicy)RetryPolicies.retryForeverWithSleep((TimeDuration)TimeDuration.valueOf((long)500L, (TimeUnit)TimeUnit.MILLISECONDS))).build();
        RaftTestUtil.waitForLeader(cluster);
        try (RaftClient client = ((MiniRaftCluster)cluster).createClient(((MiniRaftCluster)cluster).getLeader().getId(), (RetryPolicy)retryPolicy);){
            client.io().send(this.incrementMessage);
            RaftClientReply clientReply = client.io().sendReadOnly(this.queryMessage);
            Assertions.assertEquals((int)1, (int)ReadOnlyRequestTests.retrieve(clientReply));
            client.admin().transferLeadership(null, 200L);
            RaftClientReply replySuccess = client.io().sendReadOnly(this.queryMessage);
            Assertions.assertEquals((int)1, (int)ReadOnlyRequestTests.retrieve(clientReply));
        }
    }

    @Test
    public void testReadAfterWrite() throws Exception {
        this.runWithNewCluster(3, this::testReadAfterWriteImpl);
    }

    private void testReadAfterWriteImpl(CLUSTER cluster) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        try (RaftClient client = ((MiniRaftCluster)cluster).createClient();){
            client.io().send(this.incrementMessage);
            RaftClientReply blockReply = client.io().sendReadAfterWrite(this.queryMessage);
            Assertions.assertEquals((int)1, (int)ReadOnlyRequestTests.retrieve(blockReply));
            client.async().send(this.incrementMessage);
            client.async().sendReadAfterWrite(this.queryMessage).thenAccept(reply -> Assertions.assertEquals((int)2, (int)ReadOnlyRequestTests.retrieve(reply)));
            for (int i = 0; i < 20; ++i) {
                client.async().send(this.incrementMessage);
            }
            CompletableFuture linearizable = client.async().sendReadOnly(this.queryMessage);
            CompletableFuture readAfterWrite = client.async().sendReadAfterWrite(this.queryMessage);
            CompletableFuture.allOf(linearizable, readAfterWrite).get();
            Assertions.assertTrue((ReadOnlyRequestTests.retrieve((RaftClientReply)readAfterWrite.get()) >= ReadOnlyRequestTests.retrieve((RaftClientReply)linearizable.get()) ? 1 : 0) != 0);
        }
    }

    static int retrieve(RaftClientReply reply) {
        return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
    }

    static class CounterStateMachine
    extends BaseStateMachine {
        private final AtomicLong counter = new AtomicLong(0L);

        CounterStateMachine() {
        }

        public CompletableFuture<Message> query(Message request) {
            return CompletableFuture.completedFuture(Message.valueOf((String)String.valueOf(this.counter.get())));
        }

        public CompletableFuture<Message> queryStale(Message request, long minIndex) {
            return this.query(request);
        }

        private void sleepQuietly(int millis) {
            try {
                Thread.sleep(millis);
            }
            catch (InterruptedException e) {
                LOG.debug("{} be interrupted", (Object)Thread.currentThread());
                Thread.currentThread().interrupt();
            }
        }

        private void increment() {
            this.counter.incrementAndGet();
        }

        private void waitAndIncrement() {
            this.sleepQuietly(500);
            this.increment();
        }

        private void timeoutIncrement() {
            this.sleepQuietly(5000);
            this.increment();
        }

        public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
            LOG.debug("apply trx with index=" + trx.getLogEntry().getIndex());
            this.updateLastAppliedTermIndex(trx.getLogEntry().getTerm(), trx.getLogEntry().getIndex());
            String command = trx.getLogEntry().getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);
            LOG.info("receive command: {}", (Object)command);
            if (command.equals(ReadOnlyRequestTests.INCREMENT)) {
                this.increment();
            } else if (command.equals(ReadOnlyRequestTests.WAIT_AND_INCREMENT)) {
                this.waitAndIncrement();
            } else {
                this.timeoutIncrement();
            }
            return CompletableFuture.completedFuture(Message.valueOf((String)"OK"));
        }
    }
}

