/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.core.graph.builder;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.FrameworkConfigKeys;
import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.core.graph.CollectExecutionVertex;
import com.antgroup.geaflow.core.graph.ExecutionEdge;
import com.antgroup.geaflow.core.graph.ExecutionGraph;
import com.antgroup.geaflow.core.graph.ExecutionVertex;
import com.antgroup.geaflow.core.graph.ExecutionVertexGroup;
import com.antgroup.geaflow.core.graph.ExecutionVertexGroupEdge;
import com.antgroup.geaflow.core.graph.IteratorExecutionVertex;
import com.antgroup.geaflow.core.graph.plan.visualization.ExecutionGraphVisualization;
import com.antgroup.geaflow.io.CollectType;
import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.Operator;
import com.antgroup.geaflow.operator.base.AbstractOperator;
import com.antgroup.geaflow.operator.impl.graph.algo.vc.IGraphVertexCentricAggOp;
import com.antgroup.geaflow.operator.impl.graph.compute.dynamic.AbstractDynamicGraphVertexCentricOp;
import com.antgroup.geaflow.plan.graph.AffinityLevel;
import com.antgroup.geaflow.plan.graph.PipelineEdge;
import com.antgroup.geaflow.plan.graph.PipelineGraph;
import com.antgroup.geaflow.plan.graph.PipelineVertex;
import com.antgroup.geaflow.plan.graph.VertexType;
import com.antgroup.geaflow.processor.Processor;
import com.antgroup.geaflow.processor.builder.ProcessorBuilder;
import com.antgroup.geaflow.processor.impl.AbstractProcessor;
import com.antgroup.geaflow.processor.impl.window.TwoInputProcessor;
import com.antgroup.geaflow.utils.math.MathUtil;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraphBuilder
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionGraphBuilder.class);
    private static final int START_GROUP_ID = 1;
    private PipelineGraph plan;
    private int flyingCount;

    public ExecutionGraphBuilder(PipelineGraph plan) {
        this.plan = plan;
    }

    public ExecutionGraph buildExecutionGraph(Configuration jobConf) {
        ExecutionGraph executionGraph = new ExecutionGraph();
        HashMap id2Vertexes = new HashMap();
        List<PipelineVertex> pipelineVertexList = this.plan.getSourceVertices();
        LinkedList<PipelineVertex> pipelineVertexQueue = new LinkedList<PipelineVertex>(pipelineVertexList);
        HashMap<Integer, Integer> vertexId2GroupIdMap = new HashMap<Integer, Integer>();
        Map<Integer, ExecutionVertexGroup> vertexGroupMap = this.buildExecutionVertexGroup(vertexId2GroupIdMap, pipelineVertexQueue);
        executionGraph.setVertexGroupMap(vertexGroupMap);
        vertexGroupMap.values().stream().forEach(vertexGroup -> vertexGroup.getVertexMap().values().stream().forEach(vertex -> id2Vertexes.put(vertex.getVertexId(), vertex)));
        HashMap<Integer, ExecutionVertexGroupEdge> groupEdgeMap = new HashMap<Integer, ExecutionVertexGroupEdge>();
        for (Map.Entry<Integer, ExecutionVertexGroup> vertexGroupEntry : vertexGroupMap.entrySet()) {
            ExecutionVertexGroup vertexGroup2 = vertexGroupEntry.getValue();
            for (Map.Entry<Integer, ExecutionVertex> entry : vertexGroupEntry.getValue().getVertexMap().entrySet()) {
                int vertexId = entry.getKey();
                Set<PipelineEdge> vertexOutEdges = this.plan.getVertexOutEdges(vertexId);
                HashMap<Integer, ExecutionEdge> outEdgeMap = new HashMap<Integer, ExecutionEdge>();
                HashMap<Integer, ExecutionEdge> inEdgeMap = new HashMap<Integer, ExecutionEdge>();
                for (PipelineEdge pipelineEdge2 : vertexOutEdges) {
                    ExecutionEdge executionEdge = this.buildEdge(pipelineEdge2);
                    outEdgeMap.put(pipelineEdge2.getEdgeId(), executionEdge);
                }
                vertexGroup2.getEdgeMap().putAll(outEdgeMap);
                vertexGroup2.putVertexId2OutEdgeIds(vertexId, this.plan.getVertexOutEdges(vertexId).stream().map(PipelineEdge::getEdgeId).collect(Collectors.toList()));
                Set<PipelineEdge> vertexInEdges = this.plan.getVertexInputEdges(vertexId);
                for (PipelineEdge pipelineEdge3 : vertexInEdges) {
                    ExecutionEdge executionEdge = this.buildEdge(pipelineEdge3);
                    inEdgeMap.put(pipelineEdge3.getEdgeId(), executionEdge);
                }
                vertexGroup2.getEdgeMap().putAll(inEdgeMap);
                vertexGroup2.putVertexId2InEdgeIds(vertexId, this.plan.getVertexInputEdges(vertexId).stream().map(PipelineEdge::getEdgeId).collect(Collectors.toList()));
                entry.getValue().setInputEdges(inEdgeMap.values().stream().collect(Collectors.toList()));
                entry.getValue().setOutputEdges(outEdgeMap.values().stream().collect(Collectors.toList()));
            }
            int groupId = vertexGroupEntry.getKey();
            ArrayList<Integer> outGroupEdgeIds = new ArrayList<Integer>();
            ArrayList<Integer> inGroupEdgeIds = new ArrayList<Integer>();
            List<Integer> tailVertexIds = vertexGroup2.getTailVertexIds();
            List<Integer> headVertexIds = vertexGroup2.getHeadVertexIds();
            headVertexIds.stream().forEach(headVertexId -> {
                inGroupEdgeIds.addAll(this.plan.getVertexInputEdges((int)headVertexId).stream().filter(e -> !vertexGroup2.getVertexMap().containsKey(e.getSrcId())).map(PipelineEdge::getEdgeId).collect(Collectors.toList()));
                vertexGroup2.getParentVertexGroupIds().addAll(this.plan.getVertexInputVertexIds((int)headVertexId).stream().filter(vertexId -> !vertexGroup2.getVertexMap().containsKey(vertexId)).map(vertexId -> (Integer)vertexId2GroupIdMap.get(vertexId)).collect(Collectors.toList()));
            });
            tailVertexIds.stream().forEach(tailVertexId -> {
                outGroupEdgeIds.addAll(this.plan.getVertexOutEdges((int)tailVertexId).stream().filter(e -> vertexGroup2.getVertexMap().containsKey(e.getTargetId())).map(PipelineEdge::getEdgeId).collect(Collectors.toList()));
                vertexGroup2.getChildrenVertexGroupIds().addAll(this.plan.getVertexOutputVertexIds((int)tailVertexId).stream().filter(vertexId -> !vertexGroup2.getVertexMap().containsKey(vertexId)).map(vertexId -> (Integer)vertexId2GroupIdMap.get(vertexId)).collect(Collectors.toList()));
            });
            executionGraph.putVertexGroupInEdgeIds(groupId, inGroupEdgeIds);
            executionGraph.putVertexGroupOutEdgeIds(groupId, outGroupEdgeIds);
            for (int tailVertexId2 : tailVertexIds) {
                Set<PipelineEdge> set = this.plan.getVertexOutEdges(tailVertexId2);
                set.stream().forEach(pipelineEdge -> groupEdgeMap.put(pipelineEdge.getEdgeId(), new ExecutionVertexGroupEdge(pipelineEdge.getPartition(), pipelineEdge.getEdgeId(), pipelineEdge.getEdgeName(), (Integer)vertexId2GroupIdMap.get(tailVertexId2), (Integer)vertexId2GroupIdMap.get(pipelineEdge.getTargetId()))));
            }
        }
        executionGraph.setGroupEdgeMap(groupEdgeMap);
        this.flyingCount = jobConf.getInteger(FrameworkConfigKeys.STREAMING_FLYING_BATCH_NUM);
        this.buildCycleGroupMeta(executionGraph);
        ExecutionGraphVisualization graphVisualization = new ExecutionGraphVisualization(executionGraph);
        LOGGER.info("execution graph: {}, \nvertex group size: {}, group edge size: {}", new Object[]{graphVisualization.getExecutionGraphViz(), executionGraph.getVertexGroupMap().size(), executionGraph.getGroupEdgeMap().size()});
        return executionGraph;
    }

    private Map<Integer, ExecutionVertexGroup> buildExecutionVertexGroup(Map<Integer, Integer> vertexId2GroupIdMap, Queue<PipelineVertex> pipelineVertexQueue) {
        HashMap<Integer, ExecutionVertexGroup> vertexGroupMap = new HashMap<Integer, ExecutionVertexGroup>();
        int groupId = 1;
        HashSet<Integer> groupedVertices = new HashSet<Integer>();
        while (!pipelineVertexQueue.isEmpty()) {
            PipelineVertex pipelineVertex = pipelineVertexQueue.poll();
            if (groupedVertices.contains(pipelineVertex.getVertexId())) continue;
            Map<Integer, ExecutionVertex> currentVertexGroupMap = this.group(pipelineVertex, pipelineVertexQueue, groupedVertices);
            ExecutionVertexGroup vertexGroup = new ExecutionVertexGroup(groupId);
            vertexGroupMap.put(groupId, vertexGroup);
            vertexGroup.getVertexMap().putAll(currentVertexGroupMap);
            for (int id : currentVertexGroupMap.keySet()) {
                vertexId2GroupIdMap.put(id, groupId);
            }
            groupedVertices.addAll(currentVertexGroupMap.keySet());
            ++groupId;
        }
        return vertexGroupMap;
    }

    private Map<Integer, ExecutionVertex> group(PipelineVertex vertex, Queue<PipelineVertex> triggerVertices, Set<Integer> globalGroupedVertices) {
        HashMap<Integer, ExecutionVertex> currentVertexGroupMap = new HashMap<Integer, ExecutionVertex>();
        HashSet<Integer> currentVisited = new HashSet<Integer>();
        LinkedList<PipelineVertex> currentOutput = new LinkedList<PipelineVertex>();
        if (!this.group(vertex, currentVertexGroupMap, currentVisited, currentOutput, globalGroupedVertices)) {
            currentVertexGroupMap.put(vertex.getVertexId(), this.buildExecutionVertex(this.plan.getVertexMap().get(vertex.getVertexId())));
            List<Integer> outputVertexIds = this.plan.getVertexOutputVertexIds(vertex.getVertexId());
            for (int id : outputVertexIds) {
                PipelineVertex outputVertex = this.plan.getVertexMap().get(id);
                triggerVertices.add(outputVertex);
            }
        } else {
            PipelineVertex nextVertex;
            while (currentOutput.isEmpty() && !triggerVertices.isEmpty() && this.group(nextVertex = triggerVertices.poll(), currentVertexGroupMap, currentVisited, currentOutput, globalGroupedVertices)) {
            }
            triggerVertices.addAll(currentOutput);
        }
        return currentVertexGroupMap;
    }

    private boolean group(PipelineVertex vertex, Map<Integer, ExecutionVertex> currentVertexGroupMap, Set<Integer> currentVisited, Queue<PipelineVertex> groupOutputVertices, Set<Integer> globalGroupedVertices) {
        currentVisited.add(vertex.getVertexId());
        if (!this.canGroup(vertex)) {
            return false;
        }
        boolean canGroup = this.pushUpGroup(vertex, currentVertexGroupMap, currentVisited, groupOutputVertices, globalGroupedVertices);
        if (canGroup) {
            currentVertexGroupMap.put(vertex.getVertexId(), this.buildExecutionVertex(this.plan.getVertexMap().get(vertex.getVertexId())));
        }
        if (canGroup) {
            this.pushDownGroup(vertex, currentVertexGroupMap, currentVisited, groupOutputVertices, globalGroupedVertices);
        }
        return canGroup;
    }

    private boolean pushUpGroup(PipelineVertex vertex, Map<Integer, ExecutionVertex> currentVertexGroupMap, Set<Integer> currentVisited, Queue<PipelineVertex> groupOutputVertices, Set<Integer> globalGroupedVertices) {
        List<Integer> inputVertexIds = this.plan.getVertexInputVertexIds(vertex.getVertexId());
        ArrayList<Integer> inputVertexIdCandidates = new ArrayList<Integer>();
        for (int id : inputVertexIds) {
            PipelineVertex inputVertex = this.plan.getVertexMap().get(id);
            if (currentVertexGroupMap.containsKey(id) || globalGroupedVertices.contains(id) || id == vertex.getVertexId() || currentVisited.contains(id)) continue;
            if (!this.canGroup(inputVertex, vertex)) {
                return false;
            }
            inputVertexIdCandidates.add(id);
        }
        HashMap<Integer, ExecutionVertex> inputVertices = new HashMap<Integer, ExecutionVertex>();
        Iterator iterator = inputVertexIdCandidates.iterator();
        while (iterator.hasNext()) {
            int id = (Integer)iterator.next();
            PipelineVertex inputVertex = this.plan.getVertexMap().get(id);
            if (currentVertexGroupMap.containsKey(id) || globalGroupedVertices.contains(id) || inputVertices.containsKey(id)) continue;
            if (currentVisited.contains(id)) {
                return false;
            }
            if (!this.canGroup(inputVertex, vertex)) {
                return false;
            }
            HashMap<Integer, ExecutionVertex> inputVertexGroupMap = new HashMap<Integer, ExecutionVertex>();
            inputVertexGroupMap.putAll(currentVertexGroupMap);
            boolean canGroup = this.group(inputVertex, inputVertexGroupMap, currentVisited, groupOutputVertices, globalGroupedVertices);
            if (!canGroup) {
                return false;
            }
            inputVertices.putAll(inputVertexGroupMap);
        }
        currentVertexGroupMap.putAll(inputVertices);
        return true;
    }

    private void pushDownGroup(PipelineVertex vertex, Map<Integer, ExecutionVertex> currentVertexGroupMap, Set<Integer> currentVisited, Queue<PipelineVertex> groupOutputVertices, Set<Integer> globalGroupedVertices) {
        List<Integer> outputVertexIds = this.plan.getVertexOutputVertexIds(vertex.getVertexId());
        for (int id : outputVertexIds) {
            PipelineVertex outputVertex = this.plan.getVertexMap().get(id);
            if (currentVertexGroupMap.containsKey(id) || globalGroupedVertices.contains(id) || this.canGroup(vertex, outputVertex) && this.group(outputVertex, currentVertexGroupMap, currentVisited, groupOutputVertices, globalGroupedVertices)) continue;
            groupOutputVertices.add(outputVertex);
        }
    }

    private boolean canGroup(PipelineVertex currentVertex) {
        boolean enGroup = true;
        VertexType type = currentVertex.getType();
        switch (type) {
            case vertex_centric: 
            case inc_vertex_centric: 
            case iterator: 
            case inc_iterator: 
            case iteration_aggregation: {
                return currentVertex.getOperator() instanceof IGraphVertexCentricAggOp;
            }
        }
        AbstractOperator operator = (AbstractOperator)currentVertex.getOperator();
        if (!operator.getOpArgs().isEnGroup()) {
            enGroup = false;
        } else {
            List operatorList = operator.getNextOperators();
            for (Operator op : operatorList) {
                if (((AbstractOperator)op).getOpArgs().isEnGroup()) continue;
                enGroup = false;
                break;
            }
        }
        return enGroup;
    }

    private boolean canGroup(PipelineVertex currentVertex, PipelineVertex outputVertex) {
        return this.canGroupWithInput(currentVertex, outputVertex) && this.canGroupWithOutput(outputVertex, currentVertex);
    }

    private boolean canGroupWithOutput(PipelineVertex currentVertex, PipelineVertex outputVertex) {
        VertexType type = currentVertex.getType();
        switch (type) {
            case vertex_centric: 
            case inc_vertex_centric: 
            case iterator: 
            case inc_iterator: 
            case iteration_aggregation: {
                return currentVertex.getOperator() instanceof IGraphVertexCentricAggOp && outputVertex.getOperator() instanceof IGraphVertexCentricAggOp;
            }
        }
        return true;
    }

    private boolean canGroupWithInput(PipelineVertex currentVertex, PipelineVertex inputVertex) {
        VertexType type = currentVertex.getType();
        switch (type) {
            case vertex_centric: 
            case inc_vertex_centric: 
            case iterator: 
            case inc_iterator: 
            case iteration_aggregation: {
                return currentVertex.getOperator() instanceof IGraphVertexCentricAggOp && inputVertex.getOperator() instanceof IGraphVertexCentricAggOp;
            }
        }
        return true;
    }

    private ExecutionVertex buildExecutionVertex(PipelineVertex pipelineVertex) {
        List<Integer> childrenStageIds;
        boolean hasChildren;
        ExecutionVertex executionVertex;
        int vertexId = pipelineVertex.getVertexId();
        VertexType type = pipelineVertex.getType();
        String name = pipelineVertex.getName();
        LOGGER.info("vertexId:{} vertexName:{} type:{}", new Object[]{vertexId, name, type});
        switch (type) {
            case vertex_centric: 
            case inc_vertex_centric: 
            case iterator: 
            case inc_iterator: {
                executionVertex = new IteratorExecutionVertex(vertexId, name, pipelineVertex.getIterations());
                break;
            }
            case collect: {
                executionVertex = new CollectExecutionVertex(vertexId, name);
                break;
            }
            default: {
                executionVertex = new ExecutionVertex(vertexId, pipelineVertex.getName());
            }
        }
        List<Integer> parentVertexIds = this.plan.getVertexInputVertexIds(executionVertex.getVertexId());
        if (parentVertexIds != null) {
            executionVertex.setParentVertexIds(parentVertexIds);
        }
        boolean bl = hasChildren = (childrenStageIds = this.plan.getVertexOutputVertexIds(executionVertex.getVertexId())) != null && childrenStageIds.size() > 0;
        if (hasChildren) {
            int bucketNum = 1;
            Map<Integer, PipelineVertex> vertexMap = this.plan.getVertexMap();
            for (Integer childVertexId : childrenStageIds) {
                int childParallelism = this.getMaxParallelism(vertexMap.get(childVertexId));
                if (childParallelism <= bucketNum) continue;
                bucketNum = childParallelism;
            }
            executionVertex.setNumPartitions(bucketNum);
        } else {
            executionVertex.setNumPartitions(pipelineVertex.getParallelism());
        }
        Processor processor = null;
        if (pipelineVertex.getOperator() != null) {
            ProcessorBuilder processorBuilder = new ProcessorBuilder();
            processor = processorBuilder.buildProcessor(pipelineVertex.getOperator());
            executionVertex.setProcessor(processor);
        }
        if (pipelineVertex.getType() == VertexType.join || pipelineVertex.getType() == VertexType.combine) {
            List edges = this.plan.getVertexInputEdges(pipelineVertex.getVertexId()).stream().sorted(Comparator.comparingInt(PipelineEdge::getStreamOrdinal)).collect(Collectors.toList());
            TwoInputProcessor twoInputProcessor = (TwoInputProcessor)processor;
            twoInputProcessor.setLeftStream(((PipelineEdge)edges.get(0)).getEdgeName());
            twoInputProcessor.setRightStream(((PipelineEdge)edges.get(1)).getEdgeName());
        }
        executionVertex.setParallelism(pipelineVertex.getParallelism());
        executionVertex.setMaxParallelism(this.getMaxParallelism(pipelineVertex));
        executionVertex.setVertexType(pipelineVertex.getType());
        executionVertex.setAffinityLevel(pipelineVertex.getAffinity());
        executionVertex.setChainTailType(pipelineVertex.getChainTailType());
        LOGGER.info("execution vertex {}, parallelism {}, max parallelism {}, num partitions {}", new Object[]{executionVertex, executionVertex.getParallelism(), executionVertex.getMaxParallelism(), executionVertex.getNumPartitions()});
        return executionVertex;
    }

    private void buildCycleGroupMeta(ExecutionGraph graph) {
        Map<Integer, ExecutionVertexGroup> vertexGroupMap = graph.getVertexGroupMap();
        HashSet<ExecutionVertexGroup> pipelineSet = new HashSet<ExecutionVertexGroup>();
        HashSet<ExecutionVertexGroup> batchSet = new HashSet<ExecutionVertexGroup>();
        HashSet<ExecutionVertexGroup> iteratorSet = new HashSet<ExecutionVertexGroup>();
        for (ExecutionVertexGroup vertexGroup : vertexGroupMap.values()) {
            block7: for (ExecutionVertex vertex : vertexGroup.getVertexMap().values()) {
                LOGGER.info("vertexInfo:{}", (Object)vertex);
                CycleGroupType type = this.getCycleGroupType(vertex, ((AbstractProcessor)vertex.getProcessor()).getOperator());
                switch (type) {
                    case pipelined: {
                        pipelineSet.add(vertexGroup);
                        vertexGroup.getCycleGroupMeta().setIterationCount(Long.MAX_VALUE);
                        vertexGroup.getCycleGroupMeta().setFlyingCount(this.flyingCount);
                        continue block7;
                    }
                    case incremental: {
                        if (vertex.getVertexType() == VertexType.iteration_aggregation) continue block7;
                        graph.getCycleGroupMeta().setIterationCount(Long.MAX_VALUE);
                        iteratorSet.add(vertexGroup);
                        vertexGroup.getCycleGroupMeta().setIterationCount(((IteratorExecutionVertex)vertex).getIteratorCount());
                        vertexGroup.getCycleGroupMeta().setIterative(true);
                        vertexGroup.getCycleGroupMeta().setAffinityLevel(AffinityLevel.worker);
                        continue block7;
                    }
                    case statical: {
                        if (vertex.getVertexType() == VertexType.iteration_aggregation) continue block7;
                        List<PipelineVertex> sourceVertexList = this.plan.getSourceVertices();
                        boolean isSingleWindow = sourceVertexList.stream().allMatch(v -> ((AbstractOperator)v.getOperator()).getOpArgs().getOpType() == OpArgs.OpType.SINGLE_WINDOW_SOURCE);
                        if (!isSingleWindow) {
                            graph.getCycleGroupMeta().setIterationCount(Long.MAX_VALUE);
                        }
                        iteratorSet.add(vertexGroup);
                        vertexGroup.getCycleGroupMeta().setIterationCount(((IteratorExecutionVertex)vertex).getIteratorCount());
                        vertexGroup.getCycleGroupMeta().setIterative(true);
                        vertexGroup.getCycleGroupMeta().setAffinityLevel(AffinityLevel.worker);
                        continue block7;
                    }
                    case windowed: {
                        batchSet.add(vertexGroup);
                        continue block7;
                    }
                }
                throw new GeaflowRuntimeException(RuntimeErrors.INST.operatorTypeNotSupportError(String.valueOf((Object)type)));
            }
        }
        if (batchSet.size() > 0 || iteratorSet.size() > 0) {
            pipelineSet.stream().forEach(executionVertexGroup -> {
                executionVertexGroup.getCycleGroupMeta().setIterationCount(1L);
                executionVertexGroup.getCycleGroupMeta().setFlyingCount(1);
            });
        }
        pipelineSet.stream().forEach(executionVertexGroup -> executionVertexGroup.getCycleGroupMeta().setAffinityLevel(this.buildGroupAffinityLevel((ExecutionVertexGroup)executionVertexGroup)));
        batchSet.stream().forEach(executionVertexGroup -> executionVertexGroup.getCycleGroupMeta().setAffinityLevel(this.buildGroupAffinityLevel((ExecutionVertexGroup)executionVertexGroup)));
        pipelineSet.clear();
        batchSet.clear();
        iteratorSet.clear();
    }

    private CycleGroupType getCycleGroupType(ExecutionVertex vertex, Operator operator) {
        CycleGroupType groupType;
        OpArgs.OpType type = ((AbstractOperator)operator).getOpArgs().getOpType();
        switch (type) {
            case ONE_INPUT: 
            case TWO_INPUT: 
            case MULTI_WINDOW_SOURCE: 
            case GRAPH_SOURCE: {
                Object subOperator;
                groupType = CycleGroupType.pipelined;
                if (vertex == null) break;
                Iterator iterator = ((AbstractOperator)((AbstractProcessor)vertex.getProcessor()).getOperator()).getNextOperators().iterator();
                while (iterator.hasNext() && (groupType = this.getCycleGroupType(null, (Operator)(subOperator = iterator.next()))) == CycleGroupType.pipelined) {
                }
                break;
            }
            case SINGLE_WINDOW_SOURCE: {
                groupType = CycleGroupType.windowed;
                break;
            }
            case INC_VERTEX_CENTRIC_COMPUTE: 
            case INC_VERTEX_CENTRIC_TRAVERSAL: {
                groupType = CycleGroupType.incremental;
                break;
            }
            case VERTEX_CENTRIC_COMPUTE: 
            case VERTEX_CENTRIC_TRAVERSAL: {
                groupType = CycleGroupType.statical;
                break;
            }
            default: {
                throw new GeaflowRuntimeException(RuntimeErrors.INST.operatorTypeNotSupportError(type.name()));
            }
        }
        return groupType;
    }

    private final int getMaxParallelism(PipelineVertex vertex) {
        int maxParallelism = vertex.getParallelism();
        switch (vertex.getType()) {
            case vertex_centric: 
            case iterator: 
            case inc_process: {
                return MathUtil.minPowerOf2((int)maxParallelism);
            }
            case inc_vertex_centric: 
            case inc_iterator: {
                return ((AbstractDynamicGraphVertexCentricOp)vertex.getOperator()).getGraphViewDesc().getShardNum();
            }
        }
        return maxParallelism;
    }

    private AffinityLevel buildGroupAffinityLevel(ExecutionVertexGroup vertexGroup) {
        AffinityLevel affinityLevel = null;
        for (ExecutionVertex vertex : vertexGroup.getVertexMap().values()) {
            if (affinityLevel == null) {
                affinityLevel = vertex.getAffinityLevel();
                continue;
            }
            if (affinityLevel == vertex.getAffinityLevel()) continue;
            affinityLevel = AffinityLevel.worker;
            break;
        }
        return affinityLevel;
    }

    private ExecutionEdge buildEdge(PipelineEdge pipelineEdge) {
        CollectType dataTransferType = pipelineEdge.getType();
        if (dataTransferType == null) {
            dataTransferType = CollectType.FORWARD;
        }
        return new ExecutionEdge(pipelineEdge.getPartition(), pipelineEdge.getEdgeId(), pipelineEdge.getEdgeName(), pipelineEdge.getSrcId(), pipelineEdge.getTargetId(), dataTransferType, pipelineEdge.getEncoder());
    }

    private static enum CycleGroupType {
        pipelined,
        incremental,
        statical,
        windowed;

    }
}

