/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.common.monitoring.ThreadPoolExecutorMonitor;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.core.ClusterService;
import co.paralleluniverse.galaxy.core.CommThread;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.netty.ChannelNodeNameReader;
import co.paralleluniverse.galaxy.netty.TcpMessagePipelineFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;

public abstract class AbstractTcpServer
extends ClusterService {
    private final Logger LOG = LoggerFactory.getLogger((String)(AbstractTcpServer.class.getName() + "." + this.getName()));
    private final int port;
    private final ChannelFactory channelFactory;
    private final ServerBootstrap bootstrap;
    private final DefaultChannelGroup channels;
    private final AtomicLong nextMessageId = new AtomicLong(1L);
    private final ChannelPipelineFactory origChannelFacotry;
    private ThreadPoolExecutor bossExecutor;
    private ThreadPoolExecutor workerExecutor;
    private OrderedMemoryAwareThreadPoolExecutor receiveExecutor;
    private final ChannelHandler channelHandler = new SimpleChannelHandler(){

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            Message message = (Message)e.getMessage();
            AbstractTcpServer.this.LOG.debug("Received {}", (Object)message);
            AbstractTcpServer.this.receive(ctx, message);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            AbstractTcpServer.this.LOG.info("Channel exception: {} {}", (Object)e.getCause().getClass().getName(), (Object)e.getCause().getMessage());
            ctx.getChannel().close();
        }
    };

    AbstractTcpServer(String name, final Cluster cluster, DefaultChannelGroup channels, int port, final ChannelHandler testHandler) {
        super(name, cluster);
        this.channels = channels;
        this.port = port;
        if (this.bossExecutor == null) {
            this.bossExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        }
        if (this.workerExecutor == null) {
            this.workerExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        }
        this.configureThreadPool(name + "-tcpServerBoss", this.bossExecutor);
        this.configureThreadPool(name + "-tcpServerWorker", this.workerExecutor);
        if (this.receiveExecutor != null) {
            this.configureThreadPool(name + "-tcpServerReceive", (ThreadPoolExecutor)this.receiveExecutor);
        }
        this.channelFactory = new NioServerSocketChannelFactory((Executor)this.bossExecutor, (Executor)this.workerExecutor);
        this.bootstrap = new ServerBootstrap(this.channelFactory);
        this.origChannelFacotry = new TcpMessagePipelineFactory(this.LOG, channels, (Executor)this.receiveExecutor){

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = super.getPipeline();
                pipeline.addBefore("messageCodec", "nodeNameReader", (ChannelHandler)new ChannelNodeNameReader(cluster));
                pipeline.addLast("router", AbstractTcpServer.this.channelHandler);
                if (testHandler != null) {
                    pipeline.addLast("test", testHandler);
                }
                return pipeline;
            }
        };
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return AbstractTcpServer.this.getPipeline();
            }
        });
        this.bootstrap.setOption("reuseAddress", (Object)true);
        this.bootstrap.setOption("child.tcpNoDelay", (Object)true);
        this.bootstrap.setOption("child.keepAlive", (Object)true);
    }

    public AbstractTcpServer(String name, Cluster cluster, DefaultChannelGroup channels, int port) {
        this(name, cluster, channels, port, null);
    }

    public void setBossExecutor(ThreadPoolExecutor bossExecutor) {
        this.assertDuringInitialization();
        this.bossExecutor = bossExecutor;
    }

    public void setWorkerExecutor(ThreadPoolExecutor workerExecutor) {
        this.assertDuringInitialization();
        this.workerExecutor = workerExecutor;
    }

    public void setReceiveExecutor(OrderedMemoryAwareThreadPoolExecutor receiveExecutor) {
        this.assertDuringInitialization();
        this.receiveExecutor = receiveExecutor;
    }

    private void configureThreadPool(String name, ThreadPoolExecutor executor) {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setThreadFactory(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new CommThread(r);
            }
        }).build());
        ThreadPoolExecutorMonitor.register(name, executor);
    }

    protected ChannelPipeline getPipeline() throws Exception {
        return this.origChannelFacotry.getPipeline();
    }

    protected abstract void receive(ChannelHandlerContext var1, Message var2);

    protected void bind() {
        Channel channel = this.bootstrap.bind((SocketAddress)new InetSocketAddress(this.port));
        this.channels.add(channel);
        this.LOG.info("Channel {} listening on port {}", (Object)channel, (Object)this.port);
        this.setReady(true);
    }

    @Override
    public void shutdown() {
        this.LOG.info("Shutting down.");
        this.channels.close().awaitUninterruptibly();
        this.channelFactory.releaseExternalResources();
    }

    protected DefaultChannelGroup getChannels() {
        return this.channels;
    }

    protected long nextMessageId() {
        return this.nextMessageId.getAndIncrement();
    }

    @ManagedAttribute
    public int getPort() {
        return this.port;
    }
}

