/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.PubSubMessageCodec;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.WebSocketOutputOperator;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
public class WidgetOutputOperator
extends BaseOperator {
    protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new WebSocketOutputOperator<Pair<String, Object>>(){
        private transient PubSubMessageCodec<Object> codec;
        {
            this.codec = new PubSubMessageCodec(this.mapper);
        }

        @Override
        public String convertMapToMessage(Pair<String, Object> t) throws IOException {
            return PubSubMessageCodec.constructPublishMessage((String)((String)t.getLeft()), (Object)t.getRight(), this.codec);
        }
    };
    protected transient ConsoleOutputOperator coo = new ConsoleOutputOperator();
    private String timeSeriesTopic = "widget.timeseries";
    private String simpleTopic = "widget.simple";
    private String percentageTopic = "widget.percentage";
    protected String topNTopic = "widget.topn";
    private String pieChartTopic = "widget,piechart";
    private Number timeSeriesMax = 100;
    private Number timeSeriesMin = 0;
    protected int nInTopN = 10;
    private int nInPie = 5;
    private transient String appId = null;
    private transient int operId = 0;
    @InputPortFieldAnnotation(optional=true)
    public final transient SimpleInputPort simpleInput = new SimpleInputPort(this);
    @InputPortFieldAnnotation(optional=true)
    public final transient TimeseriesInputPort timeSeriesInput = new TimeseriesInputPort(this);
    @InputPortFieldAnnotation(optional=true)
    public final transient PercentageInputPort percentageInput = new PercentageInputPort(this);
    @InputPortFieldAnnotation(optional=true)
    public final transient TopNInputPort topNInput = new TopNInputPort(this);
    @InputPortFieldAnnotation(optional=true)
    public final transient PiechartInputPort pieChartInput = new PiechartInputPort(this);
    protected transient boolean isWebSocketConnected = true;

    public void setup(Context.OperatorContext context) {
        String gatewayAddress = (String)context.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
        if (!StringUtils.isEmpty((String)gatewayAddress)) {
            this.wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub"));
            this.wsoo.setup(context);
        } else {
            this.isWebSocketConnected = false;
            this.coo.setup(context);
        }
        this.appId = (String)context.getValue(DAG.APPLICATION_ID);
        this.operId = context.getId();
    }

    protected String getFullTopic(String topic, Map<String, Object> schema) {
        HashMap<String, Object> topicObj = new HashMap<String, Object>();
        topicObj.put("appId", this.appId);
        topicObj.put("opId", this.operId);
        topicObj.put("topicName", topic);
        topicObj.put("schema", schema);
        try {
            return "AppData" + this.wsoo.mapper.writeValueAsString(topicObj);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        if (this.isWebSocketConnected) {
            this.wsoo.teardown();
        } else {
            this.coo.teardown();
        }
    }

    public static class PiechartInputPort
    extends DefaultInputPort<HashMap<String, Number>> {
        private final WidgetOutputOperator operator;

        public PiechartInputPort(WidgetOutputOperator oper) {
            this.operator = oper;
        }

        public void process(HashMap<String, Number> pieNumbers) {
            HashMap[] result = (HashMap[])Array.newInstance(HashMap.class, pieNumbers.size());
            int j = 0;
            for (Map.Entry<String, Number> e : pieNumbers.entrySet()) {
                result[j] = new HashMap();
                result[j].put("label", e.getKey());
                result[j++].put("value", e.getValue());
            }
            if (this.operator.isWebSocketConnected) {
                HashMap<String, Object> schemaObj = new HashMap<String, Object>();
                schemaObj.put("type", "piechart");
                schemaObj.put("n", this.operator.nInPie);
                this.operator.wsoo.input.process((Object)new MutablePair((Object)this.operator.getFullTopic(this.operator.pieChartTopic, schemaObj), (Object)result));
            } else {
                this.operator.coo.input.process(pieNumbers);
            }
        }

        public PiechartInputPort setN(int n) {
            this.operator.nInPie = n;
            return this;
        }

        public PiechartInputPort setTopic(String topic) {
            this.operator.pieChartTopic = topic;
            return this;
        }
    }

    public static class PercentageInputPort
    extends DefaultInputPort<Integer> {
        private final WidgetOutputOperator operator;

        public PercentageInputPort(WidgetOutputOperator oper) {
            this.operator = oper;
        }

        public void process(Integer tuple) {
            if (this.operator.isWebSocketConnected) {
                HashMap<String, Object> schemaObj = new HashMap<String, Object>();
                schemaObj.put("type", "percentage");
                this.operator.wsoo.input.process((Object)new MutablePair((Object)this.operator.getFullTopic(this.operator.percentageTopic, schemaObj), (Object)tuple));
            } else {
                this.operator.coo.input.process((Object)tuple);
            }
        }

        public PercentageInputPort setTopic(String topic) {
            this.operator.percentageTopic = topic;
            return this;
        }
    }

    public static class SimpleInputPort
    extends DefaultInputPort<Object> {
        private final WidgetOutputOperator operator;

        public SimpleInputPort(WidgetOutputOperator oper) {
            this.operator = oper;
        }

        public void process(Object tuple) {
            if (this.operator.isWebSocketConnected) {
                HashMap<String, Object> schemaObj = new HashMap<String, Object>();
                schemaObj.put("type", "simple");
                this.operator.wsoo.input.process((Object)new MutablePair((Object)this.operator.getFullTopic(this.operator.simpleTopic, schemaObj), (Object)tuple.toString()));
            } else {
                this.operator.coo.input.process(tuple);
            }
        }

        public SimpleInputPort setTopic(String topic) {
            this.operator.simpleTopic = topic;
            return this;
        }
    }

    public static class TopNInputPort
    extends DefaultInputPort<HashMap<String, Number>> {
        private final WidgetOutputOperator operator;

        public TopNInputPort(WidgetOutputOperator oper) {
            this.operator = oper;
        }

        public void process(HashMap<String, Number> topNMap) {
            HashMap[] result = new HashMap[topNMap.size()];
            int j = 0;
            for (Map.Entry<String, Number> e : topNMap.entrySet()) {
                result[j] = new HashMap();
                result[j].put("name", e.getKey());
                result[j++].put("value", e.getValue());
            }
            if (this.operator.isWebSocketConnected) {
                HashMap<String, Object> schemaObj = new HashMap<String, Object>();
                schemaObj.put("type", "topN");
                schemaObj.put("n", this.operator.nInTopN);
                this.operator.wsoo.input.process((Object)new MutablePair((Object)this.operator.getFullTopic(this.operator.topNTopic, schemaObj), (Object)result));
            } else {
                this.operator.coo.input.process(topNMap);
            }
        }

        public TopNInputPort setN(int n) {
            this.operator.nInTopN = n;
            return this;
        }

        public TopNInputPort setTopic(String topic) {
            this.operator.topNTopic = topic;
            return this;
        }
    }

    public static class TimeseriesInputPort
    extends DefaultInputPort<TimeSeriesData[]> {
        private final WidgetOutputOperator operator;

        public TimeseriesInputPort(WidgetOutputOperator woo) {
            this.operator = woo;
        }

        public void process(TimeSeriesData[] tuple) {
            HashMap[] timeseriesMapData = new HashMap[tuple.length];
            int i = 0;
            for (TimeSeriesData data : tuple) {
                HashMap timeseriesMap = Maps.newHashMapWithExpectedSize((int)2);
                timeseriesMap.put("timestamp", data.time);
                timeseriesMap.put("value", data.data);
                timeseriesMapData[i++] = timeseriesMap;
            }
            if (this.operator.isWebSocketConnected) {
                HashMap<String, Object> schemaObj = new HashMap<String, Object>();
                schemaObj.put("type", "timeseries");
                schemaObj.put("minValue", this.operator.timeSeriesMin);
                schemaObj.put("maxValue", this.operator.timeSeriesMax);
                this.operator.wsoo.input.process((Object)new MutablePair((Object)this.operator.getFullTopic(this.operator.timeSeriesTopic, schemaObj), (Object)timeseriesMapData));
            } else {
                this.operator.coo.input.process((Object)tuple);
            }
        }

        public TimeseriesInputPort setMax(Number max) {
            this.operator.timeSeriesMax = max;
            return this;
        }

        public TimeseriesInputPort setMin(Number min) {
            this.operator.timeSeriesMin = min;
            return this;
        }

        public TimeseriesInputPort setTopic(String topic) {
            this.operator.timeSeriesTopic = topic;
            return this;
        }
    }

    public static class TimeSeriesData {
        public Long time;
        public Number data;
    }
}

