/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.avro.Protocol;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.NettyTransportCodec;
import org.apache.avro.ipc.Transceiver;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyTransceiver
extends Transceiver {
    private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class.getName());
    private final AtomicInteger serialGenerator = new AtomicInteger(0);
    private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
    private final ChannelFactory channelFactory;
    private final ClientBootstrap bootstrap;
    private final InetSocketAddress remoteAddr;
    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
    private Channel channel;
    private Protocol remote;

    NettyTransceiver() {
        this.channelFactory = null;
        this.bootstrap = null;
        this.remoteAddr = null;
    }

    public NettyTransceiver(InetSocketAddress addr) throws IOException {
        this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory is null");
        }
        this.channelFactory = channelFactory;
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.remoteAddr = addr;
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline p = Channels.pipeline();
                p.addLast("frameDecoder", new NettyTransportCodec.NettyFrameDecoder());
                p.addLast("frameEncoder", new NettyTransportCodec.NettyFrameEncoder());
                p.addLast("handler", new NettyClientAvroHandler());
                return p;
            }
        });
        this.bootstrap.setOption("tcpNoDelay", true);
        this.stateLock.readLock().lock();
        try {
            this.getChannel();
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    private static boolean isChannelReady(Channel channel) {
        return channel != null && channel.isOpen() && channel.isBound() && channel.isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Channel getChannel() throws IOException {
        if (!NettyTransceiver.isChannelReady(this.channel)) {
            this.stateLock.readLock().unlock();
            this.stateLock.writeLock().lock();
            try {
                LOG.info("Connecting to " + this.remoteAddr);
                ChannelFuture channelFuture = this.bootstrap.connect(this.remoteAddr);
                channelFuture.awaitUninterruptibly();
                if (!channelFuture.isSuccess()) {
                    channelFuture.getCause().printStackTrace();
                    throw new IOException("Error connecting to " + this.remoteAddr, channelFuture.getCause());
                }
                this.channel = channelFuture.getChannel();
            }
            finally {
                this.stateLock.readLock().lock();
                this.stateLock.writeLock().unlock();
            }
        }
        return this.channel;
    }

    private void disconnect() {
        this.disconnect(false, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests) {
        ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
        this.stateLock.writeLock().lock();
        try {
            if (this.channel != null) {
                LOG.info("Disconnecting from " + this.remoteAddr);
                ChannelFuture closeFuture = this.channel.close();
                if (awaitCompletion) {
                    closeFuture.awaitUninterruptibly();
                }
                this.channel = null;
                this.remote = null;
                if (cancelPendingRequests) {
                    requestsToCancel = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(this.requests);
                    this.requests.clear();
                }
            }
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
        if (requestsToCancel != null && !requestsToCancel.isEmpty()) {
            LOG.warn("Removing " + requestsToCancel.size() + " pending request(s).");
            for (Callback request : requestsToCancel.values()) {
                request.handleError(new IOException(this.getClass().getSimpleName() + " closed"));
            }
        }
    }

    @Override
    public void lockChannel() {
    }

    @Override
    public void unlockChannel() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        try {
            this.disconnect(true, true);
        }
        finally {
            this.channelFactory.releaseExternalResources();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getRemoteName() throws IOException {
        this.stateLock.readLock().lock();
        try {
            String string = this.getChannel().getRemoteAddress().toString();
            return string;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override
    public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException {
        try {
            CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
            this.transceive(request, transceiverFuture);
            return transceiverFuture.get();
        }
        catch (InterruptedException e) {
            LOG.warn("failed to get the response", e);
            return null;
        }
        catch (ExecutionException e) {
            LOG.warn("failed to get the response", e);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException {
        this.stateLock.readLock().lock();
        try {
            int serial = this.serialGenerator.incrementAndGet();
            NettyTransportCodec.NettyDataPack dataPack = new NettyTransportCodec.NettyDataPack(serial, request);
            this.requests.put(serial, callback);
            this.writeDataPack(dataPack);
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override
    public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
        this.writeDataPack(new NettyTransportCodec.NettyDataPack(this.serialGenerator.incrementAndGet(), buffers));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeDataPack(NettyTransportCodec.NettyDataPack dataPack) throws IOException {
        this.stateLock.readLock().lock();
        try {
            this.getChannel().write(dataPack);
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override
    public List<ByteBuffer> readBuffers() throws IOException {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Protocol getRemote() {
        this.stateLock.readLock().lock();
        try {
            Protocol protocol = this.remote;
            return protocol;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isConnected() {
        this.stateLock.readLock().lock();
        try {
            boolean bl = this.remote != null;
            return bl;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setRemote(Protocol protocol) {
        this.stateLock.writeLock().lock();
        try {
            this.remote = protocol;
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    class NettyClientAvroHandler
    extends SimpleChannelUpstreamHandler {
        NettyClientAvroHandler() {
        }

        @Override
        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            if (e instanceof ChannelStateEvent) {
                LOG.info(e.toString());
                ChannelStateEvent cse = (ChannelStateEvent)e;
                if (cse.getState() == ChannelState.OPEN && Boolean.FALSE.equals(cse.getValue())) {
                    LOG.info("Remote peer " + NettyTransceiver.this.remoteAddr + " closed connection.");
                    NettyTransceiver.this.disconnect();
                }
            }
            super.handleUpstream(ctx, e);
        }

        @Override
        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            super.channelOpen(ctx, e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            NettyTransportCodec.NettyDataPack dataPack = (NettyTransportCodec.NettyDataPack)e.getMessage();
            Callback callback = (Callback)NettyTransceiver.this.requests.get(dataPack.getSerial());
            if (callback == null) {
                throw new RuntimeException("Missing previous call info");
            }
            try {
                callback.handleResult(dataPack.getDatas());
            }
            finally {
                NettyTransceiver.this.requests.remove(dataPack.getSerial());
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            LOG.warn("Unexpected exception from downstream.", e.getCause());
            e.getChannel().close();
            Iterator it = NettyTransceiver.this.requests.values().iterator();
            while (it.hasNext()) {
                ((Callback)it.next()).handleError(e.getCause());
                it.remove();
            }
        }
    }
}

