/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.client;

import com.datatorrent.common.util.ObjectMapperString;
import com.datatorrent.stram.client.FSPartFileAgent;
import com.datatorrent.stram.client.StramAgent;
import com.datatorrent.stram.client.WebServicesVersionConversion;
import com.datatorrent.stram.debug.TupleRecorder;
import com.datatorrent.stram.util.WebServicesClient;
import com.sun.jersey.api.client.WebResource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.bind.annotation.XmlType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.ser.std.ToStringSerializer;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RecordingsAgent
extends FSPartFileAgent {
    private static final Logger LOG = LoggerFactory.getLogger(RecordingsAgent.class);
    private static final long MAX_LIMIT_TUPLES = 1000L;

    public RecordingsAgent(StramAgent stramAgent) {
        super(stramAgent);
    }

    public String getRecordingsDirectory(String appId, String opId) {
        return this.getRecordingsDirectory(appId) + "/" + opId;
    }

    public String getRecordingsDirectory(String appId) {
        String appPath = this.stramAgent.getAppPath(appId);
        if (appPath == null) {
            return null;
        }
        return appPath + "/" + "recordings";
    }

    public String getRecordingDirectory(String appId, String opId, String id) {
        String dir = this.getRecordingsDirectory(appId, opId);
        return dir == null ? null : dir + "/" + id;
    }

    @Override
    protected RecordingsIndexLine parseIndexLine(String line) throws JSONException {
        String[] windowRanges;
        RecordingsIndexLine info = new RecordingsIndexLine();
        if (line.startsWith("E")) {
            info.isEndLine = true;
            return info;
        }
        line = line.trim();
        info.windowIdRanges = new ArrayList<TupleRecorder.Range>();
        info.portTupleCount = new HashMap<String, MutableLong>();
        int cursor = 2;
        int cursor2 = line.indexOf(58, cursor);
        info.partFile = line.substring(cursor, cursor2);
        cursor = cursor2 + 1;
        cursor2 = line.indexOf(58, cursor);
        String timeRange = line.substring(cursor, cursor2);
        String[] tmp = timeRange.split("-");
        info.fromTime = Long.valueOf(tmp[0]);
        info.toTime = Long.valueOf(tmp[1]);
        cursor = cursor2 + 1;
        if ((cursor2 = line.indexOf(58, cursor)) < 0) {
            info.tupleCount = Long.valueOf(line.substring(cursor));
            return info;
        }
        info.tupleCount = Long.valueOf(line.substring(cursor, cursor2));
        cursor = cursor2 + 1;
        if (!line.substring(cursor, cursor + 2).equals("T:")) {
            return info;
        }
        cursor2 = line.indexOf(58, cursor += 2);
        String windowRangesString = line.substring(cursor, cursor2);
        for (String windowRange : windowRanges = windowRangesString.split(",")) {
            String[] hilow = windowRange.split("-");
            long low = Long.valueOf(hilow[0]);
            long hi = Long.valueOf(hilow[1]);
            info.windowIdRanges.add(new TupleRecorder.Range(low, hi));
        }
        cursor = cursor2 + 1;
        cursor2 = line.indexOf(58, cursor);
        int size = Integer.valueOf(line.substring(cursor, cursor2));
        cursor = cursor2 + 1;
        cursor2 = cursor + size;
        JSONObject json = new JSONObject(line.substring(cursor, cursor2));
        Iterator keys = json.keys();
        while (keys.hasNext()) {
            String portIndex = (String)keys.next();
            long tupleCount = json.getLong(portIndex);
            if (!info.portTupleCount.containsKey(portIndex)) {
                info.portTupleCount.put(portIndex, new MutableLong(tupleCount));
                continue;
            }
            info.portTupleCount.get(portIndex).add(tupleCount);
        }
        return info;
    }

    private Set<String> getRunningContainerIds(String appId) {
        HashSet<String> result = new HashSet<String>();
        try {
            JSONArray containers;
            WebServicesClient webServicesClient = new WebServicesClient();
            JSONObject response = this.stramAgent.issueStramWebGetRequest(webServicesClient, appId, "physicalPlan/containers");
            Object containersObj = response.get("containers");
            if (containersObj instanceof JSONArray) {
                containers = (JSONArray)containersObj;
            } else {
                containers = new JSONArray();
                containers.put(containersObj);
            }
            int len = containers.length();
            for (int i = 0; i < len; ++i) {
                JSONObject container = containers.getJSONObject(i);
                if (!container.getString("state").equals("ACTIVE")) continue;
                result.add(container.getString("id"));
            }
        }
        catch (Exception ex) {
            LOG.warn("Error {} getting running containers for {}. Assuming no containers are running.", (Object)ex.getMessage(), (Object)appId);
        }
        return result;
    }

    public List<RecordingInfo> getRecordingInfo(String appId) {
        ArrayList<RecordingInfo> result = new ArrayList<RecordingInfo>();
        String dir = this.getRecordingsDirectory(appId);
        if (dir == null) {
            return result;
        }
        Path path = new Path(dir);
        try {
            FileStatus fileStatus = this.stramAgent.getFileSystem().getFileStatus(path);
            if (!fileStatus.isDirectory()) {
                return result;
            }
            RemoteIterator ri = this.stramAgent.getFileSystem().listLocatedStatus(path);
            while (ri.hasNext()) {
                LocatedFileStatus lfs = (LocatedFileStatus)ri.next();
                if (!lfs.isDirectory()) continue;
                try {
                    String opId = lfs.getPath().getName();
                    result.addAll(this.getRecordingInfo(appId, opId));
                }
                catch (NumberFormatException numberFormatException) {}
            }
        }
        catch (IOException ex) {
            LOG.warn("Cannot get recording info for app id {}: {}", (Object)appId, (Object)ex);
            return result;
        }
        return result;
    }

    public List<RecordingInfo> getRecordingInfo(String appId, String opId) {
        Set<String> containers = this.getRunningContainerIds(appId);
        return this.getRecordingInfoHelper(appId, opId, containers);
    }

    private List<RecordingInfo> getRecordingInfoHelper(String appId, String opId, Set<String> containers) {
        ArrayList<RecordingInfo> result = new ArrayList<RecordingInfo>();
        String dir = this.getRecordingsDirectory(appId, opId);
        if (dir == null) {
            return result;
        }
        Path path = new Path(dir);
        try {
            FileStatus fileStatus = this.stramAgent.getFileSystem().getFileStatus(path);
            if (!fileStatus.isDirectory()) {
                return result;
            }
            RemoteIterator ri = this.stramAgent.getFileSystem().listLocatedStatus(path);
            while (ri.hasNext()) {
                LocatedFileStatus lfs = (LocatedFileStatus)ri.next();
                if (!lfs.isDirectory()) continue;
                try {
                    String id = lfs.getPath().getName();
                    RecordingInfo recordingInfo = this.getRecordingInfoHelper(appId, opId, id, containers);
                    if (recordingInfo == null) continue;
                    result.add(recordingInfo);
                }
                catch (NumberFormatException numberFormatException) {}
            }
        }
        catch (IOException ex) {
            LOG.warn("Cannot get recording info for app id {}: {}", (Object)appId, (Object)ex);
            return result;
        }
        return result;
    }

    public RecordingInfo getRecordingInfo(String appId, String opId, String id) {
        Set<String> containers = this.getRunningContainerIds(appId);
        return this.getRecordingInfoHelper(appId, opId, id, containers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RecordingInfo getRecordingInfoHelper(String appId, String opId, String id, Set<String> containers) {
        RecordingInfo info = new RecordingInfo();
        info.id = id;
        info.appId = appId;
        info.operatorId = opId;
        BufferedReader br = null;
        FSPartFileAgent.IndexFileBufferedReader ifbr = null;
        try {
            RecordingsIndexLine indexLine;
            String dir = this.getRecordingDirectory(appId, opId, id);
            if (dir == null) {
                throw new Exception("recording directory is null");
            }
            Path path = new Path(dir);
            FileStatus fileStatus = this.stramAgent.getFileSystem().getFileStatus(path);
            HashMap<String, PortInfo> portMap = new HashMap<String, PortInfo>();
            if (!fileStatus.isDirectory()) {
                throw new Exception(path + " is not a directory");
            }
            br = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, "meta.txt"))));
            String line = br.readLine();
            if (!line.equals("1.2")) {
                throw new Exception("Unexpected line: " + line);
            }
            line = br.readLine();
            JSONObject json = new JSONObject(line);
            info.startTime = json.getLong("startTime");
            info.containerId = json.optString("containerId");
            info.properties = new HashMap<String, Object>();
            if (!StringUtils.isBlank((String)info.containerId) && !containers.contains(info.containerId)) {
                info.ended = true;
            }
            if ((json = json.optJSONObject("properties")) != null) {
                Iterator keys = json.keys();
                while (keys.hasNext()) {
                    String key = (String)keys.next();
                    String strValue = json.isNull(key) ? null : json.optString(key);
                    info.properties.put(key, strValue != null ? strValue : new ObjectMapperString(json.get(key).toString()));
                }
            }
            info.ports = new ArrayList<PortInfo>();
            while ((line = br.readLine()) != null) {
                PortInfo portInfo = new PortInfo();
                json = new JSONObject(line);
                portInfo.id = json.getInt("id");
                portInfo.name = json.getString("name");
                portInfo.type = json.getString("type");
                portInfo.streamName = json.getString("streamName");
                info.ports.add(portInfo);
                portMap.put(String.valueOf(portInfo.id), portInfo);
            }
            ifbr = new FSPartFileAgent.IndexFileBufferedReader(this, new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, "index.txt"))), dir);
            info.windowIdRanges = new ArrayList<TupleRecorder.Range>();
            long prevHiWindowId = -1L;
            while ((indexLine = (RecordingsIndexLine)ifbr.readIndexLine()) != null) {
                if (indexLine.isEndLine) {
                    info.ended = true;
                    continue;
                }
                info.totalTuples += indexLine.tupleCount;
                for (Map.Entry<String, MutableLong> entry : indexLine.portTupleCount.entrySet()) {
                    PortInfo portInfo = (PortInfo)portMap.get(entry.getKey());
                    if (portInfo == null) {
                        throw new Exception("port info does not exist for " + entry.getKey());
                    }
                    portInfo.tupleCount += entry.getValue().longValue();
                }
                for (TupleRecorder.Range r : indexLine.windowIdRanges) {
                    TupleRecorder.Range range;
                    if (info.windowIdRanges.isEmpty()) {
                        range = new TupleRecorder.Range();
                        range.low = r.low;
                        info.windowIdRanges.add(range);
                    } else if (prevHiWindowId + 1L != r.low) {
                        range = info.windowIdRanges.get(info.windowIdRanges.size() - 1);
                        range.high = prevHiWindowId;
                        range = new TupleRecorder.Range();
                        range.low = r.low;
                        info.windowIdRanges.add(range);
                    }
                    prevHiWindowId = r.high;
                }
            }
            if (!info.windowIdRanges.isEmpty()) {
                TupleRecorder.Range range = info.windowIdRanges.get(info.windowIdRanges.size() - 1);
                range.high = prevHiWindowId;
            }
            IOUtils.closeQuietly((Reader)ifbr);
        }
        catch (Exception ex) {
            LOG.warn("Cannot get recording info for app id {}: {}", (Object)appId, (Object)ex);
            RecordingInfo recordingInfo = null;
            return recordingInfo;
        }
        finally {
            IOUtils.closeQuietly(ifbr);
            IOUtils.closeQuietly(br);
        }
        IOUtils.closeQuietly((Reader)br);
        return info;
    }

    public TuplesInfo getTuplesInfoByTime(String appId, String opId, String id, long fromTime, long toTime, long limit, String[] ports) {
        return this.getTuplesInfo(appId, opId, id, fromTime, toTime, limit, ports, QueryType.TIME);
    }

    public TuplesInfo getTuplesInfoByOffset(String appId, String opId, String id, long offset, long limit, String[] ports) {
        return this.getTuplesInfo(appId, opId, id, offset, 0L, limit, ports, QueryType.OFFSET);
    }

    public TuplesInfo getTuplesInfoByWindow(String appId, String opId, String id, long startWindow, long limit, String[] ports) {
        return this.getTuplesInfo(appId, opId, id, startWindow, 0L, limit, ports, QueryType.WINDOW);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private TuplesInfo getTuplesInfo(String appId, String opId, String id, long low, long high, long limit, String[] ports, QueryType queryType) {
        TuplesInfo info = new TuplesInfo();
        info.startOffset = -1L;
        String dir = this.getRecordingDirectory(appId, opId, id);
        if (dir == null) {
            return null;
        }
        try (FSPartFileAgent.IndexFileBufferedReader ifbr = new FSPartFileAgent.IndexFileBufferedReader(this, new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, "index.txt"))), dir);){
            RecordingsIndexLine indexLine;
            long currentOffset = 0L;
            boolean readPartFile = false;
            MutableLong numRemainingTuples = new MutableLong(limit);
            MutableLong currentTimestamp = new MutableLong();
            String lastProcessPartFile = null;
            while ((indexLine = (RecordingsIndexLine)ifbr.readIndexLine()) != null) {
                if (indexLine.isEndLine) continue;
                MutableLong currentWindowLow = new MutableLong();
                MutableLong currentWindowHigh = new MutableLong();
                long numTuples = 0L;
                if (ports == null || ports.length == 0) {
                    numTuples = indexLine.tupleCount;
                } else {
                    for (String port : ports) {
                        if (indexLine.portTupleCount.containsKey(port)) {
                            numTuples += indexLine.portTupleCount.get(port).longValue();
                            continue;
                        }
                        LOG.warn("Port index {} is not found, ignoring...", (Object)port);
                    }
                }
                currentWindowLow.setValue(indexLine.windowIdRanges.get((int)0).low);
                currentWindowHigh.setValue(indexLine.windowIdRanges.get((int)(indexLine.windowIdRanges.size() - 1)).high);
                if (!readPartFile) {
                    if (queryType == QueryType.WINDOW) {
                        if (currentWindowLow.longValue() > low) break;
                        if (currentWindowLow.longValue() <= low && low <= currentWindowHigh.longValue()) {
                            readPartFile = true;
                        }
                    } else if (queryType == QueryType.OFFSET) {
                        if (currentOffset + numTuples > low) {
                            readPartFile = true;
                        }
                    } else {
                        if (indexLine.fromTime > low) break;
                        if (indexLine.fromTime <= low && low <= indexLine.toTime) {
                            readPartFile = true;
                        }
                    }
                }
                if (readPartFile) {
                    lastProcessPartFile = indexLine.partFile;
                    try (BufferedReader partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, indexLine.partFile))));){
                        this.processPartFile(partBr, queryType, low, high, limit, ports, numRemainingTuples, currentTimestamp, currentWindowLow, currentOffset, info);
                        currentOffset += numTuples;
                    }
                }
                if (numRemainingTuples.longValue() > 0L && (queryType != QueryType.TIME || currentTimestamp.longValue() <= high)) continue;
                TuplesInfo tuplesInfo = info;
                return tuplesInfo;
            }
            BufferedReader partBr = null;
            try {
                String extraPartFile = RecordingsAgent.getNextPartFile(lastProcessPartFile);
                if (extraPartFile == null) return info;
                partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, extraPartFile))));
                this.processPartFile(partBr, queryType, low, high, limit, ports, numRemainingTuples, currentTimestamp, new MutableLong(), currentOffset, info);
                return info;
            }
            catch (Exception exception) {
                return info;
            }
            finally {
                IOUtils.closeQuietly(partBr);
            }
        }
        catch (Exception ex) {
            LOG.warn("Cannot get recording tuple info for app id {}: {}", (Object)appId, (Object)ex);
            return null;
        }
    }

    private void processPartFile(BufferedReader partBr, QueryType queryType, long low, long high, long limit, String[] ports, MutableLong numRemainingTuples, MutableLong currentTimestamp, MutableLong currentWindowLow, long currentOffset, TuplesInfo info) throws IOException {
        String partLine;
        long tmpOffset = currentOffset;
        while ((partLine = partBr.readLine()) != null) {
            int partCursor2;
            int partCursor = 2;
            if (partLine.startsWith("B:")) {
                partCursor2 = partLine.indexOf(58, partCursor);
                currentTimestamp.setValue((Object)Long.valueOf(partLine.substring(partCursor, partCursor2)));
                partCursor = partCursor2 + 1;
                currentWindowLow.setValue((Object)Long.valueOf(partLine.substring(partCursor)));
                if (limit == numRemainingTuples.longValue()) continue;
                WindowTuplesInfo wtinfo = new WindowTuplesInfo();
                wtinfo.windowId = currentWindowLow.longValue();
                info.tuples.add(wtinfo);
                continue;
            }
            if (!partLine.startsWith("T:")) continue;
            partCursor2 = partLine.indexOf(58, partCursor);
            currentTimestamp.setValue((Object)Long.valueOf(partLine.substring(partCursor, partCursor2)));
            partCursor = partCursor2 + 1;
            partCursor2 = partLine.indexOf(58, partCursor);
            String port = partLine.substring(partCursor, partCursor2);
            boolean portMatch = ports == null || ports.length == 0 || Arrays.asList(ports).contains(port);
            partCursor = partCursor2 + 1;
            if (portMatch && (queryType == QueryType.WINDOW && currentWindowLow.longValue() >= low || queryType == QueryType.OFFSET && tmpOffset >= low || queryType == QueryType.TIME && currentTimestamp.longValue() >= low)) {
                WindowTuplesInfo wtinfo;
                if (numRemainingTuples.longValue() <= 0L) break;
                if (info.startOffset == -1L) {
                    info.startOffset = tmpOffset;
                }
                if (info.tuples.isEmpty() || info.tuples.get((int)(info.tuples.size() - 1)).windowId != currentWindowLow.longValue()) {
                    wtinfo = new WindowTuplesInfo();
                    wtinfo.windowId = currentWindowLow.longValue();
                    info.tuples.add(wtinfo);
                } else {
                    wtinfo = info.tuples.get(info.tuples.size() - 1);
                }
                partCursor2 = partLine.indexOf(58, partCursor);
                int size = Integer.valueOf(partLine.substring(partCursor, partCursor2));
                partCursor = partCursor2 + 1;
                String tupleValue = partLine.substring(partCursor);
                wtinfo.tuples.add(new TupleInfo(port, tupleValue));
                numRemainingTuples.decrement();
            }
            if (!portMatch) continue;
            ++tmpOffset;
        }
    }

    public JSONObject startRecording(String appId, String opId, String portName, long numWindows) throws WebServicesVersionConversion.IncompatibleVersionException {
        LOG.debug("Start recording requested for {}.{} ({} windows)", new Object[]{opId, portName, numWindows});
        try {
            final JSONObject request = new JSONObject();
            StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
            uriSpec = uriSpec.path("physicalPlan/operators").path(opId);
            if (!StringUtils.isBlank((String)portName)) {
                uriSpec = uriSpec.path("ports").path(portName);
            }
            uriSpec = uriSpec.path("recordings/start");
            request.put("numWindows", numWindows);
            WebServicesClient webServicesClient = new WebServicesClient();
            return this.stramAgent.issueStramWebRequest(webServicesClient, appId, uriSpec, new WebServicesClient.WebServicesHandler<JSONObject>(){

                @Override
                public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz) {
                    return (JSONObject)((WebResource.Builder)webResource.type("application/json")).post(clazz, (Object)request);
                }
            });
        }
        catch (Exception ex) {
            LOG.error("Cannot start recording: ", (Throwable)ex);
            return null;
        }
    }

    public JSONObject stopRecording(String appId, String opId, String portName) throws WebServicesVersionConversion.IncompatibleVersionException {
        try {
            final JSONObject request = new JSONObject();
            StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
            uriSpec = uriSpec.path("physicalPlan/operators").path(opId);
            if (!StringUtils.isBlank((String)portName)) {
                uriSpec = uriSpec.path("ports").path(portName);
            }
            uriSpec = uriSpec.path("recordings/stop");
            WebServicesClient webServicesClient = new WebServicesClient();
            return this.stramAgent.issueStramWebRequest(webServicesClient, appId, uriSpec, new WebServicesClient.WebServicesHandler<JSONObject>(){

                @Override
                public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz) {
                    return (JSONObject)((WebResource.Builder)webResource.type("application/json")).post(clazz, (Object)request);
                }
            });
        }
        catch (Exception ex) {
            LOG.error("Cannot stop recording: ", (Throwable)ex);
            return null;
        }
    }

    private static enum QueryType {
        OFFSET,
        WINDOW,
        TIME;

    }

    public static class TupleInfo {
        public String portId;
        public ObjectMapperString data;

        TupleInfo(String portId, String data) {
            this.portId = portId;
            this.data = new ObjectMapperString(data);
        }
    }

    public static class TuplesInfo {
        @JsonSerialize(using=ToStringSerializer.class)
        public long startOffset;
        public List<WindowTuplesInfo> tuples = new ArrayList<WindowTuplesInfo>();
    }

    public static class WindowTuplesInfo {
        @JsonSerialize(using=ToStringSerializer.class)
        public long windowId;
        public List<TupleInfo> tuples = new ArrayList<TupleInfo>();
    }

    @XmlType(name="port_info")
    public static class PortInfo
    extends TupleRecorder.PortInfo {
        @JsonSerialize(using=ToStringSerializer.class)
        public long tupleCount = 0L;
    }

    private static class RecordingsIndexLine
    extends FSPartFileAgent.IndexLine {
        public List<TupleRecorder.Range> windowIdRanges;
        @JsonSerialize(using=ToStringSerializer.class)
        public long fromTime;
        @JsonSerialize(using=ToStringSerializer.class)
        public long toTime;
        @JsonSerialize(using=ToStringSerializer.class)
        public long tupleCount;
        public Map<String, MutableLong> portTupleCount;

        private RecordingsIndexLine() {
        }
    }

    public static class RecordingInfo {
        public String id;
        @JsonSerialize(using=ToStringSerializer.class)
        public long startTime;
        public String containerId;
        public String appId;
        public String operatorId;
        @JsonSerialize(using=ToStringSerializer.class)
        public long totalTuples = 0L;
        public List<PortInfo> ports;
        public boolean ended = false;
        public List<TupleRecorder.Range> windowIdRanges;
        public Map<String, Object> properties;
    }
}

