/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.NodeChangeListener;
import co.paralleluniverse.galaxy.cluster.NodeInfo;
import co.paralleluniverse.galaxy.cluster.ReaderWriters;
import co.paralleluniverse.galaxy.core.Comm;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MessageReceiver;
import co.paralleluniverse.galaxy.netty.AbstractTcpServer;
import co.paralleluniverse.galaxy.netty.ChannelNodeInfo;
import co.paralleluniverse.galaxy.netty.IpConstants;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import java.beans.ConstructorProperties;
import java.net.InetAddress;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TcpServerServerComm
extends AbstractTcpServer
implements Comm {
    private static final Logger LOG = LoggerFactory.getLogger(TcpServerServerComm.class);
    private MessageReceiver receiver;

    @ConstructorProperties(value={"name", "cluster", "port"})
    public TcpServerServerComm(String name, Cluster cluster, int port) throws Exception {
        this(name, cluster, port, null);
    }

    TcpServerServerComm(String name, Cluster cluster, int port, ChannelHandler testHandler) throws Exception {
        super(name, cluster, new ChannelGroup(), port, testHandler);
        cluster.addNodeProperty("ip_addr", true, true, IpConstants.INET_ADDRESS_READER_WRITER);
        cluster.setNodeProperty("ip_addr", InetAddress.getLocalHost());
        cluster.addNodeProperty("ip_server_port", false, true, ReaderWriters.INTEGER);
        cluster.setNodeProperty("ip_server_port", port);
        cluster.addNodeChangeListener(new NodeChangeListener(){

            @Override
            public void nodeAdded(short id) {
            }

            @Override
            public void nodeSwitched(short id) {
                Channel channel = TcpServerServerComm.this.getChannels().get(id);
                if (channel != null) {
                    LOG.info("Closing channel for switched node {}", (Object)id);
                    channel.close();
                }
            }

            @Override
            public void nodeRemoved(short id) {
                Channel channel = TcpServerServerComm.this.getChannels().get(id);
                if (channel != null) {
                    LOG.info("Closing channel for removed node {}", (Object)id);
                    channel.close();
                }
            }
        });
    }

    @Override
    public void start(boolean master) {
        this.bind();
    }

    @Override
    protected ChannelGroup getChannels() {
        return (ChannelGroup)super.getChannels();
    }

    @Override
    public void setReceiver(MessageReceiver receiver) {
        this.assertDuringInitialization();
        this.receiver = receiver;
    }

    @Override
    public void send(Message message) {
        if (!message.isResponse()) {
            message.setMessageId(this.nextMessageId());
        }
        LOG.debug("Send {}", (Object)message);
        Channel ch = this.getChannels().get(message.getNode());
        if (ch == null) {
            LOG.warn("No open channel found for node {}", (Object)message.getNode());
            return;
        }
        ch.write((Object)message);
    }

    @Override
    protected void receive(ChannelHandlerContext ctx, Message message) {
        this.receiver.receive(message);
    }

    private static class ChannelGroup
    extends DefaultChannelGroup {
        private final BiMap<Short, Channel> channels = Maps.synchronizedBiMap((BiMap)HashBiMap.create());

        public ChannelGroup(String name) {
            super(name);
        }

        public ChannelGroup() {
        }

        public boolean add(Channel channel) {
            if (channel instanceof ServerChannel) {
                return super.add(channel);
            }
            NodeInfo node = (NodeInfo)ChannelNodeInfo.nodeInfo.get(channel);
            if (node == null) {
                LOG.warn("Received connection from an unknown address {}.", (Object)channel.getRemoteAddress());
                throw new RuntimeException("Unknown node for address " + channel.getRemoteAddress());
            }
            short nodeId = node.getNodeId();
            if (this.channels.containsKey((Object)nodeId)) {
                LOG.warn("Received connection from address {} of node {}, but this node is already connected.", (Object)channel.getRemoteAddress(), (Object)nodeId);
                throw new RuntimeException("Node " + nodeId + " already connected.");
            }
            boolean added = super.add(channel);
            if (added) {
                this.channels.put((Object)nodeId, (Object)channel);
            }
            return added;
        }

        public boolean remove(Object o) {
            Channel channel = (Channel)o;
            boolean removed = super.remove(o);
            if (removed) {
                this.channels.inverse().remove((Object)channel);
            }
            ChannelNodeInfo.nodeInfo.remove(channel);
            return removed;
        }

        public void clear() {
            super.clear();
            this.channels.clear();
        }

        public boolean contains(Object o) {
            if (o instanceof Short) {
                return this.channels.containsKey((Object)((Short)o));
            }
            return super.contains(o);
        }

        public Channel get(short node) {
            return (Channel)this.channels.get((Object)node);
        }
    }
}

