/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.bidirectional.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import io.vlingo.common.pool.ElasticResourcePool;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.channel.ResponseSenderChannel;
import io.vlingo.wire.fdx.bidirectional.netty.server.NettyServerChannelContext;
import io.vlingo.wire.message.BasicConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBufferPool;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NettyInboundHandler
extends ChannelInboundHandlerAdapter
implements ResponseSenderChannel {
    private static final Logger logger = LoggerFactory.getLogger(NettyInboundHandler.class);
    private static final String WIRE_CONTEXT_NAME = "$WIRE_CONTEXT";
    private static final AttributeKey<NettyServerChannelContext> WIRE_CONTEXT = AttributeKey.exists((String)"$WIRE_CONTEXT") ? AttributeKey.valueOf((String)"$WIRE_CONTEXT") : AttributeKey.newInstance((String)"$WIRE_CONTEXT");
    private final RequestChannelConsumer consumer;
    private String contextInstanceId;
    private final ConsumerByteBufferPool readBufferPool;
    private static final AtomicLong nextInstanceId = new AtomicLong(0L);
    private final long instanceId;

    NettyInboundHandler(RequestChannelConsumerProvider consumerProvider, int maxBufferPoolSize, int maxMessageSize) {
        this.consumer = consumerProvider.requestChannelConsumer();
        this.readBufferPool = new ConsumerByteBufferPool(ElasticResourcePool.Config.of((int)maxBufferPoolSize), maxMessageSize);
        this.instanceId = nextInstanceId.incrementAndGet();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.debug(">>>>> NettyInboundHandler::channelActive(): " + this.instanceId + " NAME: " + this.contextInstanceId(ctx));
        if (ctx.channel().isActive()) {
            this.getWireContext(ctx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext context, Object msg) {
        if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
            return;
        }
        logger.debug(">>>>> NettyInboundHandler::channelRead(): " + this.instanceId + " NAME: " + this.contextInstanceId(context));
        try {
            NettyServerChannelContext channelContext = this.getWireContext(context);
            ConsumerByteBuffer pooledBuffer = this.readBufferPool.acquire("NettyClientHandler#channelRead");
            try {
                ByteBuf byteBuf = (ByteBuf)msg;
                byte[] bytes = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(bytes);
                pooledBuffer.put(bytes);
                this.consumer.consume(channelContext, pooledBuffer.flip());
            }
            catch (Throwable t) {
                pooledBuffer.release();
                throw t;
            }
        }
        catch (Throwable throwable) {
            logger.error("Error reading the incoming data.", throwable);
            context.close();
        }
        finally {
            ReferenceCountUtil.release((Object)msg);
        }
    }

    public void channelUnregistered(ChannelHandlerContext context) throws Exception {
        logger.debug(">>>>> NettyInboundHandler::channelUnregistered(): " + this.instanceId + " NAME: " + this.contextInstanceId(context));
        super.channelUnregistered(context);
    }

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
        logger.error("Unexpected exception", cause);
        super.exceptionCaught(context, cause);
    }

    @Override
    public void abandon(RequestResponseContext<?> context) {
        ChannelHandlerContext nettyChannelContext = ((NettyServerChannelContext)context).getNettyChannelContext();
        logger.debug(">>>>> NettyInboundHandler::abandon(): " + this.instanceId + " NAME: " + this.contextInstanceId(nettyChannelContext));
        nettyChannelContext.close();
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, ConsumerByteBuffer buffer) {
        this.respondWith(context, buffer, false);
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, ConsumerByteBuffer buffer, boolean closeFollowing) {
        NettyServerChannelContext nettyServerChannelContext = (NettyServerChannelContext)context;
        ChannelHandlerContext channelHandlerContext = nettyServerChannelContext.getNettyChannelContext();
        String contextInstanceId = this.contextInstanceId(channelHandlerContext);
        logger.debug(">>>>> NettyInboundHandler::respondWith(): " + this.instanceId + " NAME: " + contextInstanceId + " : CLOSE? " + closeFollowing);
        ByteBuf replyBuffer = channelHandlerContext.alloc().buffer(buffer.limit());
        replyBuffer.writeBytes(buffer.asByteBuffer());
        channelHandlerContext.writeAndFlush((Object)replyBuffer).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
            if (future.isSuccess()) {
                logger.debug("Reply sent");
            } else {
                logger.error("Failed to send reply", future.cause());
            }
            if (closeFollowing) {
                this.closeConnection(contextInstanceId, (ChannelFuture)future);
            }
        }));
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, Object response, boolean closeFollowing) {
        String textResponse = response.toString();
        ConsumerByteBuffer buffer = new BasicConsumerByteBuffer(0, textResponse.length() + 1024).put(textResponse.getBytes()).flip();
        this.respondWith(context, buffer, closeFollowing);
    }

    private void closeConnection(String contextInstanceId, ChannelFuture channelFuture) {
        channelFuture.channel().close().addListener(closeFuture -> {
            if (closeFuture.isSuccess()) {
                logger.debug(">>>>> NettyInboundHandler::respondWith(): " + this.instanceId + " NAME: " + contextInstanceId + " : CLOSED");
            } else {
                logger.error(">>>>> NettyInboundHandler::respondWith(): " + this.instanceId + " NAME: " + contextInstanceId + " : FAILED TO CLOSE");
            }
        });
    }

    private NettyServerChannelContext getWireContext(ChannelHandlerContext ctx) {
        Channel nettyChannel = ctx.channel();
        if (!nettyChannel.hasAttr(WIRE_CONTEXT)) {
            nettyChannel.attr(WIRE_CONTEXT).set((Object)new NettyServerChannelContext(ctx, this));
        }
        return (NettyServerChannelContext)nettyChannel.attr(WIRE_CONTEXT).get();
    }

    private String contextInstanceId(ChannelHandlerContext context) {
        if (this.contextInstanceId == null) {
            this.contextInstanceId = context.name() + ":" + this.instanceId;
        }
        return this.contextInstanceId;
    }
}

