/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.client;

import java.net.SocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientStreamRpc
implements DataStreamClientRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
    private final String name;
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final Supplier<Channel> channel;
    private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies = new ConcurrentHashMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>>();

    public NettyClientStreamRpc(RaftPeer server, RaftProperties properties) {
        this.name = JavaUtils.getClassSimpleName(this.getClass()) + "->" + server;
        ChannelFuture f = ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.workerGroup)).channel(NioSocketChannel.class)).handler(this.getInitializer())).option(ChannelOption.SO_KEEPALIVE, (Object)true)).connect((SocketAddress)NetUtils.createSocketAddr((String)server.getDataStreamAddress()));
        this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
    }

    private Channel getChannel() {
        return this.channel.get();
    }

    private ChannelInboundHandler getClientHandler() {
        return new ChannelInboundHandlerAdapter(){

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                if (!(msg instanceof DataStreamReply)) {
                    LOG.error("{}: unexpected message {}", (Object)this, msg.getClass());
                    return;
                }
                DataStreamReply reply = (DataStreamReply)msg;
                LOG.debug("{}: read {}", (Object)this, (Object)reply);
                ClientInvocationId clientInvocationId = ClientInvocationId.valueOf((ClientId)reply.getClientId(), (long)reply.getStreamId());
                Optional.ofNullable(NettyClientStreamRpc.this.replies.get(clientInvocationId)).map(Queue::poll).ifPresent(f -> f.complete(reply));
            }
        };
    }

    private ChannelInitializer<SocketChannel> getInitializer() {
        return new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.this.newEncoder()});
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.this.newEncoderDataStreamRequestFilePositionCount()});
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.this.newDecoder()});
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.this.getClientHandler()});
            }
        };
    }

    MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamRequestByteBuffer>(){

            protected void encode(ChannelHandlerContext context, DataStreamRequestByteBuffer request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, out::add, context.alloc());
            }
        };
    }

    MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
        return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>(){

            protected void encode(ChannelHandlerContext ctx, DataStreamRequestFilePositionCount request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, out::add, ctx.alloc());
            }
        };
    }

    ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder(){
            {
                this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
                Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(buf)).ifPresent(out::add);
            }
        };
    }

    public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
        CompletableFuture<DataStreamReply> f = new CompletableFuture<DataStreamReply>();
        ClientInvocationId clientInvocationId = ClientInvocationId.valueOf((ClientId)request.getClientId(), (long)request.getStreamId());
        Queue q = this.replies.computeIfAbsent(clientInvocationId, key -> new ConcurrentLinkedQueue());
        if (!q.offer(f)) {
            f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
            return f;
        }
        LOG.debug("{}: write {}", (Object)this, (Object)request);
        this.getChannel().writeAndFlush((Object)request);
        return f;
    }

    public void close() {
        this.getChannel().close().syncUninterruptibly();
        this.workerGroup.shutdownGracefully();
    }

    public String toString() {
        return this.name;
    }
}

