/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.util;

import com.datatorrent.common.util.PubSubMessage;
import com.datatorrent.common.util.PubSubMessageCodec;
import com.datatorrent.stram.util.JSONSerializationProvider;
import com.datatorrent.stram.util.LRUCache;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.servlet.http.HttpServletRequest;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL>
extends WebSocketServlet {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
    private static final long serialVersionUID = 1L;
    private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap();
    private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap();
    private ObjectMapper mapper = new JSONSerializationProvider().getContext(null);
    private PubSubMessageCodec<Object> codec = new PubSubMessageCodec(this.mapper);
    private InternalMessageHandler internalMessageHandler = null;
    private static final int latestTopicCount = 100;
    protected SECURITY_CONTEXT securityContext;
    private SubscribeFilter subscribeFilter;
    private SendFilter sendFilter;
    private String authAttribute;
    private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(100, false){
        private static final long serialVersionUID = 20140131L;

        @Override
        public Long put(String key, Long value) {
            this.remove(key);
            return super.put(key, value);
        }
    };

    public void registerSubscribeFilter(SubscribeFilter filter) {
        this.subscribeFilter = filter;
    }

    public void registerSendFilter(SendFilter filter) {
        this.sendFilter = filter;
    }

    public PubSubWebSocketServlet(SECURITY_CONTEXT securityContext, String authAttribute) {
        this.securityContext = securityContext;
        this.authAttribute = authAttribute;
    }

    public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler) {
        this.internalMessageHandler = internalMessageHandler;
    }

    public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
        Object principal = request.getAttribute(this.authAttribute);
        return new PubSubWebSocket(principal);
    }

    private synchronized void subscribe(PubSubWebSocket webSocket, String topic) {
        HashSet<Object> topicSet;
        HashSet<Object> wsSet;
        if (this.subscribeFilter != null && !this.subscribeFilter.filter(this.securityContext, webSocket.getPrincipal(), topic)) {
            LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", (Object)topic, webSocket.getPrincipal());
            return;
        }
        LOG.debug("Subscribe is allowed for topic {}, user {}", (Object)topic, webSocket.getPrincipal());
        if (!this.topicToSocketMap.containsKey(topic)) {
            wsSet = new HashSet();
            this.topicToSocketMap.put(topic, wsSet);
        } else {
            wsSet = this.topicToSocketMap.get(topic);
        }
        wsSet.add(webSocket);
        if (!this.socketToTopicMap.containsKey(webSocket)) {
            topicSet = new HashSet(0);
            this.socketToTopicMap.put(webSocket, topicSet);
        } else {
            topicSet = this.socketToTopicMap.get(webSocket);
        }
        topicSet.add(topic);
        this.publish(topic + "." + "numSubscribers", this.getNumSubscribers(topic));
    }

    private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic) {
        if (!this.topicToSocketMap.containsKey(topic)) {
            return;
        }
        HashSet<PubSubWebSocket> wsSet = this.topicToSocketMap.get(topic);
        wsSet.remove(webSocket);
        if (wsSet.isEmpty()) {
            this.topicToSocketMap.remove(topic);
        }
        if (!this.socketToTopicMap.containsKey(webSocket)) {
            return;
        }
        HashSet<String> topicSet = this.socketToTopicMap.get(webSocket);
        topicSet.remove(topic);
        if (topicSet.isEmpty()) {
            this.socketToTopicMap.remove(webSocket);
        }
        this.publish(topic + "." + "numSubscribers", this.getNumSubscribers(topic));
    }

    private synchronized void unsubscribeAll(PubSubWebSocket webSocket) {
        HashSet<String> topicSet = this.socketToTopicMap.get(webSocket);
        if (topicSet != null) {
            for (String topic : topicSet) {
                HashSet<PubSubWebSocket> wsSet = this.topicToSocketMap.get(topic);
                wsSet.remove(webSocket);
                if (wsSet.isEmpty()) {
                    this.topicToSocketMap.remove(topic);
                }
                this.publish(topic + "." + "numSubscribers", this.getNumSubscribers(topic));
            }
            this.socketToTopicMap.remove(webSocket);
        }
    }

    private synchronized void disconnect(PubSubWebSocket webSocket) {
        this.unsubscribeAll(webSocket);
    }

    public synchronized int getNumSubscribers(String topic) {
        HashSet<PubSubWebSocket> wsSet = this.topicToSocketMap.get(topic);
        return wsSet == null ? 0 : wsSet.size();
    }

    private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException {
        PubSubMessage pubSubMessage = new PubSubMessage();
        pubSubMessage.setType(PubSubMessage.PubSubMessageType.DATA);
        pubSubMessage.setTopic(topic);
        pubSubMessage.setData(data);
        LOG.debug("Sending data {} to subscriber...", (Object)topic);
        webSocket.sendMessage(this.codec.formatMessage(pubSubMessage));
    }

    public synchronized void publish(String topic, Object data) {
        HashSet<PubSubWebSocket> wsSet;
        if (!topic.endsWith(".numSubscribers") && !topic.startsWith("_internal.")) {
            this.latestTopics.put(topic, System.currentTimeMillis());
        }
        if ((wsSet = this.topicToSocketMap.get(topic)) != null) {
            Iterator<PubSubWebSocket> it = wsSet.iterator();
            while (it.hasNext()) {
                PubSubWebSocket socket = it.next();
                try {
                    if (this.sendFilter != null) {
                        Object filteredData = this.sendFilter.filter(this.securityContext, socket.getPrincipal(), topic, data);
                        this.sendData(socket, topic, filteredData);
                        continue;
                    }
                    this.sendData(socket, topic, data);
                }
                catch (Exception ex) {
                    LOG.error("Cannot send message", (Throwable)ex);
                    it.remove();
                    this.disconnect(socket);
                }
            }
        }
    }

    protected class PubSubWebSocket
    implements WebSocket.OnTextMessage {
        private WebSocket.Connection connection;
        private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
        private final Thread messengerThread = new Thread(new Messenger());
        private final PRINCIPAL principal;

        public PubSubWebSocket(PRINCIPAL principal) {
            this.principal = principal;
        }

        public PRINCIPAL getPrincipal() {
            return this.principal;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(String message) {
            block17: {
                LOG.debug("Received message {}", (Object)message);
                try {
                    PubSubMessage pubSubMessage = PubSubWebSocketServlet.this.codec.parseMessage(message);
                    if (pubSubMessage == null) break block17;
                    PubSubMessage.PubSubMessageType type = pubSubMessage.getType();
                    String topic = pubSubMessage.getTopic();
                    if (type == null) break block17;
                    if (type.equals((Object)PubSubMessage.PubSubMessageType.SUBSCRIBE)) {
                        if (topic != null) {
                            PubSubWebSocketServlet.this.subscribe(this, topic);
                        }
                        break block17;
                    }
                    if (type.equals((Object)PubSubMessage.PubSubMessageType.UNSUBSCRIBE)) {
                        if (topic != null) {
                            PubSubWebSocketServlet.this.unsubscribe(this, topic);
                        }
                        break block17;
                    }
                    if (type.equals((Object)PubSubMessage.PubSubMessageType.PUBLISH)) {
                        if (topic != null) {
                            Object data = pubSubMessage.getData();
                            if (data != null) {
                                PubSubWebSocketServlet.this.publish(topic, data);
                            }
                            if (topic.startsWith("_internal.") && PubSubWebSocketServlet.this.internalMessageHandler != null) {
                                PubSubWebSocketServlet.this.internalMessageHandler.onMessage(topic, data);
                            }
                        }
                        break block17;
                    }
                    if (type.equals((Object)PubSubMessage.PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
                        if (topic != null) {
                            PubSubWebSocketServlet.this.subscribe(this, topic + "." + "numSubscribers");
                            PubSubWebSocketServlet.this.sendData(this, topic + "." + "numSubscribers", PubSubWebSocketServlet.this.getNumSubscribers(topic));
                        }
                        break block17;
                    }
                    if (type.equals((Object)PubSubMessage.PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
                        if (topic != null) {
                            PubSubWebSocketServlet.this.unsubscribe(this, topic + "." + "numSubscribers");
                        }
                        break block17;
                    }
                    if (!type.equals((Object)PubSubMessage.PubSubMessageType.GET_LATEST_TOPICS)) break block17;
                    PubSubWebSocket pubSubWebSocket = this;
                    synchronized (pubSubWebSocket) {
                        PubSubWebSocketServlet.this.sendData(this, "_latestTopics", PubSubWebSocketServlet.this.latestTopics.keySet());
                    }
                }
                catch (Exception ex) {
                    LOG.warn("Exception caught", (Throwable)ex);
                }
            }
        }

        public void onOpen(WebSocket.Connection connection) {
            LOG.debug("onOpen");
            this.connection = connection;
            this.connection.setMaxIdleTime(300000);
            this.connection.setMaxTextMessageSize(0x800000);
            this.messengerThread.start();
        }

        public void onClose(int i, String string) {
            LOG.debug("onClose");
            PubSubWebSocketServlet.this.disconnect(this);
            this.messengerThread.interrupt();
        }

        public void sendMessage(String message) throws IllegalStateException {
            this.messageQueue.add(message);
        }

        private class Messenger
        implements Runnable {
            private Messenger() {
            }

            @Override
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        String message = (String)PubSubWebSocket.this.messageQueue.take();
                        PubSubWebSocket.this.connection.sendMessage(message);
                    }
                    catch (InterruptedException ex) {
                        return;
                    }
                    catch (Exception ex) {
                        LOG.error("Caught exception in websocket messenger.", (Throwable)ex);
                        return;
                    }
                }
            }
        }
    }

    public class UserHolder {
        public String username;
    }

    public static interface InternalMessageHandler {
        public void onMessage(String var1, Object var2);
    }

    public static interface SendFilter<SECURITY_CONTEXT, PRINCIPAL> {
        public Object filter(SECURITY_CONTEXT var1, PRINCIPAL var2, String var3, Object var4);
    }

    public static interface SubscribeFilter<SECURITY_CONTEXT, PRINCIPAL> {
        public boolean filter(SECURITY_CONTEXT var1, PRINCIPAL var2, String var3);
    }
}

