/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.Call;
import org.apache.hadoop.hbase.ipc.CallEvent;
import org.apache.hadoop.hbase.ipc.CellBlockBuilder;
import org.apache.hadoop.hbase.ipc.IPCUtil;
import org.apache.hadoop.hbase.ipc.NettyRpcConnection;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelDuplexHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPromise;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.PromiseCombiner;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;

@InterfaceAudience.Private
class NettyRpcDuplexHandler
extends ChannelDuplexHandler {
    private static final Log LOG = LogFactory.getLog(NettyRpcDuplexHandler.class);
    private final NettyRpcConnection conn;
    private final CellBlockBuilder cellBlockBuilder;
    private final Codec codec;
    private final CompressionCodec compressor;
    private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();

    public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, Codec codec, CompressionCodec compressor) {
        this.conn = conn;
        this.cellBlockBuilder = cellBlockBuilder;
        this.codec = codec;
        this.compressor = compressor;
    }

    private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) throws IOException {
        RPCProtos.CellBlockMeta cellBlockMeta;
        this.id2Call.put(call.id, call);
        ByteBuf cellBlock = this.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells, ctx.alloc());
        if (cellBlock != null) {
            RPCProtos.CellBlockMeta.Builder cellBlockMetaBuilder = RPCProtos.CellBlockMeta.newBuilder();
            cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
            cellBlockMeta = cellBlockMetaBuilder.build();
        } else {
            cellBlockMeta = null;
        }
        RPCProtos.RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
        int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
        int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() : sizeWithoutCellBlock;
        ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
        buf.writeInt(totalSize);
        ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
        requestHeader.writeDelimitedTo(bbos);
        if (call.param != null) {
            call.param.writeDelimitedTo(bbos);
        }
        if (cellBlock != null) {
            ChannelPromise withoutCellBlockPromise = ctx.newPromise();
            ctx.write(buf, withoutCellBlockPromise);
            ChannelPromise cellBlockPromise = ctx.newPromise();
            ctx.write(cellBlock, cellBlockPromise);
            PromiseCombiner combiner = new PromiseCombiner();
            combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
            combiner.finish(promise);
        } else {
            ctx.write(buf, promise);
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Call) {
            this.writeRequest(ctx, (Call)msg, promise);
        } else {
            ctx.write(msg, promise);
        }
    }

    private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
        CellScanner cellBlockScanner;
        Message value;
        Call call;
        RemoteException remoteExc;
        int totalSize = buf.readInt();
        ByteBufInputStream in = new ByteBufInputStream(buf);
        RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
        int id = responseHeader.getCallId();
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("got response header " + TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"));
        }
        if (responseHeader.hasException()) {
            RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
            remoteExc = IPCUtil.createRemoteException(exceptionResponse);
            if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
                this.exceptionCaught(ctx, remoteExc);
                return;
            }
        } else {
            remoteExc = null;
        }
        if ((call = this.id2Call.remove(id)) == null) {
            int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
            int whatIsLeftToRead = totalSize - readSoFar;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead + " bytes"));
            }
            return;
        }
        if (remoteExc != null) {
            call.setException(remoteExc);
            return;
        }
        if (call.responseDefaultType != null) {
            Message.Builder builder = call.responseDefaultType.newBuilderForType();
            builder.mergeDelimitedFrom(in);
            value = builder.build();
        } else {
            value = null;
        }
        if (responseHeader.hasCellBlockMeta()) {
            int size = responseHeader.getCellBlockMeta().getLength();
            byte[] cellBlock = new byte[size];
            buf.readBytes(cellBlock);
            cellBlockScanner = this.cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
        } else {
            cellBlockScanner = null;
        }
        call.setResponse(value, cellBlockScanner);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf)msg;
            try {
                this.readResponse(ctx, buf);
            }
            finally {
                buf.release();
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }

    private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
        for (Call call : this.id2Call.values()) {
            call.setException(error);
        }
        this.id2Call.clear();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.id2Call.isEmpty()) {
            this.cleanupCalls(ctx, new IOException("Connection closed"));
        }
        this.conn.shutdown();
        ctx.fireChannelInactive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (!this.id2Call.isEmpty()) {
            this.cleanupCalls(ctx, IPCUtil.toIOE(cause));
        }
        this.conn.shutdown();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        block8: {
            block7: {
                if (!(evt instanceof IdleStateEvent)) break block7;
                IdleStateEvent idleEvt = (IdleStateEvent)evt;
                switch (idleEvt.state()) {
                    case WRITER_IDLE: {
                        if (this.id2Call.isEmpty()) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace((Object)("shutdown connection to " + this.conn.remoteId().address + " because idle for a long time"));
                            }
                            this.conn.shutdown();
                            break;
                        }
                        break block8;
                    }
                    default: {
                        LOG.warn((Object)("Unrecognized idle state " + (Object)((Object)idleEvt.state())));
                        break;
                    }
                }
                break block8;
            }
            if (evt instanceof CallEvent) {
                this.id2Call.remove(((CallEvent)evt).call.id);
            } else {
                ctx.fireUserEventTriggered(evt);
            }
        }
    }
}

