/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.datastream;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ratis.BaseTest;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientMessage;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufUtil;
import org.apache.ratis.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface DataStreamTestUtils {
    public static final Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
    public static final ByteString MOCK = ByteString.copyFromUtf8((String)"mock");
    public static final int MODULUS = 23;
    public static final ResourceLeakDetector.LeakListener LEAK_LISTENER = (resourceType, records) -> {
        throw new IllegalStateException("Leak detected for resource type: " + resourceType + records);
    };

    public static byte pos2byte(int pos) {
        return (byte)(65 + pos % 23);
    }

    public static ByteBuffer initBuffer(int offset, int size) {
        ByteBuffer buffer = ByteBuffer.allocateDirect(size);
        int length = buffer.capacity();
        buffer.position(0).limit(length);
        for (int j = 0; j < length; ++j) {
            buffer.put(DataStreamTestUtils.pos2byte(offset + j));
        }
        buffer.flip();
        Assertions.assertEquals((int)length, (int)buffer.remaining());
        return buffer;
    }

    public static void createFile(File f, final int size) throws Exception {
        ReadableByteChannel source = new ReadableByteChannel(){
            private int offset = 0;

            @Override
            public boolean isOpen() {
                return this.offset < size;
            }

            @Override
            public void close() {
                this.offset = size;
            }

            @Override
            public int read(ByteBuffer dst) {
                int start = this.offset;
                while (dst.remaining() > 0 && this.isOpen()) {
                    dst.put(DataStreamTestUtils.pos2byte(this.offset));
                    ++this.offset;
                }
                return this.offset - start;
            }
        };
        FileUtils.createDirectories((File)f.getParentFile());
        try (FileChannel out = FileUtils.newFileChannel((File)f, (OpenOption[])new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});){
            long transferred = out.transferFrom(source, 0L, size);
            Assertions.assertEquals((long)size, (long)transferred);
        }
    }

    public static ByteString bytesWritten2ByteString(long bytesWritten) {
        return ByteString.copyFromUtf8((String)("bytesWritten=" + bytesWritten));
    }

    public static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeer> peers, RaftPeer primary) {
        return DataStreamTestUtils.getRoutingTableChainTopology(CollectionUtils.as(peers, RaftPeer::getId), primary.getId());
    }

    public static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeerId> peers, RaftPeerId primary) {
        RoutingTable.Builder builder = RoutingTable.newBuilder();
        RaftPeerId previous = primary;
        for (RaftPeerId peer : peers) {
            if (peer.equals((Object)primary)) continue;
            builder.addSuccessor(previous, peer);
            previous = peer;
        }
        return builder.build();
    }

    public static int writeAndAssertReplies(DataStreamClientImpl.DataStreamOutputImpl out, int bufferSize, int bufferNum) {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        ArrayList<Integer> sizes = new ArrayList<Integer>();
        int halfBufferSize = bufferSize / 2;
        int dataSize = 0;
        for (int i = 0; i < bufferNum; ++i) {
            int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
            sizes.add(size);
            ByteBuffer bf = DataStreamTestUtils.initBuffer(dataSize, size);
            futures.add(i == bufferNum - 1 ? out.writeAsync(bf, new WriteOption[]{StandardWriteOption.FLUSH, StandardWriteOption.SYNC}) : out.writeAsync(bf, new WriteOption[0]));
            dataSize += size;
        }
        DataStreamReply reply = (DataStreamReply)out.getHeaderFuture().join();
        DataStreamTestUtils.assertSuccessReply(RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER, 0L, reply);
        for (int i = 0; i < futures.size(); ++i) {
            DataStreamReply reply2 = (DataStreamReply)((CompletableFuture)futures.get(i)).join();
            RaftProtos.DataStreamPacketHeaderProto.Type expectedType = RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA;
            DataStreamTestUtils.assertSuccessReply(expectedType, ((Integer)sizes.get(i)).longValue(), reply2);
        }
        return dataSize;
    }

    public static void assertSuccessReply(RaftProtos.DataStreamPacketHeaderProto.Type expectedType, long expectedBytesWritten, DataStreamReply reply) {
        Assertions.assertTrue((boolean)reply.isSuccess());
        Assertions.assertEquals((long)expectedBytesWritten, (long)reply.getBytesWritten());
        Assertions.assertEquals((Object)expectedType, (Object)reply.getType());
    }

    public static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(Iterable<RaftServer> servers, RaftPeerId leader, DataStreamClientImpl.DataStreamOutputImpl out, int bufferSize, int bufferNum, ClientId clientId, boolean stepDownLeader) {
        LOG.info("start Stream{}", (Object)out.getHeader().getCallId());
        int bytesWritten = DataStreamTestUtils.writeAndAssertReplies(out, bufferSize, bufferNum);
        try {
            for (RaftServer s : servers) {
                DataStreamTestUtils.assertHeader(s, out.getHeader(), bytesWritten, stepDownLeader);
            }
        }
        catch (Throwable e) {
            throw new CompletionException(e);
        }
        LOG.info("Stream{}: bytesWritten={}", (Object)out.getHeader().getCallId(), (Object)bytesWritten);
        return out.closeAsync().thenCompose(reply -> DataStreamTestUtils.assertCloseReply(out, reply, bytesWritten, leader, clientId, stepDownLeader));
    }

    public static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader) throws Exception {
        LOG.info("XXX {}: dataSize={}, stepDownLeader={}, header={}", new Object[]{server.getId(), dataSize, stepDownLeader, header});
        Assertions.assertEquals((Object)RaftClientRequest.dataStreamRequestType(), (Object)header.getType());
        MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine)server.getDivision(header.getRaftGroupId()).getStateMachine();
        SingleDataStream stream = stateMachine.getSingleDataStream(header);
        MyDataChannel channel = stream.getDataChannel();
        Assertions.assertEquals((int)dataSize, (int)channel.getBytesWritten());
        Assertions.assertEquals((int)dataSize, (int)channel.getForcedPosition());
        RaftClientRequest writeRequest = stream.getWriteRequest();
        Assertions.assertEquals((Object)RaftClientRequest.dataStreamRequestType(), (Object)writeRequest.getType());
        DataStreamTestUtils.assertRaftClientMessage((RaftClientMessage)header, null, (RaftClientMessage)writeRequest, header.getClientId(), stepDownLeader);
    }

    public static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamClientImpl.DataStreamOutputImpl out, DataStreamReply dataStreamReply, long bytesWritten, RaftPeerId leader, ClientId clientId, boolean stepDownLeader) {
        Assertions.assertSame((Object)dataStreamReply, out.closeAsync().join());
        Assertions.assertEquals((Object)dataStreamReply.getClientId(), (Object)clientId);
        BaseTest.testFailureCase((String)"writeAsync should fail", () -> {
            DataStreamReply cfr_ignored_0 = (DataStreamReply)out.writeAsync(DataStreamRequestByteBuffer.EMPTY_BYTE_BUFFER, new WriteOption[0]).join();
        }, CompletionException.class, (Logger)null, (Class[])new Class[]{AlreadyClosedException.class});
        DataStreamReplyByteBuffer buffer = (DataStreamReplyByteBuffer)dataStreamReply;
        try {
            ByteString bytes;
            RaftClientReply reply = ClientProtoUtils.toRaftClientReply((ByteBuffer)buffer.slice());
            DataStreamTestUtils.assertRaftClientMessage((RaftClientMessage)out.getHeader(), leader, (RaftClientMessage)reply, clientId, stepDownLeader);
            if (reply.isSuccess() && !(bytes = reply.getMessage().getContent()).equals((Object)MOCK)) {
                Assertions.assertEquals((Object)DataStreamTestUtils.bytesWritten2ByteString(bytesWritten), (Object)bytes);
            }
            return CompletableFuture.completedFuture(reply);
        }
        catch (Throwable t) {
            return JavaUtils.completeExceptionally((Throwable)t);
        }
    }

    public static void assertRaftClientMessage(RaftClientMessage expected, RaftPeerId expectedServerId, RaftClientMessage computed, ClientId expectedClientId, boolean stepDownLeader) {
        Assertions.assertNotNull((Object)computed);
        Assertions.assertEquals((Object)expectedClientId, (Object)computed.getClientId());
        if (!stepDownLeader) {
            Assertions.assertEquals((Object)Optional.ofNullable(expectedServerId).orElseGet(() -> ((RaftClientMessage)expected).getServerId()), (Object)computed.getServerId());
        }
        Assertions.assertEquals((Object)expected.getRaftGroupId(), (Object)computed.getRaftGroupId());
    }

    public static RaftProtos.LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog log) throws Exception {
        for (LogEntryHeader termIndex : log.getEntries(0L, Long.MAX_VALUE)) {
            RaftProtos.LogEntryProto entry = log.get(termIndex.getIndex());
            if (!entry.hasStateMachineLogEntry() || !invocationId.match(entry.getStateMachineLogEntry())) continue;
            return entry;
        }
        return null;
    }

    public static void assertLogEntry(RaftProtos.LogEntryProto logEntry, RaftClientRequest request) {
        Assertions.assertNotNull((Object)logEntry);
        Assertions.assertTrue((boolean)logEntry.hasStateMachineLogEntry());
        RaftProtos.StateMachineLogEntryProto s = logEntry.getStateMachineLogEntry();
        Assertions.assertEquals((Object)RaftProtos.StateMachineLogEntryProto.Type.DATASTREAM, (Object)s.getType());
        Assertions.assertEquals((long)request.getCallId(), (long)s.getCallId());
        Assertions.assertEquals((Object)request.getClientId().toByteString(), (Object)s.getClientId());
    }

    public static void assertLogEntry(RaftServer.Division division, SingleDataStream stream) throws Exception {
        RaftClientRequest request = stream.getWriteRequest();
        RaftProtos.LogEntryProto entryFromStream = stream.getLogEntry();
        DataStreamTestUtils.assertLogEntry(entryFromStream, request);
        RaftProtos.LogEntryProto entryFromLog = DataStreamTestUtils.searchLogEntry(ClientInvocationId.valueOf((RaftClientMessage)request), division.getRaftLog());
        Assertions.assertEquals((Object)entryFromStream, (Object)entryFromLog);
    }

    public static void enableResourceLeakDetector() {
        ResourceLeakDetector.setLevel((ResourceLeakDetector.Level)ResourceLeakDetector.Level.PARANOID);
        ByteBufUtil.setLeakListener((ResourceLeakDetector.LeakListener)LEAK_LISTENER);
    }

    public static class MyDataChannel
    implements StateMachine.DataChannel {
        private volatile boolean open = true;
        private int bytesWritten = 0;
        private int forcedPosition = 0;

        int getBytesWritten() {
            return this.bytesWritten;
        }

        int getForcedPosition() {
            return this.forcedPosition;
        }

        public void force(boolean metadata) {
            this.forcedPosition = this.bytesWritten;
        }

        public int write(ByteBuffer src) {
            if (!this.open) {
                throw new IllegalStateException("Already closed");
            }
            int remaining = src.remaining();
            while (src.remaining() > 0) {
                Assertions.assertEquals((byte)DataStreamTestUtils.pos2byte(this.bytesWritten), (byte)src.get());
                ++this.bytesWritten;
            }
            return remaining;
        }

        public boolean isOpen() {
            return this.open;
        }

        public void close() {
            this.open = false;
        }
    }

    public static class SingleDataStream
    implements StateMachine.DataStream {
        private final RaftClientRequest writeRequest;
        private final MyDataChannel channel = new MyDataChannel();
        private volatile RaftProtos.LogEntryProto logEntry;

        SingleDataStream(RaftClientRequest request) {
            this.writeRequest = request;
        }

        public MyDataChannel getDataChannel() {
            return this.channel;
        }

        public CompletableFuture<?> cleanUp() {
            try {
                this.channel.close();
            }
            catch (Throwable t) {
                return JavaUtils.completeExceptionally((Throwable)t);
            }
            return CompletableFuture.completedFuture(null);
        }

        void setLogEntry(RaftProtos.LogEntryProto logEntry) {
            this.logEntry = logEntry;
        }

        RaftProtos.LogEntryProto getLogEntry() {
            return this.logEntry;
        }

        RaftClientRequest getWriteRequest() {
            return this.writeRequest;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(this.getClass()) + ": writeRequest=" + this.writeRequest + ", logEntry=" + LogProtoUtils.toLogEntryString((RaftProtos.LogEntryProto)this.logEntry);
        }
    }

    public static class MultiDataStreamStateMachine
    extends BaseStateMachine {
        private final ConcurrentMap<ClientInvocationId, SingleDataStream> streams = new ConcurrentHashMap<ClientInvocationId, SingleDataStream>();

        public CompletableFuture<StateMachine.DataStream> stream(RaftClientRequest request) {
            SingleDataStream s = new SingleDataStream(request);
            LOG.info("XXX {} put {}, {}", new Object[]{this, ClientInvocationId.valueOf((RaftClientMessage)request), s});
            this.streams.put(ClientInvocationId.valueOf((RaftClientMessage)request), s);
            return CompletableFuture.completedFuture(s);
        }

        public CompletableFuture<?> link(StateMachine.DataStream stream, RaftProtos.LogEntryProto entry) {
            LOG.info("link {}", (Object)stream);
            if (stream == null) {
                return JavaUtils.completeExceptionally((Throwable)new IllegalStateException("Null stream: entry=" + entry));
            }
            ((SingleDataStream)stream).setLogEntry(entry);
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
            RaftProtos.LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
            this.updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
            SingleDataStream s = this.getSingleDataStream(ClientInvocationId.valueOf((RaftProtos.StateMachineLogEntryProto)entry.getStateMachineLogEntry()));
            ByteString bytesWritten = DataStreamTestUtils.bytesWritten2ByteString(s.getDataChannel().getBytesWritten());
            return CompletableFuture.completedFuture(() -> bytesWritten);
        }

        SingleDataStream getSingleDataStream(RaftClientRequest request) {
            return this.getSingleDataStream(ClientInvocationId.valueOf((RaftClientMessage)request));
        }

        SingleDataStream getSingleDataStream(ClientInvocationId invocationId) {
            SingleDataStream s = (SingleDataStream)this.streams.get(invocationId);
            LOG.info("XXX {}: get {} return {}", new Object[]{this, invocationId, s});
            return s;
        }

        Collection<SingleDataStream> getStreams() {
            return this.streams.values();
        }
    }
}

