/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.netty;

import co.paralleluniverse.common.monitoring.ThreadPoolExecutorMonitor;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.cluster.ReaderWriters;
import co.paralleluniverse.galaxy.core.AbstractComm;
import co.paralleluniverse.galaxy.core.Comm;
import co.paralleluniverse.galaxy.core.CommThread;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MessageReceiver;
import co.paralleluniverse.galaxy.core.NodeNotFoundException;
import co.paralleluniverse.galaxy.core.ServerComm;
import co.paralleluniverse.galaxy.netty.ChannelNodeAddressResolver;
import co.paralleluniverse.galaxy.netty.IpConstants;
import co.paralleluniverse.galaxy.netty.MessagePacket;
import co.paralleluniverse.galaxy.netty.SocketNodeAddressResolver;
import co.paralleluniverse.galaxy.netty.UDPCommMonitor;
import co.paralleluniverse.galaxy.netty.UdpMessagePipelineFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import gnu.trove.iterator.TShortIterator;
import gnu.trove.set.hash.TLongHashSet;
import gnu.trove.set.hash.TShortHashSet;
import java.beans.ConstructorProperties;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;

public class UDPComm
extends AbstractComm<InetSocketAddress> {
    private static final Logger LOG = LoggerFactory.getLogger(UDPComm.class);
    private final int port;
    private InetSocketAddress multicastGroup;
    private NetworkInterface multicastNetworkInterface;
    private int maxQueueSize = 50;
    private int maxPacketSize = 4096;
    private int maxRequestOnlyPacketSize = this.maxPacketSize / 2;
    private long minDelayNanos = TimeUnit.NANOSECONDS.convert(1L, TimeUnit.MILLISECONDS);
    private long maxDelayNanos = TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS);
    private long resendPeriodNanos = TimeUnit.NANOSECONDS.convert(20L, TimeUnit.MILLISECONDS);
    private boolean jitter = false;
    private boolean exponentialBackoff = true;
    private int minimumNodesToMulticast = 3;
    private ThreadPoolExecutor workerExecutor;
    private OrderedMemoryAwareThreadPoolExecutor receiveExecutor;
    private final Comm serverComm;
    private DatagramChannelFactory channelFactory;
    private ConnectionlessBootstrap bootstrap;
    private DatagramChannel channel;
    private DatagramChannel multicastChannel;
    private BroadcastPeer broadcastPeer = new BroadcastPeer();
    private SocketAddress myAddress;
    private final ConcurrentMap<Short, NodePeer> peers = new ConcurrentHashMap<Short, NodePeer>();
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
    private final UDPCommMonitor monitor;
    private static final ThreadLocal<Boolean> recursive = new ThreadLocal();

    @ConstructorProperties(value={"name", "cluster", "serverComm", "port"})
    UDPComm(String name, Cluster cluster, ServerComm serverComm, int port) throws Exception {
        super(name, cluster, new SocketNodeAddressResolver(cluster, "ip_port"));
        this.serverComm = serverComm;
        this.port = port;
        cluster.addNodeProperty("ip_addr", true, true, IpConstants.INET_ADDRESS_READER_WRITER);
        cluster.setNodeProperty("ip_addr", InetAddress.getLocalHost());
        cluster.addNodeProperty("ip_port", true, false, ReaderWriters.INTEGER);
        cluster.setNodeProperty("ip_port", port);
        this.monitor = new UDPCommMonitor(name, this);
    }

    @ManagedAttribute
    public int getPort() {
        return this.port;
    }

    public void setReceiveBufferSize(int size) {
        this.assertDuringInitialization();
        this.bootstrap.setOption("receiveBufferSize", (Object)size);
    }

    public void setMulticastGroup(InetSocketAddress group) {
        this.assertDuringInitialization();
        this.multicastGroup = group;
    }

    @ManagedAttribute
    public String getMulticastGroupName() {
        return this.multicastGroup.toString();
    }

    public void setMulticastNetworkInterface(NetworkInterface multicastNetworkInterface) {
        this.assertDuringInitialization();
        this.multicastNetworkInterface = multicastNetworkInterface;
    }

    @ManagedAttribute
    public String getMulticastNetworkInterfaceName() {
        return this.multicastNetworkInterface.toString();
    }

    public void setMaxQueueSize(int maxQueueSize) {
        this.assertDuringInitialization();
        this.maxQueueSize = maxQueueSize;
    }

    @ManagedAttribute
    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public void setMaxPacketSize(int maxPacketSize) {
        this.assertDuringInitialization();
        this.maxPacketSize = maxPacketSize;
    }

    @ManagedAttribute
    public int getMaxPacketSize() {
        return this.maxPacketSize;
    }

    public void setMaxRequestOnlyPacketSize(int maxRequestOnlyPacketSize) {
        this.assertDuringInitialization();
        this.maxRequestOnlyPacketSize = maxRequestOnlyPacketSize;
    }

    @ManagedAttribute
    public int getMaxRequestOnlyPacketSize() {
        return this.maxRequestOnlyPacketSize;
    }

    public void setMaxDelayMicrosecs(int maxDelayMicrosecs) {
        this.assertDuringInitialization();
        this.maxDelayNanos = TimeUnit.NANOSECONDS.convert(maxDelayMicrosecs, TimeUnit.MICROSECONDS);
    }

    @ManagedAttribute
    public int getMaxDelayMicrosecs() {
        return (int)TimeUnit.MICROSECONDS.convert(this.maxDelayNanos, TimeUnit.NANOSECONDS);
    }

    public void setMinDelayMicrosecs(int minDelayMicrosecs) {
        this.assertDuringInitialization();
        this.minDelayNanos = TimeUnit.NANOSECONDS.convert(minDelayMicrosecs, TimeUnit.MICROSECONDS);
    }

    @ManagedAttribute
    public int getMinDelayMicrosecs() {
        return (int)TimeUnit.MICROSECONDS.convert(this.minDelayNanos, TimeUnit.NANOSECONDS);
    }

    public void setResendPeriodMillisecs(int resnedPeriodMillisecs) {
        this.assertDuringInitialization();
        this.resendPeriodNanos = TimeUnit.NANOSECONDS.convert(resnedPeriodMillisecs, TimeUnit.MILLISECONDS);
    }

    @ManagedAttribute
    public int getResendPeriodMillisecs() {
        return (int)TimeUnit.MILLISECONDS.convert(this.resendPeriodNanos, TimeUnit.NANOSECONDS);
    }

    public void setMinimumNodesToMulticast(int minimumNodesToMulticast) {
        this.assertDuringInitialization();
        this.minimumNodesToMulticast = minimumNodesToMulticast;
    }

    @ManagedAttribute
    public int getMinimumNodesToMulticast() {
        return this.minimumNodesToMulticast;
    }

    public void setWorkerExecutor(ThreadPoolExecutor executor) {
        this.assertDuringInitialization();
        this.workerExecutor = executor;
    }

    @ManagedAttribute
    public String getWorkerExecutorName() {
        return "udpCommWorkerExecutor";
    }

    public void setReceiveExecutor(OrderedMemoryAwareThreadPoolExecutor executor) {
        this.assertDuringInitialization();
        this.receiveExecutor = executor;
    }

    @ManagedAttribute
    public String getReceiveExecutorName() {
        return "udpCommReceiveExecutor";
    }

    public void setJitter(boolean value) {
        this.assertDuringInitialization();
        this.jitter = value;
    }

    @ManagedAttribute
    public boolean isJitter() {
        return this.jitter;
    }

    public void setExponentialBackoff(boolean value) {
        this.assertDuringInitialization();
        this.exponentialBackoff = value;
    }

    @ManagedAttribute
    public boolean isExponentialBackoff() {
        return this.exponentialBackoff;
    }

    @Override
    public void setReceiver(MessageReceiver receiver) {
        super.setReceiver(receiver);
        if (this.serverComm != null) {
            this.serverComm.setReceiver(receiver);
        }
    }

    @Override
    public void init() throws Exception {
        super.init();
        if (!this.isSendToServerInsteadOfMulticast() && this.multicastGroup == null) {
            LOG.error("If sendToServerInsteadOfBroadcast, multicastGroup must be set!");
            throw new RuntimeException("multicastGroup not set.");
        }
        this.myAddress = new InetSocketAddress(InetAddress.getLocalHost(), this.port);
        this.configureThreadPool(this.getWorkerExecutorName(), this.workerExecutor);
        if (this.receiveExecutor != null) {
            this.configureThreadPool(this.getReceiveExecutorName(), (ThreadPoolExecutor)this.receiveExecutor);
        }
        this.channelFactory = this.isSendToServerInsteadOfMulticast() ? new NioDatagramChannelFactory((Executor)this.workerExecutor) : new OioDatagramChannelFactory((Executor)this.workerExecutor);
        this.bootstrap = new ConnectionlessBootstrap((ChannelFactory)this.channelFactory);
        this.bootstrap.setOption("receiveBufferSizePredictorFactory", (Object)new FixedReceiveBufferSizePredictorFactory(4096));
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)new UdpMessagePipelineFactory(LOG, new ChannelNodeAddressResolver(this.addressResolver), (Executor)this.receiveExecutor){

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = super.getPipeline();
                pipeline.addLast("router", (ChannelHandler)new SimpleChannelHandler(){

                    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                        if (ctx.getChannel() == UDPComm.this.multicastChannel) {
                            if (e.getRemoteAddress().equals(UDPComm.this.myAddress)) {
                                return;
                            }
                            ((MessagePacket)e.getMessage()).setMulticast();
                        }
                        UDPComm.this.messageReceived((MessagePacket)e.getMessage());
                    }

                    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
                        LOG.info("Channel exception: {} {}", (Object)e.getCause().getClass().getName(), (Object)e.getCause().getMessage());
                        LOG.debug("Channel exception", e.getCause());
                    }
                });
                return pipeline;
            }
        });
        this.bootstrap.setOption("localAddress", (Object)new InetSocketAddress(this.port));
        this.bootstrap.setOption("tcpNoDelay", (Object)true);
        this.monitor.registerMBean();
    }

    private void configureThreadPool(String name, ThreadPoolExecutor executor) {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).setThreadFactory(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new CommThread(r);
            }
        }).build());
        ThreadPoolExecutorMonitor.register(name, executor);
    }

    @Override
    public void postInit() throws Exception {
        if (!this.sendToServerInsteadOfMulticast) {
            this.broadcastPeer = new BroadcastPeer();
        }
        super.postInit();
    }

    @Override
    public void start(boolean master) {
        this.channel = (DatagramChannel)this.bootstrap.bind();
        LOG.info("Channel {} listening on port {}", (Object)this.channel, (Object)this.port);
        if (!this.isSendToServerInsteadOfMulticast()) {
            int multicastPort = this.multicastGroup.getPort();
            this.multicastChannel = (DatagramChannel)this.bootstrap.bind((SocketAddress)new InetSocketAddress(multicastPort));
            if (this.multicastNetworkInterface != null) {
                LOG.info("Channel {} joining multicast group {} on network interface {}", new Object[]{this.multicastChannel, this.multicastGroup, this.multicastNetworkInterface});
                this.multicastChannel.joinGroup(this.multicastGroup, this.multicastNetworkInterface);
            } else {
                LOG.info("Channel {} joining multicast group {} ", (Object)this.multicastChannel, (Object)this.multicastGroup);
                this.multicastChannel.joinGroup(this.multicastGroup.getAddress());
            }
        } else {
            this.multicastChannel = null;
        }
        this.setReady(true);
    }

    @Override
    public void shutdown() {
        LOG.info("Shutting down.");
        this.monitor.unregisterMBean();
        if (this.channel != null) {
            this.channel.close();
        }
        if (this.multicastChannel != null) {
            this.multicastChannel.close();
        }
        this.channelFactory.releaseExternalResources();
    }

    void setChannel(DatagramChannel channel) {
        this.channel = channel;
    }

    ExecutorService getExecutor() {
        return this.executor;
    }

    @Override
    protected void sendToServer(Message message) {
        super.sendToServer(message);
        try {
            this.serverComm.send(message);
        }
        catch (NodeNotFoundException e) {
            throw new RuntimeException("Server not found!", e);
        }
    }

    @Override
    protected void sendToNode(Message message, short node, InetSocketAddress address) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending to node {} ({}): {}", new Object[]{node, address, message});
            }
            message.cloneDataBuffers();
            NodePeer peer = (NodePeer)this.peers.get(node);
            if (peer == null) {
                throw new NodeNotFoundException(node);
            }
            peer.sendMessage(message);
            this.executor.submit(peer);
        }
        catch (InterruptedException ex) {
            LOG.error("InterruptedException", (Throwable)ex);
            throw new RuntimeException(ex);
        }
        catch (Exception ex) {
            LOG.error("Error while sending message " + message + " to node " + node, (Throwable)ex);
        }
    }

    @Override
    protected synchronized void broadcast(Message message) {
        try {
            assert (message.isBroadcast() && !message.isResponse());
            this.assignMessageId(message);
            boolean unicast = this.getNumPeerNodes() < this.minimumNodesToMulticast;
            TShortHashSet nodes = new TShortHashSet();
            for (NodePeer peer : this.peers.values()) {
                nodes.add(peer.node);
                peer.sendMessage(message, unicast);
                this.executor.submit(peer);
            }
            if (nodes.isEmpty()) {
                if (message instanceof Message.LineMessage) {
                    LOG.debug("No other nodes in cluster. Responding with NOT_FOUND to message {}", (Object)message);
                    this.receive(Message.NOT_FOUND((Message.LineMessage)message).setIncoming());
                }
                return;
            }
            this.broadcastPeer.sendMessage(message, nodes, unicast);
            if (!unicast) {
                this.executor.submit(this.broadcastPeer);
            }
        }
        catch (InterruptedException ex) {
            LOG.error("InterruptedException", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    void messageReceived(MessagePacket packet) {
        if (!this.getCluster().isMaster()) {
            return;
        }
        LOG.debug("Received packet {}", (Object)packet);
        long now = System.nanoTime();
        packet.setTimestamp(now);
        short node = packet.getNode();
        NodePeer peer = (NodePeer)this.peers.get(node);
        if (peer == null) {
            throw new RuntimeException("Message received from unhandled node " + node);
        }
        try {
            peer.receivePacket(packet);
        }
        catch (InterruptedException ex) {
            LOG.error("InterruptedException", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    @Override
    public synchronized void nodeAdded(short id) {
        super.nodeAdded(id);
        if (id == 0) {
            return;
        }
        if (this.peers.get(id) != null) {
            return;
        }
        NodePeer peer = new NodePeer(id);
        LOG.info("Adding peer {} for node {}", (Object)peer, (Object)id);
        peer.setAddress((InetSocketAddress)this.getNodeAddress(id));
        this.peers.put(id, peer);
    }

    @Override
    public synchronized void nodeSwitched(short id) {
        super.nodeSwitched(id);
        NodePeer peer = (NodePeer)this.peers.get(id);
        LOG.info("Node switched. Fixing peer {}", (Object)peer);
        peer.setAddress((InetSocketAddress)this.getNodeAddress(id));
        this.executor.submit(peer);
        this.executor.submit(this.broadcastPeer);
    }

    @Override
    public synchronized void nodeRemoved(short id) {
        super.nodeRemoved(id);
        NodePeer peer = (NodePeer)this.peers.get(id);
        if (peer != null) {
            peer.removed();
        }
        this.peers.remove(id);
        this.broadcastPeer.removeNode(id);
    }

    private int getNumPeerNodes() {
        return this.getCluster().getNodes().size() - (this.getCluster().getNodes().contains((short)0) ? 1 : 0) + 1;
    }

    private static long randInterval(long expected) {
        return (long)UDPComm.randExp(1.0 / (double)expected);
    }

    private static double randExp(double lambda) {
        return -Math.log(1.0 - ThreadLocalRandom.current().nextDouble()) / lambda;
    }

    BroadcastPeer getBroadcastPeer() {
        return this.broadcastPeer;
    }

    ConcurrentMap<Short, NodePeer> getPeers() {
        return this.peers;
    }

    private static class BroadcastEntry {
        final Message message;
        final TShortHashSet nodes;

        public BroadcastEntry(Message message, TShortHashSet nodes) {
            this.message = message;
            this.nodes = nodes;
            this.nodes.remove((short)0);
            LOG.debug("Awaiting ACKS for message {} from nodes {}", (Object)message, (Object)this.nodes);
        }

        public synchronized void addNode(short node) {
            this.nodes.add(node);
        }

        public synchronized boolean removeNode(short node) {
            this.nodes.remove(node);
            return this.nodes.isEmpty();
        }
    }

    class BroadcastPeer
    extends Peer {
        private ConcurrentMap<Long, BroadcastEntry> broadcasts;

        BroadcastPeer() {
            this.broadcasts = new ConcurrentHashMap<Long, BroadcastEntry>();
        }

        public String toString() {
            return "BroadcastPeer{multicastAddress=" + UDPComm.this.multicastGroup + ", lastSent=" + this.getLastSent() + ", sentPacket=" + this.sentPacket + ", next=" + this.overflow + ", queue=" + this.queue + '}';
        }

        public void sendMessage(Message message, TShortHashSet nodes, boolean unicast) throws InterruptedException {
            this.broadcasts.put(message.getMessageId(), new BroadcastEntry(message, nodes));
            if (!unicast) {
                this.sendMessage(message);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws InterruptedException {
            ArrayList<Message> received = new ArrayList<Message>();
            BroadcastPeer broadcastPeer = this;
            synchronized (broadcastPeer) {
                LOG.trace("BroadcastPeer CALL");
                long now = System.nanoTime();
                this.handleTimeout(now, received);
                this.handleQueue(now);
                if (this.sentPacket != null && this.sentPacket.isEmpty()) {
                    this.sentPacket = null;
                }
                if (this.isTimeToResned(now)) {
                    if (this.sentPacket != null) {
                        assert (!UDPComm.this.sendToServerInsteadOfMulticast);
                        LOG.debug("BroadcastPeer {} multicasting packet {}", (Object)this, (Object)this.sentPacket);
                        UDPComm.this.channel.write((Object)this.sentPacket, (SocketAddress)UDPComm.this.multicastGroup);
                        this.resend(now);
                    } else if (!this.broadcasts.isEmpty()) {
                        UDPComm.this.executor.schedule(this, UDPComm.this.getTimeout(), TimeUnit.MILLISECONDS);
                    }
                }
            }
            for (Message message : received) {
                UDPComm.this.receive(message);
            }
            LOG.trace("BroadcastPeer CALL DONE");
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleQueue(long start) throws InterruptedException {
            Message next = this.overflow;
            this.overflow = null;
            if (next == null) {
                next = (Message)this.queue.poll();
            }
            block3: while (next != null) {
                this.overflow = next;
                if (next.size() > UDPComm.this.maxPacketSize) {
                    LOG.error("Message {} is larger than the maximum packet size {}", (Object)next, (Object)UDPComm.this.maxPacketSize);
                    throw new RuntimeException("Message is larger than maxPacketSize");
                }
                if (this.sentPacket != null && next.size() + this.sentPacket.sizeInBytes() > UDPComm.this.maxPacketSize) break;
                LOG.debug("Waiting for peers to enter broadcast mode for message {}", (Object)next);
                BroadcastEntry entry = (BroadcastEntry)this.broadcasts.get(next.getMessageId());
                if (entry != null && entry.nodes.isEmpty()) {
                    this.broadcasts.remove(next.getMessageId());
                    if (next instanceof Message.LineMessage) {
                        LOG.debug("No other nodes in cluster. Responding with NOT_FOUND to message {}", (Object)next);
                        UDPComm.this.receive(Message.NOT_FOUND((Message.LineMessage)next).setIncoming());
                    }
                    entry = null;
                }
                if (entry != null) {
                    for (short node : entry.nodes) {
                        NodePeer peer;
                        NodePeer nodePeer = peer = (NodePeer)UDPComm.this.peers.get(node);
                        synchronized (nodePeer) {
                            if (!peer.isBroadcast() || !peer.sentPacket.contains(next.getMessageId())) {
                                LOG.trace("Waiting for peer {}.", (Object)peer);
                                break block3;
                            }
                            LOG.trace("Peer {} ok (broadcast {})", (Object)peer, (Object)next);
                        }
                    }
                    LOG.debug("Adding message {} to sent-packet", (Object)next);
                    if (this.sentPacket == null) {
                        this.sentPacket = new MessagePacket();
                    }
                    this.sentPacket.addMessage(next);
                    this.forceResend();
                }
                this.overflow = null;
                long now = System.nanoTime();
                if (UDPComm.this.maxDelayNanos > now - start + UDPComm.this.minDelayNanos) break;
                next = (Message)this.queue.poll(UDPComm.this.minDelayNanos, TimeUnit.NANOSECONDS);
            }
        }

        private void handleTimeout(long now, List<Message> received) {
            if (this.broadcasts.isEmpty()) {
                return;
            }
            long timeoutNanos = TimeUnit.NANOSECONDS.convert(UDPComm.this.getTimeout(), TimeUnit.MILLISECONDS);
            Iterator it = this.broadcasts.values().iterator();
            while (it.hasNext()) {
                BroadcastEntry entry = (BroadcastEntry)it.next();
                Message message = entry.message;
                if (message.getType() == Message.Type.INV || now - message.getTimestamp() <= timeoutNanos) continue;
                if (message instanceof Message.LineMessage) {
                    LOG.debug("Timeout on message {}", (Object)message);
                    received.add(Message.TIMEOUT((Message.LineMessage)message).setIncoming());
                }
                it.remove();
                this.releasePeers(entry, (short)-1);
                this.addTimeout(message);
                if (this.sentPacket == null) continue;
                this.sentPacket.removeMessage(message.getMessageId());
            }
            if (this.sentPacket != null && this.sentPacket.isEmpty()) {
                this.sentPacket = null;
            }
            this.cleanupTimeouts(now);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void receivedResponse(Message message, List<Message> received) {
            BroadcastEntry entry = (BroadcastEntry)this.broadcasts.get(message.getMessageId());
            if (entry == null) {
                return;
            }
            BroadcastPeer broadcastPeer = this;
            synchronized (broadcastPeer) {
                boolean done = entry.removeNode(message.getNode());
                if (message.getType() != Message.Type.ACK) {
                    LOG.debug("Message {} is a reply to a broadcast! (discarding pending)", (Object)message);
                    if (!done) {
                        this.releasePeers(entry, message.getNode());
                    }
                    done = true;
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got ACK from {} to message {}", (Object)message.getNode(), (Object)entry.message);
                    }
                    int numNodes = entry.nodes.size();
                    if (done) {
                        if (entry.message instanceof Message.LineMessage) {
                            LOG.debug("Got all ACKs for message {}, but no response - sending NOT_FOUND to cache!", (Object)entry.message);
                            received.add(Message.NOT_FOUND((Message.LineMessage)entry.message).setIncoming());
                        }
                    } else if (numNodes < UDPComm.this.minimumNodesToMulticast && numNodes + 1 >= UDPComm.this.minimumNodesToMulticast) {
                        if (this.sentPacket != null) {
                            this.sentPacket.removeMessage(message.getMessageId());
                        }
                        long now = System.nanoTime();
                        long sinceLastSent = now - this.getLastSent();
                        long delay = UDPComm.this.resendPeriodNanos - sinceLastSent;
                        delay = delay >= 0L ? delay : 0L;
                        TShortIterator it = entry.nodes.iterator();
                        while (it.hasNext()) {
                            NodePeer peer = (NodePeer)UDPComm.this.peers.get(it.next());
                            if (!peer.isBroadcast()) continue;
                            peer.unicastBroadcast();
                            peer.forceResend();
                            peer.resendIn(now, delay);
                            UDPComm.this.executor.submit(peer);
                        }
                    }
                }
                if (done) {
                    if (this.sentPacket != null) {
                        this.sentPacket.removeMessage(message.getMessageId());
                    }
                    this.broadcasts.remove(message.getMessageId());
                }
                if (this.sentPacket != null && this.sentPacket.isEmpty()) {
                    this.sentPacket = null;
                }
            }
        }

        private void releasePeers(BroadcastEntry entry, short node) {
            Message message = entry.message;
            TShortIterator it = entry.nodes.iterator();
            while (it.hasNext()) {
                NodePeer peer = (NodePeer)UDPComm.this.peers.get(it.next());
                if (!peer.isBroadcast()) continue;
                LOG.debug("Broadcast releasing peer {} for message {}", (Object)peer, (Object)message);
                if (peer.node != node) {
                    LOG.debug("Broadcast marking message {} as timeout for peer {}", (Object)message, (Object)peer);
                    peer.markAsTimeout(message);
                }
                peer.unicastBroadcast();
                UDPComm.this.executor.submit(peer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void removeNode(short node) {
            BroadcastPeer broadcastPeer = this;
            synchronized (broadcastPeer) {
                Iterator it = this.broadcasts.entrySet().iterator();
                while (it.hasNext()) {
                    BroadcastEntry entry = (BroadcastEntry)it.next().getValue();
                    if (!entry.removeNode(node) || !(entry.message instanceof Message.LineMessage)) continue;
                    LOG.debug("Got all ACKs for message {}, but no response - sending NOT_FOUND to cache!", (Object)entry.message);
                    UDPComm.this.receive(Message.NOT_FOUND((Message.LineMessage)entry.message).setIncoming());
                    it.remove();
                }
            }
        }
    }

    class NodePeer
    extends Peer {
        public final short node;
        private volatile boolean removed;
        private InetSocketAddress nodeAddress;
        private boolean hasRequests;
        private boolean requestsOnly;
        private volatile boolean broadcast;
        private final TLongHashSet pendingRequests;
        private final Set<Message> unicastBroadcasts;
        private long lastReceivedBroadcastId;

        public NodePeer(short node) {
            this.removed = false;
            this.hasRequests = false;
            this.requestsOnly = true;
            this.pendingRequests = new TLongHashSet();
            this.unicastBroadcasts = Collections.newSetFromMap(new ConcurrentHashMap());
            this.node = node;
        }

        public synchronized void setAddress(InetSocketAddress nodeAddress) {
            LOG.info("Node peer {} set address to {}", (Object)this, (Object)nodeAddress);
            this.nodeAddress = nodeAddress;
            this.lastReceivedBroadcastId = 0L;
            if (this.sentPacket != null) {
                Iterator<Message> it = this.sentPacket.iterator();
                while (it.hasNext()) {
                    Message message = it.next();
                    if (!message.isResponse()) continue;
                    LOG.debug("Peer {} removing response {} because of node switch.", (Object)this, (Object)message);
                    it.remove();
                }
            }
            this.forceResend();
        }

        public synchronized String toString() {
            return "NodePeer{node=" + this.node + ", nodeAddress=" + this.nodeAddress + ", lastSent=" + this.getLastSent() + ", sentPacket=" + this.sentPacket + ", pendingRequests=" + this.pendingRequests + ", next=" + this.overflow + ", queue=" + this.queue + ", broadcast=" + this.broadcast + '}';
        }

        public boolean isBroadcast() {
            return this.broadcast;
        }

        public void unicastBroadcast() {
            assert (this.broadcast);
            LOG.debug("Node peer {} is asked to unicast broadcast.", (Object)this);
            this.broadcast = false;
        }

        public void removed() {
            this.removed = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void sendMessage(Message message) throws InterruptedException {
            ArrayBlockingQueue arrayBlockingQueue = this.queue;
            synchronized (arrayBlockingQueue) {
                UDPComm.this.assignMessageId(message);
                super.sendMessage(message);
            }
        }

        public void sendMessage(Message message, boolean unicastBroadcast) throws InterruptedException {
            if (unicastBroadcast && message.isBroadcast()) {
                this.unicastBroadcasts.add(message);
            }
            this.sendMessage(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void receivePacket(MessagePacket packet) throws InterruptedException {
            ArrayList<Message> received = new ArrayList<Message>(packet.numMessages());
            ArrayList<Message> broadcastResponses = new ArrayList<Message>(packet.numMessages());
            NodePeer nodePeer = this;
            synchronized (nodePeer) {
                this.handleReceived(packet, received, broadcastResponses);
            }
            for (Message message : broadcastResponses) {
                UDPComm.this.broadcastPeer.receivedResponse(message, received);
            }
            recursive.set(Boolean.TRUE);
            try {
                for (Message message : received) {
                    LOG.debug("Passing received message {} to cache", (Object)message);
                    UDPComm.this.receive(message);
                }
            }
            finally {
                recursive.remove();
            }
            this.call();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws InterruptedException {
            if (recursive.get() == Boolean.TRUE) {
                return null;
            }
            recursive.set(Boolean.TRUE);
            try {
                if (this.removed || UDPComm.this.getCluster().getMaster(this.node) == null) {
                    LOG.debug("Node removed from the cluster so returning from peer {}", (Object)this);
                    Void void_ = null;
                    return void_;
                }
                ArrayList<Message> received = new ArrayList<Message>();
                NodePeer nodePeer = this;
                synchronized (nodePeer) {
                    LOG.trace("Peer {} CALL", (Object)this);
                    long now = System.nanoTime();
                    this.handleTimeout(now, received);
                    this.handleQueue(now);
                    if (this.sentPacket != null && this.sentPacket.isEmpty()) {
                        this.sentPacket = null;
                    }
                    if (this.sentPacket != null && !this.broadcast && this.isTimeToResned(now)) {
                        LOG.debug("Peer {} sending packet {}", (Object)this, (Object)this.sentPacket);
                        UDPComm.this.channel.write((Object)this.sentPacket, (SocketAddress)this.nodeAddress);
                        if (this.hasRequests) {
                            this.resend(now);
                        }
                    }
                }
                for (Message message : received) {
                    UDPComm.this.receive(message);
                }
                LOG.trace("Peer {} CALL DONE", (Object)this);
                nodePeer = null;
                return nodePeer;
            }
            finally {
                recursive.remove();
            }
        }

        private void handleReceived(MessagePacket receivedPacket, List<Message> received, List<Message> broadcastResponses) {
            if (receivedPacket == null) {
                return;
            }
            LOG.debug("Peer {} has received packet {}", (Object)this, (Object)receivedPacket);
            boolean oobMulticast = false;
            if (receivedPacket.isMulticast()) {
                long maxIdInPacket = -1L;
                for (Message message : receivedPacket) {
                    maxIdInPacket = Math.max(maxIdInPacket, message.getMessageId());
                }
                if (maxIdInPacket < this.lastReceivedBroadcastId) {
                    LOG.debug("Peer {} received an out-of-band multicast packet {} which has already been seen.", (Object)this, (Object)receivedPacket);
                    oobMulticast = true;
                }
            }
            if (receivedPacket.isEmpty()) {
                return;
            }
            if (!oobMulticast && this.sentPacket != null) {
                Iterator<Message> it = this.sentPacket.iterator();
                while (it.hasNext()) {
                    Message message = it.next();
                    if (!message.isResponse() || receivedPacket.contains(message)) continue;
                    LOG.debug("Peer {} removing response {} from sent packet because it was no longer asked for.", (Object)this, (Object)message);
                    it.remove();
                }
            }
            for (Message message : receivedPacket) {
                message.setTimestamp(receivedPacket.getTimestamp());
                if (message.isBroadcast() && message.getMessageId() > this.lastReceivedBroadcastId) {
                    this.lastReceivedBroadcastId = message.getMessageId();
                }
                if (message.isResponse()) {
                    Message request;
                    Message message2 = request = this.sentPacket != null ? this.sentPacket.getMessage(message) : null;
                    if (!(request != null || this.isTimeout(message) || this.broadcast && UDPComm.this.broadcastPeer.isTimeout(message))) {
                        LOG.debug("Peer {} ignoring repeat response {}", (Object)this, (Object)message);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Peer {} received response {} for request ({})", new Object[]{this, message, request != null ? request : "TIMEOUT"});
                    }
                    if (request != null) {
                        if (request.isBroadcast()) {
                            broadcastResponses.add(message);
                        }
                        this.sentPacket.removeMessage(message);
                    }
                } else {
                    if (this.sentPacket != null && this.sentPacket.contains(message)) {
                        LOG.debug("Peer {} already has a response for message {}", (Object)this, (Object)message);
                        continue;
                    }
                    if (this.pendingRequests.contains(message.getMessageId())) {
                        LOG.debug("Peer {} already has a request pending for message {}", (Object)this, (Object)message);
                        continue;
                    }
                    this.pendingRequests.add(message.getMessageId());
                }
                if (message.getType() == Message.Type.ACK) continue;
                received.add(message);
                if (message.isResponse() || message.isReplyRequired() || this.queue.offer(Message.ACK(message))) continue;
                LOG.error("Queue capacity for perr {} exceeded", (Object)this);
                throw new RuntimeException("Peer queue full!");
            }
            if (this.sentPacket != null) {
                this.forceResend();
                if (this.sentPacket.isEmpty()) {
                    this.sentPacket = null;
                    this.broadcast = false;
                    this.hasRequests = false;
                    this.requestsOnly = true;
                } else {
                    boolean _hasRequests = false;
                    boolean _requestsOnly = true;
                    boolean _broadcast = true;
                    for (Message message : this.sentPacket) {
                        if (message.isResponse()) {
                            _requestsOnly = false;
                        } else {
                            _hasRequests = true;
                        }
                        if (message.isBroadcast()) continue;
                        _broadcast = false;
                    }
                    this.hasRequests = _hasRequests;
                    this.requestsOnly = _requestsOnly;
                    if (!this.broadcast && _broadcast) {
                        LOG.trace("Peer {} notifying broadcast.", (Object)this);
                        UDPComm.this.executor.submit(UDPComm.this.broadcastPeer);
                    }
                    this.broadcast = _broadcast;
                }
            }
        }

        private void handleTimeout(long now, List<Message> received) {
            Message message;
            if (this.broadcast || this.sentPacket == null || this.sentPacket.isEmpty()) {
                return;
            }
            long timeoutNanos = TimeUnit.NANOSECONDS.convert(UDPComm.this.getTimeout(), TimeUnit.MILLISECONDS);
            Iterator<Message> it = this.sentPacket.reverseIterator();
            while (it.hasNext() && (message = it.next()).getType() != Message.Type.INV && now - message.getTimestamp() > timeoutNanos) {
                if (message.isResponse() || message.isBroadcast()) continue;
                if (message instanceof Message.LineMessage) {
                    LOG.debug("Timeout on message {}", (Object)message);
                    received.add(Message.TIMEOUT((Message.LineMessage)message).setIncoming());
                }
                it.remove();
                this.addTimeout(message);
            }
            if (this.sentPacket.isEmpty()) {
                this.sentPacket = null;
                this.broadcast = false;
                this.hasRequests = false;
                this.requestsOnly = true;
            }
            this.cleanupTimeouts(now);
        }

        public synchronized void markAsTimeout(Message message) {
            if (this.sentPacket.removeMessage(message.getMessageId())) {
                this.addTimeout(message);
            }
        }

        private synchronized void handleQueue(long start) throws InterruptedException {
            Message next = this.overflow;
            this.overflow = null;
            if (next == null) {
                next = (Message)this.queue.poll();
            }
            while (true) {
                long now;
                boolean unicastBroadcast;
                LOG.trace("handleQueue loop");
                if (next == null) {
                    LOG.trace("handleQueue loop: next == null");
                    break;
                }
                this.overflow = next;
                boolean bl = unicastBroadcast = next.isBroadcast() && this.unicastBroadcasts.remove(next);
                if (this.broadcast && (!next.isBroadcast() || unicastBroadcast)) {
                    LOG.trace("Node peer {} not taking non-broadcast message {} during broadcast", (Object)this, (Object)next);
                    break;
                }
                if (!this.broadcast && next.isBroadcast() && !unicastBroadcast && (this.sentPacket == null || this.sentPacket.isEmpty())) {
                    LOG.debug("Node peer {} going into broadcast mode for message {}.", (Object)this, (Object)next);
                    this.broadcast = true;
                }
                if (next.size() > UDPComm.this.maxPacketSize) {
                    LOG.error("Message {} is larger than the maximum packet size {}", (Object)next, (Object)UDPComm.this.maxPacketSize);
                    throw new RuntimeException("Message is larger than maxPacketSize");
                }
                if (next.size() + this.sentPacketSizeInBytes() > UDPComm.this.maxPacketSize) {
                    if (next.isResponse() && this.requestsOnly) {
                        LOG.warn("IMPORTANT: Response message {} does not fit in packet {} which contains only requests. THIS MAY CAUSE A DEADLOCK!", (Object)next, (Object)this.sentPacket);
                    }
                    LOG.debug("Message {} cannot be added to packet now; packet full.");
                    break;
                }
                if (!next.isResponse()) {
                    if (this.requestsOnly && next.size() + this.sentPacketSizeInBytes() > UDPComm.this.maxRequestOnlyPacketSize && this.sentPacketSizeInBytes() > 0) {
                        LOG.debug("NOT Sending requests only {}. can't add to packet {} bytes long.", (Object)next, (Object)this.sentPacketSizeInBytes());
                        break;
                    }
                    this.hasRequests = true;
                } else {
                    this.requestsOnly = false;
                }
                if (next.isResponse()) {
                    this.pendingRequests.remove(next.getMessageId());
                }
                LOG.debug("Adding message {} to sent-packet", (Object)next);
                if (this.sentPacket == null) {
                    this.sentPacket = new MessagePacket();
                }
                this.sentPacket.addMessage(next);
                this.forceResend();
                this.overflow = null;
                if (this.broadcast) {
                    LOG.trace("Peer {} notifying broadcast.", (Object)this);
                    UDPComm.this.executor.submit(UDPComm.this.broadcastPeer);
                }
                if ((now = System.nanoTime()) - start + UDPComm.this.minDelayNanos > UDPComm.this.maxDelayNanos) break;
                next = (Message)this.queue.poll(UDPComm.this.minDelayNanos, TimeUnit.NANOSECONDS);
            }
        }

        private int sentPacketSizeInBytes() {
            return this.sentPacket != null ? this.sentPacket.sizeInBytes() : 0;
        }
    }

    abstract class Peer
    implements Callable<Void> {
        protected final ArrayBlockingQueue<Message> queue;
        protected Message overflow;
        protected MessagePacket sentPacket;
        private int delayMultiplier;
        private long lastSent;
        private long nextSend;
        private final Set<Message> timeouts;
        private long lastTimeoutsCleanup;

        Peer() {
            this.queue = new ArrayBlockingQueue(UDPComm.this.maxQueueSize);
            this.delayMultiplier = 1;
            this.timeouts = Collections.newSetFromMap(new ConcurrentHashMap());
        }

        public void sendMessage(Message message) throws InterruptedException {
            if (!this.queue.offer(message)) {
                LOG.info("Adding message {} to full queue. Waiting for available space.", (Object)message);
                LOG.debug("no space in Peer {}", (Object)this);
                this.queue.put(message);
            }
        }

        public int getQueueLength() {
            return this.queue.size();
        }

        protected void forceResend() {
            this.lastSent = 0L;
            this.nextSend = 0L;
            this.delayMultiplier = 0;
        }

        protected boolean isTimeToResned(long now) {
            if (now > this.nextSend) {
                this.nextSend = Long.MAX_VALUE;
                this.lastSent = now;
                return true;
            }
            return false;
        }

        protected void resendIn(long now, long delay) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Peer {} rescheduling in {}", (Object)this, (Object)delay);
            }
            this.nextSend = now + delay;
            UDPComm.this.executor.schedule(this, delay, TimeUnit.NANOSECONDS);
        }

        protected void resend(long now) {
            long delay = UDPComm.this.resendPeriodNanos << this.delayMultiplier;
            if (UDPComm.this.exponentialBackoff) {
                ++this.delayMultiplier;
            }
            if (UDPComm.this.jitter) {
                delay = UDPComm.randInterval(delay);
            }
            this.resendIn(now, delay);
        }

        protected long getLastSent() {
            return this.lastSent;
        }

        protected void addTimeout(Message message) {
            this.timeouts.add(message);
        }

        protected boolean isTimeout(Message response) {
            return this.timeouts.remove(response);
        }

        protected synchronized void cleanupTimeouts(long now) {
            if (now - this.lastTimeoutsCleanup >= TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS)) {
                Iterator<Message> it = this.timeouts.iterator();
                while (it.hasNext()) {
                    if (now - it.next().getTimestamp() < TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS)) continue;
                    it.remove();
                }
                this.lastTimeoutsCleanup = now;
            }
        }
    }
}

