/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.util;

import com.datatorrent.stram.util.PubSubWebSocketClient;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SharedPubSubWebSocketClient
extends PubSubWebSocketClient {
    public static final String LAST_INDEX_TOPIC_PREFIX = "_internal.lastIndex";
    private static final Logger LOG = LoggerFactory.getLogger(SharedPubSubWebSocketClient.class);
    private final Map<String, List<Handler>> topicHandlers = new HashMap<String, List<Handler>>();
    private long lastConnectTryTime;
    private final long minWaitConnectionRetry = 5000L;
    private final long timeoutMillis;

    public SharedPubSubWebSocketClient(URI uri, long timeoutMillis) {
        this.setUri(uri);
        this.lastConnectTryTime = System.currentTimeMillis();
        this.timeoutMillis = timeoutMillis;
    }

    public SharedPubSubWebSocketClient(String uri, long timeoutMillis) throws URISyntaxException {
        this(new URI(uri), timeoutMillis);
    }

    public void setup() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        this.openConnection(this.timeoutMillis);
    }

    public synchronized void addHandler(String topic, boolean numSubscribers, Handler handler) {
        List<Object> handlers;
        String originalTopic = topic;
        if (numSubscribers) {
            topic = topic + ".numSubscribers";
        }
        if (this.topicHandlers.containsKey(topic)) {
            handlers = this.topicHandlers.get(topic);
        } else {
            handlers = new ArrayList();
            this.topicHandlers.put(topic, handlers);
        }
        handlers.add(handler);
        try {
            if (this.isConnectionOpen()) {
                if (numSubscribers) {
                    this.subscribeNumSubscribers(originalTopic);
                } else {
                    this.subscribe(topic);
                }
            }
        }
        catch (IOException ex) {
            LOG.warn("Cannot subscribe to {}", (Object)topic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void publish(String topic, Object data) throws IOException {
        SharedPubSubWebSocketClient sharedPubSubWebSocketClient = this;
        synchronized (sharedPubSubWebSocketClient) {
            if (!this.isConnectionOpen()) {
                try {
                    long now = System.currentTimeMillis();
                    if (this.lastConnectTryTime + 5000L < now) {
                        this.lastConnectTryTime = now;
                        this.openConnectionAsync();
                    }
                }
                catch (Exception ex) {
                    LOG.debug("Failed attempt to reconnect to websocket server", (Throwable)ex);
                }
            }
        }
        super.publish(topic, data);
    }

    @Override
    public void onOpen(WebSocket ws) {
        for (String topic : this.topicHandlers.keySet()) {
            try {
                this.subscribe(topic);
            }
            catch (IOException ex) {
                LOG.warn("Cannot subscribe to {}", (Object)topic);
            }
        }
    }

    @Override
    public synchronized void onMessage(String type, String topic, Object data) {
        List<Handler> handlers = this.topicHandlers.get(topic);
        if (handlers != null) {
            for (Handler handler : handlers) {
                handler.onMessage(type, topic, data);
            }
        }
    }

    @Override
    public void onClose(WebSocket ws) {
        for (Map.Entry<String, List<Handler>> entry : this.topicHandlers.entrySet()) {
            for (Handler handler : entry.getValue()) {
                handler.onClose();
            }
        }
    }

    public static interface Handler {
        public void onMessage(String var1, String var2, Object var3);

        public void onClose();
    }
}

