/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.common.util.BaseOperator;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHandler;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfig;
import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class WebSocketOutputOperator<T>
extends BaseOperator {
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class);
    private URI uri;
    private transient AsyncHttpClient client;
    private final transient JsonFactory jsonFactory = new JsonFactory();
    protected final transient ObjectMapper mapper = new ObjectMapper(this.jsonFactory);
    protected transient WebSocket connection;
    private int ioThreadMultiplier = 1;
    private int numRetries = 3;
    private int waitMillisRetry = 5000;
    private long count = 0L;
    public final transient DefaultInputPort<T> input = new DefaultInputPort<T>(){

        public void process(T t) {
            int countTries = 0;
            while (true) {
                try {
                    if (WebSocketOutputOperator.this.connection == null || !WebSocketOutputOperator.this.connection.isOpen()) {
                        LOG.warn("Connection is closed. Reconnecting...");
                        WebSocketOutputOperator.this.client.close();
                        WebSocketOutputOperator.this.openConnection();
                    }
                    WebSocketOutputOperator.this.connection.sendMessage(WebSocketOutputOperator.this.convertMapToMessage(t));
                }
                catch (Exception ex) {
                    if (++countTries < WebSocketOutputOperator.this.numRetries) {
                        LOG.debug("Caught exception", (Throwable)ex);
                        LOG.warn("Send message failed ({}). Retrying ({}).", (Object)ex.getMessage(), (Object)countTries);
                        if (WebSocketOutputOperator.this.connection != null) {
                            WebSocketOutputOperator.this.connection.close();
                        }
                        if (WebSocketOutputOperator.this.waitMillisRetry <= 0) continue;
                        try {
                            Thread.sleep(WebSocketOutputOperator.this.waitMillisRetry);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    throw new RuntimeException(ex);
                }
                break;
            }
        }
    };

    public URI getUri() {
        return this.uri;
    }

    public void setUri(URI uri) {
        this.uri = uri;
    }

    public int getWaitMillisRetry() {
        return this.waitMillisRetry;
    }

    public void setWaitMillisRetry(int waitMillisRetry) {
        this.waitMillisRetry = waitMillisRetry;
    }

    public int getIoThreadMultiplier() {
        return this.ioThreadMultiplier;
    }

    public void setIoThreadMultiplier(int ioThreadMultiplier) {
        this.ioThreadMultiplier = ioThreadMultiplier;
    }

    public int getNumRetries() {
        return this.numRetries;
    }

    public void setNumRetries(int numRetries) {
        this.numRetries = numRetries;
    }

    private void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean();
        config.setIoThreadMultiplier(this.ioThreadMultiplier);
        config.setApplicationThreadPool(Executors.newCachedThreadPool(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + WebSocketOutputOperator.this.count++);
                return t;
            }
        }));
        this.client = new AsyncHttpClient((AsyncHttpClientConfig)config);
        this.uri = URI.create(this.uri.toString());
        LOG.info("Opening URL: {}", (Object)this.uri);
        this.connection = (WebSocket)this.client.prepareGet(this.uri.toString()).execute((AsyncHandler)new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketListener)new WebSocketTextListener(){

            public void onMessage(String string) {
            }

            public void onOpen(WebSocket ws) {
                LOG.debug("Connection opened");
            }

            public void onClose(WebSocket ws) {
                LOG.debug("Connection closed.");
            }

            public void onError(Throwable t) {
                LOG.error("Caught exception", t);
            }
        }).build()).get(5L, TimeUnit.SECONDS);
    }

    public void setup(Context.OperatorContext context) {
        try {
            this.openConnection();
        }
        catch (Exception ex) {
            LOG.warn("Cannot establish connection:", (Throwable)ex);
        }
    }

    public void teardown() {
        super.teardown();
        if (this.client != null) {
            this.client.close();
        }
    }

    public String convertMapToMessage(T t) throws IOException {
        return this.mapper.writeValueAsString(t);
    }
}

