/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.NodeInfo;
import co.paralleluniverse.galaxy.cluster.ReaderWriters;
import co.paralleluniverse.galaxy.cluster.SlaveConfigurationListener;
import co.paralleluniverse.galaxy.core.Backup;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.SlaveComm;
import co.paralleluniverse.galaxy.netty.AbstractTcpServer;
import co.paralleluniverse.galaxy.netty.ChannelNodeInfo;
import co.paralleluniverse.galaxy.netty.IpConstants;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import java.beans.ConstructorProperties;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TcpSlaveServerComm
extends AbstractTcpServer
implements SlaveComm {
    private static final Logger LOG = LoggerFactory.getLogger(TcpSlaveServerComm.class);
    private Backup backup;
    private boolean sentSlave;
    private final ConcurrentMap<Channel, Iterator<Message.BACKUP>> replIters = new ConcurrentHashMap<Channel, Iterator<Message.BACKUP>>();
    private long lastId;
    private volatile Thread replThread;

    @ConstructorProperties(value={"name", "cluster", "port"})
    public TcpSlaveServerComm(String name, Cluster cluster, int port) throws Exception {
        this(name, cluster, port, null);
    }

    TcpSlaveServerComm(String name, Cluster cluster, int port, ChannelHandler testHandler) throws Exception {
        super(name, cluster, new ChannelGroup(), port, testHandler);
        cluster.addNodeProperty("ip_addr", true, true, IpConstants.INET_ADDRESS_READER_WRITER);
        cluster.setNodeProperty("ip_addr", InetAddress.getLocalHost());
        cluster.addNodeProperty("ip_slave_port", true, false, ReaderWriters.INTEGER);
        cluster.setNodeProperty("ip_slave_port", port);
        cluster.addSlaveConfigurationListener(new SlaveConfigurationListener(){

            @Override
            public void newMaster(NodeInfo node) {
            }

            @Override
            public void slaveAdded(NodeInfo node) {
            }

            @Override
            public void slaveRemoved(NodeInfo node) {
                Channel channel = TcpSlaveServerComm.this.getChannels().get(node);
                if (channel != null) {
                    LOG.info("Closing channel for removed node {}", (Object)node);
                    channel.close();
                }
            }
        });
    }

    @Override
    public void setBackup(Backup backup) {
        this.assertDuringInitialization();
        this.backup = backup;
    }

    @Override
    protected void postInit() throws Exception {
        super.postInit();
    }

    @Override
    protected void init() throws Exception {
        super.init();
    }

    @Override
    protected void available(boolean value) {
        super.available(value);
    }

    @Override
    protected void start(boolean master) {
        if (master) {
            this.bind();
            this.startReplicationThread();
        }
    }

    @Override
    public void switchToMaster() {
        super.switchToMaster();
        this.bind();
        this.startReplicationThread();
    }

    @Override
    public void shutdown() {
        this.replThread.interrupt();
        super.shutdown();
    }

    @Override
    protected ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = super.getPipeline();
        pipeline.addLast("connections", (ChannelHandler)new SimpleChannelUpstreamHandler(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                if (TcpSlaveServerComm.this.getChannels().size() > 2) {
                    throw new RuntimeException("Only one slave is currently supported! - " + new ArrayList(TcpSlaveServerComm.this.getChannels()));
                }
                InetAddress remoteAddress = ((InetSocketAddress)ctx.getChannel().getRemoteAddress()).getAddress();
                if (TcpSlaveServerComm.this.getCluster().getNodesByProperty("ip_addr", remoteAddress).isEmpty()) {
                    LOG.warn("An attempt to connect from an unrecognized address {}. No registered cluster node has this address.", (Object)remoteAddress);
                    ctx.getChannel().close();
                    return;
                }
                TcpSlaveServerComm.this.replIters.put(ctx.getChannel(), TcpSlaveServerComm.this.backup.iterOwned());
                ConcurrentMap concurrentMap = TcpSlaveServerComm.this.replIters;
                synchronized (concurrentMap) {
                    TcpSlaveServerComm.this.replIters.notify();
                }
                super.channelConnected(ctx, e);
            }

            public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                TcpSlaveServerComm.this.ack(ctx, null);
                TcpSlaveServerComm.this.replIters.remove(ctx.getChannel());
                super.channelDisconnected(ctx, e);
            }
        });
        return pipeline;
    }

    @Override
    protected void receive(ChannelHandlerContext ctx, Message message) {
        switch (message.getType()) {
            case BACKUP_PACKETACK: {
                this.ack(ctx, (Message.BACKUP_PACKETACK)message);
                break;
            }
            case INVACK: {
                this.invack(ctx, (Message.LineMessage)message);
                break;
            }
            default: {
                LOG.warn("Unhandled message: {}", (Object)message);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ack(ChannelHandlerContext ctx, Message.BACKUP_PACKETACK ack) {
        TcpSlaveServerComm tcpSlaveServerComm = this;
        synchronized (tcpSlaveServerComm) {
            if (ack != null && ack.getId() != this.lastId) {
                LOG.warn("Received backup ack id {} which is different from last sent: {}", (Object)ack.getId(), (Object)this.lastId);
                return;
            }
            LOG.debug("Received backup ack from slave {}", (Object)ctx.getChannel());
            this.sentSlave = false;
        }
        this.backup.slavesAck(this.lastId);
    }

    private void invack(ChannelHandlerContext ctx, Message.LineMessage invack) {
        this.backup.slavesInvAck(invack.getLine());
    }

    private static NodeInfo getNodeInfo(Channel channel) {
        return (NodeInfo)ChannelNodeInfo.nodeInfo.get(channel);
    }

    @Override
    public synchronized boolean send(Message message) {
        if (message.getType() == Message.Type.BACKUP_PACKET && this.sentSlave) {
            throw new RuntimeException("Previous backup not handled yet!");
        }
        if (!message.isResponse()) {
            message.setMessageId(this.nextMessageId());
        }
        LOG.debug("Send {}", (Object)message);
        HashSet<Channel> slaves = new HashSet<Channel>();
        ChannelGroupFuture fs = this.getChannels().write(message);
        for (ChannelFuture f : fs) {
            slaves.add(f.getChannel());
        }
        if (slaves.isEmpty()) {
            LOG.debug("No slaves... Returning false");
            return false;
        }
        LOG.debug("Sending to slaves: {}", slaves);
        if (slaves.size() > 1) {
            throw new RuntimeException("Only one slave is currently supported! - " + slaves);
        }
        switch (message.getType()) {
            case INV: {
                return true;
            }
            case BACKUP_PACKET: {
                this.lastId = ((Message.BACKUP_PACKET)message).getId();
                this.sentSlave = true;
                return true;
            }
        }
        LOG.warn("Unhandled message: {}", (Object)message);
        return false;
    }

    @Override
    protected ChannelGroup getChannels() {
        return (ChannelGroup)super.getChannels();
    }

    private void startReplicationThread() {
        if (this.replThread != null) {
            return;
        }
        this.replThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    while (!Thread.interrupted()) {
                        ConcurrentMap concurrentMap = TcpSlaveServerComm.this.replIters;
                        synchronized (concurrentMap) {
                            while (TcpSlaveServerComm.this.replIters.isEmpty()) {
                                TcpSlaveServerComm.this.replIters.wait();
                            }
                        }
                        Iterator entryIter = TcpSlaveServerComm.this.replIters.entrySet().iterator();
                        block7: while (entryIter.hasNext()) {
                            Map.Entry entry = entryIter.next();
                            Channel channel = (Channel)entry.getKey();
                            Iterator iter2 = (Iterator)entry.getValue();
                            for (int i = 0; i < 10; ++i) {
                                if (!iter2.hasNext()) {
                                    channel.write((Object)Message.BACKUP(-1L, -1L, null));
                                    LOG.debug("Finished replicating to channel {}", (Object)channel);
                                    entryIter.remove();
                                    continue block7;
                                }
                                Message.BACKUP backup = (Message.BACKUP)iter2.next();
                                LOG.debug("Replicating {} to channel {}", (Object)backup, (Object)channel);
                                channel.write((Object)backup);
                            }
                        }
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                LOG.info("Replication thread interrupted");
            }
        });
        this.replThread.setName("backup-replication");
        this.replThread.setDaemon(true);
        this.replThread.setPriority(4);
        this.replThread.start();
    }

    private static class ChannelGroup
    extends DefaultChannelGroup {
        private final BiMap<NodeInfo, Channel> channels = Maps.synchronizedBiMap((BiMap)HashBiMap.create());

        public ChannelGroup(String name) {
            super(name);
        }

        public ChannelGroup() {
        }

        public boolean add(Channel channel) {
            if (channel instanceof ServerChannel) {
                return super.add(channel);
            }
            NodeInfo node = TcpSlaveServerComm.getNodeInfo(channel);
            if (node == null) {
                LOG.warn("Received connection from an unknown address {}.", (Object)channel.getRemoteAddress());
                throw new RuntimeException("Unknown node for address " + channel.getRemoteAddress());
            }
            boolean added = super.add(channel);
            if (added) {
                this.channels.put((Object)node, (Object)channel);
            }
            return added;
        }

        public boolean remove(Object o) {
            Channel channel = (Channel)o;
            boolean removed = super.remove(o);
            if (removed) {
                this.channels.inverse().remove((Object)channel);
            }
            ChannelNodeInfo.nodeInfo.remove(channel);
            return removed;
        }

        public void clear() {
            super.clear();
            this.channels.clear();
        }

        public boolean contains(Object o) {
            if (o instanceof NodeInfo) {
                return this.channels.containsKey((Object)((NodeInfo)o));
            }
            return super.contains(o);
        }

        public Channel get(NodeInfo node) {
            return (Channel)this.channels.get((Object)node);
        }

        public NodeInfo get(Channel channel) {
            return (NodeInfo)this.channels.inverse().get((Object)channel);
        }
    }
}

