/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.grpc;

import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.security.SecurityTestUtils;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class TestRaftServerWithGrpc
extends BaseTest
implements MiniRaftClusterWithGrpc.FactoryGet {
    public TestRaftServerWithGrpc() {
        Slf4jUtils.setLogLevel((Logger)GrpcClientProtocolClient.LOG, (Level)Level.TRACE);
    }

    public static Collection<Boolean[]> data() {
        return Arrays.asList({Boolean.FALSE}, {Boolean.TRUE});
    }

    @BeforeEach
    public void setup() {
        RaftProperties p = this.getProperties();
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        RaftClientConfigKeys.Rpc.setRequestTimeout((RaftProperties)p, (TimeDuration)TimeDuration.valueOf((long)1L, (TimeUnit)TimeUnit.SECONDS));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testServerRestartOnException(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        this.runWithNewCluster(1, this::runTestServerRestartOnException);
    }

    static RaftServer newRaftServer(MiniRaftClusterWithGrpc cluster, RaftPeerId id, StateMachine stateMachine, RaftStorage.StartupOption option, RaftProperties p) throws IOException {
        RaftGroup group = cluster.getGroup();
        return RaftServer.newBuilder().setServerId(id).setGroup(cluster.getGroup()).setStateMachine(stateMachine).setOption(option).setProperties(p).setParameters(cluster.setPropertiesAndInitParameters(id, group, p)).build();
    }

    void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        RaftPeerId leaderId = leader.getId();
        RaftProperties p = this.getProperties();
        GrpcConfigKeys.Server.setPort((RaftProperties)p, (int)RaftServerTestUtil.getServerRpc((RaftServer.Division)leader).getInetSocketAddress().getPort());
        StateMachine stateMachine = cluster.getLeader().getStateMachine();
        RaftServerConfigKeys.setStorageDir((RaftProperties)p, Collections.singletonList(cluster.getStorageDir(leaderId)));
        TestRaftServerWithGrpc.newRaftServer(cluster, leaderId, stateMachine, RaftStorage.StartupOption.FORMAT, p);
        RaftServerTestUtil.getServerRpc((RaftServer.Division)cluster.getLeader()).close();
        RaftServerConfigKeys.setStorageDir((RaftProperties)p, Collections.singletonList(cluster.getStorageDir(leaderId)));
        TestRaftServerWithGrpc.testFailureCase((String)"Starting a new server with the same address should fail", () -> TestRaftServerWithGrpc.newRaftServer(cluster, leaderId, stateMachine, RaftStorage.StartupOption.RECOVER, p).start(), CompletionException.class, (Logger)this.LOG, (Class[])new Class[]{IOException.class, OverlappingFileLockException.class});
        cluster.getServerFactory(leaderId).newRaftServerRpc((RaftServer)cluster.getServer(leaderId));
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testUnsupportedMethods(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        this.runWithNewCluster(1, this::runTestUnsupportedMethods);
    }

    void runTestUnsupportedMethods(MiniRaftClusterWithGrpc cluster) throws Exception {
        RaftPeerId leaderId = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster).getId();
        RaftServerRpc rpc = cluster.getServerFactory(leaderId).newRaftServerRpc((RaftServer)cluster.getServer(leaderId));
        this.testFailureCase("appendEntries", () -> rpc.appendEntries(null), UnsupportedOperationException.class, new Class[0]);
        this.testFailureCase("installSnapshot", () -> rpc.installSnapshot(null), UnsupportedOperationException.class, new Class[0]);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testLeaderRestart(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        this.runWithNewCluster(3, this::runTestLeaderRestart);
    }

    void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        try (RaftClient client = cluster.createClient();){
            CompletableFuture f = client.async().send((Message)new RaftTestUtil.SimpleMessage("testing"));
            Assertions.assertTrue((boolean)((RaftClientReply)f.get()).isSuccess());
        }
        client = cluster.createClient();
        var4_4 = null;
        try {
            RaftClientRpc rpc = client.getClientRpc();
            AtomicLong seqNum = new AtomicLong();
            RaftClientRequest request = TestRaftServerWithGrpc.newRaftClientRequest(client, seqNum.incrementAndGet());
            Assertions.assertEquals((Object)client.getId(), (Object)request.getClientId());
            CompletableFuture f = rpc.sendRequestAsync(request);
            RaftClientReply reply = (RaftClientReply)f.get();
            Assertions.assertTrue((boolean)reply.isSuccess());
            RaftClientTestUtil.handleReply((RaftClientRequest)request, (RaftClientReply)reply, (RaftClient)client);
            ClientInvocationId invocationId = ClientInvocationId.valueOf((ClientId)request.getClientId(), (long)request.getCallId());
            RetryCache.Entry entry = leader.getRetryCache().getIfPresent(invocationId);
            Assertions.assertNotNull((Object)entry);
            this.LOG.info("cache entry {}", (Object)entry);
            SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get((RaftServer.Division)leader);
            stateMachine.blockStartTransaction();
            RaftClientRequest requestBlocked = TestRaftServerWithGrpc.newRaftClientRequest(client, seqNum.incrementAndGet());
            CompletableFuture futureBlocked = rpc.sendRequestAsync(requestBlocked);
            JavaUtils.attempt(() -> Assertions.assertNull((Object)leader.getRetryCache().getIfPresent(invocationId)), (int)10, (TimeDuration)HUNDRED_MILLIS, (String)"invalidate cache entry", (Logger)this.LOG);
            this.LOG.info("cache entry not found for {}", (Object)invocationId);
            RaftTestUtil.changeLeader((MiniRaftCluster)cluster, (RaftPeerId)leader.getId());
            Assertions.assertNotEquals((Object)RaftProtos.RaftPeerRole.LEADER, (Object)leader.getInfo().getCurrentRole());
            this.testFailureCase("request should fail", futureBlocked::get, ExecutionException.class, new Class[]{AlreadyClosedException.class});
            stateMachine.unblockStartTransaction();
            RaftClientRequest requestTimeout = TestRaftServerWithGrpc.newRaftClientRequest(client, seqNum.incrementAndGet());
            rpc.handleException(leader.getId(), (Throwable)new Exception(), true);
            CompletableFuture f2 = rpc.sendRequestAsync(requestTimeout);
            this.testFailureCase("request should timeout", f2::get, ExecutionException.class, new Class[]{TimeoutIOException.class});
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var4_4 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testRaftClientMetrics(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        this.runWithNewCluster(3, this::testRaftClientRequestMetrics);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testRaftServerMetrics(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Write.setElementLimit((RaftProperties)p, (int)10);
        RaftServerConfigKeys.Write.setByteLimit((RaftProperties)p, (SizeInBytes)SizeInBytes.valueOf((String)"1MB"));
        try {
            this.runWithNewCluster(3, this::testRequestMetrics);
        }
        finally {
            RaftServerConfigKeys.Write.setElementLimit((RaftProperties)p, (int)4096);
            RaftServerConfigKeys.Write.setByteLimit((RaftProperties)p, (SizeInBytes)RaftServerConfigKeys.Write.BYTE_LIMIT_DEFAULT);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception {
        try (RaftClient client = cluster.createClient();){
            CompletableFuture f = client.async().send((Message)new RaftTestUtil.SimpleMessage("testing"));
            Assertions.assertTrue((boolean)((RaftClientReply)f.get()).isSuccess());
        }
        SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get((RaftServer.Division)cluster.getLeader());
        stateMachine.blockFlushStateMachineData();
        ArrayList<RaftClient> clients = new ArrayList<RaftClient>();
        try {
            RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
            clients.add(client);
            client.async().send((Message)new RaftTestUtil.SimpleMessage("2nd Message"));
            for (int i = 0; i < 10; ++i) {
                client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
                clients.add(client);
                client.async().send((Message)new RaftTestUtil.SimpleMessage("message " + i));
            }
            RaftTestUtil.waitFor(() -> TestRaftServerWithGrpc.getRaftServerMetrics(cluster.getLeader()).getNumRequestQueueLimitHits().getCount() == 1L, (int)300, (int)5000);
            stateMachine.unblockFlushStateMachineData();
            client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
            SizeInBytes size = SizeInBytes.valueOf((String)"1025kb");
            ByteString bytes = TestRaftServerWithGrpc.randomByteString(size.getSizeInt());
            Assertions.assertEquals((int)size.getSizeInt(), (int)bytes.size());
            client.async().send((Message)new RaftTestUtil.SimpleMessage(size + "-message", bytes));
            clients.add(client);
            RaftTestUtil.waitFor(() -> TestRaftServerWithGrpc.getRaftServerMetrics(cluster.getLeader()).getNumRequestsByteSizeLimitHits().getCount() == 1L, (int)300, (int)5000);
            Assertions.assertEquals((long)2L, (long)TestRaftServerWithGrpc.getRaftServerMetrics(cluster.getLeader()).getNumResourceLimitHits().getCount());
        }
        finally {
            for (RaftClient client : clients) {
                client.close();
            }
        }
    }

    static ByteString randomByteString(int size) {
        ByteString.Output out = ByteString.newOutput((int)size);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        byte[] buffer = new byte[4096];
        while (size > 0) {
            random.nextBytes(buffer);
            int n = Math.min(size, buffer.length);
            out.write(buffer, 0, n);
            size -= n;
        }
        return out.toByteString();
    }

    static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division division) {
        return (RaftServerMetricsImpl)division.getRaftServerMetrics();
    }

    void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOException, ExecutionException, InterruptedException {
        RaftServer.Division leader = RaftTestUtil.waitForLeader((MiniRaftCluster)cluster);
        RaftServerMetricsImpl raftServerMetrics = TestRaftServerWithGrpc.getRaftServerMetrics(leader);
        RatisMetricRegistry registry = raftServerMetrics.getRegistry();
        try (RaftClient client = cluster.createClient();){
            CompletableFuture f1 = client.async().send((Message)new RaftTestUtil.SimpleMessage("testing"));
            Assertions.assertTrue((boolean)((RaftClientReply)f1.get()).isSuccess());
            DefaultTimekeeperImpl write = (DefaultTimekeeperImpl)registry.timer("clientWriteRequest");
            JavaUtils.attempt(() -> Assertions.assertTrue((write.getTimer().getCount() > 0L ? 1 : 0) != 0), (int)3, (TimeDuration)TimeDuration.ONE_SECOND, (String)"writeTimer metrics", (Logger)this.LOG);
            CompletableFuture f2 = client.async().sendReadOnly((Message)new RaftTestUtil.SimpleMessage("testing"));
            Assertions.assertTrue((boolean)((RaftClientReply)f2.get()).isSuccess());
            DefaultTimekeeperImpl read = (DefaultTimekeeperImpl)registry.timer("clientReadRequest");
            JavaUtils.attempt(() -> Assertions.assertTrue((read.getTimer().getCount() > 0L ? 1 : 0) != 0), (int)3, (TimeDuration)TimeDuration.ONE_SECOND, (String)"readTimer metrics", (Logger)this.LOG);
            CompletableFuture f3 = client.async().sendStaleRead((Message)new RaftTestUtil.SimpleMessage("testing"), 0L, leader.getId());
            Assertions.assertTrue((boolean)((RaftClientReply)f3.get()).isSuccess());
            DefaultTimekeeperImpl staleRead = (DefaultTimekeeperImpl)registry.timer("clientStaleReadRequest");
            JavaUtils.attempt(() -> Assertions.assertTrue((staleRead.getTimer().getCount() > 0L ? 1 : 0) != 0), (int)3, (TimeDuration)TimeDuration.ONE_SECOND, (String)"staleReadTimer metrics", (Logger)this.LOG);
            CompletableFuture f4 = client.async().watch(0L, RaftProtos.ReplicationLevel.ALL);
            Assertions.assertTrue((boolean)((RaftClientReply)f4.get()).isSuccess());
            DefaultTimekeeperImpl watchAll = (DefaultTimekeeperImpl)registry.timer(String.format("clientWatch%sRequest", "-ALL"));
            JavaUtils.attempt(() -> Assertions.assertTrue((watchAll.getTimer().getCount() > 0L ? 1 : 0) != 0), (int)3, (TimeDuration)TimeDuration.ONE_SECOND, (String)"watchAllTimer metrics", (Logger)this.LOG);
            CompletableFuture f5 = client.async().watch(0L, RaftProtos.ReplicationLevel.MAJORITY);
            Assertions.assertTrue((boolean)((RaftClientReply)f5.get()).isSuccess());
            DefaultTimekeeperImpl watch = (DefaultTimekeeperImpl)registry.timer(String.format("clientWatch%sRequest", ""));
            JavaUtils.attempt(() -> Assertions.assertTrue((watch.getTimer().getCount() > 0L ? 1 : 0) != 0), (int)3, (TimeDuration)TimeDuration.ONE_SECOND, (String)"watchTimer metrics", (Logger)this.LOG);
        }
    }

    static RaftClientRequest newRaftClientRequest(RaftClient client, long seqNum) {
        RaftTestUtil.SimpleMessage m = new RaftTestUtil.SimpleMessage("m" + seqNum);
        return RaftClientTestUtil.newRaftClientRequest((RaftClient)client, null, (long)seqNum, (Message)m, (RaftClientRequest.Type)RaftClientRequest.writeRequestType(), (RaftProtos.SlidingWindowEntry)ProtoUtils.toSlidingWindowEntry((long)seqNum, (seqNum == 1L ? 1 : 0) != 0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testTlsWithKeyAndTrustManager(Boolean separateHeartbeat) throws Exception {
        GrpcConfigKeys.Server.setHeartbeatChannel((RaftProperties)this.getProperties(), (boolean)separateHeartbeat);
        RaftProperties p = this.getProperties();
        RaftServerConfigKeys.Write.setElementLimit((RaftProperties)p, (int)10);
        RaftServerConfigKeys.Write.setByteLimit((RaftProperties)p, (SizeInBytes)SizeInBytes.valueOf((String)"1MB"));
        String[] ids = MiniRaftCluster.generateIds((int)3, (int)3);
        KeyManager serverKeyManager = SecurityTestUtils.getKeyManager(SecurityTestUtils::getServerKeyStore);
        X509TrustManager serverTrustManager = SecurityTestUtils.getTrustManager(SecurityTestUtils::getTrustStore);
        KeyManager clientKeyManager = SecurityTestUtils.getKeyManager(SecurityTestUtils::getClientKeyStore);
        X509TrustManager clientTrustManager = SecurityTestUtils.getTrustManager(SecurityTestUtils::getTrustStore);
        GrpcTlsConfig serverConfig = new GrpcTlsConfig(serverKeyManager, (TrustManager)serverTrustManager, true);
        GrpcTlsConfig clientConfig = new GrpcTlsConfig(clientKeyManager, (TrustManager)clientTrustManager, true);
        Parameters parameters = new Parameters();
        GrpcConfigKeys.Server.setTlsConf((Parameters)parameters, (GrpcTlsConfig)serverConfig);
        GrpcConfigKeys.Admin.setTlsConf((Parameters)parameters, (GrpcTlsConfig)clientConfig);
        GrpcConfigKeys.Client.setTlsConf((Parameters)parameters, (GrpcTlsConfig)clientConfig);
        MiniRaftClusterWithGrpc cluster = null;
        try {
            cluster = new MiniRaftClusterWithGrpc(ids, new String[0], p, parameters);
            cluster.start();
            this.testRequestMetrics(cluster);
        }
        finally {
            RaftServerConfigKeys.Write.setElementLimit((RaftProperties)p, (int)4096);
            RaftServerConfigKeys.Write.setByteLimit((RaftProperties)p, (SizeInBytes)RaftServerConfigKeys.Write.BYTE_LIMIT_DEFAULT);
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }
}

