/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.split;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.Function;
import org.gradoop.flink.model.api.operators.UnaryGraphToCollectionOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.InitGraphHead;
import org.gradoop.flink.model.impl.functions.epgm.PairTupleWithNewId;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.tuple.Project2To1;
import org.gradoop.flink.model.impl.operators.split.functions.AddNewGraphsToEdge;
import org.gradoop.flink.model.impl.operators.split.functions.AddNewGraphsToVertex;
import org.gradoop.flink.model.impl.operators.split.functions.JoinEdgeTupleWithSourceGraphs;
import org.gradoop.flink.model.impl.operators.split.functions.JoinEdgeTupleWithTargetGraphs;
import org.gradoop.flink.model.impl.operators.split.functions.JoinVertexIdWithGraphIds;
import org.gradoop.flink.model.impl.operators.split.functions.MultipleGraphIdsGroupReducer;
import org.gradoop.flink.model.impl.operators.split.functions.SplitValues;

public class Split
implements UnaryGraphToCollectionOperator,
Serializable {
    private final Function<EPGMVertex, List<PropertyValue>> function;

    public Split(Function<EPGMVertex, List<PropertyValue>> function) {
        this.function = function;
    }

    @Override
    public GraphCollection execute(LogicalGraph graph) {
        FlatMapOperator vertexIdWithSplitValues = graph.getVertices().flatMap(new SplitValues<EPGMVertex>(this.function));
        DistinctOperator distinctSplitValues = vertexIdWithSplitValues.map(new Project2To1()).distinct();
        MapOperator splitValuesWithGraphIds = distinctSplitValues.map(new PairTupleWithNewId());
        GroupReduceOperator vertexIdWithGraphIds = vertexIdWithSplitValues.join((DataSet)splitValuesWithGraphIds).where(new int[]{1}).equalTo(new int[]{0}).with((JoinFunction)new JoinVertexIdWithGraphIds()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new MultipleGraphIdsGroupReducer());
        JoinOperator.EquiJoin vertices = graph.getVertices().join((DataSet)vertexIdWithGraphIds).where(new Id()).equalTo(new int[]{0}).with(new AddNewGraphsToVertex());
        MapOperator newGraphIds = splitValuesWithGraphIds.map(new Project2To1());
        MapOperator newGraphs = newGraphIds.map((MapFunction)new InitGraphHead((GraphHeadFactory<EPGMGraphHead>)graph.getFactory().getGraphHeadFactory()));
        JoinOperator.EquiJoin edgeGraphIdsGraphIds = graph.getEdges().join((DataSet)vertexIdWithGraphIds).where(new SourceId()).equalTo(new int[]{0}).with(new JoinEdgeTupleWithSourceGraphs()).join((DataSet)vertexIdWithGraphIds).where(new String[]{"f0.targetId"}).equalTo(new int[]{0}).with(new JoinEdgeTupleWithTargetGraphs());
        FlatMapOperator edges = edgeGraphIdsGraphIds.flatMap(new AddNewGraphsToEdge());
        return graph.getCollectionFactory().fromDataSets((DataSet<EPGMGraphHead>)newGraphs, (DataSet<EPGMVertex>)vertices, (DataSet<EPGMEdge>)edges);
    }
}

