/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.server.DataStreamManagement;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerStreamRpc
implements DataStreamServerRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
    private final String name;
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final ChannelFuture channelFuture;
    private final DataStreamManagement requests;
    private final List<Proxies> proxies = new ArrayList<Proxies>();

    public NettyServerStreamRpc(RaftServer server) {
        this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(this.getClass());
        this.requests = new DataStreamManagement(server);
        RaftProperties properties = server.getProperties();
        int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize((RaftProperties)properties);
        for (int i = 0; i < clientPoolSize; ++i) {
            this.proxies.add(new Proxies((PeerProxyMap<DataStreamClient>)new PeerProxyMap(this.name, peer -> NettyServerStreamRpc.newClient(peer, properties))));
        }
        int port = NettyConfigKeys.DataStream.port(properties);
        this.channelFuture = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).handler((ChannelHandler)new LoggingHandler(LogLevel.INFO))).childHandler(this.getInitializer()).childOption(ChannelOption.SO_KEEPALIVE, (Object)true).bind(port);
    }

    static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
        return DataStreamClient.newBuilder().setClientId(ClientId.randomId()).setDataStreamServer(peer).setProperties(properties).build();
    }

    public void addRaftPeers(Collection<RaftPeer> newPeers) {
        for (int i = 0; i < this.proxies.size(); ++i) {
            this.proxies.get(i).addPeers(newPeers);
        }
    }

    private ChannelInboundHandler newChannelInboundHandlerAdapter() {
        return new ChannelInboundHandlerAdapter(){
            private final RequestRef requestRef = new RequestRef();

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                if (!(msg instanceof DataStreamRequestByteBuf)) {
                    LOG.error("Unexpected message class {}, ignoring ...", (Object)msg.getClass().getName());
                    return;
                }
                DataStreamRequestByteBuf request = this.requestRef.set((DataStreamRequestByteBuf)((Object)msg));
                int index = Math.toIntExact(((0xFFFFFFFFL & (long)request.getClientId().hashCode()) + request.getStreamId()) % (long)NettyServerStreamRpc.this.proxies.size());
                NettyServerStreamRpc.this.requests.read(request, ctx, (CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException>)((CheckedBiFunction)((Proxies)NettyServerStreamRpc.this.proxies.get(index))::getDataStreamOutput));
                this.requestRef.reset(request);
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
                Optional.ofNullable(this.requestRef.getAndSetNull()).ifPresent(request -> NettyServerStreamRpc.this.requests.replyDataStreamException(throwable, (DataStreamRequestByteBuf)((Object)request), ctx));
            }
        };
    }

    private ChannelInitializer<SocketChannel> getInitializer() {
        return new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new ChannelHandler[]{NettyServerStreamRpc.newDecoder()});
                p.addLast(new ChannelHandler[]{NettyServerStreamRpc.newEncoder()});
                p.addLast(new ChannelHandler[]{NettyServerStreamRpc.this.newChannelInboundHandlerAdapter()});
            }
        };
    }

    static ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder(){
            {
                this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
                Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
            }
        };
    }

    static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamReplyByteBuffer>(){

            protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, context.alloc());
            }
        };
    }

    public void start() {
        this.channelFuture.syncUninterruptibly();
    }

    public InetSocketAddress getInetSocketAddress() {
        this.channelFuture.awaitUninterruptibly();
        return (InetSocketAddress)this.channelFuture.channel().localAddress();
    }

    public void close() {
        try {
            this.channelFuture.channel().close().sync();
            this.bossGroup.shutdownGracefully(0L, 100L, TimeUnit.MILLISECONDS);
            this.workerGroup.shutdownGracefully(0L, 100L, TimeUnit.MILLISECONDS);
            this.bossGroup.awaitTermination(1000L, TimeUnit.MILLISECONDS);
            this.workerGroup.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.error(this + ": Interrupted close()", (Throwable)e);
        }
        for (int i = 0; i < this.proxies.size(); ++i) {
            this.proxies.get(i).close();
        }
    }

    public String toString() {
        return this.name;
    }

    static class RequestRef {
        private final AtomicReference<DataStreamRequestByteBuf> ref = new AtomicReference();

        RequestRef() {
        }

        DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
            Optional.ofNullable(this.ref.getAndSet(current)).ifPresent(previous -> {
                throw new IllegalStateException("previous = " + (Object)previous + " != null, current=" + (Object)((Object)current));
            });
            return current;
        }

        void reset(DataStreamRequestByteBuf expected) {
            DataStreamRequestByteBuf stored = this.ref.getAndSet(null);
            Preconditions.assertTrue((stored == expected ? 1 : 0) != 0, () -> "Expected=" + (Object)((Object)expected) + " but stored=" + (Object)((Object)stored));
        }

        DataStreamRequestByteBuf getAndSetNull() {
            return this.ref.getAndSet(null);
        }
    }

    static class Proxies {
        private final PeerProxyMap<DataStreamClient> map;

        Proxies(PeerProxyMap<DataStreamClient> map) {
            this.map = map;
        }

        void addPeers(Collection<RaftPeer> newPeers) {
            this.map.addRaftPeers(newPeers);
        }

        Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
            HashSet<DataStreamOutputRpc> outs = new HashSet<DataStreamOutputRpc>();
            try {
                this.getDataStreamOutput(request, peers, outs);
            }
            catch (IOException e) {
                outs.forEach(CloseAsync::closeAsync);
                throw e;
            }
            return outs;
        }

        private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs) throws IOException {
            for (RaftPeer peer : peers) {
                try {
                    outs.add((DataStreamOutputRpc)((DataStreamClient)this.map.getProxy(peer.getId())).stream(request));
                }
                catch (IOException e) {
                    throw new IOException(this.map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
                }
            }
        }

        void close() {
            this.map.close();
        }
    }
}

