/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.plan.optimizer.strategy;

import com.antgroup.geaflow.operator.OpArgs;
import com.antgroup.geaflow.operator.base.AbstractOperator;
import com.antgroup.geaflow.partitioner.IPartitioner;
import com.antgroup.geaflow.plan.graph.PipelineEdge;
import com.antgroup.geaflow.plan.graph.PipelineGraph;
import com.antgroup.geaflow.plan.graph.PipelineVertex;
import com.antgroup.geaflow.plan.graph.VertexType;
import com.antgroup.geaflow.plan.util.DAGValidator;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChainCombiner
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChainCombiner.class);
    private Set<Integer> visited = new HashSet<Integer>();

    public void combineVertex(PipelineGraph pipelineGraph) {
        this.visited.clear();
        Map<Integer, PipelineVertex> vertexMap = pipelineGraph.getVertexMap();
        HashMap<Integer, Set<PipelineEdge>> outputEdges = new HashMap<Integer, Set<PipelineEdge>>();
        HashMap<Integer, Set<PipelineEdge>> inputEdges = new HashMap<Integer, Set<PipelineEdge>>();
        vertexMap.keySet().forEach(vertexId -> {
            outputEdges.put((Integer)vertexId, pipelineGraph.getVertexOutEdges((int)vertexId));
            inputEdges.put((Integer)vertexId, pipelineGraph.getVertexInputEdges((int)vertexId));
        });
        List sourceVertices = pipelineGraph.getPipelineVertices().stream().filter(pipelineVertex -> pipelineVertex.getType() == VertexType.source).collect(Collectors.toList());
        Collection<PipelineEdge> jobEdges = pipelineGraph.getPipelineEdgeList();
        Collection<PipelineVertex> jobVertices = pipelineGraph.getPipelineVertices();
        List verticesIds = jobVertices.stream().map(x -> x.getVertexId()).collect(Collectors.toList());
        ArrayList<PipelineEdge> needAddJobEdges = new ArrayList<PipelineEdge>();
        for (PipelineEdge jobEdge2 : jobEdges) {
            if (verticesIds.contains(jobEdge2.getSrcId())) continue;
            sourceVertices.add(vertexMap.get(jobEdge2.getTargetId()));
            needAddJobEdges.add(jobEdge2);
        }
        if (sourceVertices.size() != 0) {
            HashSet<PipelineVertex> newVertices = new HashSet<PipelineVertex>();
            TreeSet<PipelineEdge> newEdges = new TreeSet<PipelineEdge>(new Comparator<PipelineEdge>(){

                @Override
                public int compare(PipelineEdge o1, PipelineEdge o2) {
                    return o1.getEdgeId() - o2.getEdgeId();
                }
            });
            for (PipelineVertex sourceVertex : sourceVertices) {
                newVertices.add(sourceVertex);
                this.createOperatorChain(sourceVertex.getVertexId(), sourceVertex, vertexMap, inputEdges, outputEdges, newVertices, newEdges, null);
            }
            pipelineGraph.setPipelineEdges(newEdges);
            newVertices.forEach(jobVertex -> {
                LOGGER.info(jobVertex.getVertexString());
                DAGValidator.checkVertexValidity(pipelineGraph, jobVertex, false);
            });
            pipelineGraph.setPipelineVertices(newVertices);
            needAddJobEdges.stream().forEach(jobEdge -> pipelineGraph.addEdge((PipelineEdge)jobEdge));
        }
    }

    private void createOperatorChain(int id, PipelineVertex srcVertex, Map<Integer, PipelineVertex> vertexMap, Map<Integer, Set<PipelineEdge>> inputEdges, Map<Integer, Set<PipelineEdge>> outputEdges, Set<PipelineVertex> newVertices, Set<PipelineEdge> newEdges, String outputTag) {
        int srcId = srcVertex.getVertexId();
        if (this.visited.add(srcId)) {
            LOGGER.debug("Exploring vertex[{}]", (Object)srcId);
            if (outputEdges.containsKey(srcId)) {
                LOGGER.debug("srcId:{}", (Object)srcId);
                Set<PipelineEdge> srcVertexOutputEdges = outputEdges.get(srcId);
                for (PipelineEdge executeEdge : srcVertexOutputEdges) {
                    LOGGER.debug("edge:{}", (Object)executeEdge);
                    int targetId = executeEdge.getTargetId();
                    PipelineVertex targetVertex = vertexMap.get(targetId);
                    if (executeEdge.getEdgeName() != null) {
                        outputTag = executeEdge.getEdgeName();
                    }
                    if (this.isVertexCanMerge(srcVertex, targetVertex, executeEdge, inputEdges)) {
                        LOGGER.debug("Vertex[{}] can merge Vertex[{}]", (Object)srcVertex.getVertexId(), (Object)targetVertex.getVertexId());
                        AbstractOperator abstractOperator = (AbstractOperator)srcVertex.getOperator();
                        abstractOperator.addNextOperator(targetVertex.getOperator());
                        this.createOperatorChain(id, targetVertex, vertexMap, inputEdges, outputEdges, newVertices, newEdges, outputTag);
                        srcVertex.setChainTailType(targetVertex.getChainTailType());
                        if (executeEdge.getEdgeName() == null) continue;
                        abstractOperator.getOutputTags().put(executeEdge.getEdgeId(), executeEdge.getEdgeName());
                        continue;
                    }
                    LOGGER.debug("Vertex[{}] can't merge Vertex[{}]", (Object)srcVertex.getVertexId(), (Object)targetVertex.getVertexId());
                    executeEdge.setSrcId(id);
                    executeEdge.setEdgeName(outputTag);
                    newEdges.add(executeEdge);
                    newVertices.add(targetVertex);
                    if (executeEdge.getSrcId() == executeEdge.getTargetId()) continue;
                    this.createOperatorChain(targetVertex.getVertexId(), targetVertex, vertexMap, inputEdges, outputEdges, newVertices, newEdges, null);
                }
            }
        }
    }

    private boolean isVertexCanMerge(PipelineVertex srcVertex, PipelineVertex targetVertex, PipelineEdge executeEdge, Map<Integer, Set<PipelineEdge>> inputEdges) {
        if (inputEdges.get(targetVertex.getVertexId()).size() != 1) {
            return false;
        }
        if (executeEdge.getPartition().getPartitionType() != IPartitioner.PartitionType.forward) {
            return false;
        }
        if (srcVertex.getParallelism() != targetVertex.getParallelism()) {
            return false;
        }
        if (((AbstractOperator)srcVertex.getOperator()).getOpArgs().getChainStrategy() == OpArgs.ChainStrategy.NEVER) {
            return false;
        }
        OpArgs.ChainStrategy strategy = ((AbstractOperator)targetVertex.getOperator()).getOpArgs().getChainStrategy();
        return strategy == null || strategy == OpArgs.ChainStrategy.ALWAYS;
    }
}

