/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.common.collection.Util;
import co.paralleluniverse.common.monitoring.ThreadPoolExecutorMonitor;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.NodeInfo;
import co.paralleluniverse.galaxy.core.ClusterService;
import co.paralleluniverse.galaxy.core.CommThread;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.netty.ChannelNodeInfo;
import co.paralleluniverse.galaxy.netty.ChannelNodeNameWriter;
import co.paralleluniverse.galaxy.netty.TcpMessagePipelineFactory;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractTcpClient
extends ClusterService {
    private final Logger LOG = LoggerFactory.getLogger((String)(AbstractTcpClient.class.getName() + "." + this.getName()));
    private String nodeName;
    private final String portProperty;
    private InetSocketAddress address;
    private final ChannelPipelineFactory origChannelFacotry;
    private final ChannelFactory channelFactory;
    private final ClientBootstrap bootstrap;
    private boolean connecting;
    private Channel channel;
    private volatile boolean reconnect;
    private final Lock channelLock = new ReentrantLock();
    private final Condition channelConnected = this.channelLock.newCondition();
    private final Deque<Message> pendingReply = new ConcurrentLinkedDeque<Message>();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private ThreadPoolExecutor bossExecutor;
    private ThreadPoolExecutor workerExecutor;
    private OrderedMemoryAwareThreadPoolExecutor receiveExecutor;
    private final ChannelHandler channelHandler = new SimpleChannelHandler(){

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            Message message = (Message)e.getMessage();
            AbstractTcpClient.this.LOG.debug("Received {}", (Object)message);
            AbstractTcpClient.this.pendingReply.removeLastOccurrence(message);
            AbstractTcpClient.this.receive(ctx, message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            AbstractTcpClient.this.channelLock.lock();
            try {
                AbstractTcpClient.this.channel = e.getChannel();
                if (!AbstractTcpClient.this.connecting) {
                    AbstractTcpClient.this.LOG.info("Asked to disconnect from newly connected channel {}. Closing.", (Object)AbstractTcpClient.this.channel);
                    AbstractTcpClient.this.channel.close();
                    return;
                }
                AbstractTcpClient.this.LOG.debug("Set channel to {}", (Object)AbstractTcpClient.this.channel);
                for (Message pending : Util.reverse(AbstractTcpClient.this.pendingReply)) {
                    AbstractTcpClient.this.LOG.debug("Sending pending message {} (channel connected)", (Object)pending);
                    AbstractTcpClient.this.channel.write((Object)pending);
                    AbstractTcpClient.this.LOG.debug("Message {} written", (Object)pending);
                }
                AbstractTcpClient.this.setReady(true);
                AbstractTcpClient.this.connecting = false;
                AbstractTcpClient.this.channelConnected.signalAll();
            }
            finally {
                AbstractTcpClient.this.channelLock.unlock();
            }
            super.channelConnected(ctx, e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            AbstractTcpClient.this.channelLock.lock();
            try {
                if (ctx.getChannel() == AbstractTcpClient.this.channel) {
                    AbstractTcpClient.this.setReady(false);
                    if (AbstractTcpClient.this.channel != null) {
                        AbstractTcpClient.this.channel.close();
                    }
                    AbstractTcpClient.this.channel = null;
                    AbstractTcpClient.this.connectLater();
                }
            }
            finally {
                AbstractTcpClient.this.channelLock.unlock();
            }
            super.channelDisconnected(ctx, e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            AbstractTcpClient.this.channelLock.lock();
            try {
                AbstractTcpClient.this.LOG.info("Channel exception: {} {}", (Object)e.getCause().getClass().getName(), (Object)e.getCause().getMessage());
                AbstractTcpClient.this.LOG.debug("Channel exception", e.getCause());
                AbstractTcpClient.this.setReady(false);
                if (AbstractTcpClient.this.channel != null) {
                    AbstractTcpClient.this.channel.close();
                }
                AbstractTcpClient.this.channel = null;
            }
            finally {
                AbstractTcpClient.this.channelLock.unlock();
                AbstractTcpClient.this.connectLater();
            }
        }
    };

    public AbstractTcpClient(String name, final Cluster cluster, String portProperty) throws Exception {
        super(name, cluster);
        this.portProperty = portProperty;
        if (this.bossExecutor == null) {
            this.bossExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        }
        if (this.workerExecutor == null) {
            this.workerExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        }
        this.configureThreadPool(name + "-tcpClientBoss", this.bossExecutor);
        this.configureThreadPool(name + "-tcpClientWorker", this.workerExecutor);
        if (this.receiveExecutor != null) {
            this.configureThreadPool(name + "-tcpClientReceive", (ThreadPoolExecutor)this.receiveExecutor);
        }
        this.channelFactory = new NioClientSocketChannelFactory((Executor)this.bossExecutor, (Executor)this.workerExecutor);
        this.bootstrap = new ClientBootstrap(this.channelFactory);
        this.origChannelFacotry = new TcpMessagePipelineFactory(this.LOG, null, (Executor)this.receiveExecutor){

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                final ChannelPipeline pipeline = super.getPipeline();
                pipeline.addBefore("messageCodec", "nodeNameWriter", (ChannelHandler)new ChannelNodeNameWriter(cluster));
                pipeline.addBefore("nodeNameWriter", "nodeInfoSetter", (ChannelHandler)new SimpleChannelUpstreamHandler(){

                    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                        if (AbstractTcpClient.this.nodeName == null) {
                            throw new RuntimeException("nodeName not set!");
                        }
                        NodeInfo ni = cluster.getNodeInfoByName(AbstractTcpClient.this.nodeName);
                        ChannelNodeInfo.nodeInfo.set(ctx.getChannel(), (Object)ni);
                        super.channelConnected(ctx, e);
                        pipeline.remove((ChannelHandler)this);
                    }
                });
                pipeline.addLast("router", AbstractTcpClient.this.channelHandler);
                return pipeline;
            }
        };
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return AbstractTcpClient.this.getPipeline();
            }
        });
        this.bootstrap.setOption("tcpNoDelay", (Object)true);
        this.bootstrap.setOption("keepAlive", (Object)true);
        this.reconnect = true;
    }

    @Override
    public void shutdown() {
        this.LOG.info("Shutting down.");
        this.disconnect();
        this.channelFactory.releaseExternalResources();
        this.executor.shutdownNow();
    }

    public void setBossExecutor(ThreadPoolExecutor bossExecutor) {
        this.assertDuringInitialization();
        this.bossExecutor = bossExecutor;
    }

    public void setWorkerExecutor(ThreadPoolExecutor workerExecutor) {
        this.assertDuringInitialization();
        this.workerExecutor = workerExecutor;
    }

    public void setReceiveExecutor(OrderedMemoryAwareThreadPoolExecutor receiveExecutor) {
        this.assertDuringInitialization();
        this.receiveExecutor = receiveExecutor;
    }

    private void configureThreadPool(String name, ThreadPoolExecutor executor) {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setThreadFactory(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new CommThread(r);
            }
        }).build());
        ThreadPoolExecutorMonitor.register(name, executor);
    }

    protected ChannelPipeline getPipeline() throws Exception {
        return this.origChannelFacotry.getPipeline();
    }

    protected void setNodeName(String nodeName) {
        this.nodeName = nodeName;
    }

    protected String getNodeName() {
        return this.nodeName;
    }

    private InetSocketAddress getAddress(NodeInfo node, String portProperty) {
        InetAddress _address = (InetAddress)node.get("ip_addr");
        Integer port = (Integer)node.get(portProperty);
        if (_address == null || port == null) {
            if (_address == null) {
                this.LOG.warn("Socket address (property {}) not set for node {}", (Object)"ip_addr", (Object)node);
            }
            if (port == null) {
                this.LOG.warn("Socket port (property {}) not set for node {}", (Object)portProperty, (Object)node);
            }
            return null;
        }
        InetSocketAddress socket = new InetSocketAddress(_address, (int)port);
        return socket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void reconnect(String nodeName) {
        if (nodeName == null) {
            throw new IllegalArgumentException("nodeName cannot be null!");
        }
        this.channelLock.lock();
        try {
            if (!nodeName.equals(this.nodeName)) {
                this.disconnect();
                this.setNodeName(nodeName);
            }
        }
        finally {
            this.channelLock.unlock();
        }
        this.reconnect = true;
        this.connectLater();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isConnected() {
        this.channelLock.lock();
        try {
            boolean bl = this.channel != null;
            return bl;
        }
        finally {
            this.channelLock.unlock();
        }
    }

    protected void connectLater() {
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                AbstractTcpClient.this.connect();
            }
        });
    }

    private void connect() {
        try {
            while (true) {
                block16: {
                    this.channelLock.lock();
                    try {
                        if (!this.reconnect || Thread.interrupted()) {
                            return;
                        }
                        if (this.channel != null) {
                            return;
                        }
                        this.address = this.getAddress(this.getCluster().getNodeInfoByName(this.nodeName), this.portProperty);
                        if (this.address == null) {
                            this.LOG.warn("No address found for node {}", (Object)this.nodeName);
                            return;
                        }
                        this.LOG.info("Connecting to node {} at {}...", (Object)this.nodeName, (Object)this.address);
                        this.connecting = true;
                        ChannelFuture future = this.bootstrap.connect((SocketAddress)this.address);
                        future.awaitUninterruptibly();
                        if (!future.isSuccess()) break block16;
                        this.LOG.info("Connecting to {} - successful", (Object)this.address);
                        this.channelConnected.signalAll();
                        break;
                    }
                    catch (ChannelException e) {
                        this.LOG.warn("ChannelException", (Throwable)e);
                    }
                    catch (Exception e) {
                        this.LOG.error("Exception", (Throwable)e);
                        throw Throwables.propagate((Throwable)e);
                    }
                    finally {
                        this.channelLock.unlock();
                    }
                }
                this.LOG.info("Connection to {} failed. Retrying.", (Object)this.address);
                Thread.sleep(500L);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void disconnect() {
        this.LOG.info("Disconnecting from node {} - {}", (Object)this.nodeName, (Object)this.address);
        this.channelLock.lock();
        try {
            this.connecting = false;
            this.reconnect = false;
            if (this.channel != null) {
                this.LOG.debug("Closing channel {}", (Object)this.channel);
                this.channel.close().awaitUninterruptibly();
            }
            this.channel = null;
        }
        finally {
            this.channelLock.unlock();
        }
    }

    private Channel getChannel() {
        Channel _channel = this.channel;
        if (_channel == null) {
            this.channelLock.lock();
            try {
                while (this.channel == null) {
                    this.channelConnected.await();
                }
                Channel channel = this.channel;
                return channel;
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            finally {
                this.channelLock.unlock();
            }
        }
        return _channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(Message message) {
        this.LOG.debug("Send {}", (Object)message);
        if (!message.getType().isOf(Message.Type.REQUIRES_RESPONSE)) {
            this.LOG.debug("Message {} does not require a response.", (Object)message);
        } else {
            this.pendingReply.addFirst(message);
        }
        this.channelLock.lock();
        try {
            if (this.channel != null) {
                this.channel.write((Object)message);
                this.LOG.debug("Message {} written", (Object)message);
            } else {
                this.LOG.debug("Message {} not written b/c channel is not yet connected. Keeping as pending.", (Object)message);
            }
        }
        finally {
            this.channelLock.unlock();
        }
    }

    protected abstract void receive(ChannelHandlerContext var1, Message var2);
}

