/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.netty.server.DataStreamManagement;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientMessage;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Exception performing whole class analysis ignored.
 */
public class DataStreamManagement {
    public static final Logger LOG = LoggerFactory.getLogger(DataStreamManagement.class);
    private final RaftServer server;
    private final String name;
    private final StreamMap streams = new StreamMap();
    private final Executor requestExecutor;
    private final Executor writeExecutor;
    private final NettyServerStreamRpcMetrics nettyServerStreamRpcMetrics;

    DataStreamManagement(RaftServer server, NettyServerStreamRpcMetrics metrics) {
        this.server = server;
        this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(this.getClass());
        RaftProperties properties = server.getProperties();
        boolean useCachedThreadPool = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached((RaftProperties)properties);
        this.requestExecutor = ConcurrentUtils.newThreadPoolWithMax((boolean)useCachedThreadPool, (int)RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize((RaftProperties)properties), (String)(this.name + "-request-"));
        this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax((boolean)useCachedThreadPool, (int)RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize((RaftProperties)properties), (String)(this.name + "-write-"));
        this.nettyServerStreamRpcMetrics = metrics;
    }

    private CompletableFuture<StateMachine.DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
        RaftServer.Division division = this.server.getDivision(request.getRaftGroupId());
        ClientInvocationId invocationId = ClientInvocationId.valueOf((RaftClientMessage)request);
        MemoizedSupplier supplier = JavaUtils.memoize(() -> {
            NettyServerStreamRpcMetrics.RequestMetrics metrics = this.getMetrics().newRequestMetrics(NettyServerStreamRpcMetrics.RequestType.STATE_MACHINE_STREAM);
            NettyServerStreamRpcMetrics.RequestContext context = metrics.start();
            return division.getStateMachine().data().stream(request).whenComplete((r, e) -> metrics.stop(context, e == null));
        });
        CompletableFuture f = division.getDataStreamMap().computeIfAbsent(invocationId, key -> (CompletableFuture)supplier.get());
        if (!supplier.isInitialized()) {
            throw new AlreadyExistsException("A DataStream already exists for " + invocationId);
        }
        return f;
    }

    private StreamInfo newStreamInfo(ByteBuf buf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
        try {
            RaftClientRequest request = ClientProtoUtils.toRaftClientRequest((RaftProtos.RaftClientRequestProto)RaftProtos.RaftClientRequestProto.parseFrom((ByteBuffer)buf.nioBuffer()));
            boolean isPrimary = this.server.getId().equals((Object)request.getServerId());
            return new StreamInfo(request, isPrimary, this.computeDataStreamIfAbsent(request), this.server, getStreams, arg_0 -> ((NettyServerStreamRpcMetrics)this.getMetrics()).newRequestMetrics(arg_0));
        }
        catch (Throwable e) {
            throw new CompletionException(e);
        }
    }

    static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor, Function<T, CompletableFuture<T>> function) {
        return future.updateAndGet(previous -> previous.thenComposeAsync(function, executor));
    }

    static CompletableFuture<Long> writeToAsync(ByteBuf buf, Iterable<WriteOption> options, StateMachine.DataStream stream, Executor defaultExecutor) {
        Executor e = Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor);
        return CompletableFuture.supplyAsync(() -> DataStreamManagement.writeTo((ByteBuf)buf, (Iterable)options, (StateMachine.DataStream)stream), e);
    }

    static long writeTo(ByteBuf buf, Iterable<WriteOption> options, StateMachine.DataStream stream) {
        StateMachine.DataChannel channel = stream.getDataChannel();
        long byteWritten = 0L;
        for (ByteBuffer buffer : buf.nioBuffers()) {
            ReferenceCountedObject wrapped = ReferenceCountedObject.wrap((Object)buffer, () -> ((ByteBuf)buf).retain(), () -> ((ByteBuf)buf).release());
            try {
                byteWritten += (long)channel.write(wrapped);
            }
            catch (Throwable t) {
                throw new CompletionException(t);
            }
        }
        if (WriteOption.containsOption(options, (WriteOption)StandardWriteOption.SYNC)) {
            try {
                channel.force(false);
            }
            catch (IOException e) {
                throw new CompletionException(e);
            }
        }
        if (WriteOption.containsOption(options, (WriteOption)StandardWriteOption.CLOSE)) {
            DataStreamManagement.close((StateMachine.DataStream)stream);
        }
        return byteWritten;
    }

    static void close(StateMachine.DataStream stream) {
        try {
            stream.getDataChannel().close();
        }
        catch (IOException e) {
            throw new CompletionException("Failed to close " + stream, e);
        }
    }

    static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request, RaftClientReply reply, long bytesWritten, Collection<RaftProtos.CommitInfoProto> commitInfos) {
        ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto((RaftClientReply)reply).toByteString().asReadOnlyByteBuffer();
        return DataStreamReplyByteBuffer.newBuilder().setDataStreamPacket((DataStreamPacket)request).setBuffer(buffer).setSuccess(reply.isSuccess()).setBytesWritten(bytesWritten).setCommitInfos(commitInfos).build();
    }

    static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites, DataStreamRequestByteBuf request, long bytesWritten, Collection<RaftProtos.CommitInfoProto> commitInfos, ChannelHandlerContext ctx) {
        boolean success = DataStreamManagement.checkSuccessRemoteWrite(remoteWrites, (long)bytesWritten, (DataStreamRequestByteBuf)request);
        DataStreamReplyByteBuffer.Builder builder = DataStreamReplyByteBuffer.newBuilder().setDataStreamPacket((DataStreamPacket)request).setSuccess(success).setCommitInfos(commitInfos);
        if (success) {
            builder.setBytesWritten(bytesWritten);
        }
        ctx.writeAndFlush((Object)builder.build());
    }

    private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info, DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
        NettyServerStreamRpcMetrics.RequestMetrics metrics = this.getMetrics().newRequestMetrics(NettyServerStreamRpcMetrics.RequestType.START_TRANSACTION);
        NettyServerStreamRpcMetrics.RequestContext context = metrics.start();
        try {
            AsyncRpcApi asyncRpcApi = (AsyncRpcApi)this.server.getDivision(info.getRequest().getRaftGroupId()).getRaftClient().async();
            return asyncRpcApi.sendForward(StreamInfo.access$000((StreamInfo)info)).whenCompleteAsync((reply, e) -> {
                metrics.stop(context, e == null);
                if (e != null) {
                    DataStreamManagement.replyDataStreamException((RaftServer)this.server, (Throwable)e, (RaftClientRequest)info.getRequest(), (DataStreamRequestByteBuf)request, (ChannelHandlerContext)ctx);
                } else {
                    ctx.writeAndFlush((Object)DataStreamManagement.newDataStreamReplyByteBuffer((DataStreamRequestByteBuf)request, (RaftClientReply)reply, (long)bytesWritten, (Collection)info.getCommitInfos()));
                }
            }, this.requestExecutor);
        }
        catch (IOException e2) {
            throw new CompletionException(e2);
        }
    }

    static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
        RaftClientReply reply = RaftClientReply.newBuilder().setRequest(raftClientRequest).setException((RaftException)new DataStreamException(server.getId(), cause)).build();
        DataStreamManagement.sendDataStreamException((Throwable)cause, (DataStreamRequestByteBuf)request, (RaftClientReply)reply, (ChannelHandlerContext)ctx);
    }

    void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
        RaftClientReply reply = RaftClientReply.newBuilder().setClientId(ClientId.emptyClientId()).setServerId(this.server.getId()).setGroupId(RaftGroupId.emptyGroupId()).setException((RaftException)new DataStreamException(this.server.getId(), cause)).build();
        DataStreamManagement.sendDataStreamException((Throwable)cause, (DataStreamRequestByteBuf)request, (RaftClientReply)reply, (ChannelHandlerContext)ctx);
    }

    static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBuf request, RaftClientReply reply, ChannelHandlerContext ctx) {
        LOG.warn("Failed to process {}", (Object)request, (Object)throwable);
        try {
            ctx.writeAndFlush((Object)DataStreamManagement.newDataStreamReplyByteBuffer((DataStreamRequestByteBuf)request, (RaftClientReply)reply, (long)0L, null));
        }
        catch (Throwable t) {
            LOG.warn("Failed to sendDataStreamException {} for {}", new Object[]{throwable, request, t});
        }
    }

    void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
        LOG.debug("{}: read {}", (Object)this, (Object)request);
        ByteBuf buf = request.slice();
        try {
            this.readImpl(request, ctx, buf, getStreams);
        }
        catch (Throwable t) {
            this.replyDataStreamException(t, request, ctx);
            buf.release();
        }
    }

    private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf, CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
        List remoteWrites;
        CompletableFuture localWrite;
        StreamInfo info;
        boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
        ClientInvocationId key = ClientInvocationId.valueOf((ClientId)request.getClientId(), (long)request.getStreamId());
        if (request.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER) {
            MemoizedSupplier supplier = JavaUtils.memoize(() -> this.newStreamInfo(buf, getStreams));
            info = this.streams.computeIfAbsent(key, id -> (StreamInfo)supplier.get());
            if (!supplier.isInitialized()) {
                this.streams.remove(key);
                throw new IllegalStateException("Failed to create a new stream for " + request + " since a stream already exists Key: " + key + " StreamInfo:" + info);
            }
            this.getMetrics().onRequestCreate(NettyServerStreamRpcMetrics.RequestType.HEADER);
        } else {
            info = close ? Optional.ofNullable(this.streams.remove(key)).orElseThrow(() -> new IllegalStateException("Failed to remove StreamInfo for " + request)) : Optional.ofNullable(this.streams.get(key)).orElseThrow(() -> {
                this.streams.remove(key);
                return new IllegalStateException("Failed to get StreamInfo for " + request);
            });
        }
        if (request.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER) {
            localWrite = CompletableFuture.completedFuture(0L);
            remoteWrites = Collections.emptyList();
        } else if (request.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA) {
            localWrite = info.getLocal().write(buf, (Iterable)request.getWriteOptionList(), this.writeExecutor);
            remoteWrites = info.applyToRemotes(out -> out.write(request, this.requestExecutor));
        } else {
            throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
        }
        DataStreamManagement.composeAsync((AtomicReference)info.getPrevious(), (Executor)this.requestExecutor, (T n) -> JavaUtils.allOf((Collection)remoteWrites).thenCombineAsync((CompletionStage)localWrite, (v, bytesWritten) -> {
            if (request.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_HEADER || request.getType() == RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA && !close) {
                DataStreamManagement.sendReply((List)remoteWrites, (DataStreamRequestByteBuf)request, (long)bytesWritten, (Collection)info.getCommitInfos(), (ChannelHandlerContext)ctx);
            } else if (close) {
                if (info.isPrimary()) {
                    this.startTransaction(info, request, bytesWritten.longValue(), ctx);
                } else {
                    DataStreamManagement.sendReply((List)remoteWrites, (DataStreamRequestByteBuf)request, (long)bytesWritten, (Collection)info.getCommitInfos(), (ChannelHandlerContext)ctx);
                }
            } else {
                throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
            }
            return null;
        }, this.requestExecutor)).whenComplete((v, exception) -> {
            try {
                if (exception != null) {
                    this.streams.remove(key);
                    DataStreamManagement.replyDataStreamException((RaftServer)this.server, (Throwable)exception, (RaftClientRequest)info.getRequest(), (DataStreamRequestByteBuf)request, (ChannelHandlerContext)ctx);
                }
            }
            finally {
                buf.release();
            }
        });
    }

    static void assertReplyCorrespondingToRequest(DataStreamRequestByteBuf request, DataStreamReply reply) {
        Preconditions.assertTrue((boolean)request.getClientId().equals((Object)reply.getClientId()));
        Preconditions.assertTrue((request.getType() == reply.getType() ? 1 : 0) != 0);
        Preconditions.assertTrue((request.getStreamId() == reply.getStreamId() ? 1 : 0) != 0);
        Preconditions.assertTrue((request.getStreamOffset() == reply.getStreamOffset() ? 1 : 0) != 0);
    }

    static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten, DataStreamRequestByteBuf request) {
        for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
            DataStreamReply reply = replyFuture.join();
            DataStreamManagement.assertReplyCorrespondingToRequest((DataStreamRequestByteBuf)request, (DataStreamReply)reply);
            if (!reply.isSuccess()) {
                LOG.warn("reply is not success, request: {}", (Object)request);
                return false;
            }
            if (reply.getBytesWritten() == bytesWritten) continue;
            LOG.warn("reply written bytes not match, local size: {} remote size: {} request: {}", new Object[]{bytesWritten, reply.getBytesWritten(), request});
            return false;
        }
        return true;
    }

    NettyServerStreamRpcMetrics getMetrics() {
        return this.nettyServerStreamRpcMetrics;
    }

    public String toString() {
        return this.name;
    }
}

