/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.debug;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.RequestFactory;
import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.debug.OperatorIdPortNamePair;
import com.datatorrent.stram.debug.TupleRecorder;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.engio.mbassy.listener.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TupleRecorderCollection
extends HashMap<OperatorIdPortNamePair, TupleRecorder>
implements Component<Context> {
    private int tupleRecordingPartFileSize;
    private String gatewayAddress;
    private boolean gatewayUseSsl = false;
    private String gatewayUserName;
    private String gatewayPassword;
    private long tupleRecordingPartFileTimeMillis;
    private String appPath;
    private String appId;
    private SharedPubSubWebSocketClient wsClient;
    private Map<Class<?>, Class<? extends StringCodec<?>>> codecs;
    private static final long serialVersionUID = 201309112123L;
    private static final Logger logger = LoggerFactory.getLogger(TupleRecorderCollection.class);

    public TupleRecorder getTupleRecorder(int operId, String portName) {
        return (TupleRecorder)this.get(new OperatorIdPortNamePair(operId, portName));
    }

    public void setup(Context ctx) {
        this.tupleRecordingPartFileSize = (Integer)ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE);
        this.tupleRecordingPartFileTimeMillis = ((Integer)ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS)).intValue();
        this.appId = (String)ctx.getValue(LogicalPlan.APPLICATION_ID);
        this.gatewayAddress = (String)ctx.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        this.gatewayUseSsl = (Boolean)ctx.getValue(LogicalPlan.GATEWAY_USE_SSL);
        this.gatewayUserName = (String)ctx.getValue(LogicalPlan.GATEWAY_USER_NAME);
        this.gatewayPassword = (String)ctx.getValue(LogicalPlan.GATEWAY_PASSWORD);
        this.appPath = (String)ctx.getValue(LogicalPlan.APPLICATION_PATH);
        this.codecs = (Map)ctx.getAttributes().get(Context.DAGContext.STRING_CODECS);
        RequestDelegateImpl impl = new RequestDelegateImpl();
        RequestFactory rf = (RequestFactory)ctx.getValue(ContainerContext.REQUEST_FACTORY);
        if (rf == null) {
            logger.warn("No request factory defined, recording is disabled!");
        } else {
            rf.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.START_RECORDING, impl);
            rf.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING, impl);
            rf.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.SYNC_RECORDING, impl);
        }
    }

    public void teardown() {
        for (TupleRecorder entry : this.values()) {
            entry.teardown();
        }
        if (this.wsClient != null) {
            this.wsClient.teardown();
        }
        this.clear();
    }

    public final String getDeclaredStreamId(int operatorId, String portname) {
        return String.valueOf(operatorId).concat(".").concat(portname);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startRecording(String id, final Node<?> node, int operatorId, final String portName, long numWindows) {
        Operators.PortMappingDescriptor descriptor = node.getPortMappingDescriptor();
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
        boolean conflict = false;
        if (this.containsKey(new OperatorIdPortNamePair(operatorId, null))) {
            conflict = true;
        } else if (portName == null) {
            for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
                if (!this.containsKey(new OperatorIdPortNamePair(operatorId, entry.getKey()))) continue;
                conflict = true;
                break;
            }
            for (Map.Entry entry : descriptor.outputPorts.entrySet()) {
                if (!this.containsKey(new OperatorIdPortNamePair(operatorId, (String)entry.getKey()))) continue;
                conflict = true;
                break;
            }
        } else if (this.containsKey(operatorIdPortNamePair)) {
            conflict = true;
        }
        if (!conflict) {
            String streamId;
            logger.debug("Executing start recording request for {}", (Object)operatorIdPortNamePair);
            if (this.gatewayAddress != null && this.wsClient == null) {
                TupleRecorderCollection i$ = this;
                synchronized (i$) {
                    if (this.wsClient == null) {
                        try {
                            this.wsClient = new SharedPubSubWebSocketClient((this.gatewayUseSsl ? "wss://" : "ws://") + this.gatewayAddress + "/pubsub", 500L);
                            if (this.gatewayUserName != null && this.gatewayPassword != null) {
                                this.wsClient.setLoginUrl((this.gatewayUseSsl ? "https://" : "http://") + this.gatewayAddress + "/ws/v2/login");
                                this.wsClient.setUserName(this.gatewayUserName);
                                this.wsClient.setPassword(this.gatewayPassword);
                            }
                            this.wsClient.setup();
                        }
                        catch (Exception exception) {
                            logger.warn("Error initializing websocket", (Throwable)exception);
                        }
                    }
                }
            }
            TupleRecorder tupleRecorder = new TupleRecorder(id, this.appId);
            tupleRecorder.setWebSocketClient(this.wsClient);
            HashMap<String, Sink<Object>> hashMap = new HashMap<String, Sink<Object>>();
            for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
                streamId = this.getDeclaredStreamId(operatorId, entry.getKey());
                if (streamId == null) {
                    streamId = portName + "_implicit_stream";
                }
                if (entry.getValue().context == null || portName != null && !entry.getKey().equals(portName)) continue;
                logger.debug("Adding recorder sink to input port {}, stream {}", (Object)entry.getKey(), (Object)streamId);
                tupleRecorder.addInputPortInfo(entry.getKey(), streamId);
                hashMap.put(entry.getKey(), tupleRecorder.newSink(entry.getKey()));
            }
            for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : descriptor.outputPorts.entrySet()) {
                streamId = this.getDeclaredStreamId(operatorId, entry.getKey());
                if (streamId == null) {
                    streamId = portName + "_implicit_stream";
                }
                if (portName != null && !entry.getKey().equals(portName)) continue;
                logger.debug("Adding recorder sink to output port {}, stream {}", (Object)entry.getKey(), (Object)streamId);
                tupleRecorder.addOutputPortInfo(entry.getKey(), streamId);
                hashMap.put(entry.getKey(), tupleRecorder.newSink(entry.getKey()));
            }
            if (!hashMap.isEmpty()) {
                logger.debug("Started recording on {} through {}", (Object)operatorIdPortNamePair, (Object)System.identityHashCode(this));
                String basePath = this.appPath + "/recordings/" + operatorId + "/" + tupleRecorder.getId();
                tupleRecorder.getStorage().setBasePath(basePath);
                tupleRecorder.getStorage().setBytesPerPartFile(this.tupleRecordingPartFileSize);
                tupleRecorder.getStorage().setMillisPerPartFile(this.tupleRecordingPartFileTimeMillis);
                node.addSinks(hashMap);
                tupleRecorder.setup(node.getOperator(), this.codecs);
                this.put(operatorIdPortNamePair, tupleRecorder);
                if (numWindows > 0L) {
                    tupleRecorder.setNumWindows(numWindows, new Runnable(){

                        @Override
                        public void run() {
                            node.context.request(new StatsListener.OperatorRequest(){

                                public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                                    TupleRecorderCollection.this.stopRecording(node, operatorId, portName);
                                    return null;
                                }
                            });
                        }
                    });
                }
            } else {
                logger.warn("Tuple recording request ignored because operator is not connected on the specified port.");
            }
        } else {
            logger.error("Operator id {} is already being recorded.", (Object)operatorId);
        }
    }

    private void stopRecording(Node<?> node, int operatorId, String portName) {
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
        if (this.containsKey(operatorIdPortNamePair)) {
            logger.debug("Executing stop recording request for {}", (Object)operatorIdPortNamePair);
            TupleRecorder tupleRecorder = (TupleRecorder)this.get(operatorIdPortNamePair);
            if (tupleRecorder != null) {
                node.removeSinks(tupleRecorder.getSinkMap());
                tupleRecorder.teardown();
                logger.debug("Stopped recording for {}", (Object)operatorIdPortNamePair);
                this.remove(operatorIdPortNamePair);
            }
        } else if (portName == null) {
            Iterator iterator = this.entrySet().iterator();
            while (iterator.hasNext()) {
                TupleRecorder tupleRecorder;
                Map.Entry entry = iterator.next();
                if (operatorId != ((OperatorIdPortNamePair)entry.getKey()).operatorId || (tupleRecorder = (TupleRecorder)entry.getValue()) == null) continue;
                node.removeSinks(tupleRecorder.getSinkMap());
                tupleRecorder.teardown();
                logger.debug("Stopped recording for operator/port {}", (Object)operatorIdPortNamePair);
                iterator.remove();
            }
        } else {
            logger.error("Operator/port {} is not being recorded.", (Object)operatorIdPortNamePair);
        }
    }

    private void syncRecording(Node<?> node, int operatorId, String portName) {
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(operatorId, portName);
        if (this.containsKey(operatorIdPortNamePair)) {
            logger.debug("Executing sync recording request for {}", (Object)operatorIdPortNamePair);
            TupleRecorder tupleRecorder = (TupleRecorder)this.get(operatorIdPortNamePair);
            if (tupleRecorder != null) {
                tupleRecorder.getStorage().requestSync();
                logger.debug("Requested sync recording for operator/port {}", (Object)operatorIdPortNamePair);
            }
        } else if (portName == null) {
            for (Map.Entry entry : this.entrySet()) {
                TupleRecorder tupleRecorder;
                if (operatorId != ((OperatorIdPortNamePair)entry.getKey()).operatorId || (tupleRecorder = (TupleRecorder)entry.getValue()) == null) continue;
                tupleRecorder.getStorage().requestSync();
                logger.debug("Requested sync recording for operator/port {}", (Object)operatorIdPortNamePair);
            }
        } else {
            logger.error("(SYNC_RECORDING) Operator/port {} is not being recorded.", (Object)operatorIdPortNamePair);
        }
    }

    @Handler
    public void activated(ContainerEvent.NodeActivationEvent nae) {
        Node<?> node = nae.getNode();
        if (((Boolean)node.context.getValue(Context.OperatorContext.AUTO_RECORD)).booleanValue()) {
            this.startRecording(null, node, node.getId(), null, 0L);
        } else {
            for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : node.getPortMappingDescriptor().inputPorts.entrySet()) {
                if (entry.getValue().context == null || !((Boolean)((Context.PortContext)entry.getValue().context).getValue(Context.PortContext.AUTO_RECORD)).booleanValue()) continue;
                this.startRecording(null, node, node.getId(), entry.getKey(), 0L);
            }
            for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : node.getPortMappingDescriptor().outputPorts.entrySet()) {
                if (entry.getValue().context == null || !((Boolean)((Context.PortContext)entry.getValue().context).getValue(Context.PortContext.AUTO_RECORD)).booleanValue()) continue;
                this.startRecording(null, node, node.getId(), entry.getKey(), 0L);
            }
        }
    }

    @Handler
    public void deactivated(ContainerEvent.NodeDeactivationEvent nde) {
        Node<?> node = nde.getNode();
        this.stopRecording(node, node.getId(), null);
    }

    @Handler
    public void collected(ContainerEvent.ContainerStatsEvent cse) {
        StreamingContainerUmbilicalProtocol.ContainerStats stats = cse.getContainerStats();
        for (StreamingContainerUmbilicalProtocol.OperatorHeartbeat node : stats.operators) {
            for (Stats.OperatorStats os : node.windowStats) {
                if (os.inputPorts != null) {
                    for (Stats.OperatorStats.PortStats portStats : os.inputPorts) {
                        portStats.recordingId = null;
                    }
                }
                if (os.outputPorts == null) continue;
                for (Stats.OperatorStats.PortStats portStats : os.outputPorts) {
                    portStats.recordingId = null;
                }
            }
        }
        for (StreamingContainerUmbilicalProtocol.OperatorHeartbeat node : stats.operators) {
            String recordingId;
            TupleRecorder tupleRecorder = (TupleRecorder)this.get(new OperatorIdPortNamePair(node.nodeId, null));
            if (tupleRecorder == null) {
                recordingId = null;
                for (Map.Entry entry : this.entrySet()) {
                    if (((OperatorIdPortNamePair)entry.getKey()).operatorId != node.nodeId) continue;
                    block6: for (Stats.OperatorStats os : node.windowStats) {
                        if (os.inputPorts != null) {
                            for (Stats.OperatorStats.PortStats ps : os.inputPorts) {
                                if (!ps.id.equals(((OperatorIdPortNamePair)entry.getKey()).portName)) continue;
                                ps.recordingId = ((TupleRecorder)entry.getValue()).getId();
                                break;
                            }
                        }
                        if (os.outputPorts == null) continue;
                        for (Stats.OperatorStats.PortStats ps : os.outputPorts) {
                            if (!ps.id.equals(((OperatorIdPortNamePair)entry.getKey()).portName)) continue;
                            ps.recordingId = ((TupleRecorder)entry.getValue()).getId();
                            continue block6;
                        }
                    }
                }
            } else {
                recordingId = tupleRecorder.getId();
            }
            for (Stats.OperatorStats operatorStats : node.windowStats) {
                operatorStats.recordingId = recordingId;
            }
        }
    }

    private class RequestDelegateImpl
    implements RequestFactory.RequestDelegate {
        private RequestDelegateImpl() {
        }

        @Override
        public StatsListener.OperatorRequest getRequestExecutor(final Node<?> node, final StreamingContainerUmbilicalProtocol.StramToNodeRequest snr) {
            switch (snr.getRequestType()) {
                case START_RECORDING: {
                    return new StatsListener.OperatorRequest(){

                        public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                            StramToNodeStartRecordingRequest r = (StramToNodeStartRecordingRequest)snr;
                            TupleRecorderCollection.this.startRecording(r.getId(), node, operatorId, r.getPortName(), r.getNumWindows());
                            return null;
                        }

                        public String toString() {
                            return "Start Recording";
                        }
                    };
                }
                case STOP_RECORDING: {
                    return new StatsListener.OperatorRequest(){

                        public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                            TupleRecorderCollection.this.stopRecording(node, operatorId, snr.getPortName());
                            return null;
                        }

                        public String toString() {
                            return "Stop Recording";
                        }
                    };
                }
                case SYNC_RECORDING: {
                    return new StatsListener.OperatorRequest(){

                        public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                            TupleRecorderCollection.this.syncRecording(node, operatorId, snr.getPortName());
                            return null;
                        }

                        public String toString() {
                            return "Recording Request";
                        }
                    };
                }
            }
            throw new UnsupportedOperationException("Unknown request type " + (Object)((Object)snr.requestType));
        }
    }
}

