/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfig;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
import org.apache.commons.lang3.ClassUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketInputOperator<T>
extends SimpleSinglePortInputOperator<T>
implements Runnable {
    private static final long serialVersionUID = 201506160829L;
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketInputOperator.class);
    public int readTimeoutMillis = 0;
    private URI uri;
    private transient AsyncHttpClient client;
    private final transient JsonFactory jsonFactory = new JsonFactory();
    protected final transient ObjectMapper mapper = new ObjectMapper(this.jsonFactory);
    protected transient WebSocket connection;
    private transient boolean connectionClosed = false;
    private volatile transient boolean shutdown = false;
    private int ioThreadMultiplier = 1;
    protected boolean skipNull = false;
    private transient MonitorThread monThread;

    public URI getUri() {
        return this.uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public int getIoThreadMultiplier() {
        return this.ioThreadMultiplier;
    }

    public void setIoThreadMultiplier(int ioThreadMultiplier) {
        this.ioThreadMultiplier = ioThreadMultiplier;
    }

    public void setup(Context.OperatorContext context) {
        try {
            this.uri = URI.create(this.uri.toString());
            LOG.info("URL: {}", (Object)this.uri);
            this.shutdown = false;
            this.monThread = new MonitorThread();
            this.monThread.start();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public void teardown() {
        this.shutdown = true;
        try {
            if (this.monThread != null) {
                this.monThread.join();
            }
        }
        catch (Exception ex) {
            LOG.error("Error joining monitor", (Throwable)ex);
        }
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.client != null) {
            this.client.close();
        }
        super.teardown();
    }

    protected T convertMessage(String message) throws IOException {
        return (T)this.mapper.readValue(message, Object.class);
    }

    @Override
    public void run() {
        try {
            this.connectionClosed = false;
            AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
            config.setIoThreadMultiplier(this.ioThreadMultiplier);
            config.setApplicationThreadPool(Executors.newCachedThreadPool(new ThreadFactory(){
                private long count = 0L;

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + this.count++);
                    return t;
                }
            }));
            if (this.client != null) {
                this.client.closeAsynchronously();
            }
            this.client = new AsyncHttpClient((AsyncHttpClientConfig)config);
            this.connection = (WebSocket)this.client.prepareGet(this.uri.toString()).execute((AsyncHandler)new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketListener)new WebSocketTextListener(){

                public void onMessage(String string) {
                    LOG.debug("Got: " + string);
                    try {
                        Object o = WebSocketInputOperator.this.convertMessage(string);
                        if (!WebSocketInputOperator.this.skipNull || o != null) {
                            WebSocketInputOperator.this.outputPort.emit(o);
                        }
                    }
                    catch (IOException ex) {
                        LOG.error("Got exception: ", (Throwable)ex);
                    }
                }

                public void onOpen(WebSocket ws) {
                    LOG.debug("Connection opened");
                }

                public void onClose(WebSocket ws) {
                    LOG.debug("Connection connectionClosed.");
                    WebSocketInputOperator.this.connectionClosed = true;
                }

                public void onError(Throwable t) {
                    LOG.error("Caught exception", t);
                }
            }).build()).get(5L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            LOG.error("Error reading from " + this.uri, (Throwable)ex);
            if (this.client != null) {
                this.client.close();
            }
            this.connectionClosed = true;
        }
    }

    private class MonitorThread
    extends Thread {
        private MonitorThread() {
        }

        @Override
        public void run() {
            while (!WebSocketInputOperator.this.shutdown) {
                try {
                    MonitorThread.sleep(1000L);
                    if (!WebSocketInputOperator.this.connectionClosed || WebSocketInputOperator.this.shutdown) continue;
                    WebSocketInputOperator.this.connection.close();
                    WebSocketInputOperator.this.activate((Context.OperatorContext)null);
                }
                catch (Exception exception) {}
            }
        }
    }
}

