/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.auth.AuthManager;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.common.util.NumberAggregate;
import com.datatorrent.common.util.Pair;
import com.datatorrent.netlet.Listener;
import com.datatorrent.stram.BufferServerController;
import com.datatorrent.stram.FSEventRecorder;
import com.datatorrent.stram.FSStatsRecorder;
import com.datatorrent.stram.Journal;
import com.datatorrent.stram.PhysicalMetricsContextImpl;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
import com.datatorrent.stram.api.StramToNodeGetPropertyRequest;
import com.datatorrent.stram.api.StramToNodeSetPropertyRequest;
import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.OperatorResponse;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PlanModifier;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.util.FSJsonLineFile;
import com.datatorrent.stram.util.MovingAverage;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.datatorrent.stram.util.WebServicesClient;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import com.datatorrent.stram.webapp.OperatorAggregationInfo;
import com.datatorrent.stram.webapp.OperatorInfo;
import com.datatorrent.stram.webapp.PortInfo;
import com.datatorrent.stram.webapp.StreamInfo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingContainerManager
implements PhysicalPlan.PlanContext {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
    public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
    public static final String BUILTIN_APPDATA_URL = "builtin";
    public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
    public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
    public static final String APP_META_FILENAME = "meta.json";
    public static final String APP_META_KEY_ATTRIBUTES = "attributes";
    public static final String APP_META_KEY_METRICS = "metrics";
    public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
    public static final Journal.Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
    public static final Journal.Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty();
    public static final int METRIC_QUEUE_SIZE = 1000;
    private final FinalVars vars;
    private final PhysicalPlan plan;
    private final Clock clock;
    private SharedPubSubWebSocketClient wsClient;
    private FSStatsRecorder statsRecorder;
    private FSEventRecorder eventRecorder;
    protected final Map<String, String> containerStopRequests = new ConcurrentHashMap<String, String>();
    protected final ConcurrentLinkedQueue<StreamingContainerAgent.ContainerStartRequest> containerStartRequests = new ConcurrentLinkedQueue();
    protected boolean forcedShutdown = false;
    private final ConcurrentLinkedQueue<Runnable> eventQueue = new ConcurrentLinkedQueue();
    private final AtomicBoolean eventQueueProcessing = new AtomicBoolean();
    private final HashSet<PTContainer> pendingAllocation = Sets.newLinkedHashSet();
    protected String shutdownDiagnosticsMessage = "";
    private long lastResourceRequest = 0L;
    private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
    private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
    private Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> checkpointGroups;
    private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<Long, Set<PTOperator>>();
    private CriticalPathInfo criticalPathInfo;
    private final ConcurrentMap<PTOperator, PTOperator> reportStats = new ConcurrentHashMap<PTOperator, PTOperator>();
    private final AtomicBoolean deployChangeInProgress = new AtomicBoolean();
    private int deployChangeCnt;
    private MBassador<StramEvent> eventBus;
    private final Journal journal;
    private RecoveryHandler recoveryHandler;
    private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap = new ConcurrentSkipListMap();
    private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp = new ConcurrentHashMap<PTOperator, PTOperator>();
    private long committedWindowId;
    private final Map<Pair<Integer, String>, Long> operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap();
    private final Map<Integer, Long> operatorLastEndWindowTimestamps = Maps.newConcurrentMap();
    private long lastStatsTimestamp = System.currentTimeMillis();
    private long currentEndWindowStatsWindowId;
    private long completeEndWindowStatsWindowId;
    private final ConcurrentHashMap<String, MovingAverage.MovingAverageLong> rpcLatencies = new ConcurrentHashMap();
    private final AtomicLong nodeToStramRequestIds = new AtomicLong(1L);
    private int allocatedMemoryMB = 0;
    private List<AppDataSource> appDataSources = null;
    private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).build();
    private transient ExecutorService poolExecutor;
    private FileContext fileContext;
    private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
    private final Map<String, Map<String, Object>> latestLogicalMetrics = Maps.newHashMap();
    private final Map<String, Object> latestLogicalCounters = Maps.newHashMap();
    private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String, ContainerInfo>(){
        private static final long serialVersionUID = 201405281500L;

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, ContainerInfo> eldest) {
            long expireTime = System.currentTimeMillis() - 108000L;
            Iterator iterator = this.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = iterator.next();
                if (((ContainerInfo)entry.getValue()).finishedTime >= expireTime) continue;
                iterator.remove();
            }
            return false;
        }
    };
    private FSJsonLineFile containerFile;
    private FSJsonLineFile operatorFile;
    private final long startTime = System.currentTimeMillis();
    private final Object appDataSourcesLock = new Object();

    public StreamingContainerManager(LogicalPlan dag, Clock clock) {
        this(dag, false, clock);
    }

    public StreamingContainerManager(LogicalPlan dag) {
        this(dag, false, (Clock)new SystemClock());
    }

    public StreamingContainerManager(LogicalPlan dag, boolean enableEventRecording, Clock clock) {
        this.clock = clock;
        this.vars = new FinalVars(dag, clock.getTime());
        this.poolExecutor = Executors.newFixedThreadPool(4);
        if (enableEventRecording) {
            this.eventBus = new MBassador(BusConfiguration.Default((int)1, (int)1, (int)1));
        }
        this.plan = new PhysicalPlan(dag, this);
        this.journal = new Journal(this);
        this.init(enableEventRecording);
    }

    private StreamingContainerManager(CheckpointState checkpointedState, boolean enableEventRecording) {
        this.vars = checkpointedState.finals;
        this.clock = new SystemClock();
        this.poolExecutor = Executors.newFixedThreadPool(4);
        this.plan = checkpointedState.physicalPlan;
        this.eventBus = new MBassador(BusConfiguration.Default((int)1, (int)1, (int)1));
        this.journal = new Journal(this);
        this.init(enableEventRecording);
    }

    private void init(boolean enableEventRecording) {
        this.setupWsClient();
        this.setupRecording(enableEventRecording);
        this.setupStringCodecs();
        try {
            Path file = new Path(this.vars.appPath);
            URI uri = file.toUri();
            YarnConfiguration config = new YarnConfiguration();
            this.fileContext = uri.getScheme() == null ? FileContext.getFileContext((Configuration)config) : FileContext.getFileContext((URI)uri, (Configuration)config);
            this.saveMetaInfo();
            String fileName = String.format(CONTAINERS_INFO_FILENAME_FORMAT, this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
            this.containerFile = new FSJsonLineFile(this.fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
            this.containerFile.append(this.getAppMasterContainerInfo());
            fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID));
            this.operatorFile = new FSJsonLineFile(this.fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault());
        }
        catch (IOException ex) {
            throw Throwables.propagate((Throwable)ex);
        }
    }

    public Journal getJournal() {
        return this.journal;
    }

    public final ContainerInfo getAppMasterContainerInfo() {
        ContainerInfo ci = new ContainerInfo();
        ci.id = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
        String nmHost = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
        String nmPort = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
        String nmHttpPort = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
        ci.state = "ACTIVE";
        ci.jvmName = ManagementFactory.getRuntimeMXBean().getName();
        ci.numOperators = 0;
        YarnConfiguration conf = new YarnConfiguration();
        if (nmHost != null) {
            if (nmPort != null) {
                ci.host = nmHost + ":" + nmPort;
            }
            if (nmHttpPort != null) {
                String nodeHttpAddress = nmHost + ":" + nmHttpPort;
                if (this.allocatedMemoryMB == 0) {
                    String url = ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/ws/v1/node/containers/" + ci.id;
                    WebServicesClient webServicesClient = new WebServicesClient();
                    try {
                        String content = webServicesClient.process(url, String.class, new WebServicesClient.GetWebServicesHandler());
                        JSONObject json = new JSONObject(content);
                        int totalMemoryNeededMB = json.getJSONObject("container").getInt("totalMemoryNeededMB");
                        if (totalMemoryNeededMB > 0) {
                            this.allocatedMemoryMB = totalMemoryNeededMB;
                        } else {
                            LOG.warn("Could not determine the memory allocated for the streaming application master.  Node manager is reporting {} MB from {}", (Object)totalMemoryNeededMB, (Object)url);
                        }
                    }
                    catch (Exception ex) {
                        LOG.warn("Could not determine the memory allocated for the streaming application master", (Throwable)ex);
                    }
                }
                ci.containerLogsUrl = ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString());
                ci.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(conf, nodeHttpAddress, (String)this.plan.getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), ci.id);
            }
        }
        ci.memoryMBAllocated = this.allocatedMemoryMB;
        ci.memoryMBFree = (int)(Runtime.getRuntime().freeMemory() / 0x100000L);
        ci.lastHeartbeat = -1L;
        ci.startedTime = this.startTime;
        ci.finishedTime = -1L;
        return ci;
    }

    public void updateRPCLatency(String containerId, long latency) {
        if (this.vars.rpcLatencyCompensationSamples > 0) {
            MovingAverage.MovingAverageLong val;
            MovingAverage.MovingAverageLong latencyMA = this.rpcLatencies.get(containerId);
            if (latencyMA == null && (latencyMA = this.rpcLatencies.putIfAbsent(containerId, val = new MovingAverage.MovingAverageLong(this.vars.rpcLatencyCompensationSamples))) == null) {
                latencyMA = val;
            }
            latencyMA.add(latency);
        }
    }

    private void setupRecording(boolean enableEventRecording) {
        if (this.vars.enableStatsRecording) {
            this.statsRecorder = new FSStatsRecorder();
            this.statsRecorder.setBasePath(this.vars.appPath + "/" + LogicalPlan.SUBDIR_STATS);
            this.statsRecorder.setup();
        }
        if (enableEventRecording) {
            this.eventRecorder = new FSEventRecorder((String)this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ID));
            this.eventRecorder.setBasePath(this.vars.appPath + "/" + LogicalPlan.SUBDIR_EVENTS);
            this.eventRecorder.setWebSocketClient(this.wsClient);
            this.eventRecorder.setup();
            this.eventBus.subscribe((Object)this.eventRecorder);
        }
    }

    private void setupStringCodecs() {
        Map codecs = (Map)this.plan.getLogicalPlan().getAttributes().get(Context.DAGContext.STRING_CODECS);
        StringCodecs.loadConverters(codecs);
    }

    private void setupWsClient() {
        String gatewayAddress = (String)this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        boolean gatewayUseSsl = (Boolean)this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL);
        String gatewayUserName = (String)this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME);
        String gatewayPassword = (String)this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD);
        int timeout = (Integer)this.plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
        if (gatewayAddress != null) {
            try {
                this.wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", (long)timeout);
                if (gatewayUserName != null && gatewayPassword != null) {
                    this.wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
                    this.wsClient.setUserName(gatewayUserName);
                    this.wsClient.setPassword(gatewayPassword);
                }
                this.wsClient.setup();
            }
            catch (Exception ex) {
                LOG.warn("Cannot establish websocket connection to {}", (Object)gatewayAddress, (Object)ex);
            }
        }
    }

    public void teardown() {
        if (this.eventBus != null) {
            this.eventBus.shutdown();
        }
        if (this.eventRecorder != null) {
            this.eventRecorder.teardown();
        }
        if (this.statsRecorder != null) {
            this.statsRecorder.teardown();
        }
        IOUtils.closeQuietly((Closeable)this.containerFile);
        IOUtils.closeQuietly((Closeable)this.operatorFile);
        if (this.poolExecutor != null) {
            this.poolExecutor.shutdown();
        }
    }

    public void subscribeToEvents(Object listener) {
        if (this.eventBus != null) {
            this.eventBus.subscribe(listener);
        }
    }

    public PhysicalPlan getPhysicalPlan() {
        return this.plan;
    }

    public long getCommittedWindowId() {
        return this.committedWindowId;
    }

    public boolean isGatewayConnected() {
        return this.wsClient != null && this.wsClient.isConnectionOpen();
    }

    public SharedPubSubWebSocketClient getWsClient() {
        return this.wsClient;
    }

    private String convertAppDataUrl(String url) {
        if (BUILTIN_APPDATA_URL.equals(url)) {
            return url;
        }
        LOG.warn("App Data URL {} cannot be converted for the client.", (Object)url);
        return url;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<AppDataSource> getAppDataSources() {
        Object object = this.appDataSourcesLock;
        synchronized (object) {
            if (this.appDataSources == null) {
                this.appDataSources = new ArrayList<AppDataSource>();
                block3: for (LogicalPlan.OperatorMeta operatorMeta : this.plan.getLogicalPlan().getAllOperators()) {
                    Serializable portMeta;
                    AppData.Store store;
                    AppData.EmbeddableQueryInfoProvider embeddableQueryInfoProvider;
                    Map<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> inputStreams = operatorMeta.getInputStreams();
                    Map<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> outputStreams = operatorMeta.getOutputStreams();
                    String queryOperatorName = null;
                    String queryUrl = null;
                    String queryTopic = null;
                    boolean hasEmbeddedQuery = false;
                    if (operatorMeta.getOperator() instanceof AppData.Store && (embeddableQueryInfoProvider = (store = (AppData.Store)operatorMeta.getOperator()).getEmbeddableQueryInfoProvider()) != null) {
                        hasEmbeddedQuery = true;
                        queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
                        queryUrl = embeddableQueryInfoProvider.getAppDataURL();
                        queryTopic = embeddableQueryInfoProvider.getTopic();
                    }
                    LOG.debug("Looking at operator {} {}", (Object)operatorMeta.getName(), (Object)Thread.currentThread().getId());
                    for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
                        portMeta = entry.getKey();
                        if (!((LogicalPlan.InputPortMeta)portMeta).isAppDataQueryPort()) continue;
                        if (queryUrl == null) {
                            LogicalPlan.OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta();
                            if (!(queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider)) continue;
                            if (!hasEmbeddedQuery) {
                                AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator();
                                queryOperatorName = queryOperatorMeta.getName();
                                queryUrl = queryOperator.getAppDataURL();
                                queryTopic = queryOperator.getTopic();
                                continue;
                            }
                            LOG.warn("An embeddable query connector and the {} query operator were discovered. The query operator will be ignored and the embeddable query connector will be used instead.", (Object)operatorMeta.getName());
                            continue;
                        }
                        LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", (Object)operatorMeta.getName());
                        continue block3;
                    }
                    for (Map.Entry<Serializable, LogicalPlan.StreamMeta> entry : outputStreams.entrySet()) {
                        portMeta = (LogicalPlan.OutputPortMeta)entry.getKey();
                        LOG.debug("Looking at port {} {}", (Object)((LogicalPlan.OutputPortMeta)portMeta).getPortName(), (Object)Thread.currentThread().getId());
                        if (!((LogicalPlan.OutputPortMeta)portMeta).isAppDataResultPort()) continue;
                        AppDataSource appDataSource = new AppDataSource();
                        appDataSource.setType(AppDataSource.Type.DAG);
                        appDataSource.setOperatorName(operatorMeta.getName());
                        appDataSource.setPortName(((LogicalPlan.OutputPortMeta)portMeta).getPortName());
                        if (queryOperatorName == null) {
                            LOG.warn("There is no query operator for the App Data Source {}.{}. Ignoring the App Data Source.", (Object)operatorMeta.getName(), (Object)((LogicalPlan.OutputPortMeta)portMeta).getPortName());
                            continue;
                        }
                        appDataSource.setQueryOperatorName(queryOperatorName);
                        appDataSource.setQueryTopic(queryTopic);
                        appDataSource.setQueryUrl(this.convertAppDataUrl(queryUrl));
                        List<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks();
                        if (sinks.isEmpty()) {
                            LOG.warn("There is no result operator for the App Data Source {}.{}. Ignoring the App Data Source.", (Object)operatorMeta.getName(), (Object)((LogicalPlan.OutputPortMeta)portMeta).getPortName());
                            continue;
                        }
                        if (sinks.size() > 1) {
                            LOG.warn("There are multiple result operators for the App Data Source {}.{}. Ignoring the App Data Source.", (Object)operatorMeta.getName(), (Object)((LogicalPlan.OutputPortMeta)portMeta).getPortName());
                            continue;
                        }
                        LogicalPlan.OperatorMeta resultOperatorMeta = sinks.get(0).getOperatorWrapper();
                        if (resultOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
                            AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator();
                            appDataSource.setResultOperatorName(resultOperatorMeta.getName());
                            appDataSource.setResultTopic(resultOperator.getTopic());
                            appDataSource.setResultUrl(this.convertAppDataUrl(resultOperator.getAppDataURL()));
                            AppData.AppendQueryIdToTopic queryIdAppended = resultOperator.getClass().getAnnotation(AppData.AppendQueryIdToTopic.class);
                            if (queryIdAppended != null && queryIdAppended.value()) {
                                appDataSource.setResultAppendQIDTopic(true);
                            }
                        } else {
                            LOG.warn("Result operator for the App Data Source {}.{} does not implement the right interface. Ignoring the App Data Source.", (Object)operatorMeta.getName(), (Object)((LogicalPlan.OutputPortMeta)portMeta).getPortName());
                            continue;
                        }
                        LOG.debug("Adding appDataSource {} {}", (Object)appDataSource.getName(), (Object)Thread.currentThread().getId());
                        this.appDataSources.add(appDataSource);
                    }
                }
            }
        }
        return this.appDataSources;
    }

    public Map<String, Map<String, Object>> getLatestLogicalMetrics() {
        return this.latestLogicalMetrics;
    }

    public void monitorHeartbeat() {
        long currentTms = this.clock.getTime();
        if (!this.pendingAllocation.isEmpty()) {
            if (this.lastResourceRequest + (long)((Integer)this.plan.getLogicalPlan().getValue(LogicalPlan.RESOURCE_ALLOCATION_TIMEOUT_MILLIS)).intValue() < currentTms) {
                String msg = String.format("Shutdown due to resource allocation timeout (%s ms) waiting for %s containers", currentTms - this.lastResourceRequest, this.pendingAllocation.size());
                LOG.warn(msg);
                for (PTContainer c : this.pendingAllocation) {
                    LOG.warn("Waiting for resource: {}m priority: {} {}", new Object[]{c.getRequiredMemoryMB(), c.getResourceRequestPriority(), c});
                }
                this.shutdownAllContainers(msg);
                this.forcedShutdown = true;
            } else {
                for (PTContainer c : this.pendingAllocation) {
                    LOG.debug("Waiting for resource: {}m {}", (Object)c.getRequiredMemoryMB(), (Object)c);
                }
            }
        }
        for (StreamingContainerAgent sca : this.containers.values()) {
            PTContainer c;
            c = sca.container;
            if (this.pendingAllocation.contains(c) || c.getExternalId() == null) continue;
            if (sca.lastHeartbeatMillis == 0L) {
                if (currentTms - sca.createdMillis <= (long)(2 * this.vars.heartbeatTimeoutMillis)) continue;
                LOG.info("Container {}@{} startup timeout ({} ms).", new Object[]{c.getExternalId(), c.host, currentTms - sca.createdMillis});
                this.containerStopRequests.put(c.getExternalId(), c.getExternalId());
                continue;
            }
            if (currentTms - sca.lastHeartbeatMillis <= (long)this.vars.heartbeatTimeoutMillis || this.isApplicationIdle()) continue;
            LOG.info("Container {}@{} heartbeat timeout ({} ms).", new Object[]{c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis});
            this.containerStopRequests.put(c.getExternalId(), c.getExternalId());
        }
        this.processEvents();
        this.committedWindowId = this.updateCheckpoints(false);
        this.calculateEndWindowStats();
        if (this.vars.enableStatsRecording) {
            this.recordStats(currentTms);
        }
    }

    private void recordStats(long currentTms) {
        try {
            this.statsRecorder.recordContainers(this.containers, currentTms);
            this.statsRecorder.recordOperators(this.getOperatorInfoList(), currentTms);
        }
        catch (Exception ex) {
            LOG.warn("Exception caught when recording stats", (Throwable)ex);
        }
    }

    private void calculateEndWindowStats() {
        Map<Integer, PTOperator> allOperators = this.plan.getAllOperators();
        UpdateOperatorLatencyContext ctx = new UpdateOperatorLatencyContext(this.rpcLatencies, this.endWindowStatsOperatorMap);
        for (PTOperator operator : allOperators.values()) {
            this.updateOperatorLatency(operator, ctx);
        }
        if (!this.endWindowStatsOperatorMap.isEmpty()) {
            if (this.endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
                LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", (Object)this.vars.maxWindowsBehindForStats);
                while (this.endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
                    LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}", new Object[]{this.endWindowStatsOperatorMap.firstKey(), this.endWindowStatsOperatorMap.get(this.endWindowStatsOperatorMap.firstKey()).keySet(), allOperators.keySet()});
                    this.endWindowStatsOperatorMap.remove(this.endWindowStatsOperatorMap.firstKey());
                }
            }
            int numOperators = allOperators.size();
            Long windowId = this.endWindowStatsOperatorMap.firstKey();
            while (windowId != null) {
                Map<Integer, EndWindowStats> endWindowStatsMap = this.endWindowStatsOperatorMap.get(windowId);
                Set<Integer> endWindowStatsOperators = endWindowStatsMap.keySet();
                this.aggregateMetrics(windowId, endWindowStatsMap);
                this.criticalPathInfo = this.findCriticalPath();
                if (allOperators.keySet().containsAll(endWindowStatsOperators)) {
                    if (endWindowStatsMap.size() < numOperators) {
                        if (windowId >= this.completeEndWindowStatsWindowId) break;
                        LOG.debug("Disregarding stale end window stats for window {}", (Object)windowId);
                        this.endWindowStatsOperatorMap.remove(windowId);
                    } else {
                        this.endWindowStatsOperatorMap.remove(windowId);
                        this.currentEndWindowStatsWindowId = windowId;
                    }
                } else {
                    LOG.debug("Stats for non-existent operators detected. Disregarding end window stats for window {}", (Object)windowId);
                    this.endWindowStatsOperatorMap.remove(windowId);
                }
                windowId = this.endWindowStatsOperatorMap.higherKey(windowId);
            }
        }
    }

    private void aggregateMetrics(long windowId, Map<Integer, EndWindowStats> endWindowStatsMap) {
        EndWindowStats stats;
        Collection<PTOperator> physicalOperators;
        Context.CountersAggregator aggregator;
        Collection<LogicalPlan.OperatorMeta> logicalOperators = this.getLogicalPlan().getAllOperators();
        for (LogicalPlan.OperatorMeta operatorMeta : logicalOperators) {
            aggregator = (Context.CountersAggregator)operatorMeta.getValue(Context.OperatorContext.COUNTERS_AGGREGATOR);
            if (aggregator == null) continue;
            physicalOperators = this.plan.getAllOperators(operatorMeta);
            ArrayList counters = Lists.newArrayList();
            for (PTOperator operator : physicalOperators) {
                stats = endWindowStatsMap.get(operator.getId());
                if (stats == null || stats.counters == null) continue;
                counters.add(stats.counters);
            }
            if (counters.size() <= 0) continue;
            Object aggregate = aggregator.aggregate((Collection)counters);
            this.latestLogicalCounters.put(operatorMeta.getName(), aggregate);
        }
        for (LogicalPlan.OperatorMeta operatorMeta : logicalOperators) {
            Map lm;
            AutoMetric.Aggregator aggregator2 = aggregator = operatorMeta.getMetricAggregatorMeta() != null ? operatorMeta.getMetricAggregatorMeta().getAggregator() : null;
            if (aggregator == null) continue;
            physicalOperators = this.plan.getAllOperators(operatorMeta);
            ArrayList metricPool = Lists.newArrayList();
            for (PTOperator operator : physicalOperators) {
                stats = endWindowStatsMap.get(operator.getId());
                if (stats == null || stats.metrics == null) continue;
                PhysicalMetricsContextImpl physicalMetrics = new PhysicalMetricsContextImpl(operator.getId(), stats.metrics);
                metricPool.add(physicalMetrics);
            }
            if (metricPool.isEmpty() || (lm = aggregator.aggregate(windowId, (Collection)metricPool)) == null || lm.size() <= 0) continue;
            LinkedBlockingQueue<Pair<Long, Map<String, Object>>> windowMetrics = this.logicalMetrics.get(operatorMeta.getName());
            if (windowMetrics == null) {
                windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(1000){
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean add(Pair<Long, Map<String, Object>> longMapPair) {
                        if (this.remainingCapacity() <= 1) {
                            this.remove();
                        }
                        return super.add(longMapPair);
                    }
                };
                this.logicalMetrics.put(operatorMeta.getName(), (Queue<Pair<Long, Map<String, Object>>>)windowMetrics);
            }
            LOG.debug("Adding to logical metrics for {}", (Object)operatorMeta.getName());
            windowMetrics.add(new Pair((Object)windowId, (Object)lm));
            Map<String, Object> oldValue = this.latestLogicalMetrics.put(operatorMeta.getName(), lm);
            if (oldValue != null) continue;
            try {
                this.saveMetaInfo();
            }
            catch (IOException ex) {
                LOG.error("Cannot save application meta info to DFS. App data sources will not be available.", (Throwable)ex);
            }
        }
    }

    private void saveMetaInfo() throws IOException {
        Path file = new Path(this.vars.appPath, "meta.json." + System.nanoTime());
        try (FSDataOutputStream os = this.fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[]{Options.CreateOpts.CreateParent.createParent()});){
            JSONObject top = new JSONObject();
            JSONObject attributes = new JSONObject();
            for (Map.Entry entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
                attributes.put(((Attribute)entry.getKey()).getSimpleName(), entry.getValue());
            }
            JSONObject autoMetrics = new JSONObject();
            for (Map.Entry<String, Map<String, Object>> entry : this.latestLogicalMetrics.entrySet()) {
                autoMetrics.put(entry.getKey(), (Object)new JSONArray(entry.getValue().keySet()));
            }
            top.put(APP_META_KEY_ATTRIBUTES, (Object)attributes);
            top.put(APP_META_KEY_METRICS, (Object)autoMetrics);
            os.write(top.toString().getBytes());
        }
        catch (JSONException ex) {
            throw new RuntimeException(ex);
        }
        Path origPath = new Path(this.vars.appPath, APP_META_FILENAME);
        this.fileContext.rename(file, origPath, new Options.Rename[]{Options.Rename.OVERWRITE});
    }

    public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName) {
        return this.logicalMetrics.get(operatorName);
    }

    private CriticalPathInfo findCriticalPath() {
        CriticalPathInfo result = null;
        List<PTOperator> leafOperators = this.plan.getLeafOperators();
        HashMap<PTOperator, CriticalPathInfo> cache = new HashMap<PTOperator, CriticalPathInfo>();
        for (PTOperator leafOperator : leafOperators) {
            CriticalPathInfo cpi = this.findCriticalPathHelper(leafOperator, cache);
            if (result != null && result.latency >= cpi.latency) continue;
            result = cpi;
        }
        return result;
    }

    private CriticalPathInfo findCriticalPathHelper(PTOperator operator, Map<PTOperator, CriticalPathInfo> cache) {
        CriticalPathInfo cpi = cache.get(operator);
        if (cpi != null) {
            return cpi;
        }
        PTOperator slowestUpstreamOperator = (PTOperator)this.slowestUpstreamOp.get(operator);
        if (slowestUpstreamOperator != null) {
            cpi = this.findCriticalPathHelper(slowestUpstreamOperator, cache);
            try {
                cpi = (CriticalPathInfo)cpi.clone();
            }
            catch (CloneNotSupportedException ex) {
                throw new RuntimeException();
            }
        } else {
            cpi = new CriticalPathInfo();
        }
        cpi.latency += operator.stats.getLatencyMA();
        cpi.path.addLast(operator.getId());
        cache.put(operator, cpi);
        return cpi;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int processEvents() {
        Runnable command;
        for (PTOperator o : this.reportStats.keySet()) {
            List<Stats.OperatorStats> stats = o.stats.listenerStats.poll();
            if (stats != null) {
                List<Stats.OperatorStats> moreStats;
                while ((moreStats = o.stats.listenerStats.poll()) != null) {
                    stats.addAll(moreStats);
                }
            }
            o.stats.lastWindowedStats = stats;
            o.stats.operatorResponses = null;
            if (!o.stats.responses.isEmpty()) {
                StatsListener.OperatorResponse operatorResponse;
                o.stats.operatorResponses = new ArrayList<StatsListener.OperatorResponse>();
                while ((operatorResponse = o.stats.responses.poll()) != null) {
                    o.stats.operatorResponses.add(operatorResponse);
                }
            }
            if (o.stats.lastWindowedStats != null && o.statsListeners != null) {
                this.plan.onStatusUpdate(o);
            }
            this.reportStats.remove(o);
        }
        if (!this.shutdownOperators.isEmpty()) {
            Iterator<PTOperator> i$ = this.shutdownOperators;
            synchronized (i$) {
                Iterator<Map.Entry<Long, Set<PTOperator>>> it = this.shutdownOperators.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next();
                    if (windowAndOpers.getKey() > this.committedWindowId && !this.checkDownStreamOperators(windowAndOpers)) continue;
                    LOG.info("Removing inactive operators at window {} {}", (Object)Codec.getStringWindowId((long)windowAndOpers.getKey()), windowAndOpers.getValue());
                    for (PTOperator oper : windowAndOpers.getValue()) {
                        this.plan.removeTerminatedPartition(oper);
                    }
                    it.remove();
                }
            }
        }
        if (!this.eventQueue.isEmpty()) {
            for (PTOperator oper : this.plan.getAllOperators().values()) {
                if (oper.getState() == PTOperator.State.ACTIVE) continue;
                LOG.debug("Skipping plan updates due to inactive operator {} {}", (Object)oper, (Object)oper.getState());
                return 0;
            }
        }
        int count = 0;
        while ((command = this.eventQueue.poll()) != null) {
            this.eventQueueProcessing.set(true);
            try {
                command.run();
                ++count;
            }
            catch (Exception e) {
                LOG.error("Failed to execute {}", (Object)command, (Object)e);
            }
            this.eventQueueProcessing.set(false);
        }
        if (count > 0) {
            try {
                this.checkpoint();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to checkpoint state.", e);
            }
        }
        return count;
    }

    private boolean checkDownStreamOperators(Map.Entry<Long, Set<PTOperator>> windowAndOpers) {
        Set<PTOperator> downStreamOperators = this.getPhysicalPlan().getDependents((Collection<PTOperator>)windowAndOpers.getValue());
        for (PTOperator oper : downStreamOperators) {
            long windowId = oper.stats.currentWindowId.get();
            if (windowId >= windowAndOpers.getKey()) continue;
            return false;
        }
        return true;
    }

    public void scheduleContainerRestart(String containerId) {
        StreamingContainerAgent cs = this.getContainerAgent(containerId);
        if (cs == null || cs.shutdownRequested) {
            return;
        }
        LOG.info("Initiating recovery for {}@{}", (Object)containerId, (Object)cs.container.host);
        cs.container.setState(PTContainer.State.KILLED);
        cs.container.bufferServerAddress = null;
        cs.container.setResourceRequestPriority(-1);
        cs.container.setAllocatedMemoryMB(0);
        cs.container.setAllocatedVCores(0);
        UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(this.clock, false, this.getCheckpointGroups());
        for (PTOperator oper : cs.container.getOperators()) {
            this.updateRecoveryCheckpoints(oper, ctx);
        }
        this.includeLocalUpstreamOperators(ctx);
        LOG.info("Affected operators {}", ctx.visited);
        this.deploy(Collections.emptySet(), ctx.visited, Sets.newHashSet((Object[])new PTContainer[]{cs.container}), ctx.visited);
    }

    private void includeLocalUpstreamOperators(UpdateCheckpointsContext ctx) {
        HashSet newOperators = Sets.newHashSet();
        do {
            newOperators.clear();
            for (PTOperator oper : ctx.visited) {
                for (PTOperator.PTInput input : oper.getInputs()) {
                    if (input.source.source.getContainer() != oper.getContainer() || ctx.visited.contains(input.source.source)) continue;
                    newOperators.add(input.source.source);
                }
            }
            if (newOperators.isEmpty()) continue;
            for (PTOperator oper : newOperators) {
                this.updateRecoveryCheckpoints(oper, ctx);
            }
        } while (!newOperators.isEmpty());
    }

    public void removeContainerAgent(String containerId) {
        LOG.debug("Removing container agent {}", (Object)containerId);
        StreamingContainerAgent containerAgent = this.containers.remove(containerId);
        if (containerAgent != null) {
            for (PTOperator oper : containerAgent.container.getOperators()) {
                StramEvent.StopOperatorEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId);
                this.recordEventAsync(ev);
            }
            containerAgent.container.setFinishedTime(System.currentTimeMillis());
            containerAgent.container.setState(PTContainer.State.KILLED);
            this.completedContainers.put(containerId, containerAgent.getContainerInfo());
        }
    }

    public Collection<ContainerInfo> getCompletedContainerInfo() {
        return Collections.unmodifiableCollection(this.completedContainers.values());
    }

    public StreamingContainerAgent assignContainer(ContainerResource resource, InetSocketAddress bufferServerAddr) {
        PTContainer container = null;
        for (PTContainer c : this.pendingAllocation) {
            if (c.getState() != PTContainer.State.NEW && c.getState() != PTContainer.State.KILLED || c.getResourceRequestPriority() != resource.priority) continue;
            container = c;
            break;
        }
        if (container == null) {
            LOG.debug("No container matching allocated resource {}", (Object)resource);
            LOG.debug("Containers waiting for allocation {}", this.pendingAllocation);
            return null;
        }
        this.pendingAllocation.remove(container);
        container.setState(PTContainer.State.ALLOCATED);
        if (container.getExternalId() != null) {
            LOG.info("Removing container agent {}", (Object)container.getExternalId());
            this.containers.remove(container.getExternalId());
        }
        container.setExternalId(resource.containerId);
        container.host = resource.host;
        container.bufferServerAddress = bufferServerAddr;
        if (UserGroupInformation.isSecurityEnabled()) {
            byte[] token = AuthManager.generateToken();
            container.setBufferServerToken(token);
        }
        container.nodeHttpAddress = resource.nodeHttpAddress;
        container.setAllocatedMemoryMB(resource.memoryMB);
        container.setAllocatedVCores(resource.vCores);
        container.setStartedTime(-1L);
        container.setFinishedTime(-1L);
        this.writeJournal(container.getSetContainerState());
        StreamingContainerAgent sca = new StreamingContainerAgent(container, this.newStreamingContainerContext(container), this);
        this.containers.put(resource.containerId, sca);
        LOG.debug("Assigned container {} priority {}", (Object)resource.containerId, (Object)resource.priority);
        return sca;
    }

    private StreamingContainerUmbilicalProtocol.StreamingContainerContext newStreamingContainerContext(PTContainer container) {
        try {
            int bufferServerMemory = 0;
            Iterator<PTOperator> operatorIterator = container.getOperators().iterator();
            while (operatorIterator.hasNext()) {
                bufferServerMemory += operatorIterator.next().getBufferServerMemory();
            }
            LOG.debug("Buffer Server Memory {}", (Object)bufferServerMemory);
            StreamingContainerUmbilicalProtocol.StreamingContainerContext scc = new StreamingContainerUmbilicalProtocol.StreamingContainerContext(this.plan.getLogicalPlan().getAttributes().clone(), null);
            scc.attributes.put(ContainerContext.IDENTIFIER, (Object)container.getExternalId());
            scc.attributes.put(ContainerContext.BUFFER_SERVER_MB, (Object)bufferServerMemory);
            scc.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, (Object)container.getBufferServerToken());
            scc.startWindowMillis = this.vars.windowStartMillis;
            return scc;
        }
        catch (CloneNotSupportedException ex) {
            throw new RuntimeException("Cannot clone DAG attributes", ex);
        }
    }

    public StreamingContainerAgent getContainerAgent(String containerId) {
        StreamingContainerAgent cs = this.containers.get(containerId);
        if (cs == null) {
            LOG.warn("Trying to get unknown container {}", (Object)containerId);
        }
        return cs;
    }

    public Collection<StreamingContainerAgent> getContainerAgents() {
        return this.containers.values();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOperatorDeployStatus(PTOperator oper, StreamingContainerUmbilicalProtocol.OperatorHeartbeat ohb, StreamingContainerAgent sca) {
        StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState ds = null;
        if (ohb != null) {
            ds = ohb.getState();
        }
        LOG.debug("heartbeat {} {}/{} {}", new Object[]{oper, oper.getState(), ds, oper.getContainer().getExternalId()});
        block2 : switch (oper.getState()) {
            case ACTIVE: {
                if (ds == null) {
                    sca.deployOpers.add(oper);
                    break;
                }
                switch (ds) {
                    case SHUTDOWN: {
                        long windowId = oper.stats.currentWindowId.get();
                        if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
                            windowId = ohb.windowStats.get((int)(ohb.windowStats.size() - 1)).windowId;
                        }
                        LOG.debug("Operator {} deactivated at window {}", (Object)oper, (Object)windowId);
                        Map<Long, Set<PTOperator>> map = this.shutdownOperators;
                        synchronized (map) {
                            Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId);
                            if (deactivatedOpers == null) {
                                deactivatedOpers = new HashSet<PTOperator>();
                                this.shutdownOperators.put(windowId, deactivatedOpers);
                            }
                            deactivatedOpers.add(oper);
                        }
                        sca.undeployOpers.add(oper.getId());
                        this.slowestUpstreamOp.remove(oper);
                        this.recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
                        break block2;
                    }
                    case FAILED: {
                        this.processOperatorFailure(oper);
                        sca.undeployOpers.add(oper.getId());
                        this.slowestUpstreamOp.remove(oper);
                        this.recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
                        break block2;
                    }
                }
                break;
            }
            case PENDING_UNDEPLOY: {
                if (ds == null) {
                    this.recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
                    oper.setState(PTOperator.State.PENDING_DEPLOY);
                    sca.deployOpers.add(oper);
                    break;
                }
                sca.undeployOpers.add(oper.getId());
                this.slowestUpstreamOp.remove(oper);
                break;
            }
            case PENDING_DEPLOY: {
                if (ds == null) {
                    sca.deployOpers.add(oper);
                    break;
                }
                PTContainer container = oper.getContainer();
                LOG.debug("{} marking deployed: {} remote status {}", new Object[]{container.getExternalId(), oper, ds});
                oper.setState(PTOperator.State.ACTIVE);
                oper.stats.lastHeartbeat = null;
                oper.stats.lastWindowIdChangeTms = this.clock.getTime();
                this.recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(), container.getExternalId()));
                break;
            }
            default: {
                if (ds == null) break;
                sca.undeployOpers.add(oper.getId());
                this.slowestUpstreamOp.remove(oper);
                this.recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));
            }
        }
    }

    private void processOperatorFailure(PTOperator oper) {
        if (oper.getState() == PTOperator.State.ACTIVE) {
            oper.setState(PTOperator.State.INACTIVE);
            ++oper.failureCount;
            ++oper.getOperatorMeta().getStatus().failureCount;
            LOG.warn("Operator failure: {} count: {}", (Object)oper, (Object)oper.failureCount);
            Integer maxAttempts = (Integer)oper.getOperatorMeta().getValue(Context.OperatorContext.RECOVERY_ATTEMPTS);
            if (maxAttempts == null || oper.failureCount <= maxAttempts) {
                LOG.error("Initiating container restart after operator failure {}", (Object)oper);
                this.containerStopRequests.put(oper.getContainer().getExternalId(), oper.getContainer().getExternalId());
            } else {
                String msg = String.format("Shutdown after reaching failure threshold for %s", oper);
                LOG.warn(msg);
                this.shutdownAllContainers(msg);
                this.forcedShutdown = true;
            }
        } else {
            LOG.warn("Failed operator {} {} {} to be undeployed by container", (Object)oper, (Object)oper.getState());
        }
    }

    public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat) {
        StreamingContainerUmbilicalProtocol.StramToNodeRequest r;
        long currentTimeMillis = this.clock.getTime();
        final StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId());
        if (sca == null || sca.container.getState() == PTContainer.State.KILLED) {
            LOG.error("Unknown container {}", (Object)heartbeat.getContainerId());
            StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse response = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
            response.shutdown = true;
            return response;
        }
        if (sca.container.getState() == PTContainer.State.ALLOCATED) {
            if (sca.container.bufferServerAddress == null && heartbeat.bufferServerHost != null) {
                sca.container.bufferServerAddress = InetSocketAddress.createUnresolved(heartbeat.bufferServerHost, heartbeat.bufferServerPort);
                LOG.info("Container {} buffer server: {}", (Object)sca.container.getExternalId(), (Object)sca.container.bufferServerAddress);
            }
            final long containerStartTime = System.currentTimeMillis();
            sca.container.setState(PTContainer.State.ACTIVE);
            sca.container.setStartedTime(containerStartTime);
            sca.container.setFinishedTime(-1L);
            sca.jvmName = heartbeat.jvmName;
            this.poolExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        StreamingContainerManager.this.containerFile.append(sca.getContainerInfo());
                    }
                    catch (IOException ex) {
                        LOG.warn("Cannot write to container file");
                    }
                    for (PTOperator ptOp : sca.container.getOperators()) {
                        try {
                            JSONObject operatorInfo = new JSONObject();
                            operatorInfo.put("name", (Object)ptOp.getName());
                            operatorInfo.put("id", ptOp.getId());
                            operatorInfo.put("container", (Object)sca.container.getExternalId());
                            operatorInfo.put("startTime", containerStartTime);
                            StreamingContainerManager.this.operatorFile.append(operatorInfo);
                        }
                        catch (IOException | JSONException ex) {
                            LOG.warn("Cannot write to operator file: ", ex);
                        }
                    }
                }
            });
        }
        sca.containerStackTrace = heartbeat.stackTrace;
        if (heartbeat.restartRequested) {
            LOG.error("Container {} restart request", (Object)sca.container.getExternalId());
            this.containerStopRequests.put(sca.container.getExternalId(), sca.container.getExternalId());
        }
        sca.memoryMBFree = heartbeat.memoryMBFree;
        sca.gcCollectionCount = heartbeat.gcCollectionCount;
        sca.gcCollectionTime = heartbeat.gcCollectionTime;
        sca.undeployOpers.clear();
        sca.deployOpers.clear();
        if (!this.deployChangeInProgress.get()) {
            sca.deployCnt = this.deployChangeCnt;
        }
        HashSet reportedOperators = Sets.newHashSetWithExpectedSize((int)sca.container.getOperators().size());
        for (StreamingContainerUmbilicalProtocol.OperatorHeartbeat shb : heartbeat.getContainerStats().operators) {
            long maxEndWindowTimestamp = 0L;
            reportedOperators.add(shb.nodeId);
            PTOperator oper = this.plan.getAllOperators().get(shb.getNodeId());
            if (oper == null) {
                LOG.info("Heartbeat for unknown operator {} (container {})", (Object)shb.getNodeId(), (Object)heartbeat.getContainerId());
                sca.undeployOpers.add(shb.nodeId);
                continue;
            }
            if (shb.requestResponse != null) {
                for (StatsListener.OperatorResponse obj : shb.requestResponse) {
                    if (obj instanceof OperatorResponse) {
                        this.commandResponse.put((Object)((Long)obj.getResponseId()), obj.getResponse());
                        LOG.debug(" Got back the response {} for the request {}", (Object)obj, obj.getResponseId());
                        continue;
                    }
                    oper.stats.responses.add(obj);
                }
            }
            if (oper.getState() != PTOperator.State.ACTIVE || shb.getState() != StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE) {
                this.processOperatorDeployStatus(oper, shb, sca);
            }
            oper.stats.lastHeartbeat = shb;
            ArrayList<Stats.OperatorStats> statsList = shb.getOperatorStatsContainer();
            if (!statsList.isEmpty()) {
                long lastMaxEndWindowTimestamp;
                long tuplesProcessed = 0L;
                long tuplesEmitted = 0L;
                long totalCpuTimeUsed = 0L;
                int statCount = 0;
                long maxDequeueTimestamp = -1L;
                oper.stats.recordingId = null;
                OperatorStatus status = oper.stats;
                status.statsRevs.checkout();
                for (Map.Entry<String, OperatorStatus.PortStatus> entry : status.inputPortStatusList.entrySet()) {
                    entry.getValue().recordingId = null;
                }
                for (Map.Entry<String, OperatorStatus.PortStatus> entry : status.outputPortStatusList.entrySet()) {
                    entry.getValue().recordingId = null;
                }
                for (Stats.OperatorStats stats : statsList) {
                    Map.Entry<String, OperatorStatus.PortStatus> entry;
                    Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it;
                    long portElapsedMillis;
                    Long lastEndWindowTimestamp;
                    Pair operatorPortName;
                    OperatorStatus.PortStatus ps;
                    if (stats == null) {
                        LOG.warn("Operator {} statistics list contains null element", (Object)shb.getNodeId());
                        continue;
                    }
                    if (stats.checkpoint instanceof Checkpoint && (oper.getRecentCheckpoint() == null || oper.getRecentCheckpoint().windowId < stats.checkpoint.getWindowId())) {
                        this.addCheckpoint(oper, (Checkpoint)stats.checkpoint);
                        if (stats.checkpointStats != null) {
                            status.checkpointStats = stats.checkpointStats;
                            status.checkpointTimeMA.add(stats.checkpointStats.checkpointTime);
                        }
                        oper.failureCount = 0;
                    }
                    oper.stats.recordingId = stats.recordingId;
                    EndWindowStats endWindowStats = new EndWindowStats();
                    ArrayList ports = stats.inputPorts;
                    if (ports != null) {
                        HashSet currentInputPortSet = Sets.newHashSetWithExpectedSize((int)ports.size());
                        for (Stats.OperatorStats.PortStats s : ports) {
                            currentInputPortSet.add(s.id);
                            ps = status.inputPortStatusList.get(s.id);
                            if (ps == null) {
                                ps = new OperatorStatus.PortStatus(status);
                                ps.portName = s.id;
                                status.inputPortStatusList.put(s.id, ps);
                            }
                            ps.totalTuples += (long)s.tupleCount;
                            ps.recordingId = s.recordingId;
                            tuplesProcessed += (long)s.tupleCount;
                            endWindowStats.dequeueTimestamps.put(s.id, s.endWindowTimestamp);
                            operatorPortName = new Pair((Object)oper.getId(), (Object)s.id);
                            lastEndWindowTimestamp = this.operatorPortLastEndWindowTimestamps.get(operatorPortName);
                            if (lastEndWindowTimestamp == null) {
                                lastEndWindowTimestamp = this.lastStatsTimestamp;
                            }
                            portElapsedMillis = Math.max(s.endWindowTimestamp - lastEndWindowTimestamp, 0L);
                            ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis);
                            ps.bufferServerBytesPMSMA.add(s.bufferServerBytes, portElapsedMillis);
                            ps.queueSizeMA.add(s.queueSize);
                            this.operatorPortLastEndWindowTimestamps.put((Pair<Integer, String>)operatorPortName, s.endWindowTimestamp);
                            if (maxEndWindowTimestamp < s.endWindowTimestamp) {
                                maxEndWindowTimestamp = s.endWindowTimestamp;
                            }
                            if (s.endWindowTimestamp <= maxDequeueTimestamp) continue;
                            maxDequeueTimestamp = s.endWindowTimestamp;
                        }
                        it = status.inputPortStatusList.entrySet().iterator();
                        while (it.hasNext()) {
                            entry = it.next();
                            if (currentInputPortSet.contains(entry.getKey())) continue;
                            it.remove();
                        }
                    }
                    if ((ports = stats.outputPorts) != null) {
                        HashSet currentOutputPortSet = Sets.newHashSetWithExpectedSize((int)ports.size());
                        for (Stats.OperatorStats.PortStats s : ports) {
                            currentOutputPortSet.add(s.id);
                            ps = status.outputPortStatusList.get(s.id);
                            if (ps == null) {
                                ps = new OperatorStatus.PortStatus(status);
                                ps.portName = s.id;
                                status.outputPortStatusList.put(s.id, ps);
                            }
                            ps.totalTuples += (long)s.tupleCount;
                            ps.recordingId = s.recordingId;
                            tuplesEmitted += (long)s.tupleCount;
                            operatorPortName = new Pair((Object)oper.getId(), (Object)s.id);
                            lastEndWindowTimestamp = this.operatorPortLastEndWindowTimestamps.get(operatorPortName);
                            if (lastEndWindowTimestamp == null) {
                                lastEndWindowTimestamp = this.lastStatsTimestamp;
                            }
                            portElapsedMillis = Math.max(s.endWindowTimestamp - lastEndWindowTimestamp, 0L);
                            ps.tuplesPMSMA.add(s.tupleCount, portElapsedMillis);
                            ps.bufferServerBytesPMSMA.add(s.bufferServerBytes, portElapsedMillis);
                            this.operatorPortLastEndWindowTimestamps.put((Pair<Integer, String>)operatorPortName, s.endWindowTimestamp);
                            if (maxEndWindowTimestamp >= s.endWindowTimestamp) continue;
                            maxEndWindowTimestamp = s.endWindowTimestamp;
                        }
                        if (ports.size() > 0) {
                            endWindowStats.emitTimestamp = ((Stats.OperatorStats.PortStats)ports.iterator().next()).endWindowTimestamp;
                        }
                        it = status.outputPortStatusList.entrySet().iterator();
                        while (it.hasNext()) {
                            entry = it.next();
                            if (currentOutputPortSet.contains(entry.getKey())) continue;
                            it.remove();
                        }
                    }
                    if (endWindowStats.emitTimestamp < 0L) {
                        endWindowStats.emitTimestamp = maxDequeueTimestamp;
                    }
                    if (status.currentWindowId.get() != stats.windowId) {
                        status.lastWindowIdChangeTms = currentTimeMillis;
                        status.currentWindowId.set(stats.windowId);
                    }
                    totalCpuTimeUsed += stats.cpuTimeUsed;
                    ++statCount;
                    if (oper.getOperatorMeta().getValue(Context.OperatorContext.COUNTERS_AGGREGATOR) != null) {
                        endWindowStats.counters = stats.counters;
                    }
                    if (oper.getOperatorMeta().getMetricAggregatorMeta() != null && oper.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) {
                        endWindowStats.metrics = stats.metrics;
                    }
                    if (stats.windowId <= this.currentEndWindowStatsWindowId) continue;
                    Map<Integer, EndWindowStats> endWindowStatsMap = this.endWindowStatsOperatorMap.get(stats.windowId);
                    if (endWindowStatsMap == null) {
                        endWindowStatsMap = new ConcurrentSkipListMap<Integer, EndWindowStats>();
                        Map<Integer, EndWindowStats> endWindowStatsMapPrevious = this.endWindowStatsOperatorMap.putIfAbsent(stats.windowId, endWindowStatsMap);
                        if (endWindowStatsMapPrevious != null) {
                            endWindowStatsMap = endWindowStatsMapPrevious;
                        }
                    }
                    endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
                    Set<Integer> allCurrentOperators = this.plan.getAllOperators().keySet();
                    int numOperators = this.plan.getAllOperators().size();
                    if (!allCurrentOperators.containsAll(endWindowStatsMap.keySet()) || endWindowStatsMap.size() != numOperators) continue;
                    this.completeEndWindowStatsWindowId = stats.windowId;
                }
                status.totalTuplesProcessed.add(tuplesProcessed);
                status.totalTuplesEmitted.add(tuplesEmitted);
                LogicalPlan.OperatorMeta logicalOperator = oper.getOperatorMeta();
                LogicalOperatorStatus logicalStatus = logicalOperator.getStatus();
                if (!oper.isUnifier()) {
                    logicalStatus.totalTuplesProcessed += tuplesProcessed;
                    logicalStatus.totalTuplesEmitted += tuplesEmitted;
                }
                long l = lastMaxEndWindowTimestamp = this.operatorLastEndWindowTimestamps.containsKey(oper.getId()) ? this.operatorLastEndWindowTimestamps.get(oper.getId()) : this.lastStatsTimestamp;
                if (maxEndWindowTimestamp >= lastMaxEndWindowTimestamp) {
                    double tuplesProcessedPMSMA = 0.0;
                    double tuplesEmittedPMSMA = 0.0;
                    if (statCount != 0) {
                        status.cpuNanosPMSMA.add(totalCpuTimeUsed, maxEndWindowTimestamp - lastMaxEndWindowTimestamp);
                    }
                    for (OperatorStatus.PortStatus ps : status.inputPortStatusList.values()) {
                        tuplesProcessedPMSMA += ps.tuplesPMSMA.getAvg();
                    }
                    for (OperatorStatus.PortStatus ps : status.outputPortStatusList.values()) {
                        tuplesEmittedPMSMA += ps.tuplesPMSMA.getAvg();
                    }
                    status.tuplesProcessedPSMA.set(Math.round(tuplesProcessedPMSMA * 1000.0));
                    status.tuplesEmittedPSMA.set(Math.round(tuplesEmittedPMSMA * 1000.0));
                }
                this.operatorLastEndWindowTimestamps.put(oper.getId(), maxEndWindowTimestamp);
                status.listenerStats.add(statsList);
                this.reportStats.put(oper, oper);
                status.statsRevs.commit();
            }
            if (this.lastStatsTimestamp >= maxEndWindowTimestamp) continue;
            this.lastStatsTimestamp = maxEndWindowTimestamp;
        }
        sca.lastHeartbeatMillis = currentTimeMillis;
        for (PTOperator oper : sca.container.getOperators()) {
            if (reportedOperators.contains(oper.getId())) continue;
            this.processOperatorDeployStatus(oper, null, sca);
        }
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse rsp = this.getHeartbeatResponse(sca);
        if (heartbeat.getContainerStats().operators.isEmpty() && this.isApplicationIdle()) {
            LOG.info("requesting idle shutdown for container {}", (Object)heartbeat.getContainerId());
            rsp.shutdown = true;
        } else if (sca.shutdownRequested) {
            LOG.info("requesting shutdown for container {}", (Object)heartbeat.getContainerId());
            rsp.shutdown = true;
        }
        ArrayList<StreamingContainerUmbilicalProtocol.StramToNodeRequest> requests = rsp.nodeRequests != null ? rsp.nodeRequests : new ArrayList<StreamingContainerUmbilicalProtocol.StramToNodeRequest>();
        ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> operatorRequests = sca.getOperatorRequests();
        while ((r = operatorRequests.poll()) != null) {
            requests.add(r);
        }
        rsp.nodeRequests = requests;
        rsp.committedWindowId = this.committedWindowId;
        rsp.stackTraceRequired = sca.stackTraceRequested;
        sca.stackTraceRequested = false;
        return rsp;
    }

    public long updateOperatorLatency(PTOperator oper, UpdateOperatorLatencyContext ctx) {
        if (!oper.getInputs().isEmpty()) {
            OperatorStatus status = oper.stats;
            long latency = Long.MAX_VALUE;
            PTOperator slowestUpstream = null;
            int windowWidthMillis = (Integer)this.plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
            int heartbeatTimeoutMillis = (Integer)this.plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS);
            long currentWindowId = status.currentWindowId.get();
            if (!ctx.endWindowStatsExists(currentWindowId)) {
                for (PTOperator.PTInput input : oper.getInputs()) {
                    long portLatency;
                    PTOperator upstreamOp = input.source.source;
                    if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator || upstreamOp.stats.currentWindowId.get() <= oper.stats.currentWindowId.get() || latency <= (portLatency = WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * (long)windowWidthMillis)) continue;
                    latency = portLatency;
                    slowestUpstream = upstreamOp;
                }
            } else {
                long endWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, oper);
                long adjustedEndWindowEmitTimestamp = endWindowEmitTime + ctx.getRPCLatency(oper);
                for (PTOperator.PTInput input : oper.getInputs()) {
                    long latencyFromWindowsBehind;
                    long upstreamEndWindowEmitTime;
                    PTOperator upstreamOp = input.source.source;
                    if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator || (upstreamEndWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, upstreamOp)) < 0L) continue;
                    long portLatency = adjustedEndWindowEmitTimestamp - (upstreamEndWindowEmitTime + ctx.getRPCLatency(upstreamOp));
                    if (portLatency < 0L) {
                        portLatency = 0L;
                    }
                    if ((latencyFromWindowsBehind = WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * (long)windowWidthMillis) > portLatency && latencyFromWindowsBehind > (long)heartbeatTimeoutMillis) {
                        portLatency = latencyFromWindowsBehind;
                    }
                    if (latency <= portLatency) continue;
                    latency = portLatency;
                    slowestUpstream = upstreamOp;
                }
            }
            if (slowestUpstream != null) {
                status.latencyMA.add(latency);
                this.slowestUpstreamOp.put(oper, slowestUpstream);
                return latency;
            }
        }
        return 0L;
    }

    private StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent sca) {
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse rsp = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
        if (this.deployChangeInProgress.get() || sca.deployCnt != this.deployChangeCnt) {
            LOG.debug("{} deferred requests due to concurrent plan change.", (Object)sca.container.toIdStateString());
            rsp.hasPendingRequests = true;
            return rsp;
        }
        if (!sca.undeployOpers.isEmpty()) {
            rsp.undeployRequest = Lists.newArrayList(sca.undeployOpers);
            rsp.hasPendingRequests = !sca.deployOpers.isEmpty();
            return rsp;
        }
        Set<PTOperator> deployOperators = sca.deployOpers;
        if (!deployOperators.isEmpty()) {
            for (PTContainer c : this.getPhysicalPlan().getContainers()) {
                if (c.getState() != PTContainer.State.ACTIVE) {
                    LOG.debug("{} waiting for container activation {}", (Object)sca.container.toIdStateString(), (Object)c.toIdStateString());
                    rsp.hasPendingRequests = true;
                    return rsp;
                }
                for (PTOperator oper : c.getOperators()) {
                    if (oper.getState() != PTOperator.State.PENDING_UNDEPLOY) continue;
                    LOG.debug("{} waiting for undeploy {} {}", new Object[]{sca.container.toIdStateString(), c.toIdStateString(), oper});
                    rsp.hasPendingRequests = true;
                    return rsp;
                }
            }
            LOG.debug("{} deployable operators: {}", (Object)sca.container.toIdStateString(), deployOperators);
            List<OperatorDeployInfo> deployList = sca.getDeployInfoList(deployOperators);
            if (deployList != null && !deployList.isEmpty()) {
                rsp.deployRequest = deployList;
                rsp.nodeRequests = Lists.newArrayList();
                for (PTOperator o : deployOperators) {
                    rsp.nodeRequests.addAll(o.deployRequests);
                }
            }
            rsp.hasPendingRequests = false;
            return rsp;
        }
        return rsp;
    }

    private boolean isApplicationIdle() {
        if (this.eventQueueProcessing.get()) {
            return false;
        }
        for (StreamingContainerAgent sca : this.containers.values()) {
            if (sca.hasPendingWork()) {
                return false;
            }
            for (PTOperator oper : sca.container.getOperators()) {
                if (oper.stats.isIdle()) continue;
                return false;
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addCheckpoint(PTOperator node, Checkpoint checkpoint) {
        LinkedList<Checkpoint> linkedList = node.checkpoints;
        synchronized (linkedList) {
            if (!node.checkpoints.isEmpty()) {
                Checkpoint lastCheckpoint = node.checkpoints.getLast();
                if (lastCheckpoint.windowId != checkpoint.windowId) {
                    if (lastCheckpoint.windowId > checkpoint.windowId) {
                        LOG.warn("Out of sequence checkpoint {} last {} (operator {})", new Object[]{checkpoint, lastCheckpoint, node});
                        ListIterator<Checkpoint> li = node.checkpoints.listIterator();
                        while (li.hasNext() && ((Checkpoint)li.next()).windowId < checkpoint.windowId) {
                        }
                        if (((Checkpoint)li.previous()).windowId != checkpoint.windowId) {
                            li.add(checkpoint);
                        }
                    } else {
                        node.checkpoints.add(checkpoint);
                    }
                }
            } else {
                node.checkpoints.add(checkpoint);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx) {
        boolean pendingDeploy;
        if (operator.getRecoveryCheckpoint().windowId < ctx.committedWindowId.longValue()) {
            ctx.committedWindowId.setValue(operator.getRecoveryCheckpoint().windowId);
        }
        if (operator.getState() == PTOperator.State.ACTIVE && ctx.currentTms - operator.stats.lastWindowIdChangeTms > (long)operator.stats.windowProcessingTimeoutMillis && ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
            LOG.warn("Marking operator {} blocked committed window {}, recovery window {}, current time {}, last window id change time {}, window processing timeout millis {}", new Object[]{operator, Codec.getStringWindowId((long)ctx.committedWindowId.longValue()), Codec.getStringWindowId((long)operator.getRecoveryCheckpoint().windowId), ctx.currentTms, operator.stats.lastWindowIdChangeTms, operator.stats.windowProcessingTimeoutMillis});
            ctx.blocked.add(operator);
        }
        Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
        Set<LogicalPlan.OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta());
        if (checkpointGroup == null) {
            checkpointGroup = Collections.singleton(operator.getOperatorMeta());
        }
        TreeSet<Checkpoint> commonCheckpoints = new TreeSet<Checkpoint>(new Checkpoint.CheckpointComparator());
        LinkedList<Checkpoint> linkedList = operator.checkpoints;
        synchronized (linkedList) {
            commonCheckpoints.addAll(operator.checkpoints);
        }
        HashSet<PTOperator> groupOpers = new HashSet<PTOperator>(checkpointGroup.size());
        boolean bl = pendingDeploy = operator.getState() == PTOperator.State.PENDING_DEPLOY;
        if (checkpointGroup.size() > 1) {
            for (LogicalPlan.OperatorMeta om : checkpointGroup) {
                Collection<PTOperator> operators = this.plan.getAllOperators(om);
                for (PTOperator groupOper : operators) {
                    LinkedList<Checkpoint> linkedList2 = groupOper.checkpoints;
                    synchronized (linkedList2) {
                        commonCheckpoints.retainAll(groupOper.checkpoints);
                    }
                    ctx.visited.add(groupOper);
                    groupOpers.add(groupOper);
                    pendingDeploy |= groupOper.getState() == PTOperator.State.PENDING_DEPLOY;
                }
            }
            if (!commonCheckpoints.isEmpty()) {
                maxCheckpoint = commonCheckpoints.last();
            }
        } else {
            ctx.visited.add(operator);
            groupOpers.add(operator);
            maxCheckpoint = operator.getRecentCheckpoint();
            if (ctx.recovery && maxCheckpoint.windowId == -1L && operator.isOperatorStateLess()) {
                long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, ((Integer)this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)).intValue());
                maxCheckpoint = new Checkpoint(currentWindowId, 0, 0);
            }
        }
        for (PTOperator groupOper : groupOpers) {
            for (PTOperator.PTOutput out : groupOper.getOutputs()) {
                for (PTOperator.PTInput sink : out.sinks) {
                    PTOperator sinkOperator = sink.target;
                    if (groupOpers.contains(sinkOperator)) continue;
                    if (!ctx.visited.contains(sinkOperator)) {
                        this.updateRecoveryCheckpoints(sinkOperator, ctx);
                    }
                    if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
                        maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint());
                    }
                    if (!ctx.blocked.contains(sinkOperator) || sinkOperator.stats.getCurrentWindowId() != operator.stats.getCurrentWindowId()) continue;
                    ctx.blocked.remove(sinkOperator);
                }
            }
        }
        if (!commonCheckpoints.contains(maxCheckpoint) && !commonCheckpoints.isEmpty()) {
            maxCheckpoint = (Checkpoint)Objects.firstNonNull((Object)commonCheckpoints.floor(maxCheckpoint), (Object)maxCheckpoint);
        }
        for (PTOperator groupOper : groupOpers) {
            if (!pendingDeploy || ctx.recovery) {
                LinkedList<Checkpoint> checkpoints;
                Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
                LinkedList<Checkpoint> linkedList3 = checkpoints = groupOper.checkpoints;
                synchronized (linkedList3) {
                    if (!checkpoints.isEmpty() && checkpoints.getFirst().windowId <= maxCheckpoint.windowId) {
                        c1 = checkpoints.getFirst();
                        while (checkpoints.size() > 1) {
                            Checkpoint c2 = checkpoints.get(1);
                            if (c2.windowId <= maxCheckpoint.windowId) {
                                checkpoints.removeFirst();
                                this.purgeCheckpoints.add((Pair<PTOperator, Long>)new Pair((Object)groupOper, (Object)c1.windowId));
                                c1 = c2;
                                continue;
                            }
                            break;
                        }
                    } else if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
                        LOG.debug("Adding checkpoint for stateless operator {} {}", (Object)groupOper, (Object)Codec.getStringWindowId((long)maxCheckpoint.windowId));
                        c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
                    }
                }
                groupOper.setRecoveryCheckpoint(c1);
                continue;
            }
            LOG.debug("Skipping checkpoint update {} during {}", (Object)groupOper, (Object)groupOper.getState());
        }
    }

    public long windowIdToMillis(long windowId) {
        int widthMillis = (Integer)this.plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
        return WindowGenerator.getWindowMillis(windowId, this.vars.windowStartMillis, widthMillis);
    }

    public long getWindowStartMillis() {
        return this.vars.windowStartMillis;
    }

    private Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> getCheckpointGroups() {
        if (this.checkpointGroups == null) {
            this.checkpointGroups = new HashMap<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>>();
            LogicalPlan dag = this.plan.getLogicalPlan();
            dag.resetNIndex();
            LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
            for (LogicalPlan.OperatorMeta operatorMeta : dag.getRootOperators()) {
                this.plan.getLogicalPlan().findStronglyConnected(operatorMeta, vc);
            }
            for (Set set : vc.stronglyConnected) {
                for (LogicalPlan.OperatorMeta om : set) {
                    this.checkpointGroups.put(om, set);
                }
            }
        }
        return this.checkpointGroups;
    }

    private long updateCheckpoints(boolean recovery) {
        UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(this.clock, recovery, this.getCheckpointGroups());
        for (LogicalPlan.OperatorMeta logicalOperator : this.plan.getLogicalPlan().getRootOperators()) {
            List<PTOperator> operators = this.plan.getOperators(logicalOperator);
            if (operators == null) continue;
            for (PTOperator operator : operators) {
                this.updateRecoveryCheckpoints(operator, ctx);
            }
        }
        this.purgeCheckpoints();
        for (PTOperator oper : ctx.blocked) {
            String containerId = oper.getContainer().getExternalId();
            if (containerId == null) continue;
            LOG.info("Blocked operator {} container {} time {}ms", new Object[]{oper, oper.getContainer().toIdStateString(), ctx.currentTms - oper.stats.lastWindowIdChangeTms});
            this.containerStopRequests.put(containerId, containerId);
        }
        return ctx.committedWindowId.longValue();
    }

    private BufferServerController getBufferServerClient(PTOperator operator) {
        BufferServerController bsc = new BufferServerController(operator.getLogicalId());
        bsc.setToken(operator.getContainer().getBufferServerToken());
        InetSocketAddress address = operator.getContainer().bufferServerAddress;
        StreamingContainer.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, (Listener.ClientListener)bsc);
        return bsc;
    }

    private void purgeCheckpoints() {
        for (Pair<PTOperator, Long> p : this.purgeCheckpoints) {
            final PTOperator operator = (PTOperator)p.getFirst();
            if (operator.isOperatorStateLess()) continue;
            final long windowId = (Long)p.getSecond();
            Runnable r = new Runnable(){

                @Override
                public void run() {
                    try {
                        ((StorageAgent)operator.getOperatorMeta().getValue(Context.OperatorContext.STORAGE_AGENT)).delete(operator.getId(), windowId);
                    }
                    catch (IOException ex) {
                        LOG.error("Failed to purge checkpoint for operator {} for windowId {}", new Object[]{operator, windowId, ex});
                    }
                }
            };
            this.poolExecutor.submit(r);
        }
        this.purgeCheckpoints.clear();
    }

    public void shutdownAllContainers(String message) {
        this.shutdownDiagnosticsMessage = message;
        LOG.info("Initiating application shutdown: {}", (Object)message);
        for (StreamingContainerAgent cs : this.containers.values()) {
            cs.shutdownRequested = true;
        }
    }

    private Map<PTContainer, List<PTOperator>> groupByContainer(Collection<PTOperator> operators) {
        HashMap<PTContainer, List<PTOperator>> m = new HashMap<PTContainer, List<PTOperator>>();
        for (PTOperator node : operators) {
            ArrayList<PTOperator> nodes = (ArrayList<PTOperator>)m.get(node.getContainer());
            if (nodes == null) {
                nodes = new ArrayList<PTOperator>();
                m.put(node.getContainer(), nodes);
            }
            nodes.add(node);
        }
        return m;
    }

    private void requestContainer(PTContainer c) {
        StreamingContainerAgent.ContainerStartRequest dr = new StreamingContainerAgent.ContainerStartRequest(c);
        this.containerStartRequests.add(dr);
        this.pendingAllocation.add(dr.container);
        this.lastResourceRequest = System.currentTimeMillis();
        for (PTOperator operator : c.getOperators()) {
            operator.setState(PTOperator.State.INACTIVE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void deploy(Set<PTContainer> releaseContainers, Collection<PTOperator> undeploy, Set<PTContainer> startContainers, Collection<PTOperator> deploy) {
        try {
            this.deployChangeInProgress.set(true);
            Map<PTContainer, List<PTOperator>> undeployGroups = this.groupByContainer(undeploy);
            for (Map.Entry<PTContainer, List<PTOperator>> e : undeployGroups.entrySet()) {
                PTContainer c = e.getKey();
                if (startContainers.contains(c) || releaseContainers.contains(c) || c.getState() == PTContainer.State.KILLED) continue;
                LOG.debug("scheduling undeploy {} {}", (Object)e.getKey().getExternalId(), e.getValue());
                for (PTOperator oper : e.getValue()) {
                    oper.setState(PTOperator.State.PENDING_UNDEPLOY);
                }
            }
            for (PTContainer c : startContainers) {
                this.requestContainer(c);
            }
            Map<PTContainer, List<PTOperator>> deployGroups = this.groupByContainer(deploy);
            for (Map.Entry<PTContainer, List<PTOperator>> e : deployGroups.entrySet()) {
                if (!startContainers.contains(e.getKey())) {
                    for (PTOperator operator : e.getValue()) {
                        for (PTOperator.PTOutput out : operator.getOutputs()) {
                            if (out.isDownStreamInline()) continue;
                            for (LogicalPlan.InputPortMeta ipm : out.logicalStream.getSinks()) {
                                StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
                                Integer codecId = this.plan.getStreamCodecIdentifier(streamCodecInfo);
                                String sourceIdentifier = Integer.toString(operator.getId()).concat(".").concat(out.portName).concat(".").concat(codecId.toString());
                                if (operator.getContainer().getState() != PTContainer.State.ACTIVE || operator.getContainer().bufferServerAddress.getPort() == 0) continue;
                                BufferServerController bsc = this.getBufferServerClient(operator);
                                try {
                                    bsc.reset(null, sourceIdentifier, 0L);
                                }
                                catch (Exception ex) {
                                    LOG.error("Failed to reset buffer server {} {}", (Object)sourceIdentifier, (Object)ex);
                                }
                            }
                        }
                    }
                }
                LOG.debug("scheduling deploy {} {}", (Object)e.getKey().getExternalId(), e.getValue());
                for (PTOperator oper : e.getValue()) {
                    if (oper.getState() == PTOperator.State.PENDING_UNDEPLOY) continue;
                    oper.setState(PTOperator.State.PENDING_DEPLOY);
                }
            }
            for (PTContainer c : releaseContainers) {
                StreamingContainerAgent sca;
                if (c.getExternalId() == null || (sca = this.containers.get(c.getExternalId())) == null) continue;
                LOG.debug("Container marked for shutdown: {}", (Object)c);
                sca.shutdownRequested = true;
            }
        }
        finally {
            ++this.deployChangeCnt;
            this.deployChangeInProgress.set(false);
        }
    }

    @Override
    public void recordEventAsync(StramEvent ev) {
        if (this.eventBus != null) {
            this.eventBus.publishAsync((Object)ev);
        }
    }

    @Override
    public void dispatch(Runnable r) {
        this.eventQueue.add(r);
    }

    public OperatorInfo getOperatorInfo(int operatorId) {
        PTOperator o = this.plan.getAllOperators().get(operatorId);
        return o == null ? null : this.fillPhysicalOperatorInfo(o);
    }

    public List<OperatorInfo> getOperatorInfoList() {
        ArrayList<OperatorInfo> infoList = new ArrayList<OperatorInfo>();
        for (PTContainer container : this.plan.getContainers()) {
            for (PTOperator operator : container.getOperators()) {
                infoList.add(this.fillPhysicalOperatorInfo(operator));
            }
        }
        return infoList;
    }

    public LogicalOperatorInfo getLogicalOperatorInfo(String operatorName) {
        LogicalPlan.OperatorMeta operatorMeta = this.getLogicalPlan().getOperatorMeta(operatorName);
        if (operatorMeta == null) {
            return null;
        }
        return this.fillLogicalOperatorInfo(operatorMeta);
    }

    public LogicalPlan.ModuleMeta getModuleMeta(String moduleName) {
        return this.getModuleMeta(moduleName, this.getLogicalPlan());
    }

    private LogicalPlan.ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag) {
        for (LogicalPlan.ModuleMeta m : dag.getAllModules()) {
            if (m.getFullName().equals(moduleName)) {
                return m;
            }
            LogicalPlan.ModuleMeta res = this.getModuleMeta(moduleName, m.getDag());
            if (res == null) continue;
            return res;
        }
        return null;
    }

    public List<LogicalOperatorInfo> getLogicalOperatorInfoList() {
        ArrayList<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>();
        Collection<LogicalPlan.OperatorMeta> allOperators = this.getLogicalPlan().getAllOperators();
        for (LogicalPlan.OperatorMeta operatorMeta : allOperators) {
            infoList.add(this.fillLogicalOperatorInfo(operatorMeta));
        }
        return infoList;
    }

    public OperatorAggregationInfo getOperatorAggregationInfo(String operatorName) {
        LogicalPlan.OperatorMeta operatorMeta = this.getLogicalPlan().getOperatorMeta(operatorName);
        if (operatorMeta == null) {
            return null;
        }
        return this.fillOperatorAggregationInfo(operatorMeta);
    }

    public static long toWsWindowId(long windowId) {
        return windowId < 0L ? 0L : windowId;
    }

    private OperatorInfo fillPhysicalOperatorInfo(PTOperator operator) {
        PortInfo pinfo;
        OperatorInfo oi = new OperatorInfo();
        oi.container = operator.getContainer().getExternalId();
        oi.host = operator.getContainer().host;
        oi.id = Integer.toString(operator.getId());
        oi.name = operator.getName();
        oi.className = operator.getOperatorMeta().getOperator().getClass().getName();
        oi.status = operator.getState().toString();
        if (operator.isUnifier()) {
            oi.unifierClass = operator.getUnifierClass().getName();
        }
        oi.logicalName = operator.getOperatorMeta().getName();
        OperatorStatus os = operator.stats;
        oi.recordingId = os.recordingId;
        oi.totalTuplesProcessed = os.totalTuplesProcessed.get();
        oi.totalTuplesEmitted = os.totalTuplesEmitted.get();
        oi.tuplesProcessedPSMA = os.tuplesProcessedPSMA.get();
        oi.tuplesEmittedPSMA = os.tuplesEmittedPSMA.get();
        oi.cpuPercentageMA = os.cpuNanosPMSMA.getAvg() / 10000.0;
        oi.latencyMA = os.latencyMA.getAvg();
        oi.failureCount = operator.failureCount;
        oi.recoveryWindowId = StreamingContainerManager.toWsWindowId(operator.getRecoveryCheckpoint().windowId);
        oi.currentWindowId = StreamingContainerManager.toWsWindowId(os.currentWindowId.get());
        if (os.lastHeartbeat != null) {
            oi.lastHeartbeat = os.lastHeartbeat.getGeneratedTms();
        }
        if (os.checkpointStats != null) {
            oi.checkpointTime = os.checkpointStats.checkpointTime;
            oi.checkpointStartTime = os.checkpointStats.checkpointStartTime;
        }
        oi.checkpointTimeMA = os.checkpointTimeMA.getAvg();
        for (OperatorStatus.PortStatus ps : os.inputPortStatusList.values()) {
            pinfo = new PortInfo();
            pinfo.name = ps.portName;
            pinfo.type = "input";
            pinfo.totalTuples = ps.totalTuples;
            pinfo.tuplesPSMA = Math.round(ps.tuplesPMSMA.getAvg() * 1000.0);
            pinfo.bufferServerBytesPSMA = Math.round(ps.bufferServerBytesPMSMA.getAvg() * 1000.0);
            pinfo.queueSizeMA = ps.queueSizeMA.getAvg();
            pinfo.recordingId = ps.recordingId;
            oi.addPort(pinfo);
        }
        for (OperatorStatus.PortStatus ps : os.outputPortStatusList.values()) {
            pinfo = new PortInfo();
            pinfo.name = ps.portName;
            pinfo.type = "output";
            pinfo.totalTuples = ps.totalTuples;
            pinfo.tuplesPSMA = Math.round(ps.tuplesPMSMA.getAvg() * 1000.0);
            pinfo.bufferServerBytesPSMA = Math.round(ps.bufferServerBytesPMSMA.getAvg() * 1000.0);
            pinfo.recordingId = ps.recordingId;
            oi.addPort(pinfo);
        }
        oi.counters = os.getLastWindowedStats().size() > 0 ? os.getLastWindowedStats().get((int)(os.getLastWindowedStats().size() - 1)).counters : null;
        oi.metrics = os.getLastWindowedStats().size() > 0 ? os.getLastWindowedStats().get((int)(os.getLastWindowedStats().size() - 1)).metrics : null;
        return oi;
    }

    private LogicalOperatorInfo fillLogicalOperatorInfo(LogicalPlan.OperatorMeta operator) {
        LogicalOperatorInfo loi = new LogicalOperatorInfo();
        loi.name = operator.getName();
        loi.className = operator.getOperator().getClass().getName();
        loi.totalTuplesEmitted = operator.getStatus().totalTuplesEmitted;
        loi.totalTuplesProcessed = operator.getStatus().totalTuplesProcessed;
        loi.failureCount = operator.getStatus().failureCount;
        loi.status = new HashMap<String, MutableInt>();
        loi.partitions = new TreeSet<Integer>();
        loi.unifiers = new TreeSet<Integer>();
        loi.containerIds = new TreeSet<String>();
        loi.hosts = new TreeSet<String>();
        Collection<PTOperator> physicalOperators = this.getPhysicalPlan().getAllOperators(operator);
        NumberAggregate.LongAggregate checkpointTimeAggregate = new NumberAggregate.LongAggregate();
        for (PTOperator physicalOperator : physicalOperators) {
            String externalId;
            PTContainer container;
            MutableInt count;
            OperatorStatus os = physicalOperator.stats;
            if (physicalOperator.isUnifier()) {
                loi.unifiers.add(physicalOperator.getId());
            } else {
                loi.partitions.add(physicalOperator.getId());
                loi.tuplesEmittedPSMA += os.tuplesEmittedPSMA.get();
                loi.tuplesProcessedPSMA += os.tuplesProcessedPSMA.get();
                long latency = this.calculateLatency(physicalOperator);
                if (latency > loi.latencyMA) {
                    loi.latencyMA = latency;
                }
                checkpointTimeAggregate.addNumber((Number)os.checkpointTimeMA.getAvg());
            }
            loi.cpuPercentageMA += os.cpuNanosPMSMA.getAvg() / 10000.0;
            if (os.lastHeartbeat != null && (loi.lastHeartbeat == 0L || loi.lastHeartbeat > os.lastHeartbeat.getGeneratedTms())) {
                loi.lastHeartbeat = os.lastHeartbeat.getGeneratedTms();
            }
            long currentWindowId = StreamingContainerManager.toWsWindowId(os.currentWindowId.get());
            if (loi.currentWindowId == 0L || loi.currentWindowId > currentWindowId) {
                loi.currentWindowId = currentWindowId;
            }
            if ((count = loi.status.get(physicalOperator.getState().toString())) == null) {
                count = new MutableInt();
                loi.status.put(physicalOperator.getState().toString(), count);
            }
            count.increment();
            if (physicalOperator.getRecoveryCheckpoint() != null) {
                long recoveryWindowId = StreamingContainerManager.toWsWindowId(physicalOperator.getRecoveryCheckpoint().windowId);
                if (loi.recoveryWindowId == 0L || loi.recoveryWindowId > recoveryWindowId) {
                    loi.recoveryWindowId = recoveryWindowId;
                }
            }
            if ((container = physicalOperator.getContainer()) == null || (externalId = container.getExternalId()) == null) continue;
            loi.containerIds.add(externalId);
            loi.hosts.add(container.host);
        }
        if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != null) {
            loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue();
            loi.counters = this.latestLogicalCounters.get(operator.getName());
            loi.autoMetrics = this.latestLogicalMetrics.get(operator.getName());
        }
        return loi;
    }

    private OperatorAggregationInfo fillOperatorAggregationInfo(LogicalPlan.OperatorMeta operator) {
        OperatorAggregationInfo oai = new OperatorAggregationInfo();
        Collection<PTOperator> physicalOperators = this.getPhysicalPlan().getAllOperators(operator);
        if (physicalOperators.isEmpty()) {
            return null;
        }
        oai.name = operator.getName();
        for (PTOperator physicalOperator : physicalOperators) {
            if (physicalOperator.isUnifier()) continue;
            OperatorStatus os = physicalOperator.stats;
            oai.latencyMA.addNumber((Number)os.latencyMA.getAvg());
            oai.cpuPercentageMA.addNumber((Number)(os.cpuNanosPMSMA.getAvg() / 10000.0));
            oai.tuplesEmittedPSMA.addNumber((Number)os.tuplesEmittedPSMA.get());
            oai.tuplesProcessedPSMA.addNumber((Number)os.tuplesProcessedPSMA.get());
            oai.currentWindowId.addNumber((Number)os.currentWindowId.get());
            oai.recoveryWindowId.addNumber((Number)StreamingContainerManager.toWsWindowId(physicalOperator.getRecoveryCheckpoint().windowId));
            if (os.lastHeartbeat != null) {
                oai.lastHeartbeat.addNumber((Number)os.lastHeartbeat.getGeneratedTms());
            }
            oai.checkpointTime.addNumber((Number)os.checkpointTimeMA.getAvg());
        }
        return oai;
    }

    private long calculateLatency(PTOperator operator) {
        long latency = operator.stats.latencyMA.getAvg();
        long maxUnifierLatency = 0L;
        for (PTOperator.PTOutput output : operator.getOutputs()) {
            for (PTOperator.PTInput input : output.sinks) {
                long thisUnifierLatency;
                if (!input.target.isUnifier() || maxUnifierLatency >= (thisUnifierLatency = this.calculateLatency(input.target))) continue;
                maxUnifierLatency = thisUnifierLatency;
            }
        }
        return latency + maxUnifierLatency;
    }

    public List<StreamInfo> getStreamInfoList() {
        ArrayList<StreamInfo> infoList = new ArrayList<StreamInfo>();
        for (PTContainer container : this.plan.getContainers()) {
            for (PTOperator operator : container.getOperators()) {
                List<PTOperator.PTOutput> outputs = operator.getOutputs();
                for (PTOperator.PTOutput output : outputs) {
                    StreamInfo si = new StreamInfo();
                    si.logicalName = output.logicalStream.getName();
                    si.source.operatorId = String.valueOf(operator.getId());
                    si.source.portName = output.portName;
                    si.locality = output.logicalStream.getLocality();
                    for (PTOperator.PTInput input : output.sinks) {
                        StreamInfo.Port p = new StreamInfo.Port();
                        p.operatorId = String.valueOf(input.target.getId());
                        p.portName = input.target.isUnifier() ? StreamingContainer.getUnifierInputPortName(input.portName, operator.getId(), output.portName) : input.portName;
                        si.sinks.add(p);
                    }
                    infoList.add(si);
                }
            }
        }
        return infoList;
    }

    private void updateOnDeployRequests(PTOperator p, Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> superseded, StreamingContainerUmbilicalProtocol.StramToNodeRequest newRequest) {
        ArrayList<StreamingContainerUmbilicalProtocol.StramToNodeRequest> cloneRequests = new ArrayList<StreamingContainerUmbilicalProtocol.StramToNodeRequest>(p.deployRequests.size());
        for (StreamingContainerUmbilicalProtocol.StramToNodeRequest existingRequest : p.deployRequests) {
            if (superseded.apply((Object)existingRequest)) continue;
            cloneRequests.add(existingRequest);
        }
        if (newRequest != null) {
            cloneRequests.add(newRequest);
        }
        p.deployRequests = Collections.unmodifiableList(cloneRequests);
    }

    private StreamingContainerAgent getContainerAgentFromOperatorId(int operatorId) {
        StreamingContainerAgent sca;
        PTOperator oper = this.plan.getAllOperators().get(operatorId);
        if (oper != null && (sca = this.containers.get(oper.getContainer().getExternalId())) != null) {
            return sca;
        }
        throw new NotFoundException("Operator ID " + operatorId + " not found");
    }

    public void startRecording(String id, int operId, String portName, long numWindows) {
        StreamingContainerAgent sca = this.getContainerAgentFromOperatorId(operId);
        StramToNodeStartRecordingRequest request = new StramToNodeStartRecordingRequest();
        request.setOperatorId(operId);
        if (!StringUtils.isBlank((CharSequence)portName)) {
            request.setPortName(portName);
        }
        request.setNumWindows(numWindows);
        request.setId(id);
        sca.addOperatorRequest(request);
        PTOperator operator = this.plan.getAllOperators().get(operId);
        if (operator != null) {
            this.updateOnDeployRequests(operator, new RecordingRequestFilter(), request);
        }
    }

    public void stopRecording(int operId, String portName) {
        StreamingContainerAgent sca = this.getContainerAgentFromOperatorId(operId);
        StreamingContainerUmbilicalProtocol.StramToNodeRequest request = new StreamingContainerUmbilicalProtocol.StramToNodeRequest();
        request.setOperatorId(operId);
        if (!StringUtils.isBlank((CharSequence)portName)) {
            request.setPortName(portName);
        }
        request.setRequestType(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING);
        sca.addOperatorRequest(request);
        PTOperator operator = this.plan.getAllOperators().get(operId);
        if (operator != null) {
            this.updateOnDeployRequests(operator, new RecordingRequestFilter(), null);
        }
    }

    public void syncStats() {
        this.statsRecorder.requestSync();
    }

    public void syncEvents() {
        this.eventRecorder.requestSync();
    }

    public void stopContainer(String containerId) {
        this.containerStopRequests.put(containerId, containerId);
    }

    public Journal.Recoverable getSetOperatorProperty(String operatorName, String propertyName, String propertyValue) {
        return new SetOperatorProperty(operatorName, propertyName, propertyValue);
    }

    public Journal.Recoverable getSetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) {
        return new SetPhysicalOperatorProperty(operatorId, propertyName, propertyValue);
    }

    public void setOperatorProperty(String operatorName, String propertyName, String propertyValue) {
        LogicalPlan.OperatorMeta logicalOperator = this.plan.getLogicalPlan().getOperatorMeta(operatorName);
        if (logicalOperator == null) {
            throw new IllegalArgumentException("Unknown operator " + operatorName);
        }
        this.writeJournal(new SetOperatorProperty(operatorName, propertyName, propertyValue));
        this.setOperatorProperty(logicalOperator, propertyName, propertyValue);
    }

    private void setOperatorProperty(LogicalPlan.OperatorMeta logicalOperator, String propertyName, String propertyValue) {
        Map<String, String> properties = Collections.singletonMap(propertyName, propertyValue);
        LogicalPlanConfiguration.setOperatorProperties((DAG.GenericOperator)logicalOperator.getOperator(), properties);
        List<PTOperator> operators = this.plan.getOperators(logicalOperator);
        for (PTOperator o : operators) {
            StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
            request.setOperatorId(o.getId());
            request.setPropertyKey(propertyName);
            request.setPropertyValue(propertyValue);
            this.addOperatorRequest(o, request);
            this.updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
        }
        this.recordEventAsync(new StramEvent.SetOperatorPropertyEvent(logicalOperator.getName(), propertyName, propertyValue));
    }

    public void setPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) {
        PTOperator o = this.plan.getAllOperators().get(operatorId);
        if (o == null) {
            return;
        }
        this.writeJournal(new SetPhysicalOperatorProperty(operatorId, propertyName, propertyValue));
        this.setPhysicalOperatorProperty(o, propertyName, propertyValue);
    }

    private void setPhysicalOperatorProperty(PTOperator o, String propertyName, String propertyValue) {
        String operatorName = o.getName();
        StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
        request.setOperatorId(o.getId());
        request.setPropertyKey(propertyName);
        request.setPropertyValue(propertyValue);
        this.addOperatorRequest(o, request);
        this.updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
        this.recordEventAsync(new StramEvent.SetPhysicalOperatorPropertyEvent(operatorName, o.getId(), propertyName, propertyValue));
    }

    @Override
    public void addOperatorRequest(PTOperator oper, StreamingContainerUmbilicalProtocol.StramToNodeRequest request) {
        StreamingContainerAgent sca = this.getContainerAgent(oper.getContainer().getExternalId());
        if (sca != null) {
            sca.addOperatorRequest(request);
        }
    }

    public void setLoggersLevel(Map<String, String> changedLoggers) {
        LOG.debug("change logger request");
        StramToNodeChangeLoggersRequest request = new StramToNodeChangeLoggersRequest();
        request.setTargetChanges(changedLoggers);
        for (StreamingContainerAgent stramChildAgent : this.containers.values()) {
            stramChildAgent.addOperatorRequest(request);
        }
    }

    public FutureTask<Object> getPhysicalOperatorProperty(int operatorId, String propertyName, long waitTime) {
        PTOperator o = this.plan.getAllOperators().get(operatorId);
        StramToNodeGetPropertyRequest request = new StramToNodeGetPropertyRequest();
        request.setOperatorId(operatorId);
        request.setPropertyName(propertyName);
        this.addOperatorRequest(o, request);
        RequestHandler task = new RequestHandler();
        task.requestId = this.nodeToStramRequestIds.incrementAndGet();
        task.waitTime = waitTime;
        request.requestId = task.requestId;
        FutureTask<Object> future = new FutureTask<Object>(task);
        this.dispatch(future);
        return future;
    }

    public Attribute.AttributeMap getApplicationAttributes() {
        LogicalPlan lp = this.getLogicalPlan();
        try {
            return lp.getAttributes().clone();
        }
        catch (CloneNotSupportedException ex) {
            throw new RuntimeException("Cannot clone DAG attributes", ex);
        }
    }

    public Attribute.AttributeMap getOperatorAttributes(String operatorId) {
        LogicalPlan.OperatorMeta logicalOperator = this.plan.getLogicalPlan().getOperatorMeta(operatorId);
        if (logicalOperator == null) {
            throw new IllegalArgumentException("Invalid operatorId " + operatorId);
        }
        try {
            return logicalOperator.getAttributes().clone();
        }
        catch (CloneNotSupportedException ex) {
            throw new RuntimeException("Cannot clone operator attributes", ex);
        }
    }

    public Attribute.AttributeMap getPortAttributes(String operatorId, String portName) {
        LogicalPlan.OperatorMeta logicalOperator = this.plan.getLogicalPlan().getOperatorMeta(operatorId);
        if (logicalOperator == null) {
            throw new IllegalArgumentException("Invalid operatorId " + operatorId);
        }
        Operators.PortMappingDescriptor portMap = new Operators.PortMappingDescriptor();
        Operators.describe((DAG.GenericOperator)logicalOperator.getOperator(), portMap);
        Operators.PortContextPair<Operator.InputPort<?>> inputPort = portMap.inputPorts.get(portName);
        if (inputPort != null) {
            DAG.InputPortMeta portMeta = logicalOperator.getMeta((Operator.InputPort)inputPort.component);
            try {
                return portMeta.getAttributes().clone();
            }
            catch (CloneNotSupportedException ex) {
                throw new RuntimeException("Cannot clone port attributes", ex);
            }
        }
        Operators.PortContextPair<Operator.OutputPort<?>> outputPort = portMap.outputPorts.get(portName);
        if (outputPort != null) {
            DAG.OutputPortMeta portMeta = logicalOperator.getMeta((Operator.OutputPort)outputPort.component);
            try {
                return portMeta.getAttributes().clone();
            }
            catch (CloneNotSupportedException ex) {
                throw new RuntimeException("Cannot clone port attributes", ex);
            }
        }
        throw new IllegalArgumentException("Invalid port name " + portName);
    }

    public LogicalPlan getLogicalPlan() {
        return this.plan.getLogicalPlan();
    }

    public FutureTask<Object> logicalPlanModification(List<LogicalPlanRequest> requests) throws Exception {
        FutureTask<Object> future = new FutureTask<Object>(new LogicalPlanChangeRunnable(requests));
        this.dispatch(future);
        return future;
    }

    public CriticalPathInfo getCriticalPathInfo() {
        return this.criticalPathInfo;
    }

    private void checkpoint() throws IOException {
        if (this.recoveryHandler != null) {
            LOG.debug("Checkpointing state");
            DataOutputStream out = this.recoveryHandler.rotateLog();
            this.journal.setOutputStream(out);
            CheckpointState cs = new CheckpointState();
            cs.finals = this.vars;
            cs.physicalPlan = this.plan;
            this.recoveryHandler.save(cs);
        }
    }

    @Override
    public void writeJournal(Journal.Recoverable operation) {
        try {
            if (this.journal != null) {
                this.journal.write(operation);
            }
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to write to journal " + operation, e);
        }
    }

    public static StreamingContainerManager getInstance(RecoveryHandler rh, LogicalPlan dag, boolean enableEventRecording) throws IOException {
        try {
            StreamingContainerManager scm;
            CheckpointState checkpointedState = (CheckpointState)rh.restore();
            if (checkpointedState == null) {
                scm = new StreamingContainerManager(dag, enableEventRecording, (Clock)new SystemClock());
            } else {
                PhysicalPlan plan = checkpointedState.physicalPlan;
                plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID));
                scm = new StreamingContainerManager(checkpointedState, enableEventRecording);
                for (Field f : plan.getClass().getDeclaredFields()) {
                    if (f.getType() != PhysicalPlan.PlanContext.class) continue;
                    f.setAccessible(true);
                    try {
                        f.set(plan, scm);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Failed to set " + f, e);
                    }
                    f.setAccessible(false);
                }
                DataInputStream logStream = rh.getLog();
                scm.journal.replay(logStream);
                logStream.close();
                plan.syncCheckpoints(scm.vars.windowStartMillis, scm.clock.getTime());
                scm.committedWindowId = scm.updateCheckpoints(true);
                for (PTContainer c : plan.getContainers()) {
                    if (c.getExternalId() != null) {
                        LOG.debug("Restore container agent {} for {}", (Object)c.getExternalId(), (Object)c);
                        StreamingContainerAgent sca = new StreamingContainerAgent(c, scm.newStreamingContainerContext(c), scm);
                        scm.containers.put(c.getExternalId(), sca);
                        continue;
                    }
                    LOG.debug("Requesting new resource for {}", (Object)c.toIdStateString());
                    scm.requestContainer(c);
                }
            }
            scm.recoveryHandler = rh;
            scm.checkpoint();
            return scm;
        }
        catch (IOException e) {
            throw new IllegalStateException("Failed to read checkpointed state", e);
        }
    }

    @VisibleForTesting
    protected Collection<Pair<Long, Map<String, Object>>> getLogicalMetrics(String operatorName) {
        if (this.logicalMetrics.get(operatorName) != null) {
            return Collections.unmodifiableCollection((Collection)this.logicalMetrics.get(operatorName));
        }
        return null;
    }

    @VisibleForTesting
    protected Object getLogicalCounter(String operatorName) {
        return this.latestLogicalCounters.get(operatorName);
    }

    private class RequestHandler
    implements Callable<Object> {
        public long requestId;
        public long waitTime = 5000L;

        private RequestHandler() {
        }

        @Override
        public Object call() throws Exception {
            Object obj;
            long expiryTime = System.currentTimeMillis() + this.waitTime;
            while ((obj = StreamingContainerManager.this.commandResponse.getIfPresent((Object)this.requestId)) == null && expiryTime > System.currentTimeMillis()) {
                Thread.sleep(100L);
                LOG.debug("Polling for a response to request with Id {}", (Object)this.requestId);
            }
            if (obj != null) {
                StreamingContainerManager.this.commandResponse.invalidate((Object)this.requestId);
                return obj;
            }
            return null;
        }
    }

    public static interface RecoveryHandler {
        public void save(Object var1) throws IOException;

        public Object restore() throws IOException;

        public DataOutputStream rotateLog() throws IOException;

        public DataInputStream getLog() throws IOException;
    }

    static class CheckpointState
    implements Serializable {
        private static final long serialVersionUID = 3827310557521807024L;
        private FinalVars finals;
        private PhysicalPlan physicalPlan;

        CheckpointState() {
        }

        public void setApplicationId(LogicalPlan newApp, Configuration conf) {
            LogicalPlan lp = this.physicalPlan.getLogicalPlan();
            String appId = (String)newApp.getValue(LogicalPlan.APPLICATION_ID);
            String oldAppId = (String)lp.getValue(LogicalPlan.APPLICATION_ID);
            if (oldAppId == null) {
                throw new AssertionError((Object)"Missing original application id");
            }
            lp.setAttribute(LogicalPlan.APPLICATION_ID, appId);
            lp.setAttribute(LogicalPlan.APPLICATION_PATH, newApp.assertAppPath());
            lp.setAttribute(Context.DAGContext.LIBRARY_JARS, newApp.getValue(Context.DAGContext.LIBRARY_JARS));
            lp.setAttribute(LogicalPlan.ARCHIVES, newApp.getValue(LogicalPlan.ARCHIVES));
            this.finals = new FinalVars(this.finals, lp);
            StorageAgent sa = (StorageAgent)lp.getValue(Context.OperatorContext.STORAGE_AGENT);
            if (sa instanceof AsyncFSStorageAgent) {
                AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa;
                if (fssa.path.contains(oldAppId)) {
                    fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
                    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, fssa);
                }
            } else if (sa instanceof FSStorageAgent) {
                FSStorageAgent fssa = (FSStorageAgent)sa;
                if (fssa.path.contains(oldAppId)) {
                    fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf);
                    lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, fssa);
                }
            }
        }
    }

    private static class FinalVars
    implements Serializable {
        private static final long serialVersionUID = 3827310557521807024L;
        private final long windowStartMillis;
        private final int heartbeatTimeoutMillis;
        private final String appPath;
        private final int maxWindowsBehindForStats;
        private final boolean enableStatsRecording;
        private final int rpcLatencyCompensationSamples;

        private FinalVars(LogicalPlan dag, long tms) {
            Attribute.AttributeMap attributes = dag.getAttributes();
            this.windowStartMillis = tms - tms % 1000L;
            if (attributes.get(LogicalPlan.APPLICATION_PATH) == null) {
                throw new IllegalArgumentException("Not set: " + LogicalPlan.APPLICATION_PATH);
            }
            this.appPath = (String)attributes.get(LogicalPlan.APPLICATION_PATH);
            if (attributes.get(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS) == null) {
                attributes.put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, (Object)500);
            }
            if (attributes.get(LogicalPlan.CHECKPOINT_WINDOW_COUNT) == null) {
                attributes.put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, (Object)(30000 / (Integer)attributes.get(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)));
            }
            this.heartbeatTimeoutMillis = (Integer)dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS);
            this.maxWindowsBehindForStats = (Integer)dag.getValue(LogicalPlan.STATS_MAX_ALLOWABLE_WINDOWS_LAG);
            this.enableStatsRecording = (Boolean)dag.getValue(LogicalPlan.ENABLE_STATS_RECORDING);
            this.rpcLatencyCompensationSamples = (Integer)dag.getValue(LogicalPlan.RPC_LATENCY_COMPENSATION_SAMPLES);
        }

        private FinalVars(FinalVars other, LogicalPlan dag) {
            this.windowStartMillis = other.windowStartMillis;
            this.heartbeatTimeoutMillis = other.heartbeatTimeoutMillis;
            this.maxWindowsBehindForStats = other.maxWindowsBehindForStats;
            this.enableStatsRecording = other.enableStatsRecording;
            this.appPath = (String)dag.getValue(LogicalPlan.APPLICATION_PATH);
            this.rpcLatencyCompensationSamples = other.rpcLatencyCompensationSamples;
        }
    }

    private class LogicalPlanChangeRunnable
    implements Callable<Object> {
        final List<LogicalPlanRequest> requests;

        private LogicalPlanChangeRunnable(List<LogicalPlanRequest> requests) {
            this.requests = requests;
        }

        @Override
        public Object call() throws Exception {
            LOG.info("Begin plan changes: {}", this.requests);
            LogicalPlan lp = StreamingContainerManager.this.plan.getLogicalPlan();
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            LogicalPlan.write(lp, bos);
            bos.flush();
            ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
            lp = LogicalPlan.read(bis);
            PlanModifier pm = new PlanModifier(lp);
            for (LogicalPlanRequest request : this.requests) {
                LOG.debug("Dry run plan change: {}", (Object)request);
                request.execute(pm);
            }
            lp.validate();
            pm = new PlanModifier(StreamingContainerManager.this.plan);
            for (LogicalPlanRequest request : this.requests) {
                request.execute(pm);
                StreamingContainerManager.this.recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request));
            }
            pm.applyChanges(StreamingContainerManager.this);
            LOG.info("Plan changes applied: {}", this.requests);
            return null;
        }
    }

    private class SetOperatorPropertyRequestFilter
    implements Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> {
        final String propertyKey;

        SetOperatorPropertyRequestFilter(String key) {
            this.propertyKey = key;
        }

        public boolean apply(@Nullable StreamingContainerUmbilicalProtocol.StramToNodeRequest input) {
            if (input == null) {
                return false;
            }
            if (input instanceof StramToNodeSetPropertyRequest) {
                return ((StramToNodeSetPropertyRequest)input).getPropertyKey().equals(this.propertyKey);
            }
            return false;
        }
    }

    private static class RecordingRequestFilter
    implements Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> {
        static final Set<StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType> MATCH_TYPES = Sets.newHashSet((Object[])new StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType[]{StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.START_RECORDING, StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING, StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.SYNC_RECORDING});

        private RecordingRequestFilter() {
        }

        public boolean apply(@Nullable StreamingContainerUmbilicalProtocol.StramToNodeRequest input) {
            return input != null && MATCH_TYPES.contains((Object)input.getRequestType());
        }
    }

    public static class UpdateCheckpointsContext {
        public final MutableLong committedWindowId = new MutableLong(Long.MAX_VALUE);
        public final Set<PTOperator> visited = new LinkedHashSet<PTOperator>();
        public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
        public final long currentTms;
        public final boolean recovery;
        public final Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> checkpointGroups;

        public UpdateCheckpointsContext(Clock clock) {
            this(clock, false, Collections.emptyMap());
        }

        public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> checkpointGroups) {
            this.currentTms = clock.getTime();
            this.recovery = recovery;
            this.checkpointGroups = checkpointGroups;
        }
    }

    static class UpdateOperatorLatencyContext {
        Map<String, MovingAverage.MovingAverageLong> rpcLatencies;
        Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;

        UpdateOperatorLatencyContext() {
        }

        UpdateOperatorLatencyContext(Map<String, MovingAverage.MovingAverageLong> rpcLatencies, Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap) {
            this.rpcLatencies = rpcLatencies;
            this.endWindowStatsOperatorMap = endWindowStatsOperatorMap;
        }

        long getRPCLatency(PTOperator oper) {
            MovingAverage.MovingAverageLong rpcLatency = this.rpcLatencies.get(oper.getContainer().getExternalId());
            return rpcLatency == null ? 0L : rpcLatency.getAvg();
        }

        boolean endWindowStatsExists(long windowId) {
            return this.endWindowStatsOperatorMap.containsKey(windowId);
        }

        long getEndWindowEmitTimestamp(long windowId, PTOperator oper) {
            Map<Integer, EndWindowStats> endWindowStatsMap = this.endWindowStatsOperatorMap.get(windowId);
            if (endWindowStatsMap == null) {
                return -1L;
            }
            EndWindowStats ews = endWindowStatsMap.get(oper.getId());
            if (ews == null) {
                return -1L;
            }
            return ews.emitTimestamp;
        }
    }

    public static class ContainerResource {
        public final String containerId;
        public final String host;
        public final int memoryMB;
        public final int vCores;
        public final int priority;
        public final String nodeHttpAddress;

        public ContainerResource(int priority, String containerId, String host, int memoryMB, int vCores, String nodeHttpAddress) {
            this.containerId = containerId;
            this.host = host;
            this.memoryMB = memoryMB;
            this.vCores = vCores;
            this.priority = priority;
            this.nodeHttpAddress = nodeHttpAddress;
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("containerId", (Object)this.containerId).append("host", (Object)this.host).append("memoryMB", this.memoryMB).toString();
        }
    }

    private static class SetPhysicalOperatorProperty
    implements Journal.Recoverable {
        private final int operatorId;
        private final String propertyName;
        private final String propertyValue;

        private SetPhysicalOperatorProperty() {
            this(-1, null, null);
        }

        private SetPhysicalOperatorProperty(int operatorId, String propertyName, String propertyValue) {
            this.operatorId = operatorId;
            this.propertyName = propertyName;
            this.propertyValue = propertyValue;
        }

        @Override
        public void read(Object object, Input in) throws KryoException {
            StreamingContainerManager scm = (StreamingContainerManager)object;
            int operatorId = in.readInt();
            String propertyName = in.readString();
            String propertyValue = in.readString();
            PTOperator o = scm.plan.getAllOperators().get(operatorId);
            if (o == null) {
                throw new IllegalArgumentException("Unknown physical operator " + operatorId);
            }
            scm.setPhysicalOperatorProperty(o, propertyName, propertyValue);
        }

        @Override
        public void write(Output out) throws KryoException {
            out.writeInt(this.operatorId);
            out.writeString(this.propertyName);
            out.writeString(this.propertyValue);
        }
    }

    private static class SetOperatorProperty
    implements Journal.Recoverable {
        private final String operatorName;
        private final String propertyName;
        private final String propertyValue;

        private SetOperatorProperty() {
            this(null, null, null);
        }

        private SetOperatorProperty(String operatorName, String propertyName, String propertyValue) {
            this.operatorName = operatorName;
            this.propertyName = propertyName;
            this.propertyValue = propertyValue;
        }

        @Override
        public void read(Object object, Input in) throws KryoException {
            StreamingContainerManager scm = (StreamingContainerManager)object;
            String operatorName = in.readString();
            String propertyName = in.readString();
            String propertyValue = in.readString();
            LogicalPlan.OperatorMeta logicalOperator = scm.plan.getLogicalPlan().getOperatorMeta(operatorName);
            if (logicalOperator == null) {
                throw new IllegalArgumentException("Unknown operator " + operatorName);
            }
            scm.setOperatorProperty(logicalOperator, propertyName, propertyValue);
        }

        @Override
        public void write(Output out) throws KryoException {
            out.writeString(this.operatorName);
            out.writeString(this.propertyName);
            out.writeString(this.propertyValue);
        }
    }

    public static class CriticalPathInfo {
        long latency;
        final LinkedList<Integer> path;

        public CriticalPathInfo() {
            this.path = new LinkedList();
        }

        private CriticalPathInfo(long latency, LinkedList<Integer> path) {
            this.latency = latency;
            this.path = path;
        }

        protected Object clone() throws CloneNotSupportedException {
            return new CriticalPathInfo(this.latency, (LinkedList)this.path.clone());
        }
    }

    static class EndWindowStats {
        long emitTimestamp = -1L;
        HashMap<String, Long> dequeueTimestamps = new HashMap();
        Object counters;
        Map<String, Object> metrics;

        EndWindowStats() {
        }
    }
}

