/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.util;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.JacksonObjectMapperProvider;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.common.util.PubSubMessage;
import com.datatorrent.common.util.PubSubMessageCodec;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncCompletionHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfig;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
import org.apache.apex.shaded.ning19.com.ning.http.client.Response;
import org.apache.apex.shaded.ning19.com.ning.http.client.cookie.Cookie;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PubSubWebSocketClient
implements Component<Context> {
    private final AsyncHttpClient client;
    private WebSocket connection;
    private final ObjectMapper mapper;
    private final PubSubMessageCodec<Object> codec;
    private URI uri;
    private int ioThreadMultiplier = 1;
    private String loginUrl;
    private String userName;
    private String password;
    private final AtomicReference<Throwable> throwable = new AtomicReference();
    private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClient.class);

    public PubSubWebSocketClient() {
        this.mapper = new JacksonObjectMapperProvider().getContext(null);
        this.codec = new PubSubMessageCodec(this.mapper);
        AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
        config.setIoThreadMultiplier(this.ioThreadMultiplier);
        config.setApplicationThreadPool(Executors.newCachedThreadPool((ThreadFactory)new NameableThreadFactory("AsyncHttpClient")));
        this.client = new AsyncHttpClient((AsyncHttpClientConfig)config);
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public void setIoThreadMultiplier(int ioThreadMultiplier) {
        this.ioThreadMultiplier = ioThreadMultiplier;
    }

    public void setLoginUrl(String url) {
        this.loginUrl = url;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void openConnection(long timeoutMillis) throws IOException, ExecutionException, InterruptedException, TimeoutException {
        this.throwable.set(null);
        List cookies = null;
        if (this.loginUrl != null && this.userName != null && this.password != null) {
            JSONObject json = new JSONObject();
            try {
                json.put("userName", (Object)this.userName);
                json.put("password", (Object)this.password);
            }
            catch (JSONException ex) {
                throw new RuntimeException(ex);
            }
            Response response = (Response)this.client.preparePost(this.loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute().get();
            cookies = response.getCookies();
        }
        AsyncHttpClient.BoundRequestBuilder brb = this.client.prepareGet(this.uri.toString());
        if (cookies != null) {
            for (Cookie cookie : cookies) {
                brb.addCookie(cookie);
            }
        }
        this.connection = (WebSocket)brb.execute((AsyncHandler)new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketListener)new PubSubWebSocket()).build()).get(timeoutMillis, TimeUnit.MILLISECONDS);
    }

    public void openConnectionAsync() throws IOException {
        this.throwable.set(null);
        if (this.loginUrl != null && this.userName != null && this.password != null) {
            JSONObject json = new JSONObject();
            try {
                json.put("userName", (Object)this.userName);
                json.put("password", (Object)this.password);
            }
            catch (JSONException ex) {
                throw new RuntimeException(ex);
            }
            this.client.preparePost(this.loginUrl).setHeader("Content-Type", "application/json").setBody(json.toString()).execute((AsyncHandler)new AsyncCompletionHandler<Response>(){

                public Response onCompleted(Response response) throws Exception {
                    List cookies = response.getCookies();
                    AsyncHttpClient.BoundRequestBuilder brb = PubSubWebSocketClient.this.client.prepareGet(PubSubWebSocketClient.this.uri.toString());
                    if (cookies != null) {
                        for (Cookie cookie : cookies) {
                            brb.addCookie(cookie);
                        }
                    }
                    PubSubWebSocketClient.this.connection = (WebSocket)brb.execute((AsyncHandler)new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketListener)new PubSubWebSocket()).build()).get();
                    return response;
                }
            });
        } else {
            PubSubWebSocket webSocket = new PubSubWebSocket(){

                @Override
                public void onOpen(WebSocket ws) {
                    PubSubWebSocketClient.this.connection = ws;
                    super.onOpen(ws);
                }
            };
            this.client.prepareGet(this.uri.toString()).execute((AsyncHandler)new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketListener)webSocket).build());
        }
    }

    public boolean isConnectionOpen() {
        if (this.connection == null) {
            return false;
        }
        return this.connection.isOpen();
    }

    public void assertUsable() throws IOException {
        if (this.throwable.get() == null) {
            if (this.connection == null) {
                throw new IOException("Connection is not open");
            }
            return;
        }
        Throwable t = this.throwable.get();
        if (t instanceof IOException) {
            throw (IOException)t;
        }
        throw Throwables.propagate((Throwable)t);
    }

    public void publish(String topic, Object data) throws IOException {
        this.assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructPublishMessage((String)topic, (Object)data, this.codec));
    }

    public void subscribe(String topic) throws IOException {
        this.assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage((String)topic, this.codec));
    }

    public void unsubscribe(String topic) throws IOException {
        this.assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructUnsubscribeMessage((String)topic, this.codec));
    }

    public void subscribeNumSubscribers(String topic) throws IOException {
        this.assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructSubscribeNumSubscribersMessage((String)topic, this.codec));
    }

    public void unsubscribeNumSubscribers(String topic) throws IOException {
        this.assertUsable();
        this.connection.sendMessage(PubSubMessageCodec.constructUnsubscribeNumSubscribersMessage((String)topic, this.codec));
    }

    public abstract void onOpen(WebSocket var1);

    public abstract void onMessage(String var1, String var2, Object var3);

    public abstract void onClose(WebSocket var1);

    public void setup(Context context) {
    }

    public void teardown() {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        this.throwable.set(null);
    }

    private void onError(Throwable t) {
        this.throwable.set(t);
    }

    private class PubSubWebSocket
    implements WebSocketTextListener {
        private PubSubWebSocket() {
        }

        public void onMessage(String message) {
            try {
                PubSubMessage pubSubMessage = PubSubWebSocketClient.this.codec.parseMessage(message);
                PubSubWebSocketClient.this.onMessage(pubSubMessage.getType().getIdentifier(), pubSubMessage.getTopic(), pubSubMessage.getData());
            }
            catch (JsonParseException jpe) {
                logger.warn("Ignoring unparseable JSON message: {}", (Object)message, (Object)jpe);
            }
            catch (JsonMappingException jme) {
                logger.warn("Ignoring JSON mapping in message: {}", (Object)message, (Object)jme);
            }
            catch (IOException ex) {
                this.onError(ex);
            }
        }

        public void onOpen(WebSocket ws) {
            PubSubWebSocketClient.this.onOpen(ws);
        }

        public void onClose(WebSocket ws) {
            PubSubWebSocketClient.this.onClose(ws);
        }

        public void onError(Throwable t) {
            PubSubWebSocketClient.this.onError(t);
        }
    }
}

