/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.datastream;

import java.io.File;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.datastream.DataStreamTestUtils;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 3;

    public DataStreamClusterTests() {
        this.setStateMachine(DataStreamTestUtils.MultiDataStreamStateMachine.class);
        DataStreamTestUtils.enableResourceLeakDetector();
    }

    RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
        return DataStreamTestUtils.getRoutingTableChainTopology(peers, primary);
    }

    @Test
    public void testStreamWrites() throws Exception {
        this.runWithNewCluster(3, this::testStreamWrites);
    }

    @Test
    public void testStreamWithInvalidRoutingTable() throws Exception {
        this.runWithNewCluster(3, this::runTestInvalidPrimaryInRoutingTable);
    }

    void testStreamWrites(CLUSTER cluster) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        this.runTestDataStreamOutput(cluster);
        int size = 10000000 + ThreadLocalRandom.current().nextInt(1000000);
        File f = new File(this.getTestDir(), "a.txt");
        DataStreamTestUtils.createFile(f, size);
        for (int i = 0; i < 3; ++i) {
            this.runTestWriteFile(cluster, i, DataStreamClusterTests.writeAsyncDefaultFileRegion(f, size));
            this.runTestWriteFile(cluster, i, DataStreamClusterTests.transferToWritableByteChannel(f, size));
        }
    }

    void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
        CompletableFuture reply;
        RaftClientRequest request;
        RaftPeer primaryServer = (RaftPeer)CollectionUtils.random((Collection)cluster.getGroup().getPeers());
        try (RaftClient client = cluster.createClient(primaryServer);
             DataStreamClientImpl.DataStreamOutputImpl out = (DataStreamClientImpl.DataStreamOutputImpl)client.getDataStreamApi().stream(null, this.getRoutingTable(cluster.getGroup().getPeers(), primaryServer));){
            request = out.getHeader();
            reply = out.getRaftClientReplyFuture();
            DataStreamTestUtils.writeAndAssertReplies(out, 1000, 10);
        }
        this.watchOrSleep(cluster, ((RaftClientReply)reply.join()).getLogIndex());
        this.assertLogEntry(cluster, request);
    }

    void runTestInvalidPrimaryInRoutingTable(CLUSTER cluster) throws Exception {
        RaftPeer primaryServer = (RaftPeer)CollectionUtils.random((Collection)cluster.getGroup().getPeers());
        RaftPeer notPrimary = null;
        for (RaftPeer peer : cluster.getGroup().getPeers()) {
            if (peer.equals((Object)primaryServer)) continue;
            notPrimary = peer;
            break;
        }
        Assertions.assertNotNull(notPrimary, (String)"Cannot find peer other than the primary");
        Assertions.assertNotEquals((Object)primaryServer, notPrimary);
        try (RaftClient client = cluster.createClient(primaryServer);){
            RoutingTable routingTableWithWrongPrimary = this.getRoutingTable(cluster.getGroup().getPeers(), notPrimary);
            this.testFailureCase("", () -> client.getDataStreamApi().stream(null, routingTableWithWrongPrimary), IllegalStateException.class, new Class[0]);
        }
    }

    void runTestWriteFile(CLUSTER cluster, int i, CheckedConsumer<DataStreamClientImpl.DataStreamOutputImpl, Exception> testCase) throws Exception {
        CompletableFuture reply;
        RaftClientRequest request;
        RaftPeer primaryServer = (RaftPeer)CollectionUtils.random((Collection)cluster.getGroup().getPeers());
        try (RaftClient client = cluster.createClient(primaryServer);
             DataStreamClientImpl.DataStreamOutputImpl out = (DataStreamClientImpl.DataStreamOutputImpl)client.getDataStreamApi().stream(null, this.getRoutingTable(cluster.getGroup().getPeers(), primaryServer));){
            request = out.getHeader();
            reply = out.getRaftClientReplyFuture();
            Timestamp start = Timestamp.currentTime();
            testCase.accept((Object)out);
            this.LOG.info("{}: {} elapsed {}ms", new Object[]{i, testCase, start.elapsedTimeMs()});
        }
        this.watchOrSleep(cluster, ((RaftClientReply)reply.join()).getLogIndex());
        this.assertLogEntry(cluster, request);
    }

    static CheckedConsumer<DataStreamClientImpl.DataStreamOutputImpl, Exception> transferToWritableByteChannel(final File f, final int size) {
        return new CheckedConsumer<DataStreamClientImpl.DataStreamOutputImpl, Exception>(){

            public void accept(DataStreamClientImpl.DataStreamOutputImpl out) throws Exception {
                try (FileChannel in = FileUtils.newFileChannel((File)f, (OpenOption[])new OpenOption[]{StandardOpenOption.READ});){
                    long transferred = in.transferTo(0L, size, out.getWritableByteChannel());
                    Assertions.assertEquals((long)size, (long)transferred);
                }
            }

            public String toString() {
                return "transferToWritableByteChannel";
            }
        };
    }

    static CheckedConsumer<DataStreamClientImpl.DataStreamOutputImpl, Exception> writeAsyncDefaultFileRegion(final File f, final int size) {
        return new CheckedConsumer<DataStreamClientImpl.DataStreamOutputImpl, Exception>(){

            public void accept(DataStreamClientImpl.DataStreamOutputImpl out) {
                DataStreamReply dataStreamReply = (DataStreamReply)out.writeAsync(f, new WriteOption[]{StandardWriteOption.FLUSH}).join();
                DataStreamTestUtils.assertSuccessReply(RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA, size, dataStreamReply);
            }

            public String toString() {
                return "writeAsyncDefaultFileRegion";
            }
        };
    }

    void watchOrSleep(CLUSTER cluster, long index) throws Exception {
        try (RaftClient client = cluster.createClient();){
            client.async().watch(index, RaftProtos.ReplicationLevel.ALL).join();
        }
        catch (UnsupportedOperationException e) {
            ONE_SECOND.sleep();
        }
    }

    void assertLogEntry(CLUSTER cluster, RaftClientRequest request) throws Exception {
        for (RaftServer proxy : cluster.getServers()) {
            RaftServer.Division impl = proxy.getDivision(cluster.getGroupId());
            DataStreamTestUtils.MultiDataStreamStateMachine stateMachine = (DataStreamTestUtils.MultiDataStreamStateMachine)impl.getStateMachine();
            DataStreamTestUtils.SingleDataStream s = stateMachine.getSingleDataStream(request);
            Assertions.assertFalse((boolean)s.getDataChannel().isOpen());
            DataStreamTestUtils.assertLogEntry(impl, s);
        }
    }
}

