/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.lib.appdata.StoreUtils;
import com.datatorrent.lib.appdata.query.WindowBoundedService;
import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
import java.net.URI;
import java.net.URISyntaxException;
import javax.validation.constraints.Min;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubWebSocketAppDataQuery
extends PubSubWebSocketInputOperator<String>
implements AppData.ConnectionInfoProvider,
AppData.EmbeddableQueryInfoProvider<String> {
    private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class);
    private static final long serialVersionUID = 201506121124L;
    public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10L;
    private boolean useEmitThread;
    @Min(value=0L)
    private long executeIntervalMillis = 10L;
    private transient WindowBoundedService windowBoundedService;

    public PubSubWebSocketAppDataQuery() {
        this.skipNull = true;
    }

    @Override
    public void setup(Context.OperatorContext context) {
        this.setUri(PubSubWebSocketAppDataQuery.uriHelper(context, this.getUri()));
        logger.debug("Setting up:\nuri:{}\ntopic:{}", (Object)this.getUri(), (Object)this.getTopic());
        super.setup(context);
        if (this.useEmitThread) {
            this.windowBoundedService = new WindowBoundedService(this.executeIntervalMillis, new StoreUtils.BufferingOutputPortFlusher(this.outputPort));
            this.windowBoundedService.setup(context);
        }
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        if (this.windowBoundedService != null) {
            this.windowBoundedService.beginWindow(windowId);
        }
    }

    public void endWindow() {
        if (this.windowBoundedService != null) {
            this.windowBoundedService.endWindow();
        }
        super.endWindow();
    }

    @Override
    public void teardown() {
        if (this.windowBoundedService != null) {
            this.windowBoundedService.teardown();
        }
        super.teardown();
    }

    public static URI uriHelper(Context.OperatorContext context, URI uri) {
        if (uri == null) {
            if (context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) == null) {
                throw new IllegalArgumentException("The uri property is not set and the dt.attr.GATEWAY_CONNECT_ADDRESS is not defined");
            }
            try {
                uri = new URI("ws://" + (String)context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) + "/pubsub");
            }
            catch (URISyntaxException ex) {
                throw new RuntimeException(ex);
            }
        }
        return uri;
    }

    @Override
    public URI getUri() {
        return super.getUri();
    }

    @Override
    public void setUri(URI uri) {
        super.setUri(uri);
    }

    @Override
    protected String convertMessage(String message) {
        String data;
        try {
            JSONObject jo = new JSONObject(message);
            JSONArray ja = jo.names();
            for (int keyIndex = 0; keyIndex < ja.length(); ++keyIndex) {
                String key = ja.getString(keyIndex);
                if ("data".equals(key) || "topic".equals(key) || "type".equals(key)) continue;
                logger.error("{} is not a valid key in the first level of the following pubsub message:\n{}", (Object)key, (Object)message);
                return null;
            }
            data = jo.getString("data");
        }
        catch (JSONException e) {
            return null;
        }
        return data;
    }

    public String getAppDataURL() {
        return "pubsub";
    }

    public DefaultOutputPort<String> getOutputPort() {
        return this.outputPort;
    }

    public void enableEmbeddedMode() {
        this.useEmitThread = true;
    }

    public long getExecuteIntervalMillis() {
        return this.executeIntervalMillis;
    }

    public void setExecuteIntervalMillis(long executeIntervalMillis) {
        this.executeIntervalMillis = executeIntervalMillis;
    }
}

