/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.debug;

import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.common.util.ObjectMapperString;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TupleRecorder {
    public static final String VERSION = "1.2";
    private int totalTupleCount = 0;
    private final HashMap<String, PortInfo> portMap = new HashMap();
    private final HashMap<String, PortCount> portCountMap = new HashMap();
    private transient long currentWindowId = -1L;
    private transient ArrayList<Range> windowIdRanges = new ArrayList();
    private long startTime = -1L;
    private String id;
    private final String appId;
    private int nextPortIndex = 0;
    private final HashMap<String, Sink<Object>> sinks = new HashMap();
    private transient long endWindowTuplesProcessed = 0L;
    private transient StreamCodec<Object> streamCodec;
    private int numSubscribers = 0;
    private SharedPubSubWebSocketClient wsClient;
    private String recordingNameTopic;
    private long numWindows = Long.MAX_VALUE;
    private Runnable stopProcedure;
    private final FSPartFileCollection storage = new FSPartFileCollection(){

        @Override
        protected String getIndexExtraInfo() {
            if (TupleRecorder.this.windowIdRanges.isEmpty()) {
                return null;
            }
            ((Range)((TupleRecorder)TupleRecorder.this).windowIdRanges.get((int)(((TupleRecorder)TupleRecorder.this).windowIdRanges.size() - 1))).high = TupleRecorder.this.currentWindowId;
            String str = TupleRecorder.convertToString(TupleRecorder.this.windowIdRanges);
            int i = 0;
            str = str + ":";
            StringBuilder countStr = new StringBuilder("{");
            for (PortCount pc : TupleRecorder.this.portCountMap.values()) {
                if (i != 0) {
                    countStr.append(",");
                }
                countStr.append("\"").append(pc.id).append("\":\"").append(pc.count).append("\"");
                ++i;
            }
            countStr.append("}");
            str = str + countStr.length();
            str = str + ":" + countStr.toString();
            return str;
        }

        @Override
        protected void resetIndexExtraInfo() {
            for (PortCount pc : TupleRecorder.this.portCountMap.values()) {
                pc.count = 0L;
            }
            TupleRecorder.this.windowIdRanges.clear();
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);

    public TupleRecorder(String id, String appId) {
        this.id = id;
        this.appId = appId;
    }

    public FSPartFileCollection getStorage() {
        return this.storage;
    }

    public RecorderSink newSink(String key) {
        RecorderSink recorderSink = new RecorderSink(key);
        this.sinks.put(key, recorderSink);
        return recorderSink;
    }

    public void setStreamCodec(StreamCodec<Object> streamCodec) {
        this.streamCodec = streamCodec;
    }

    public void setWebSocketClient(SharedPubSubWebSocketClient wsClient) {
        this.wsClient = wsClient;
    }

    public Map<String, PortInfo> getPortInfoMap() {
        return Collections.unmodifiableMap(this.portMap);
    }

    public int getTotalTupleCount() {
        return this.totalTupleCount;
    }

    public Map<String, Sink<Object>> getSinkMap() {
        return Collections.unmodifiableMap(this.sinks);
    }

    public void setStartTime(long startTime) {
        if (this.startTime != -1L) {
            throw new IllegalStateException("Tuple recorder has already started at " + this.startTime);
        }
        this.startTime = startTime;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public String getId() {
        return this.id;
    }

    public void addInputPortInfo(String portName, String streamName) {
        PortInfo portInfo = new PortInfo();
        portInfo.name = portName;
        portInfo.streamName = streamName;
        portInfo.type = "input";
        portInfo.id = this.nextPortIndex++;
        this.portMap.put(portName, portInfo);
        PortCount pc = new PortCount();
        pc.id = portInfo.id;
        pc.count = 0L;
        this.portCountMap.put(portName, pc);
    }

    public void addOutputPortInfo(String portName, String streamName) {
        PortInfo portInfo = new PortInfo();
        portInfo.name = portName;
        portInfo.streamName = streamName;
        portInfo.type = "output";
        portInfo.id = this.nextPortIndex++;
        this.portMap.put(portName, portInfo);
        PortCount pc = new PortCount();
        pc.id = portInfo.id;
        pc.count = 0L;
        this.portCountMap.put(portName, pc);
    }

    public void teardown() {
        logger.info("Closing down tuple recorder.");
        this.storage.teardown();
    }

    public void setup(Operator operator, Map<Class<?>, Class<? extends StringCodec<?>>> codecs) {
        try {
            this.storage.setup();
            this.setStartTime(System.currentTimeMillis());
            if (this.id == null) {
                this.id = String.valueOf(this.startTime);
            }
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            bos.write("1.2\n".getBytes());
            RecordInfo recordInfo = new RecordInfo();
            recordInfo.startTime = this.startTime;
            recordInfo.appId = this.appId;
            this.streamCodec = new JsonStreamCodec(codecs);
            if (operator != null) {
                PropertyDescriptor[] propertyDescriptors;
                BeanInfo beanInfo = Introspector.getBeanInfo(operator.getClass());
                for (PropertyDescriptor pd : propertyDescriptors = beanInfo.getPropertyDescriptors()) {
                    String name = pd.getName();
                    Method readMethod = pd.getReadMethod();
                    if (readMethod == null) continue;
                    readMethod.setAccessible(true);
                    try {
                        Slice f = this.streamCodec.toByteArray(readMethod.invoke((Object)operator, new Object[0]));
                        recordInfo.properties.put(name, new ObjectMapperString(f.stringValue()));
                    }
                    catch (Throwable t) {
                        logger.warn("Cannot serialize property {} for operator {}", (Object)name, operator.getClass());
                        recordInfo.properties.put(name, null);
                    }
                }
            }
            Slice f = this.streamCodec.toByteArray((Object)recordInfo);
            bos.write(f.buffer, f.offset, f.length);
            bos.write("\n".getBytes());
            for (PortInfo pi : this.portMap.values()) {
                f = this.streamCodec.toByteArray((Object)pi);
                bos.write(f.buffer, f.offset, f.length);
                bos.write("\n".getBytes());
            }
            this.storage.writeMetaData(bos.toByteArray());
            if (this.wsClient != null) {
                this.recordingNameTopic = "applications." + this.appId + ".tupleRecorder." + this.getStartTime();
                this.setupWsClient();
            }
        }
        catch (Exception ex) {
            logger.error("Trouble setting up tuple recorder", (Throwable)ex);
        }
    }

    private void setupWsClient() throws ExecutionException, IOException, InterruptedException, TimeoutException {
        if (this.wsClient != null) {
            this.wsClient.addHandler(this.recordingNameTopic, true, new SharedPubSubWebSocketClient.Handler(){

                @Override
                public void onMessage(String type, String topic, Object data) {
                    TupleRecorder.this.numSubscribers = Integer.valueOf((String)data);
                    logger.info("Number of subscribers for recording started at {} is now {}", (Object)TupleRecorder.this.getStartTime(), (Object)TupleRecorder.this.numSubscribers);
                }

                @Override
                public void onClose() {
                    TupleRecorder.this.numSubscribers = 0;
                }
            });
        }
    }

    public void beginWindow(long windowId) {
        if (this.currentWindowId != windowId) {
            Range range;
            if (windowId != this.currentWindowId + 1L) {
                if (!this.windowIdRanges.isEmpty()) {
                    this.windowIdRanges.get((int)(this.windowIdRanges.size() - 1)).high = this.currentWindowId;
                }
                range = new Range();
                range.low = windowId;
                this.windowIdRanges.add(range);
            }
            if (this.windowIdRanges.isEmpty()) {
                range = new Range();
                range.low = windowId;
                this.windowIdRanges.add(range);
            }
            this.currentWindowId = windowId;
            this.endWindowTuplesProcessed = 0L;
            try {
                this.storage.writeDataItem(("B:" + System.currentTimeMillis() + ":" + windowId + "\n").getBytes(), false);
            }
            catch (IOException ex) {
                logger.error(ex.toString());
            }
        }
    }

    public void endWindow() {
        if (++this.endWindowTuplesProcessed == (long)this.portMap.size()) {
            try {
                this.storage.writeDataItem(("E:" + System.currentTimeMillis() + ":" + this.currentWindowId + "\n").getBytes(), false);
                logger.debug("Got last end window tuple.  Flushing...");
                if (!this.storage.flushData() && this.wsClient != null) {
                    this.wsClient.publish("_internal.lastIndex.tuple." + this.storage.getBasePath(), this.storage.getLatestIndexLine());
                }
            }
            catch (IOException ex) {
                logger.error("Exception caught in endWindow", (Throwable)ex);
            }
        }
        if (this.stopProcedure != null && --this.numWindows <= 0L) {
            this.stopProcedure.run();
        }
    }

    public void writeTuple(Object obj, String port) {
        if (this.windowIdRanges.isEmpty()) {
            throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
        }
        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            Slice f = this.streamCodec.toByteArray(obj);
            PortInfo pi = this.portMap.get(port);
            String str = "T:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
            bos.write(str.getBytes());
            bos.write(f.buffer, f.offset, f.length);
            bos.write("\n".getBytes());
            PortCount pc = this.portCountMap.get(port);
            ++pc.count;
            this.portCountMap.put(port, pc);
            this.storage.writeDataItem(bos.toByteArray(), true);
            ++this.totalTupleCount;
            if (this.numSubscribers > 0) {
                this.publishTupleData(pi.id, obj);
            }
        }
        catch (IOException ex) {
            logger.error(ex.toString());
        }
    }

    public void writeControlTuple(Tuple tuple, String port) {
        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            PortInfo pi = this.portMap.get(port);
            Slice f = this.streamCodec.toByteArray((Object)tuple);
            String str = "C:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
            bos.write(str.getBytes());
            bos.write(f.buffer, f.offset, f.length);
            bos.write("\n".getBytes());
            this.storage.writeDataItem(bos.toByteArray(), false);
        }
        catch (IOException ex) {
            logger.error(ex.toString());
        }
    }

    private static String convertToString(List<Range> ranges) {
        String result = "";
        int i = 0;
        for (Range range : ranges) {
            if (i++ > 0) {
                result = result + ",";
            }
            result = result + String.valueOf(range.low);
            result = result + "-";
            result = result + String.valueOf(range.high);
        }
        return result;
    }

    private void publishTupleData(int portId, Object obj) {
        try {
            if (this.wsClient != null && this.wsClient.isConnectionOpen()) {
                HashMap<String, Object> map = new HashMap<String, Object>();
                map.put("portId", String.valueOf(portId));
                map.put("windowId", this.currentWindowId);
                map.put("tupleCount", this.totalTupleCount);
                map.put("data", obj);
                this.wsClient.publish(this.recordingNameTopic, map);
            }
        }
        catch (Exception ex) {
            logger.warn("Error publishing tuple data", (Throwable)ex);
        }
    }

    public void setNumWindows(long numWindows, Runnable stopProcedure) {
        this.numWindows = numWindows;
        this.stopProcedure = stopProcedure;
    }

    public class RecorderSink
    implements Sink<Object> {
        private final String portName;
        private int count;

        public RecorderSink(String portName) {
            this.portName = portName;
        }

        public void put(Object payload) {
            ++this.count;
            if (payload instanceof Tuple) {
                Tuple tuple = (Tuple)payload;
                MessageType messageType = tuple.getType();
                if (messageType == MessageType.BEGIN_WINDOW) {
                    TupleRecorder.this.beginWindow(tuple.getWindowId());
                }
                TupleRecorder.this.writeControlTuple(tuple, this.portName);
                if (messageType == MessageType.END_WINDOW) {
                    TupleRecorder.this.endWindow();
                }
            } else {
                TupleRecorder.this.writeTuple(payload, this.portName);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int getCount(boolean reset) {
            try {
                int n = this.count;
                return n;
            }
            finally {
                if (reset) {
                    this.count = 0;
                }
            }
        }
    }

    public static class Range {
        public long low = -1L;
        public long high = -1L;

        public Range() {
        }

        public Range(long low, long high) {
            this.low = low;
            this.high = high;
        }

        public String toString() {
            return "[" + String.valueOf(this.low) + "," + String.valueOf(this.high) + "]";
        }
    }

    public static class RecordInfo {
        public long startTime;
        public String appId;
        public Map<String, ObjectMapperString> properties = new HashMap<String, ObjectMapperString>();
    }

    public static class PortCount {
        public int id;
        public long count;
    }

    public static class PortInfo {
        public String name;
        public String streamName;
        public String type;
        public int id;
    }
}

