/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.datastream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamClusterTests;
import org.apache.ratis.datastream.DataStreamTestUtils;
import org.apache.ratis.netty.client.NettyClientStreamRpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluster>
extends DataStreamClusterTests<CLUSTER> {
    final Executor executor = Executors.newFixedThreadPool(16);

    public int getGlobalTimeoutSeconds() {
        return 300;
    }

    @Test
    public void testSingleStreamsMultipleServers() throws Exception {
        Slf4jUtils.setLogLevel((Logger)NettyClientStreamRpc.LOG, (Level)Level.TRACE);
        try {
            this.runWithNewCluster(3, cluster -> this.runTestDataStream(cluster, false, (c, stepDownLeader) -> this.runTestDataStream((CLUSTER)c, 1, 1, 1000, 3, (boolean)stepDownLeader)));
        }
        finally {
            Slf4jUtils.setLogLevel((Logger)NettyClientStreamRpc.LOG, (Level)Level.INFO);
        }
    }

    @Test
    public void testMultipleStreamsSingleServer() throws Exception {
        this.runWithNewCluster(1, this::runTestDataStream);
    }

    @Test
    public void testMultipleStreamsMultipleServers() throws Exception {
        TimeDuration min = RaftServerConfigKeys.Rpc.timeoutMin((RaftProperties)this.getProperties());
        RaftServerConfigKeys.Rpc.setTimeoutMin((RaftProperties)this.getProperties(), (TimeDuration)TimeDuration.valueOf((long)2L, (TimeUnit)TimeUnit.SECONDS));
        TimeDuration max = RaftServerConfigKeys.Rpc.timeoutMax((RaftProperties)this.getProperties());
        RaftServerConfigKeys.Rpc.setTimeoutMax((RaftProperties)this.getProperties(), (TimeDuration)TimeDuration.valueOf((long)3L, (TimeUnit)TimeUnit.SECONDS));
        this.runWithNewCluster(3, this::runTestDataStream);
        RaftServerConfigKeys.Rpc.setTimeoutMin((RaftProperties)this.getProperties(), (TimeDuration)min);
        RaftServerConfigKeys.Rpc.setTimeoutMax((RaftProperties)this.getProperties(), (TimeDuration)max);
    }

    @Test
    public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
        this.runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
    }

    void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
        this.runMultipleStreams(cluster, true);
    }

    void runTestDataStream(CLUSTER cluster) throws Exception {
        this.runTestDataStream(cluster, false, this::runMultipleStreams);
    }

    long runMultipleStreams(CLUSTER cluster, boolean stepDownLeader) {
        ArrayList<CompletableFuture<Long>> futures = new ArrayList<CompletableFuture<Long>>();
        futures.add(CompletableFuture.supplyAsync(() -> this.runTestDataStream(cluster, 5, 10, 100000, 10, stepDownLeader), this.executor));
        futures.add(CompletableFuture.supplyAsync(() -> this.runTestDataStream(cluster, 2, 20, 1000, 5000, stepDownLeader), this.executor));
        return futures.stream().map(CompletableFuture::join).max(Long::compareTo).orElseThrow(IllegalStateException::new);
    }

    void runTestDataStream(CLUSTER cluster, boolean stepDownLeader, CheckedBiFunction<CLUSTER, Boolean, Long, Exception> runMethod) throws Exception {
        Object changed;
        RaftTestUtil.waitForLeader(cluster);
        long maxIndex = (Long)runMethod.apply(cluster, (Object)stepDownLeader);
        if (stepDownLeader) {
            RaftPeerId oldLeader = cluster.getLeader().getId();
            try {
                changed = RaftTestUtil.changeLeader(cluster, (RaftPeerId)oldLeader);
            }
            catch (Exception e) {
                throw new CompletionException("Failed to change leader from " + oldLeader, e);
            }
            this.LOG.info("Changed leader from {} to {}", (Object)oldLeader, changed);
        }
        RaftClient client = cluster.createClient();
        changed = null;
        try {
            RaftClientReply reply = (RaftClientReply)client.async().watch(maxIndex, RaftProtos.ReplicationLevel.ALL).join();
            Assertions.assertTrue((boolean)reply.isSuccess());
        }
        catch (Throwable reply) {
            changed = reply;
            throw reply;
        }
        finally {
            if (client != null) {
                if (changed != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable reply) {
                        ((Throwable)changed).addSuppressed(reply);
                    }
                } else {
                    client.close();
                }
            }
        }
        for (RaftServer proxy : cluster.getServers()) {
            RaftServer.Division impl = proxy.getDivision(cluster.getGroupId());
            DataStreamTestUtils.MultiDataStreamStateMachine stateMachine = (DataStreamTestUtils.MultiDataStreamStateMachine)impl.getStateMachine();
            for (DataStreamTestUtils.SingleDataStream s : stateMachine.getStreams()) {
                Assertions.assertFalse((boolean)s.getDataChannel().isOpen());
                DataStreamTestUtils.assertLogEntry(impl, s);
            }
        }
    }

    Long runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
        ArrayList<CompletableFuture<Long>> futures = new ArrayList<CompletableFuture<Long>>();
        for (int j = 0; j < numClients; ++j) {
            futures.add(CompletableFuture.supplyAsync(() -> this.runTestDataStream(cluster, numStreams, bufferSize, bufferNum, stepDownLeader), this.executor));
        }
        Assertions.assertEquals((int)numClients, (int)futures.size());
        return futures.stream().map(CompletableFuture::join).max(Long::compareTo).orElseThrow(IllegalStateException::new);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
        RaftPeerId leader;
        Iterable servers = CollectionUtils.as((Iterable)cluster.getServers(), s -> s);
        try {
            leader = RaftTestUtil.waitForLeader(cluster).getId();
        }
        catch (InterruptedException e) {
            throw new CompletionException(e);
        }
        ArrayList<CompletableFuture<RaftClientReply>> futures = new ArrayList<CompletableFuture<RaftClientReply>>();
        RaftPeer primaryServer = (RaftPeer)CollectionUtils.random((Collection)cluster.getGroup().getPeers());
        try (RaftClient client = cluster.createClient(primaryServer);){
            for (int i = 0; i < numStreams; ++i) {
                DataStreamClientImpl.DataStreamOutputImpl out = (DataStreamClientImpl.DataStreamOutputImpl)client.getDataStreamApi().stream(null, this.getRoutingTable(cluster.getGroup().getPeers(), primaryServer));
                futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(servers, leader, out, bufferSize, bufferNum, client.getId(), stepDownLeader).join(), this.executor));
            }
            Assertions.assertEquals((int)numStreams, (int)futures.size());
            long l = futures.stream().map(CompletableFuture::join).map(RaftClientReply::getLogIndex).max(Long::compareTo).orElseThrow(IllegalStateException::new);
            return l;
        }
        catch (IOException e) {
            throw new CompletionException(e);
        }
    }
}

