/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.galaxy.core;

import co.paralleluniverse.common.collection.ConcurrentMultimap;
import co.paralleluniverse.common.concurrent.WithExecutor;
import co.paralleluniverse.common.io.Streamable;
import co.paralleluniverse.common.io.Streamables;
import co.paralleluniverse.common.spring.Component;
import co.paralleluniverse.galaxy.MessageListener;
import co.paralleluniverse.galaxy.Messenger;
import co.paralleluniverse.galaxy.TimeoutException;
import co.paralleluniverse.galaxy.core.Cache;
import co.paralleluniverse.galaxy.core.Message;
import co.paralleluniverse.galaxy.core.MessageReceiver;
import co.paralleluniverse.galaxy.core.NodeOrderedThreadPoolExecutor;
import co.paralleluniverse.galaxy.core.NodeTask;
import co.paralleluniverse.galaxy.core.Op;
import com.google.common.util.concurrent.ListenableFuture;
import java.beans.ConstructorProperties;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessengerImpl
extends Component
implements Messenger {
    private static final Logger LOG = LoggerFactory.getLogger(MessengerImpl.class);
    private final AtomicLong topicGenerator = new AtomicLong();
    private final Cache cache;
    private final ConcurrentMultimap<Long, MessageListener, List<MessageListener>> longTopicListeners = new ConcurrentMultimap<Long, MessageListener, List<MessageListener>>(new NonBlockingHashMapLong(), Collections.EMPTY_LIST){

        @Override
        protected List<MessageListener> allocateElement() {
            return new CopyOnWriteArrayList<MessageListener>();
        }
    };
    private final ConcurrentMultimap<String, MessageListener, List<MessageListener>> stringTopicListeners = new ConcurrentMultimap<String, MessageListener, List<MessageListener>>(new NonBlockingHashMap(), Collections.EMPTY_LIST){

        @Override
        protected List<MessageListener> allocateElement() {
            return new CopyOnWriteArrayList<MessageListener>();
        }
    };
    private final NodeOrderedThreadPoolExecutor executor;

    @ConstructorProperties(value={"name", "cache", "threadPool"})
    MessengerImpl(String name, Cache cache, NodeOrderedThreadPoolExecutor threadPool) {
        super(name);
        this.executor = threadPool;
        if (this.executor == null) {
            throw new RuntimeException("The executor must be set!");
        }
        this.cache = cache;
        cache.setReceiver(new MessageReceiver(){

            @Override
            public void receive(Message message) {
                if (message.getType() != Message.Type.MSGACK) {
                    MessengerImpl.this.receive((Message.MSG)message);
                }
            }
        });
    }

    @Override
    public long createTopic() {
        return this.topicGenerator.incrementAndGet();
    }

    @Override
    public void addMessageListener(long topic, MessageListener listener) {
        this.longTopicListeners.put((Long)topic, listener);
    }

    @Override
    public void removeMessageListener(long topic, MessageListener listener) {
        this.longTopicListeners.remove(topic, listener);
    }

    @Override
    public void addMessageListener(String topic, MessageListener listener) {
        this.stringTopicListeners.put(topic, listener);
    }

    @Override
    public void removeMessageListener(String topic, MessageListener listener) {
        this.stringTopicListeners.remove(topic, listener);
    }

    @Override
    public void send(short node, long topic, byte[] data) {
        this.sendToNode(node, new Msg(topic, null, data));
    }

    @Override
    public void send(short node, String topic, byte[] data) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        this.sendToNode(node, new Msg(-1L, topic, data));
    }

    @Override
    public void send(short node, long topic, Streamable data) {
        this.sendToNode(node, new Msg(topic, null, data));
    }

    @Override
    public void send(short node, String topic, Streamable data) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        this.sendToNode(node, new Msg(-1L, topic, data));
    }

    @Override
    public void sendToOwnerOf(long ref, long topic, byte[] data) throws TimeoutException {
        this.sendToOwnerOf(ref, new Msg(topic, null, data));
    }

    @Override
    public void sendToOwnerOf(long ref, String topic, byte[] data) throws TimeoutException {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        this.sendToOwnerOf(ref, new Msg(-1L, topic, data));
    }

    @Override
    public void sendToOwnerOf(long ref, long topic, Streamable data) throws TimeoutException {
        this.sendToOwnerOf(ref, new Msg(topic, null, data));
    }

    @Override
    public void sendToOwnerOf(long ref, String topic, Streamable data) throws TimeoutException {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        this.sendToOwnerOf(ref, new Msg(-1L, topic, data));
    }

    @Override
    public ListenableFuture<Void> sendToOwnerOfAsync(long ref, long topic, byte[] data) {
        return this.sendToOwnerOfAsync(ref, new Msg(topic, null, data));
    }

    @Override
    public ListenableFuture<Void> sendToOwnerOfAsync(long ref, String topic, byte[] data) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        return this.sendToOwnerOfAsync(ref, new Msg(-1L, topic, data));
    }

    @Override
    public ListenableFuture<Void> sendToOwnerOfAsync(long ref, long topic, Streamable data) {
        return this.sendToOwnerOfAsync(ref, new Msg(topic, null, data));
    }

    @Override
    public ListenableFuture<Void> sendToOwnerOfAsync(long ref, String topic, Streamable data) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic must not be null");
        }
        return this.sendToOwnerOfAsync(ref, new Msg(-1L, topic, data));
    }

    private void sendToNode(short node, Msg msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to node {}: {}", (Object)node, (Object)msg);
        }
        this.cache.send(Message.MSG(node, -1L, true, Streamables.toByteArray(msg)));
    }

    private void sendToOwnerOf(long line, Msg msg) throws TimeoutException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to owner of {}: {}", (Object)Long.toHexString(line), (Object)msg);
        }
        Message.MSG message = Message.MSG((short)-1, line, true, Streamables.toByteArray(msg));
        this.cache.doOp(Op.Type.SEND, line, null, message, null);
    }

    private ListenableFuture<Void> sendToOwnerOfAsync(long line, Msg msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending to owner of {}: {}", (Object)Long.toHexString(line), (Object)msg);
        }
        Message.MSG message = Message.MSG((short)-1, line, true, Streamables.toByteArray(msg));
        return this.cache.doOpAsync(Op.Type.SEND, line, null, message, null);
    }

    private void receive(Message.MSG message) {
        List ls;
        Msg msg = new Msg();
        Streamables.fromByteArray(msg, message.getData());
        LOG.debug("Received: {}", (Object)msg);
        List list = ls = msg.hasSTopic() ? (List)this.stringTopicListeners.get(msg.getsTopic()) : (List)this.longTopicListeners.get(msg.getlTopic());
        if (ls != null) {
            this.notifyListeners(ls, message.getNode(), msg);
        }
    }

    private void notifyListeners(final Collection<MessageListener> listeners, final short node, final Msg msg) {
        this.executor.execute(new NodeTask(){

            @Override
            public short getNode() {
                return node;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Collection collection = listeners;
                synchronized (collection) {
                    for (final MessageListener listener : listeners) {
                        if (!(listener instanceof WithExecutor)) {
                            try {
                                listener.messageReceived(node, msg.getData());
                            }
                            catch (Exception e) {
                                LOG.error("Listener threw an exception.", (Throwable)e);
                            }
                            continue;
                        }
                        ((WithExecutor)((Object)listener)).getExecutor().execute(new Runnable(){

                            @Override
                            public void run() {
                                try {
                                    listener.messageReceived(node, msg.getData());
                                }
                                catch (Exception e) {
                                    LOG.error("Listener threw an exception.", (Throwable)e);
                                }
                            }
                        });
                    }
                }
            }
        });
    }

    private static class Msg
    implements Streamable {
        private long lTopic = -1L;
        private String sTopic = null;
        private byte[] data;

        public Msg() {
        }

        public Msg(long topic, byte[] data) {
            this(topic, null, data);
        }

        public Msg(String topic, byte[] data) {
            this(-1L, topic, data);
            assert (topic != null);
        }

        public Msg(long topic, Streamable data) {
            this(topic, null, data);
        }

        public Msg(String topic, Streamable data) {
            this(-1L, topic, data);
            assert (topic != null);
        }

        private Msg(long lTopic, String sTopic, byte[] data) {
            this.lTopic = lTopic;
            this.sTopic = sTopic;
            this.data = data;
        }

        private Msg(long lTopic, String sTopic, Streamable data) {
            this(lTopic, sTopic, Streamables.toByteArray(data));
        }

        public boolean hasSTopic() {
            return this.sTopic != null;
        }

        public long getlTopic() {
            return this.lTopic;
        }

        public String getsTopic() {
            return this.sTopic;
        }

        public byte[] getData() {
            return this.data;
        }

        @Override
        public int size() {
            return 1 + (this.lTopic != -1L ? 8 : Streamables.calcUtfLength(this.sTopic) + 2 + this.data.length);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            boolean hasSTopic = this.hasSTopic();
            out.writeBoolean(hasSTopic);
            if (hasSTopic) {
                out.writeUTF(this.sTopic);
            } else {
                out.writeLong(this.lTopic);
            }
            out.writeShort((short)this.data.length);
            out.write(this.data);
        }

        @Override
        public void read(DataInput in) throws IOException {
            boolean hasSTopic = in.readBoolean();
            if (hasSTopic) {
                this.lTopic = -1L;
                this.sTopic = in.readUTF();
            } else {
                this.lTopic = in.readLong();
                this.sTopic = null;
            }
            int dataLength = in.readUnsignedShort();
            this.data = new byte[dataLength];
            in.readFully(this.data);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Msg[");
            sb.append("Topic: ");
            if (this.sTopic != null) {
                sb.append('\"').append(this.sTopic).append('\"');
            } else {
                sb.append(this.lTopic);
            }
            sb.append(" data: ");
            if (this.data == null) {
                sb.append("null");
            } else {
                sb.append("(").append(this.data.length).append(" bytes)");
            }
            sb.append("]");
            return sb.toString();
        }
    }
}

