/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.plan.physical;

import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.Journal;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.StreamMapping;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalPlan
implements Serializable {
    private static final long serialVersionUID = 201312112033L;
    private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlan.class);
    private final AtomicInteger idSequence = new AtomicInteger();
    final AtomicInteger containerSeq = new AtomicInteger();
    private LinkedHashMap<LogicalPlan.OperatorMeta, PMapping> logicalToPTOperator = new LinkedHashMap();
    private final List<PTContainer> containers = new CopyOnWriteArrayList<PTContainer>();
    private final LogicalPlan dag;
    private final transient PlanContext ctx;
    private int maxContainers = 1;
    private int availableMemoryMB = Integer.MAX_VALUE;
    private final LocalityPrefs localityPrefs = new LocalityPrefs();
    private final LocalityPrefs inlinePrefs = new LocalityPrefs();
    final Set<PTOperator> deployOpers = Sets.newHashSet();
    final Map<PTOperator, Operator> newOpers = Maps.newHashMap();
    final Set<PTOperator> undeployOpers = Sets.newHashSet();
    final ConcurrentMap<Integer, PTOperator> allOperators = Maps.newConcurrentMap();
    private final ConcurrentMap<LogicalPlan.OperatorMeta, LogicalPlan.OperatorMeta> pendingRepartition = Maps.newConcurrentMap();
    private final AtomicInteger strCodecIdSequence = new AtomicInteger();
    private final Map<StreamCodec<?>, Integer> streamCodecIdentifiers = Maps.newHashMap();

    private PTContainer getContainer(int index) {
        if (index >= this.containers.size()) {
            if (index >= this.maxContainers) {
                index = this.maxContainers - 1;
            }
            for (int i = this.containers.size(); i < index + 1; ++i) {
                this.containers.add(i, new PTContainer(this));
            }
        }
        return this.containers.get(index);
    }

    public PhysicalPlan(LogicalPlan dag, PlanContext ctx) {
        this.dag = dag;
        this.ctx = ctx;
        this.maxContainers = Math.max(dag.getMaxContainerCount(), 1);
        LOG.debug("Max containers: {}", (Object)this.maxContainers);
        Stack<LogicalPlan.OperatorMeta> pendingNodes = new Stack<LogicalPlan.OperatorMeta>();
        this.updatePersistOperatorStreamCodec(dag);
        for (LogicalPlan.OperatorMeta n : dag.getAllOperators()) {
            pendingNodes.push(n);
        }
        while (!pendingNodes.isEmpty()) {
            LogicalPlan.OperatorMeta n = (LogicalPlan.OperatorMeta)pendingNodes.pop();
            if (this.logicalToPTOperator.containsKey(n)) continue;
            boolean upstreamDeployed = true;
            for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : n.getInputStreams().entrySet()) {
                LogicalPlan.StreamMeta s = entry.getValue();
                boolean bl = entry.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
                if (bl || s.getSource() == null || this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) continue;
                pendingNodes.push(n);
                pendingNodes.push(s.getSource().getOperatorMeta());
                upstreamDeployed = false;
                break;
            }
            if (!upstreamDeployed) continue;
            this.addLogicalOperator(n);
        }
        AffinityRulesSet affinityRuleSet = (AffinityRulesSet)dag.getAttributes().get(Context.DAGContext.AFFINITY_RULES_SET);
        if (affinityRuleSet != null && affinityRuleSet.getAffinityRules() != null) {
            for (AffinityRule rule : affinityRuleSet.getAffinityRules()) {
                if (rule.getOperatorsList() == null) continue;
                for (int i = 0; i < rule.getOperatorsList().size() - 1; ++i) {
                    for (int j = i + 1; j < rule.getOperatorsList().size(); ++j) {
                        LogicalPlan.OperatorPair operatorPair = new LogicalPlan.OperatorPair((String)rule.getOperatorsList().get(i), (String)rule.getOperatorsList().get(j));
                        PMapping firstPMapping = this.logicalToPTOperator.get(dag.getOperatorMeta((String)operatorPair.first));
                        LogicalPlan.OperatorMeta opMeta = dag.getOperatorMeta((String)operatorPair.second);
                        PMapping secondMapping = this.logicalToPTOperator.get(opMeta);
                        if (rule.getType() != AffinityRule.Type.AFFINITY) continue;
                        if (DAG.Locality.CONTAINER_LOCAL == rule.getLocality()) {
                            this.inlinePrefs.setLocal(firstPMapping, secondMapping);
                        } else if (DAG.Locality.NODE_LOCAL == rule.getLocality()) {
                            this.localityPrefs.setLocal(firstPMapping, secondMapping);
                        }
                        for (PTOperator ptOperator : firstPMapping.partitions) {
                            this.setLocalityGrouping(firstPMapping, ptOperator, this.inlinePrefs, DAG.Locality.CONTAINER_LOCAL, null);
                            this.setLocalityGrouping(firstPMapping, ptOperator, this.localityPrefs, DAG.Locality.NODE_LOCAL, null);
                        }
                    }
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            for (LogicalPlan.OperatorMeta operator : dag.getAllOperators()) {
                PMapping mapping = this.logicalToPTOperator.get(operator);
                if (mapping == null) continue;
                for (PTOperator pTOperator : mapping.partitions) {
                    ArrayList<String> operators = new ArrayList<String>();
                    for (PTOperator op : pTOperator.getGrouping(DAG.Locality.CONTAINER_LOCAL).getOperatorSet()) {
                        operators.add(op.getLogicalId());
                    }
                    LOG.debug("Operator {} Partition {} CONTAINER LOCAL Operator set  = {}", new Object[]{operator.getName(), pTOperator.id, StringUtils.join(operators, (String)",")});
                    operators.clear();
                    for (PTOperator op : pTOperator.getGrouping(DAG.Locality.NODE_LOCAL).getOperatorSet()) {
                        operators.add(op.getLogicalId());
                    }
                    LOG.debug("Operator {} Partition {} NODE LOCAL Operator set  = {}", new Object[]{operator.getName(), pTOperator.id, StringUtils.join(operators, (String)",")});
                }
            }
        }
        this.updatePartitionsInfoForPersistOperator(dag);
        HashMap<PTOperator, PTContainer> operatorContainerMap = new HashMap<PTOperator, PTContainer>();
        int groupCount = 0;
        HashSet deployOperators = Sets.newHashSet();
        for (Map.Entry<LogicalPlan.OperatorMeta, PMapping> entry : this.logicalToPTOperator.entrySet()) {
            for (PTOperator oper : entry.getValue().getAllOperators()) {
                Set<PTOperator> inlineSet;
                if (oper.container != null) continue;
                PTContainer container = this.getContainer(groupCount++ % this.maxContainers);
                if (!container.operators.isEmpty()) {
                    LOG.warn("Operator {} shares container without locality contraint due to insufficient resources.", (Object)oper);
                }
                if (!(inlineSet = oper.getGrouping(DAG.Locality.CONTAINER_LOCAL).getOperatorSet()).isEmpty()) {
                    for (PTOperator inlineOper : inlineSet) {
                        this.setContainer(inlineOper, container);
                        operatorContainerMap.put(inlineOper, container);
                    }
                } else {
                    this.setContainer(oper, container);
                }
                operatorContainerMap.put(oper, container);
                deployOperators.addAll(container.operators);
            }
        }
        for (PTContainer pTContainer : this.containers) {
            this.updateContainerMemoryWithBufferServer(pTContainer);
            pTContainer.setRequiredVCores(this.getVCores(pTContainer.getOperators()));
        }
        if (affinityRuleSet != null && affinityRuleSet.getAffinityRules() != null) {
            this.setAntiAffinityForContainers(dag, affinityRuleSet.getAffinityRules(), operatorContainerMap);
        }
        if (LOG.isDebugEnabled()) {
            for (PTContainer pTContainer : this.containers) {
                ArrayList<String> antiOperators = new ArrayList<String>();
                for (PTContainer c : pTContainer.getStrictAntiPrefs()) {
                    for (PTOperator operator : c.getOperators()) {
                        antiOperators.add(operator.getName());
                    }
                }
                ArrayList<String> containerOperators = new ArrayList<String>();
                for (PTOperator operator : pTContainer.getOperators()) {
                    containerOperators.add(operator.getName());
                }
                LOG.debug("Container with operators [{}] has anti affinity with [{}]", (Object)StringUtils.join(containerOperators, (String)","), (Object)StringUtils.join(antiOperators, (String)","));
            }
        }
        for (Map.Entry entry : this.newOpers.entrySet()) {
            this.initCheckpoint((PTOperator)entry.getKey(), (Operator)entry.getValue(), Checkpoint.INITIAL_CHECKPOINT);
        }
        ctx.deploy(Collections.emptySet(), Collections.emptySet(), Sets.newHashSet(this.containers), deployOperators);
        this.newOpers.clear();
        this.deployOpers.clear();
        this.undeployOpers.clear();
    }

    public void setAntiAffinityForContainers(LogicalPlan dag, Collection<AffinityRule> affinityRules, Map<PTOperator, PTContainer> operatorContainerMap) {
        for (AffinityRule rule : affinityRules) {
            if (rule.getOperatorsList() == null || rule.getType() != AffinityRule.Type.ANTI_AFFINITY) continue;
            for (int i = 0; i < rule.getOperatorsList().size() - 1; ++i) {
                for (int j = i + 1; j < rule.getOperatorsList().size(); ++j) {
                    LogicalPlan.OperatorPair operators = new LogicalPlan.OperatorPair((String)rule.getOperatorsList().get(i), (String)rule.getOperatorsList().get(j));
                    PMapping firstPMapping = this.logicalToPTOperator.get(dag.getOperatorMeta((String)operators.first));
                    LogicalPlan.OperatorMeta opMeta = dag.getOperatorMeta((String)operators.second);
                    PMapping secondMapping = this.logicalToPTOperator.get(opMeta);
                    for (PTOperator firstPtOperator : firstPMapping.partitions) {
                        PTContainer firstContainer = operatorContainerMap.get(firstPtOperator);
                        for (PTOperator secondPtOperator : secondMapping.partitions) {
                            PTContainer secondContainer = operatorContainerMap.get(secondPtOperator);
                            if (firstContainer == secondContainer || firstContainer.getStrictAntiPrefs().contains(secondContainer)) continue;
                            if (rule.isRelaxLocality()) {
                                firstContainer.getPreferredAntiPrefs().add(secondContainer);
                                secondContainer.getPreferredAntiPrefs().add(firstContainer);
                                continue;
                            }
                            firstContainer.getStrictAntiPrefs().add(secondContainer);
                            secondContainer.getStrictAntiPrefs().add(firstContainer);
                        }
                    }
                }
            }
        }
    }

    private void updatePartitionsInfoForPersistOperator(LogicalPlan dag) {
        try {
            for (LogicalPlan.OperatorMeta n : dag.getAllOperators()) {
                for (LogicalPlan.StreamMeta s : n.getOutputStreams().values()) {
                    if (s.getPersistOperator() != null) {
                        LogicalPlan.InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
                        StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance)persistInputPort.getAttributes().get(Context.PortContext.STREAM_CODEC);
                        if (persistCodec == null) continue;
                        for (LogicalPlan.InputPortMeta portMeta : s.getSinksToPersist()) {
                            this.updatePersistOperatorWithSinkPartitions(persistInputPort, s.getPersistOperator(), persistCodec, portMeta);
                        }
                    }
                    for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
                        LogicalPlan.InputPortMeta persistInputPort = entry.getValue();
                        StreamCodec codec = (StreamCodec)persistInputPort.getAttributes().get(Context.PortContext.STREAM_CODEC);
                        if (codec == null || !(codec instanceof StreamCodecWrapperForPersistance)) continue;
                        StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance)codec;
                        this.updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
                    }
                }
            }
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void updatePersistOperatorWithSinkPartitions(LogicalPlan.InputPortMeta persistInputPort, LogicalPlan.OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, LogicalPlan.InputPortMeta sinkPortMeta) {
        List<PTOperator> ptOperators = this.getOperators(sinkPortMeta.getOperatorWrapper());
        ArrayList<Partitioner.PartitionKeys> partitionKeysList = new ArrayList<Partitioner.PartitionKeys>();
        for (PTOperator p : ptOperators) {
            Partitioner.PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
            partitionKeysList.add(keys);
        }
        persistCodec.inputPortToPartitionMap.put(sinkPortMeta, partitionKeysList);
    }

    private void updatePersistOperatorStreamCodec(LogicalPlan dag) {
        HashMap streamMetaToCodecMap = new HashMap();
        try {
            for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
                for (LogicalPlan.StreamMeta s : operatorMeta.getOutputStreams().values()) {
                    if (s.getPersistOperator() == null) continue;
                    HashMap<LogicalPlan.InputPortMeta, StreamCodec<Object>> inputStreamCodecs = new HashMap<LogicalPlan.InputPortMeta, StreamCodec<Object>>();
                    for (LogicalPlan.InputPortMeta portMeta : s.getSinksToPersist()) {
                        Operator.InputPort<?> port = portMeta.getPortObject();
                        StreamCodec inputStreamCodec = portMeta.getValue(Context.PortContext.STREAM_CODEC) != null ? (StreamCodec)portMeta.getValue(Context.PortContext.STREAM_CODEC) : port.getStreamCodec();
                        if (inputStreamCodec == null) continue;
                        boolean alreadyAdded = false;
                        for (StreamCodec codec : inputStreamCodecs.values()) {
                            if (!inputStreamCodec.equals(codec)) continue;
                            alreadyAdded = true;
                            break;
                        }
                        if (alreadyAdded) continue;
                        inputStreamCodecs.put(portMeta, (StreamCodec<Object>)inputStreamCodec);
                    }
                    if (inputStreamCodecs.isEmpty()) continue;
                    StreamCodec specifiedCodecForLogger = s.getPersistOperatorInputPort().getValue(Context.PortContext.STREAM_CODEC) != null ? (StreamCodec)s.getPersistOperatorInputPort().getValue(Context.PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
                    StreamCodecWrapperForPersistance codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, (StreamCodec<Object>)specifiedCodecForLogger);
                    streamMetaToCodecMap.put(s, codec);
                }
            }
            for (Map.Entry entry : streamMetaToCodecMap.entrySet()) {
                dag.setInputPortAttribute(((LogicalPlan.StreamMeta)entry.getKey()).getPersistOperatorInputPort().getPortObject(), Context.PortContext.STREAM_CODEC, entry.getValue());
            }
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void setContainer(PTOperator pOperator, PTContainer container) {
        LOG.debug("Setting container {} for {}", (Object)container, (Object)pOperator);
        assert (pOperator.container == null) : "Container already assigned for " + pOperator;
        pOperator.container = container;
        container.operators.add(pOperator);
        int upStreamUnifierMemory = 0;
        if (!pOperator.upstreamMerge.isEmpty()) {
            for (Map.Entry<LogicalPlan.InputPortMeta, PTOperator> mEntry : pOperator.upstreamMerge.entrySet()) {
                assert (mEntry.getValue().container == null) : "Container already assigned for " + mEntry.getValue();
                mEntry.getValue().container = container;
                container.operators.add(mEntry.getValue());
                upStreamUnifierMemory += ((Integer)mEntry.getValue().getOperatorMeta().getValue(Context.OperatorContext.MEMORY_MB)).intValue();
            }
        }
        int memoryMB = (Integer)pOperator.getOperatorMeta().getValue(Context.OperatorContext.MEMORY_MB) + upStreamUnifierMemory;
        container.setRequiredMemoryMB(container.getRequiredMemoryMB() + memoryMB);
    }

    private void updateContainerMemoryWithBufferServer(PTContainer container) {
        int bufferServerMemory = 0;
        for (PTOperator operator : container.getOperators()) {
            bufferServerMemory += operator.getBufferServerMemory();
        }
        container.setRequiredMemoryMB(container.getRequiredMemoryMB() + bufferServerMemory);
    }

    private int getVCores(Collection<PTOperator> operators) {
        HashMap<PTOperator, Set> groupMap = new HashMap<PTOperator, Set>();
        for (PTOperator operator : operators) {
            HashSet<PTOperator> group = new HashSet<PTOperator>();
            group.add(operator);
            groupMap.put(operator, group);
        }
        int vCores = 0;
        for (PTOperator operator : operators) {
            Set<PTOperator> threadLocal = operator.getThreadLocalOperators();
            if (threadLocal == null) continue;
            Set group = (Set)groupMap.get(operator);
            for (PTOperator operator1 : threadLocal) {
                group.addAll((Collection)groupMap.get(operator1));
            }
            for (PTOperator operator1 : group) {
                groupMap.put(operator1, group);
            }
        }
        HashSet visitedOperators = new HashSet();
        for (Map.Entry group : groupMap.entrySet()) {
            if (visitedOperators.contains(group.getKey())) continue;
            visitedOperators.addAll((Collection)group.getValue());
            int tempCores = 0;
            for (PTOperator operator : (Set)group.getValue()) {
                tempCores = Math.max(tempCores, (Integer)operator.getOperatorMeta().getValue(Context.OperatorContext.VCORES));
            }
            vCores += tempCores;
        }
        return vCores;
    }

    private void initPartitioning(PMapping m, int partitionCnt) {
        Collection statsListeners;
        Collection partitions;
        Operator operator = m.logicalOperator.getOperator();
        Partitioner partitioner = m.logicalOperator.getAttributes().contains(Context.OperatorContext.PARTITIONER) ? (Partitioner)m.logicalOperator.getValue(Context.OperatorContext.PARTITIONER) : (operator instanceof Partitioner ? (Partitioner)operator : null);
        ArrayList<DefaultPartition> collection = new ArrayList<DefaultPartition>(1);
        DefaultPartition firstPartition = new DefaultPartition((Object)operator);
        collection.add(firstPartition);
        if (partitioner != null) {
            partitions = partitioner.definePartitions(collection, (Partitioner.PartitioningContext)new PartitioningContextImpl(m, partitionCnt));
            if (partitions == null || partitions.isEmpty()) {
                throw new IllegalStateException("Partitioner returns null or empty.");
            }
        } else {
            for (int partitionCounter = 0; partitionCounter < partitionCnt - 1; ++partitionCounter) {
                collection.add(firstPartition);
            }
            partitions = collection;
        }
        if ((statsListeners = (Collection)m.logicalOperator.getValue(Context.OperatorContext.STATS_LISTENERS)) != null && !statsListeners.isEmpty()) {
            if (m.statsHandlers == null) {
                m.statsHandlers = new ArrayList(statsListeners.size());
            }
            m.statsHandlers.addAll(statsListeners);
        }
        if (m.logicalOperator.getOperator() instanceof StatsListener) {
            if (m.statsHandlers == null) {
                m.statsHandlers = new ArrayList(1);
            }
            m.statsHandlers.add(new StatsListenerProxy(m.logicalOperator));
        }
        HashMap operatorIdToPartition = Maps.newHashMapWithExpectedSize((int)partitions.size());
        for (Partitioner.Partition partition : partitions) {
            PTOperator p = this.addPTOperator(m, (Partitioner.Partition<? extends Operator>)partition, null);
            operatorIdToPartition.put(p.getId(), partition);
        }
        if (partitioner != null) {
            partitioner.partitioned((Map)operatorIdToPartition);
        }
    }

    private Partitioner<Operator> getPartitioner(PMapping currentMapping) {
        Operator operator = currentMapping.logicalOperator.getOperator();
        Partitioner partitioner = null;
        if (currentMapping.logicalOperator.getAttributes().contains(Context.OperatorContext.PARTITIONER)) {
            Partitioner tmp;
            partitioner = tmp = (Partitioner)currentMapping.logicalOperator.getValue(Context.OperatorContext.PARTITIONER);
        } else if (operator instanceof Partitioner) {
            Partitioner tmp;
            partitioner = tmp = (Partitioner)operator;
        }
        return partitioner;
    }

    private void redoPartitions(PMapping currentMapping, String note) {
        Partitioner<Operator> partitioner = this.getPartitioner(currentMapping);
        if (partitioner == null) {
            LOG.warn("No partitioner for {}", (Object)currentMapping.logicalOperator);
            return;
        }
        RepartitionContext mainPC = new RepartitionContext(partitioner, currentMapping, 0);
        if (mainPC.newPartitions.isEmpty()) {
            LOG.warn("Empty partition list after repartition: {}", (Object)currentMapping.logicalOperator);
            return;
        }
        int memoryPerPartition = (Integer)currentMapping.logicalOperator.getValue(Context.OperatorContext.MEMORY_MB);
        for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> stream : currentMapping.logicalOperator.getOutputStreams().entrySet()) {
            if (stream.getValue().getLocality() == DAG.Locality.THREAD_LOCAL || stream.getValue().getLocality() == DAG.Locality.CONTAINER_LOCAL) continue;
            memoryPerPartition += ((Integer)stream.getKey().getValue(Context.PortContext.BUFFER_MEMORY_MB)).intValue();
        }
        for (LogicalPlan.OperatorMeta pp : currentMapping.parallelPartitions) {
            for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : pp.getOutputStreams().entrySet()) {
                if (entry.getValue().getLocality() == DAG.Locality.THREAD_LOCAL || entry.getValue().getLocality() == DAG.Locality.CONTAINER_LOCAL) continue;
                memoryPerPartition += ((Integer)entry.getKey().getValue(Context.PortContext.BUFFER_MEMORY_MB)).intValue();
            }
            memoryPerPartition += ((Integer)pp.getValue(Context.OperatorContext.MEMORY_MB)).intValue();
        }
        int requiredMemoryMB = (mainPC.newPartitions.size() - mainPC.currentPartitions.size()) * memoryPerPartition;
        if (requiredMemoryMB > this.availableMemoryMB) {
            LOG.warn("Insufficient headroom for repartitioning: available {}m required {}m", (Object)this.availableMemoryMB, (Object)requiredMemoryMB);
            return;
        }
        ArrayList<Partitioner.Partition> addedPartitions = new ArrayList<Partitioner.Partition>();
        for (Partitioner.Partition partition : mainPC.newPartitions) {
            PTOperator op = mainPC.currentPartitionMap.remove(partition);
            if (op == null) {
                addedPartitions.add(partition);
                continue;
            }
            for (DefaultPartition<Operator> pi : mainPC.currentPartitions) {
                if (pi != partition || !pi.isModified()) continue;
                mainPC.currentPartitionMap.put(partition, op);
                addedPartitions.add(partition);
            }
        }
        this.undeployOpers.addAll(mainPC.currentPartitionMap.values());
        Set<PTOperator> deps = this.getDependents(mainPC.currentPartitionMap.values());
        this.undeployOpers.addAll(deps);
        this.deployOpers.addAll(deps);
        LinkedHashMap linkedHashMap = Maps.newLinkedHashMap();
        Stack<LogicalPlan.OperatorMeta> parallelPartitions = new Stack<LogicalPlan.OperatorMeta>();
        parallelPartitions.addAll(currentMapping.parallelPartitions);
        block5: while (!parallelPartitions.isEmpty()) {
            LogicalPlan.OperatorMeta ppMeta = (LogicalPlan.OperatorMeta)parallelPartitions.pop();
            for (LogicalPlan.StreamMeta streamMeta : ppMeta.getInputStreams().values()) {
                if (!currentMapping.parallelPartitions.contains(streamMeta.getSource().getOperatorMeta()) || !parallelPartitions.contains(streamMeta.getSource().getOperatorMeta())) continue;
                parallelPartitions.push(ppMeta);
                parallelPartitions.remove(streamMeta.getSource().getOperatorMeta());
                parallelPartitions.push(streamMeta.getSource().getOperatorMeta());
                continue block5;
            }
            LOG.debug("Processing parallel partition {}", (Object)ppMeta);
            PMapping ppm = this.logicalToPTOperator.get(ppMeta);
            Partitioner<Operator> partitioner2 = this.getPartitioner(ppm);
            if (partitioner2 == null) {
                linkedHashMap.put(ppm, null);
                continue;
            }
            RepartitionContext pc = new RepartitionContext(partitioner2, ppm, mainPC.newPartitions.size());
            if (pc.newPartitions == null) {
                throw new IllegalStateException("Partitioner returns null for parallel partition " + ppm.logicalOperator);
            }
            linkedHashMap.put(ppm, pc);
        }
        ArrayList copyPartitions = Lists.newArrayList((Iterable)currentMapping.partitions);
        for (PTOperator pTOperator : mainPC.currentPartitionMap.values()) {
            copyPartitions.remove(pTOperator);
            this.removePartition(pTOperator, currentMapping);
            mainPC.operatorIdToPartition.remove(pTOperator.getId());
        }
        currentMapping.partitions = copyPartitions;
        for (Partitioner.Partition partition : addedPartitions) {
            PTOperator p = this.addPTOperator(currentMapping, (Partitioner.Partition<? extends Operator>)partition, mainPC.minCheckpoint);
            mainPC.operatorIdToPartition.put(p.getId(), (Partitioner.Partition<Operator>)partition);
        }
        for (Map.Entry entry : linkedHashMap.entrySet()) {
            if (entry.getValue() == null) {
                for (int i = 0; i < addedPartitions.size(); ++i) {
                    LOG.debug("Automatically adding to parallel partition {}", entry.getKey());
                    this.addPTOperator((PMapping)entry.getKey(), null, mainPC.minCheckpoint);
                }
                continue;
            }
            RepartitionContext pc = (RepartitionContext)entry.getValue();
            HashMap prevMapping = Maps.newHashMap();
            for (int i = 0; i < mainPC.currentPartitions.size(); ++i) {
                prevMapping.put(pc.currentPartitions.get(i), mainPC.currentPartitions.get(i));
            }
            HashMap newMapping = Maps.newHashMap();
            Iterator<Partitioner.Partition<Operator>> itMain = mainPC.newPartitions.iterator();
            Iterator<Partitioner.Partition<Operator>> itParallel = pc.newPartitions.iterator();
            while (itMain.hasNext() && itParallel.hasNext()) {
                newMapping.put(itParallel.next(), itMain.next());
            }
            for (Partitioner.Partition<Operator> newPartition : pc.newPartitions) {
                PTOperator op = pc.currentPartitionMap.remove(newPartition);
                if (op == null) {
                    pc.addedPartitions.add(newPartition);
                    continue;
                }
                if (prevMapping.get(newPartition) != newMapping.get(newPartition)) {
                    pc.currentPartitionMap.put(newPartition, op);
                    pc.addedPartitions.add(newPartition);
                    continue;
                }
                for (DefaultPartition<Operator> pi : pc.currentPartitions) {
                    if (pi != newPartition || !pi.isModified()) continue;
                    mainPC.currentPartitionMap.put(newPartition, op);
                    pc.addedPartitions.add(newPartition);
                }
            }
            if (!pc.currentPartitionMap.isEmpty()) {
                ArrayList cowPartitions = Lists.newArrayList((Iterable)((PMapping)entry.getKey()).partitions);
                for (PTOperator p : pc.currentPartitionMap.values()) {
                    cowPartitions.remove(p);
                    this.removePartition(p, (PMapping)entry.getKey());
                    pc.operatorIdToPartition.remove(p.getId());
                }
                ((PMapping)entry.getKey()).partitions = cowPartitions;
            }
            for (Partitioner.Partition<Operator> newPartition : pc.addedPartitions) {
                PTOperator oper = this.addPTOperator((PMapping)entry.getKey(), newPartition, mainPC.minCheckpoint);
                pc.operatorIdToPartition.put(oper.getId(), newPartition);
            }
            this.getPartitioner((PMapping)entry.getKey()).partitioned(pc.operatorIdToPartition);
        }
        this.updateStreamMappings(currentMapping);
        for (PMapping pMapping : linkedHashMap.keySet()) {
            this.updateStreamMappings(pMapping);
        }
        this.deployChanges();
        if (mainPC.currentPartitions.size() != mainPC.newPartitions.size()) {
            StramEvent.PartitionEvent ev = new StramEvent.PartitionEvent(currentMapping.logicalOperator.getName(), mainPC.currentPartitions.size(), mainPC.newPartitions.size());
            ev.setReason(note);
            this.ctx.recordEventAsync(ev);
        }
        partitioner.partitioned(mainPC.operatorIdToPartition);
    }

    private void updateStreamMappings(PMapping m) {
        for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : m.logicalOperator.getOutputStreams().entrySet()) {
            StreamMapping ug = (StreamMapping)m.outputStreams.get(entry.getKey());
            if (ug == null) {
                ug = new StreamMapping(entry.getValue(), this);
                m.outputStreams.put(entry.getKey(), ug);
            }
            LOG.debug("update stream mapping for {} {}", (Object)entry.getKey().getOperatorMeta(), (Object)entry.getKey().getPortName());
            ug.setSources(m.partitions);
        }
        for (Map.Entry<Serializable, LogicalPlan.StreamMeta> entry : m.logicalOperator.getInputStreams().entrySet()) {
            PMapping sourceMapping = this.logicalToPTOperator.get(entry.getValue().getSource().getOperatorMeta());
            if (entry.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) continue;
            if (((Boolean)((LogicalPlan.InputPortMeta)entry.getKey()).getValue(Context.PortContext.PARTITION_PARALLEL)).booleanValue()) {
                if (sourceMapping.partitions.size() < m.partitions.size()) {
                    throw new AssertionError((Object)("Number of partitions don't match in parallel mapping " + sourceMapping.logicalOperator.getName() + " -> " + m.logicalOperator.getName() + ", " + sourceMapping.partitions.size() + " -> " + m.partitions.size()));
                }
                int slidingWindowCount = 0;
                LogicalPlan.OperatorMeta sourceOM = sourceMapping.logicalOperator;
                if (sourceOM.getAttributes().contains(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)) {
                    if ((Integer)sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT) < (Integer)sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
                        slidingWindowCount = (Integer)sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT);
                    } else {
                        LOG.warn("Sliding Window Count {} should be less than APPLICATION WINDOW COUNT {}", sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT), sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
                    }
                }
                for (int i = 0; i < m.partitions.size(); ++i) {
                    PTOperator oper = (PTOperator)m.partitions.get(i);
                    PTOperator sourceOper = (PTOperator)sourceMapping.partitions.get(i);
                    block3: for (PTOperator.PTOutput sourceOut : sourceOper.outputs) {
                        PTOperator.PTInput input;
                        if (sourceOut.logicalStream != entry.getValue()) continue;
                        for (PTOperator.PTInput sinkIn : sourceOut.sinks) {
                            if (sinkIn.target != oper || !sinkIn.portName.equals(((LogicalPlan.InputPortMeta)entry.getKey()).getPortName())) continue;
                            continue block3;
                        }
                        if (slidingWindowCount > 0) {
                            PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this, (Integer)sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                            StreamMapping.addInput(slidingUnifier, sourceOut, null);
                            input = new PTOperator.PTInput(((LogicalPlan.InputPortMeta)entry.getKey()).getPortName(), entry.getValue(), oper, null, slidingUnifier.outputs.get(0), ((LogicalPlan.InputPortMeta)entry.getKey()).getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                            ((StreamMapping)((PMapping)sourceMapping).outputStreams.get((Object)entry.getValue().getSource())).slidingUnifiers.add(slidingUnifier);
                        } else {
                            input = new PTOperator.PTInput(((LogicalPlan.InputPortMeta)entry.getKey()).getPortName(), entry.getValue(), oper, null, sourceOut, ((LogicalPlan.InputPortMeta)entry.getKey()).getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                        }
                        oper.inputs.add(input);
                    }
                }
                continue;
            }
            StreamMapping ug = (StreamMapping)sourceMapping.outputStreams.get(entry.getValue().getSource());
            if (ug == null) {
                ug = new StreamMapping(entry.getValue(), this);
                m.outputStreams.put(entry.getValue().getSource(), ug);
            }
            LOG.debug("update upstream stream mapping for {} {}", (Object)sourceMapping.logicalOperator, (Object)entry.getValue().getSource().getPortName());
            ug.setSources(sourceMapping.partitions);
        }
    }

    public void deployChanges() {
        HashSet newContainers = Sets.newHashSet();
        HashSet releaseContainers = Sets.newHashSet();
        this.assignContainers(newContainers, releaseContainers);
        this.updatePartitionsInfoForPersistOperator(this.dag);
        Set<PTOperator> ndeps = this.getDependents(this.newOpers.keySet());
        this.undeployOpers.addAll(ndeps);
        this.undeployOpers.removeAll(this.newOpers.keySet());
        Set<PTOperator> deployOperators = this.getDependents(this.deployOpers);
        deployOperators.addAll(ndeps);
        this.deployOpers.addAll(this.newOpers.keySet());
        this.ctx.deploy(releaseContainers, this.undeployOpers, newContainers, deployOperators);
        this.newOpers.clear();
        this.deployOpers.clear();
        this.undeployOpers.clear();
    }

    private void assignContainers(Set<PTContainer> newContainers, Set<PTContainer> releaseContainers) {
        HashSet mxnUnifiers = Sets.newHashSet();
        for (PTOperator o : this.newOpers.keySet()) {
            mxnUnifiers.addAll(o.upstreamMerge.values());
        }
        HashSet updatedContainers = Sets.newHashSet();
        HashMap operatorContainerMap = Maps.newHashMap();
        for (Map.Entry<PTOperator, Operator> operEntry : this.newOpers.entrySet()) {
            PTOperator oper = operEntry.getKey();
            Checkpoint checkpoint = this.getActivationCheckpoint(operEntry.getKey());
            this.initCheckpoint(oper, operEntry.getValue(), checkpoint);
            if (mxnUnifiers.contains(operEntry.getKey())) continue;
            PTContainer newContainer = null;
            int memoryMB = 0;
            for (PTOperator inlineOper : oper.getGrouping(DAG.Locality.CONTAINER_LOCAL).getOperatorSet()) {
                if (inlineOper.container != null) {
                    newContainer = inlineOper.container;
                    break;
                }
                memoryMB += ((Integer)inlineOper.operatorMeta.getValue(Context.OperatorContext.MEMORY_MB)).intValue();
                memoryMB += inlineOper.getBufferServerMemory();
            }
            if (newContainer == null) {
                int vCores = this.getVCores(oper.getGrouping(DAG.Locality.CONTAINER_LOCAL).getOperatorSet());
                for (PTContainer c : this.containers) {
                    if (!c.operators.isEmpty() || c.getState() != PTContainer.State.ACTIVE || c.getAllocatedMemoryMB() != memoryMB || c.getAllocatedVCores() != vCores) continue;
                    LOG.debug("Reusing existing container {} for {}", (Object)c, (Object)oper);
                    c.setRequiredMemoryMB(0);
                    c.setRequiredVCores(0);
                    newContainer = c;
                    break;
                }
                if (newContainer == null) {
                    LOG.debug("New container for: " + oper);
                    newContainer = new PTContainer(this);
                    newContainers.add(newContainer);
                    this.containers.add(newContainer);
                }
                updatedContainers.add(newContainer);
            }
            this.setContainer(oper, newContainer);
        }
        for (PTContainer c : this.containers) {
            if (c.operators.isEmpty()) {
                LOG.debug("Container {} to be released", (Object)c);
                releaseContainers.add(c);
                this.containers.remove(c);
                continue;
            }
            for (PTOperator oper : c.operators) {
                operatorContainerMap.put(oper, c);
            }
            c.getStrictAntiPrefs().clear();
            c.getPreferredAntiPrefs().clear();
        }
        for (PTContainer c : updatedContainers) {
            this.updateContainerMemoryWithBufferServer(c);
            c.setRequiredVCores(this.getVCores(c.getOperators()));
        }
        AffinityRulesSet affinityRuleSet = (AffinityRulesSet)this.dag.getAttributes().get(Context.DAGContext.AFFINITY_RULES_SET);
        if (affinityRuleSet != null && affinityRuleSet.getAffinityRules() != null) {
            this.setAntiAffinityForContainers(this.dag, affinityRuleSet.getAffinityRules(), operatorContainerMap);
        }
    }

    private void initCheckpoint(PTOperator oper, Operator oo, Checkpoint checkpoint) {
        try {
            AsyncFSStorageAgent asyncFSStorageAgent;
            LOG.debug("Writing activation checkpoint {} {} {}", new Object[]{checkpoint, oper, oo});
            long windowId = oper.isOperatorStateLess() ? -1L : checkpoint.windowId;
            StorageAgent agent = (StorageAgent)oper.operatorMeta.getValue(Context.OperatorContext.STORAGE_AGENT);
            agent.save((Object)oo, oper.id, windowId);
            if (agent instanceof AsyncFSStorageAgent && !(asyncFSStorageAgent = (AsyncFSStorageAgent)agent).isSyncCheckpoint()) {
                asyncFSStorageAgent.copyToHDFS(oper.id, windowId);
            }
        }
        catch (IOException e) {
            throw new IllegalStateException("Failed to write operator state after partition change " + oper, e);
        }
        oper.setRecoveryCheckpoint(checkpoint);
        if (!Checkpoint.INITIAL_CHECKPOINT.equals(checkpoint)) {
            oper.checkpoints.add(checkpoint);
        }
    }

    public Operator loadOperator(PTOperator oper) {
        try {
            LOG.debug("Loading state for {}", (Object)oper);
            return (Operator)((StorageAgent)oper.operatorMeta.getValue(Context.OperatorContext.STORAGE_AGENT)).load(oper.id, oper.isOperatorStateLess() ? -1L : oper.recoveryCheckpoint.windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to read partition state for " + oper, e);
        }
    }

    private Checkpoint getActivationCheckpoint(PTOperator oper) {
        if (oper.recoveryCheckpoint == null && oper.checkpoints.isEmpty()) {
            Checkpoint activationCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
            for (PTOperator.PTInput input : oper.inputs) {
                PTOperator sourceOper = input.source.source;
                Checkpoint checkpoint = sourceOper.recoveryCheckpoint;
                if (sourceOper.checkpoints.isEmpty()) {
                    checkpoint = this.getActivationCheckpoint(sourceOper);
                }
                activationCheckpoint = Checkpoint.max(activationCheckpoint, checkpoint);
            }
            return activationCheckpoint;
        }
        return oper.recoveryCheckpoint;
    }

    public void removeTerminatedPartition(PTOperator p) {
        HashSet<PTOperator> downstreamOpers = new HashSet<PTOperator>(p.outputs.size());
        for (PTOperator.PTOutput out : p.outputs) {
            for (PTOperator.PTInput sinkIn : out.sinks) {
                downstreamOpers.add(sinkIn.target);
            }
        }
        PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
        if (currentMapping != null) {
            ArrayList copyPartitions = Lists.newArrayList((Iterable)currentMapping.partitions);
            copyPartitions.remove(p);
            this.removePartition(p, currentMapping);
            currentMapping.partitions = copyPartitions;
        } else {
            this.removePTOperator(p);
        }
        for (PTOperator dop : downstreamOpers) {
            if (!dop.inputs.isEmpty()) continue;
            this.removeTerminatedPartition(dop);
        }
        this.deployChanges();
    }

    private void removePartition(PTOperator oper, PMapping operatorMapping) {
        for (PTOperator.PTOutput out : oper.outputs) {
            for (PTOperator.PTInput in : Lists.newArrayList(out.sinks)) {
                for (LogicalPlan.InputPortMeta im : in.logicalStream.getSinks()) {
                    PMapping m = this.logicalToPTOperator.get(im.getOperatorWrapper());
                    if (m.parallelPartitions != operatorMapping.parallelPartitions) continue;
                    this.removePartition(in.target, operatorMapping);
                    m.partitions.remove(in.target);
                }
            }
        }
        this.removePTOperator(oper);
    }

    private PTOperator addPTOperator(PMapping nodeDecl, Partitioner.Partition<? extends Operator> partition, Checkpoint checkpoint) {
        PTOperator oper = this.newOperator(nodeDecl.logicalOperator, nodeDecl.logicalOperator.getName());
        oper.recoveryCheckpoint = checkpoint;
        for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> outputEntry : nodeDecl.logicalOperator.getOutputStreams().entrySet()) {
            this.setupOutput(nodeDecl, oper, outputEntry);
        }
        String host = null;
        if (partition != null) {
            oper.setPartitionKeys(partition.getPartitionKeys());
            host = (String)partition.getAttributes().get(Context.OperatorContext.LOCALITY_HOST);
        }
        if (host == null) {
            host = (String)nodeDecl.logicalOperator.getValue(Context.OperatorContext.LOCALITY_HOST);
        }
        nodeDecl.addPartition(oper);
        this.newOpers.put(oper, partition != null ? (Operator)partition.getPartitionedInstance() : nodeDecl.logicalOperator.getOperator());
        this.setLocalityGrouping(nodeDecl, oper, this.inlinePrefs, DAG.Locality.CONTAINER_LOCAL, host);
        this.setLocalityGrouping(nodeDecl, oper, this.localityPrefs, DAG.Locality.NODE_LOCAL, host);
        return oper;
    }

    private void setupOutput(PMapping mapping, PTOperator oper, Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> outputEntry) {
        for (PTOperator.PTOutput out : oper.outputs) {
            if (out.logicalStream != outputEntry.getValue()) continue;
            return;
        }
        PTOperator.PTOutput out = new PTOperator.PTOutput(outputEntry.getKey().getPortName(), outputEntry.getValue(), oper);
        oper.outputs.add(out);
    }

    PTOperator newOperator(LogicalPlan.OperatorMeta om, String name) {
        PTOperator oper = new PTOperator(this, this.idSequence.incrementAndGet(), name, om);
        this.allOperators.put(oper.id, oper);
        oper.inputs = new ArrayList<PTOperator.PTInput>();
        oper.outputs = new ArrayList<PTOperator.PTOutput>();
        this.ctx.recordEventAsync(new StramEvent.CreateOperatorEvent(oper.getName(), oper.getId()));
        return oper;
    }

    private void setLocalityGrouping(PMapping pnodes, PTOperator newOperator, LocalityPrefs localityPrefs, DAG.Locality ltype, String host) {
        PTOperator.HostOperatorSet grpObj = newOperator.getGrouping(ltype);
        if (host != null) {
            grpObj.setHost(host);
        }
        Set<PTOperator> s = grpObj.getOperatorSet();
        s.add(newOperator);
        LocalityPref loc = (LocalityPref)localityPrefs.prefs.get(pnodes);
        if (loc != null) {
            for (PMapping localPM : loc.operators) {
                if (pnodes.parallelPartitions == localPM.parallelPartitions) {
                    if (localPM.partitions.size() < pnodes.partitions.size()) continue;
                    s.addAll(((PTOperator)localPM.partitions.get(pnodes.partitions.size() - 1)).getGrouping(ltype).getOperatorSet());
                    continue;
                }
                for (PTOperator otherNode : localPM.partitions) {
                    s.addAll(otherNode.getGrouping(ltype).getOperatorSet());
                }
            }
            for (PTOperator localOper : s) {
                if (grpObj.getHost() == null) {
                    grpObj.setHost(localOper.groupings.get(ltype).getHost());
                }
                localOper.groupings.put(ltype, grpObj);
            }
        }
    }

    private List<Operator.InputPort<?>> getInputPortList(LogicalPlan.OperatorMeta operatorMeta) {
        ArrayList inputPortList = Lists.newArrayList();
        for (LogicalPlan.InputPortMeta inputPortMeta : operatorMeta.getInputStreams().keySet()) {
            inputPortList.add(inputPortMeta.getPortObject());
        }
        return inputPortList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removePTOperator(PTOperator oper) {
        LOG.debug("Removing operator " + oper);
        if (!oper.upstreamMerge.isEmpty()) {
            for (PTOperator unifier : oper.upstreamMerge.values()) {
                this.removePTOperator(unifier);
            }
        }
        for (PTOperator.PTOutput out : oper.outputs) {
            for (PTOperator.PTInput sinkIn : out.sinks) {
                if (sinkIn.source.source != oper) continue;
                ArrayList cowInputs = Lists.newArrayList(sinkIn.target.inputs);
                cowInputs.remove(sinkIn);
                sinkIn.target.inputs = cowInputs;
            }
        }
        for (PTOperator.PTInput in : oper.inputs) {
            in.source.sinks.remove(in);
        }
        for (PTOperator.HostOperatorSet s : oper.groupings.values()) {
            s.getOperatorSet().remove(oper);
        }
        try {
            LinkedList<Checkpoint> i$ = oper.checkpoints;
            synchronized (i$) {
                for (Checkpoint checkpoint : oper.checkpoints) {
                    ((StorageAgent)oper.operatorMeta.getValue(Context.OperatorContext.STORAGE_AGENT)).delete(oper.id, checkpoint.windowId);
                }
            }
        }
        catch (IOException e) {
            LOG.warn("Failed to remove state for " + oper, (Throwable)e);
        }
        ArrayList cowList = Lists.newArrayList(oper.container.operators);
        cowList.remove(oper);
        oper.container.operators = cowList;
        this.deployOpers.remove(oper);
        this.undeployOpers.add(oper);
        this.allOperators.remove(oper.id);
        this.ctx.recordEventAsync(new StramEvent.RemoveOperatorEvent(oper.getName(), oper.getId()));
    }

    public PlanContext getContext() {
        return this.ctx;
    }

    public LogicalPlan getLogicalPlan() {
        return this.dag;
    }

    public List<PTContainer> getContainers() {
        return this.containers;
    }

    public Map<Integer, PTOperator> getAllOperators() {
        return this.allOperators;
    }

    public List<PTOperator> getOperators(LogicalPlan.OperatorMeta logicalOperator) {
        return this.logicalToPTOperator.get(logicalOperator).partitions;
    }

    public Collection<PTOperator> getAllOperators(LogicalPlan.OperatorMeta logicalOperator) {
        return this.logicalToPTOperator.get(logicalOperator).getAllOperators();
    }

    public List<PTOperator> getLeafOperators() {
        ArrayList<PTOperator> operators = new ArrayList<PTOperator>();
        for (LogicalPlan.OperatorMeta opMeta : this.dag.getLeafOperators()) {
            operators.addAll(this.getAllOperators(opMeta));
        }
        return operators;
    }

    public boolean hasMapping(LogicalPlan.OperatorMeta om) {
        return this.logicalToPTOperator.containsKey(om);
    }

    @VisibleForTesting
    public List<PTOperator> getMergeOperators(LogicalPlan.OperatorMeta logicalOperator) {
        ArrayList opers = Lists.newArrayList();
        for (StreamMapping ug : this.logicalToPTOperator.get(logicalOperator).outputStreams.values()) {
            ug.addTo(opers);
        }
        return opers;
    }

    protected List<LogicalPlan.OperatorMeta> getRootOperators() {
        return this.dag.getRootOperators();
    }

    private void getDeps(PTOperator operator, Set<PTOperator> visited) {
        visited.add(operator);
        for (PTOperator.PTInput in : operator.inputs) {
            PTOperator sourceOperator;
            if (!in.source.isDownStreamInline() || visited.contains(sourceOperator = in.source.source)) continue;
            this.getDeps(sourceOperator, visited);
        }
        for (PTOperator.PTOutput out : operator.outputs) {
            for (PTOperator.PTInput sink : out.sinks) {
                PTOperator sinkOperator = sink.target;
                if (visited.contains(sinkOperator)) continue;
                this.getDeps(sinkOperator, visited);
            }
        }
    }

    public Set<PTOperator> getDependents(Collection<PTOperator> operators) {
        LinkedHashSet<PTOperator> visited = new LinkedHashSet<PTOperator>();
        if (operators != null) {
            for (PTOperator operator : operators) {
                this.getDeps(operator, visited);
            }
        }
        visited.addAll(this.getDependentPersistOperators(operators));
        return visited;
    }

    private Set<PTOperator> getDependentPersistOperators(Collection<PTOperator> operators) {
        LinkedHashSet<PTOperator> persistOperators = new LinkedHashSet<PTOperator>();
        if (operators != null) {
            for (PTOperator operator : operators) {
                for (PTOperator.PTInput in : operator.inputs) {
                    if (in.logicalStream.getPersistOperator() != null) {
                        for (LogicalPlan.InputPortMeta inputPortMeta : in.logicalStream.getSinksToPersist()) {
                            if (!inputPortMeta.getOperatorWrapper().equals(operator.operatorMeta)) continue;
                            persistOperators.addAll(this.getOperators(in.logicalStream.getPersistOperator()));
                            break;
                        }
                    }
                    for (Map.Entry entry : in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) {
                        persistOperators.addAll(this.getOperators((LogicalPlan.OperatorMeta)entry.getValue()));
                    }
                }
            }
        }
        return persistOperators;
    }

    public final void addLogicalOperator(LogicalPlan.OperatorMeta om) {
        PMapping pnodes = new PMapping(om);
        String host = (String)pnodes.logicalOperator.getValue(Context.OperatorContext.LOCALITY_HOST);
        this.localityPrefs.add(pnodes, host);
        PMapping upstreamPartitioned = null;
        for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> e : om.getInputStreams().entrySet()) {
            if (e.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) continue;
            PMapping m = this.logicalToPTOperator.get(e.getValue().getSource().getOperatorMeta());
            if (((Boolean)e.getKey().getValue(Context.PortContext.PARTITION_PARALLEL)).equals(true)) {
                if (upstreamPartitioned != null && !upstreamPartitioned.parallelPartitions.contains(m.logicalOperator) && upstreamPartitioned != m) {
                    String msg = String.format("operator cannot extend multiple partitions (%s and %s)", upstreamPartitioned.logicalOperator, m.logicalOperator);
                    throw new AssertionError((Object)msg);
                }
                m.parallelPartitions.add(pnodes.logicalOperator);
                pnodes.parallelPartitions = m.parallelPartitions;
                upstreamPartitioned = m;
            }
            if (DAG.Locality.CONTAINER_LOCAL == e.getValue().getLocality() || DAG.Locality.THREAD_LOCAL == e.getValue().getLocality()) {
                this.inlinePrefs.setLocal(m, pnodes);
                continue;
            }
            if (DAG.Locality.NODE_LOCAL != e.getValue().getLocality()) continue;
            this.localityPrefs.setLocal(m, pnodes);
        }
        this.logicalToPTOperator.put(om, pnodes);
        if (upstreamPartitioned != null) {
            this.initPartitioning(pnodes, upstreamPartitioned.partitions.size());
        } else {
            this.initPartitioning(pnodes, 0);
        }
        this.updateStreamMappings(pnodes);
    }

    public void removeLogicalStream(LogicalPlan.StreamMeta sm) {
        for (LogicalPlan.InputPortMeta ipm : sm.getSinks()) {
            LogicalPlan.OperatorMeta om = ipm.getOperatorWrapper();
            PMapping m = this.logicalToPTOperator.get(om);
            if (m == null) {
                throw new AssertionError((Object)("Unknown operator " + om));
            }
            for (PTOperator oper : m.partitions) {
                ArrayList inputsCopy = Lists.newArrayList(oper.inputs);
                for (PTOperator.PTInput input : oper.inputs) {
                    if (input.logicalStream != sm) continue;
                    input.source.sinks.remove(input);
                    inputsCopy.remove(input);
                    this.undeployOpers.add(oper);
                    this.deployOpers.add(oper);
                }
                oper.inputs = inputsCopy;
            }
        }
        PMapping m = this.logicalToPTOperator.get(sm.getSource().getOperatorMeta());
        for (PTOperator oper : m.partitions) {
            ArrayList outputsCopy = Lists.newArrayList(oper.outputs);
            for (PTOperator.PTOutput out : oper.outputs) {
                if (out.logicalStream != sm) continue;
                for (PTOperator.PTInput input : out.sinks) {
                    PTOperator downstreamOper = input.source.source;
                    downstreamOper.inputs.remove(input);
                    Set<PTOperator> deps = this.getDependents(Collections.singletonList(downstreamOper));
                    this.undeployOpers.addAll(deps);
                    this.deployOpers.addAll(deps);
                }
                outputsCopy.remove(out);
                this.undeployOpers.add(oper);
                this.deployOpers.add(oper);
            }
            oper.outputs = outputsCopy;
        }
    }

    public void connectInput(LogicalPlan.InputPortMeta ipm) {
        for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> inputEntry : ipm.getOperatorWrapper().getInputStreams().entrySet()) {
            if (inputEntry.getKey() != ipm) continue;
            for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> outputEntry : inputEntry.getValue().getSource().getOperatorMeta().getOutputStreams().entrySet()) {
                PMapping sourceOpers = this.logicalToPTOperator.get(outputEntry.getKey().getOperatorMeta());
                for (PTOperator oper : sourceOpers.partitions) {
                    this.setupOutput(sourceOpers, oper, outputEntry);
                    this.undeployOpers.add(oper);
                    this.deployOpers.add(oper);
                }
            }
            PMapping m = this.logicalToPTOperator.get(ipm.getOperatorWrapper());
            this.updateStreamMappings(m);
            for (PTOperator oper : m.partitions) {
                this.undeployOpers.add(oper);
                this.deployOpers.add(oper);
            }
        }
    }

    public void removeLogicalOperator(LogicalPlan.OperatorMeta om) {
        PMapping opers = this.logicalToPTOperator.get(om);
        if (opers == null) {
            throw new AssertionError((Object)("Operator not in physical plan: " + om.getName()));
        }
        for (PTOperator oper : opers.partitions) {
            this.removePartition(oper, opers);
        }
        for (StreamMapping ug : opers.outputStreams.values()) {
            for (PTOperator oper : ug.cascadingUnifiers) {
                this.removePTOperator(oper);
            }
            if (ug.finalUnifier == null) continue;
            this.removePTOperator(ug.finalUnifier);
        }
        LinkedHashMap copyMap = Maps.newLinkedHashMap(this.logicalToPTOperator);
        copyMap.remove(om);
        this.logicalToPTOperator = copyMap;
    }

    public void setAvailableResources(int memoryMB) {
        this.availableMemoryMB = memoryMB;
    }

    public void onStatusUpdate(PTOperator oper) {
        for (StatsListener statsListener : oper.statsListeners) {
            StreamingContainerUmbilicalProtocol.StramToNodeRequest request;
            final StatsListener.Response rsp = statsListener.processStats((StatsListener.BatchedOperatorStats)oper.stats);
            if (rsp == null) continue;
            oper.loadIndicator = rsp.loadIndicator;
            if (rsp.repartitionRequired) {
                final LogicalPlan.OperatorMeta om = oper.getOperatorMeta();
                if (this.pendingRepartition.putIfAbsent(om, om) != null) {
                    LOG.debug("Skipping repartitioning for {} load {}", (Object)oper, (Object)oper.loadIndicator);
                } else {
                    LOG.debug("Scheduling repartitioning for {} load {}", (Object)oper, (Object)oper.loadIndicator);
                    Runnable r = new Runnable(){

                        @Override
                        public void run() {
                            PhysicalPlan.this.redoPartitions((PMapping)PhysicalPlan.this.logicalToPTOperator.get(om), rsp.repartitionNote);
                            PhysicalPlan.this.pendingRepartition.remove(om);
                        }
                    };
                    this.ctx.dispatch(r);
                }
            }
            if (rsp.operatorRequests != null) {
                for (StatsListener.OperatorRequest cmd : rsp.operatorRequests) {
                    request = new StreamingContainerUmbilicalProtocol.StramToNodeRequest();
                    request.operatorId = oper.getId();
                    request.requestType = StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.CUSTOM;
                    request.cmd = cmd;
                    this.ctx.addOperatorRequest(oper, request);
                }
            }
            if (rsp.operatorCommands == null) continue;
            for (StatsListener.OperatorRequest cmd : rsp.operatorCommands) {
                request = new StreamingContainerUmbilicalProtocol.StramToNodeRequest();
                request.operatorId = oper.getId();
                request.requestType = StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.CUSTOM;
                OperatorCommandConverter converter = new OperatorCommandConverter();
                converter.cmd = cmd;
                request.cmd = converter;
                this.ctx.addOperatorRequest(oper, request);
            }
        }
    }

    public void syncCheckpoints(long startTime, long currentTime) throws IOException {
        for (PTOperator oper : this.getAllOperators().values()) {
            StorageAgent sa = (StorageAgent)oper.operatorMeta.getValue(Context.OperatorContext.STORAGE_AGENT);
            long[] windowIds = sa.getWindowIds(oper.getId());
            Arrays.sort(windowIds);
            oper.checkpoints.clear();
            for (long wid : windowIds) {
                if (wid == -1L) continue;
                oper.addCheckpoint(wid, startTime);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Integer getStreamCodecIdentifier(StreamCodec<?> streamCodecInfo) {
        Integer id;
        Map<StreamCodec<?>, Integer> map = this.streamCodecIdentifiers;
        synchronized (map) {
            id = this.streamCodecIdentifiers.get(streamCodecInfo);
            if (id == null) {
                id = this.strCodecIdSequence.incrementAndGet();
                this.streamCodecIdentifiers.put(streamCodecInfo, id);
            }
        }
        return id;
    }

    @VisibleForTesting
    public Map<StreamCodec<?>, Integer> getStreamCodecIdentifiers() {
        return Collections.unmodifiableMap(this.streamCodecIdentifiers);
    }

    public static class OperatorCommandConverter
    implements StatsListener.OperatorRequest,
    Serializable {
        private static final long serialVersionUID = 1L;
        public StatsListener.OperatorCommand cmd;

        public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
            this.cmd.execute(operator, operatorId, windowId);
            return null;
        }
    }

    private class RepartitionContext
    extends PartitioningContextImpl {
        final List<PTOperator> operators;
        final List<DefaultPartition<Operator>> currentPartitions;
        final Map<Partitioner.Partition<?>, PTOperator> currentPartitionMap;
        final Map<Integer, Partitioner.Partition<Operator>> operatorIdToPartition;
        final List<Partitioner.Partition<Operator>> addedPartitions;
        Checkpoint minCheckpoint;
        Collection<Partitioner.Partition<Operator>> newPartitions;

        RepartitionContext(Partitioner<Operator> partitioner, PMapping currentMapping, int partitionCount) {
            super(currentMapping, partitionCount);
            this.addedPartitions = new ArrayList<Partitioner.Partition<Operator>>();
            this.minCheckpoint = null;
            this.newPartitions = null;
            this.operators = currentMapping.partitions;
            this.currentPartitions = new ArrayList<DefaultPartition<Operator>>(this.operators.size());
            this.currentPartitionMap = Maps.newHashMapWithExpectedSize((int)this.operators.size());
            this.operatorIdToPartition = Maps.newHashMapWithExpectedSize((int)this.operators.size());
            for (PTOperator pOperator : this.operators) {
                Map<Operator.InputPort<?>, Partitioner.PartitionKeys> pks = pOperator.getPartitionKeys();
                if (pks == null) {
                    throw new AssertionError((Object)("Null partition: " + pOperator));
                }
                if (this.minCheckpoint == null) {
                    this.minCheckpoint = pOperator.recoveryCheckpoint;
                } else if (this.minCheckpoint.windowId > pOperator.recoveryCheckpoint.windowId) {
                    this.minCheckpoint = pOperator.recoveryCheckpoint;
                }
                Operator partitionedOperator = PhysicalPlan.this.loadOperator(pOperator);
                DefaultPartition partition = new DefaultPartition((Object)partitionedOperator, pks, pOperator.loadIndicator, (StatsListener.BatchedOperatorStats)pOperator.stats);
                this.currentPartitions.add((DefaultPartition<Operator>)partition);
                this.currentPartitionMap.put((Partitioner.Partition<?>)partition, pOperator);
                LOG.debug("partition load: {} {} {}", new Object[]{pOperator, partition.getPartitionKeys(), partition.getLoad()});
                this.operatorIdToPartition.put(pOperator.getId(), (Partitioner.Partition<Operator>)partition);
            }
            this.newPartitions = partitioner.definePartitions(new ArrayList<DefaultPartition<Operator>>(this.currentPartitions), (Partitioner.PartitioningContext)this);
        }
    }

    private class PartitioningContextImpl
    implements Partitioner.PartitioningContext {
        private List<Operator.InputPort<?>> inputPorts;
        private final int parallelPartitionCount;
        private final PMapping om;

        private PartitioningContextImpl(PMapping om, int parallelPartitionCount) {
            this.om = om;
            this.parallelPartitionCount = parallelPartitionCount;
        }

        public int getParallelPartitionCount() {
            return this.parallelPartitionCount;
        }

        public List<Operator.InputPort<?>> getInputPorts() {
            if (this.inputPorts == null) {
                this.inputPorts = PhysicalPlan.this.getInputPortList(this.om.logicalOperator);
            }
            return this.inputPorts;
        }
    }

    private class LocalityPrefs
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        private final Map<PMapping, LocalityPref> prefs = Maps.newHashMap();
        private final AtomicInteger groupSeq = new AtomicInteger();

        private LocalityPrefs() {
        }

        void add(PMapping m, String group) {
            if (group != null) {
                LocalityPref pref = null;
                for (LocalityPref lp : this.prefs.values()) {
                    if (!group.equals(lp.host)) continue;
                    lp.operators.add(m);
                    pref = lp;
                    break;
                }
                if (pref == null) {
                    pref = new LocalityPref();
                    pref.host = group;
                    pref.operators.add(m);
                    this.prefs.put(m, pref);
                }
            }
        }

        void setLocal(PMapping m1, PMapping m2) {
            LocalityPref lp1 = this.prefs.get(m1);
            LocalityPref lp2 = this.prefs.get(m2);
            if (lp1 == null && lp2 == null) {
                lp1 = lp2 = new LocalityPref();
                lp1.host = "host" + this.groupSeq.incrementAndGet();
                lp1.operators.add(m1);
                lp1.operators.add(m2);
            } else if (lp1 != null && lp2 != null) {
                if (StringUtils.equals((String)lp1.host, (String)lp2.host)) {
                    lp1.operators.addAll(lp2.operators);
                    lp2.operators.addAll(lp1.operators);
                } else {
                    LOG.warn("Node locality conflict {} {}", (Object)m1, (Object)m2);
                }
            } else if (lp1 == null) {
                lp2.operators.add(m1);
                lp1 = lp2;
            } else {
                lp1.operators.add(m2);
                lp2 = lp1;
            }
            this.prefs.put(m1, lp1);
            this.prefs.put(m2, lp2);
        }
    }

    private class LocalityPref
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        String host;
        Set<PMapping> operators = Sets.newHashSet();

        private LocalityPref() {
        }
    }

    public static class PMapping
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        private final LogicalPlan.OperatorMeta logicalOperator;
        private List<PTOperator> partitions = new LinkedList<PTOperator>();
        private final Map<LogicalPlan.OutputPortMeta, StreamMapping> outputStreams = Maps.newHashMap();
        private List<StatsListener> statsHandlers;
        private Set<LogicalPlan.OperatorMeta> parallelPartitions = Sets.newHashSet();

        private PMapping(LogicalPlan.OperatorMeta om) {
            this.logicalOperator = om;
        }

        private void addPartition(PTOperator p) {
            this.partitions.add(p);
            p.statsListeners = this.statsHandlers;
        }

        private Collection<PTOperator> getAllOperators() {
            ArrayList<PTOperator> c = new ArrayList<PTOperator>(this.partitions.size() + 1);
            c.addAll(this.partitions);
            for (StreamMapping ug : this.outputStreams.values()) {
                ug.addTo(c);
            }
            return c;
        }

        public String toString() {
            return this.logicalOperator.toString();
        }
    }

    private static class StatsListenerProxy
    implements StatsListener,
    Serializable {
        private static final long serialVersionUID = 201312112033L;
        private final LogicalPlan.OperatorMeta om;

        private StatsListenerProxy(LogicalPlan.OperatorMeta om) {
            this.om = om;
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
            return ((StatsListener)this.om.getOperator()).processStats(stats);
        }
    }

    public static interface PlanContext {
        public void recordEventAsync(StramEvent var1);

        public void deploy(Set<PTContainer> var1, Collection<PTOperator> var2, Set<PTContainer> var3, Collection<PTOperator> var4);

        public void dispatch(Runnable var1);

        public void writeJournal(Journal.Recoverable var1);

        public void addOperatorRequest(PTOperator var1, StreamingContainerUmbilicalProtocol.StramToNodeRequest var2);
    }

    public static class LoadIndicator {
        public final int indicator;
        public final String note;

        LoadIndicator(int indicator, String note) {
            this.indicator = indicator;
            this.note = note;
        }
    }
}

