/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.jgroups;

import co.paralleluniverse.common.collection.ConcurrentMultimap;
import co.paralleluniverse.common.collection.Util;
import co.paralleluniverse.galaxy.Cluster;
import co.paralleluniverse.galaxy.core.AbstractComm;
import co.paralleluniverse.galaxy.core.Comm;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MessageReceiver;
import co.paralleluniverse.galaxy.core.NodeNotFoundException;
import co.paralleluniverse.galaxy.jgroups.Channel;
import co.paralleluniverse.galaxy.jgroups.JGroupsCluster;
import co.paralleluniverse.galaxy.jgroups.JGroupsNodeAddressResolver;
import gnu.trove.set.hash.TShortHashSet;
import java.beans.ConstructorProperties;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class JGroupsComm
extends AbstractComm<Address> {
    private static final Logger LOG = LoggerFactory.getLogger(JGroupsComm.class);
    private final Channel channel;
    private final Comm serverComm;
    private final ConcurrentMultimap<Short, co.paralleluniverse.galaxy.core.Message, Deque<co.paralleluniverse.galaxy.core.Message>> pendingReply = new ConcurrentMultimap<Short, co.paralleluniverse.galaxy.core.Message, Deque<co.paralleluniverse.galaxy.core.Message>>(new ArrayDeque(0)){

        @Override
        protected Deque<co.paralleluniverse.galaxy.core.Message> allocateElement() {
            return new ConcurrentLinkedDeque<co.paralleluniverse.galaxy.core.Message>();
        }
    };
    private ConcurrentMap<Long, BroadcastEntry> pendingBroadcasts;

    @ConstructorProperties(value={"name", "cluster", "serverComm"})
    public JGroupsComm(String name, Cluster cluster, Comm serverComm) {
        super(name, cluster, new JGroupsNodeAddressResolver(cluster));
        this.channel = this.getCluster().getDataChannel();
        this.serverComm = serverComm;
        this.channel.setReceiver((Receiver)new ReceiverAdapter(){

            public void receive(Message msg) {
                JGroupsComm.this.receive(msg);
            }
        });
        this.sendToServerInsteadOfMulticast = serverComm != null;
    }

    @Override
    public void setReceiver(MessageReceiver receiver) {
        super.setReceiver(receiver);
        if (this.serverComm != null) {
            this.serverComm.setReceiver(receiver);
        }
    }

    @Override
    public void init() throws Exception {
        super.init();
        if (this.sendToServerInsteadOfMulticast && this.serverComm == null) {
            throw new RuntimeException("sendToServerInsteadOfBroadcast is set to true but no serverComm set");
        }
    }

    @Override
    public void postInit() throws Exception {
        if (!this.sendToServerInsteadOfMulticast) {
            this.pendingBroadcasts = new ConcurrentHashMap<Long, BroadcastEntry>();
        }
        super.postInit();
    }

    @Override
    protected void start(boolean master) {
        final long timeoutNano = TimeUnit.NANOSECONDS.convert(this.getTimeout(), TimeUnit.MILLISECONDS);
        this.getScheduler().scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                long now = System.nanoTime();
                if (JGroupsComm.this.pendingBroadcasts != null) {
                    for (BroadcastEntry entry : JGroupsComm.this.pendingBroadcasts.values()) {
                        Message.LineMessage message = entry.message;
                        if (message.getType() == Message.Type.INVACK || now - message.getTimestamp() <= timeoutNano || JGroupsComm.this.pendingBroadcasts.remove(message.getMessageId()) == null) continue;
                        LOG.debug("Timeout on message {}", (Object)message);
                        JGroupsComm.this.receive(co.paralleluniverse.galaxy.core.Message.TIMEOUT(message).setIncoming());
                    }
                }
                for (Deque pending : JGroupsComm.this.pendingReply.values()) {
                    co.paralleluniverse.galaxy.core.Message message;
                    Iterator i$ = Util.reverse(pending).iterator();
                    while (i$.hasNext() && (message = (co.paralleluniverse.galaxy.core.Message)i$.next()).getType() != Message.Type.INVACK && now - message.getTimestamp() > timeoutNano) {
                        if (!pending.removeLastOccurrence(message)) continue;
                        LOG.debug("Timeout on message {}", (Object)message);
                        JGroupsComm.this.receive(co.paralleluniverse.galaxy.core.Message.TIMEOUT((Message.LineMessage)message).setIncoming());
                    }
                }
                if (JGroupsComm.this.hasPendingBroadcasts()) {
                    try {
                        JGroupsComm.this.channel.send(new Message(null, new byte[0]));
                    }
                    catch (Exception ex) {
                        LOG.error("Error while broadcasting flush.", (Throwable)ex);
                    }
                }
            }
        }, 0L, this.getTimeout() / 2L, TimeUnit.MILLISECONDS);
        this.setReady(true);
    }

    @Override
    public final JGroupsCluster getCluster() {
        return (JGroupsCluster)super.getCluster();
    }

    protected boolean hasPendingBroadcasts() {
        return !this.pendingBroadcasts.isEmpty();
    }

    protected boolean addToPending(co.paralleluniverse.galaxy.core.Message message, short node) {
        if (!message.getType().isOf(Message.Type.REQUIRES_RESPONSE)) {
            LOG.debug("Message {} does not require a response.", (Object)message);
            return true;
        }
        if (node >= 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Enqueing message in pending-replies {}", (Object)message);
            }
            ((Deque)this.pendingReply.getOrAllocate(node)).addFirst(message);
        } else {
            assert (message.isBroadcast());
            assert (message instanceof Message.LineMessage);
            Set<Short> nodes = this.getCluster().getNodes();
            if (message instanceof Message.LineMessage) {
                if (nodes.isEmpty() || nodes.size() == 1 && nodes.contains((short)0)) {
                    LOG.debug("No other nodes in cluster. Responding with NOT_FOUND to message {}", (Object)message);
                    this.receive(co.paralleluniverse.galaxy.core.Message.NOT_FOUND((Message.LineMessage)message).setIncoming());
                    return false;
                }
                this.pendingBroadcasts.put(message.getMessageId(), new BroadcastEntry((Message.LineMessage)message, nodes));
                return true;
            }
        }
        return true;
    }

    @Override
    protected void sendToNode(co.paralleluniverse.galaxy.core.Message message, short node, Address address) {
        this.assignMessageId(message);
        this.addToPending(message, node);
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending to node {} ({}): {}", new Object[]{node, address, message});
            }
            this.channel.send(new Message(address, message.toByteArray()));
        }
        catch (Exception ex) {
            LOG.error("Error while sending message " + message + " to node " + node, (Throwable)ex);
        }
    }

    @Override
    protected void sendToServer(co.paralleluniverse.galaxy.core.Message message) {
        super.sendToServer(message);
        try {
            this.serverComm.send(message);
        }
        catch (NodeNotFoundException e) {
            throw new RuntimeException("Server not found!", e);
        }
    }

    @Override
    protected void broadcast(co.paralleluniverse.galaxy.core.Message message) {
        this.assignMessageId(message);
        if (this.addToPending(message, (short)-1)) {
            this.broadcast(message);
        }
        try {
            LOG.debug("Broadcasting (null): {}", (Object)message);
            this.channel.send(new Message(null, message.toByteArray()));
        }
        catch (Exception ex) {
            LOG.error("Error while broadcasting message " + message, (Throwable)ex);
        }
    }

    private void receive(Message msg) {
        try {
            short sourceNode;
            BroadcastEntry entry;
            boolean res;
            Deque pending;
            LOG.debug("Received {}", (Object)msg);
            if (this.getCluster().getMyAddress() != null && msg.getSrc() != null && this.getCluster().getMyAddress().equals(msg.getSrc())) {
                return;
            }
            byte[] buffer = msg.getRawBuffer();
            if (buffer.length == 0) {
                return;
            }
            co.paralleluniverse.galaxy.core.Message message = co.paralleluniverse.galaxy.core.Message.fromByteArray(buffer);
            Address source = msg.getSrc();
            if (message.isResponse() && (pending = (Deque)this.pendingReply.get(message.getNode())) != null && (res = pending.removeLastOccurrence(message))) {
                LOG.debug("Message {} is a reply! (removing from pending)", (Object)message);
            }
            if (message.isResponse() && (entry = (BroadcastEntry)this.pendingBroadcasts.get(message.getMessageId())) != null) {
                if (message.getType() != Message.Type.ACK) {
                    LOG.debug("Message {} is a reply to a broadcast! (discarding pending)", (Object)message);
                    this.pendingBroadcasts.remove(message.getMessageId());
                } else {
                    this.removeFromPendingBroadcasts(message.getMessageId(), message.getNode());
                }
            }
            if ((sourceNode = this.getNode(source)) < 0) {
                throw new RuntimeException("Node not found for source address " + source);
            }
            message.setNode(sourceNode);
            this.receive(message);
        }
        catch (Exception ex) {
            LOG.error("Error receiving message", (Throwable)ex);
        }
    }

    @Override
    public void nodeAdded(short id) {
        super.nodeAdded(id);
        try {
            for (co.paralleluniverse.galaxy.core.Message message : Util.reverse((Deque)this.pendingReply.get(id))) {
                this.sendToNode(message, id);
            }
        }
        catch (NodeNotFoundException e) {
            throw new AssertionError();
        }
    }

    @Override
    public void nodeSwitched(short id) {
        super.nodeSwitched(id);
        try {
            for (co.paralleluniverse.galaxy.core.Message message : Util.reverse((Deque)this.pendingReply.get(id))) {
                this.sendToNode(message, id);
            }
            for (BroadcastEntry entry : this.pendingBroadcasts.values()) {
                this.sendToNode(entry.message, id);
            }
        }
        catch (NodeNotFoundException e) {
            throw new AssertionError();
        }
    }

    @Override
    public void nodeRemoved(short id) {
        super.nodeRemoved(id);
        this.pendingReply.remove(id);
        for (Long messageId : this.pendingBroadcasts.keySet()) {
            this.removeFromPendingBroadcasts(messageId, id);
        }
    }

    private void removeFromPendingBroadcasts(long messageId, short node) {
        BroadcastEntry entry = (BroadcastEntry)this.pendingBroadcasts.get(messageId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got ACK from {} to message {}", (Object)node, (Object)entry.message);
        }
        if (entry.removeNode(node)) {
            LOG.debug("Got all ACKs for message {}, but no response - sending NOT_FOUND to cache!", (Object)entry.message);
            this.receive(co.paralleluniverse.galaxy.core.Message.NOT_FOUND(entry.message).setIncoming());
            this.pendingBroadcasts.remove(messageId);
        }
    }

    private static class BroadcastEntry {
        final Message.LineMessage message;
        final TShortHashSet nodes;

        public BroadcastEntry(Message.LineMessage message, Set<Short> nodes) {
            this.message = message;
            this.nodes = new TShortHashSet(nodes);
            this.nodes.remove((short)0);
            LOG.debug("Awaiting ACKS for message {} from nodes {}", (Object)message, (Object)this.nodes);
        }

        public synchronized void addNode(short node) {
            this.nodes.add(node);
        }

        public synchronized boolean removeNode(short node) {
            this.nodes.remove(node);
            return this.nodes.isEmpty();
        }
    }
}

