/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.keyedgrouping;

import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ProjectOperator;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.functions.filters.Not;
import org.gradoop.flink.model.impl.operators.keyedgrouping.GroupingKeys;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperEdgeFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildSuperVertexFromTuple;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromEdges;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.BuildTuplesFromVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.FilterSuperVertices;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceEdgeTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.ReduceVertexTuples;
import org.gradoop.flink.model.impl.operators.keyedgrouping.functions.UpdateIdField;

public class KeyedGrouping<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements UnaryBaseGraphToBaseGraphOperator<LG> {
    private final List<KeyFunction<V, ?>> vertexGroupingKeys;
    private final List<AggregateFunction> vertexAggregateFunctions;
    private final List<KeyFunction<E, ?>> edgeGroupingKeys;
    private final List<AggregateFunction> edgeAggregateFunctions;
    private boolean useGroupCombine = true;

    public KeyedGrouping(List<KeyFunction<V, ?>> vertexGroupingKeys, List<AggregateFunction> vertexAggregateFunctions, List<KeyFunction<E, ?>> edgeGroupingKeys, List<AggregateFunction> edgeAggregateFunctions) {
        this.vertexGroupingKeys = vertexGroupingKeys == null || vertexGroupingKeys.isEmpty() ? Collections.singletonList(GroupingKeys.nothing()) : vertexGroupingKeys;
        this.vertexAggregateFunctions = vertexAggregateFunctions == null ? Collections.emptyList() : vertexAggregateFunctions;
        this.edgeGroupingKeys = edgeGroupingKeys == null ? Collections.emptyList() : edgeGroupingKeys;
        this.edgeAggregateFunctions = edgeAggregateFunctions == null ? Collections.emptyList() : edgeAggregateFunctions;
    }

    @Override
    public LG execute(LG graph) {
        GroupReduceOperator verticesWithSuperVertex = graph.getVertices().map(new BuildTuplesFromVertices(this.vertexGroupingKeys, this.vertexAggregateFunctions)).groupBy(this.getInternalVertexGroupingKeys()).reduceGroup(new ReduceVertexTuples(2 + this.vertexGroupingKeys.size(), this.vertexAggregateFunctions));
        ProjectOperator idToSuperId = verticesWithSuperVertex.filter(new Not(new FilterSuperVertices())).project(new int[]{0, 1});
        JoinOperator.EquiJoin edgesWithUpdatedIds = graph.getEdges().map(new BuildTuplesFromEdges<E>(this.edgeGroupingKeys, this.edgeAggregateFunctions)).join((DataSet)idToSuperId).where(new int[]{0}).equalTo(new int[]{0}).with(new UpdateIdField(0)).join((DataSet)idToSuperId).where(new int[]{1}).equalTo(new int[]{0}).with(new UpdateIdField(1));
        GroupReduceOperator superEdgeTuples = edgesWithUpdatedIds.groupBy(this.getInternalEdgeGroupingKeys()).reduceGroup(new ReduceEdgeTuples(2 + this.edgeGroupingKeys.size(), this.edgeAggregateFunctions)).setCombinable(this.useGroupCombine);
        MapOperator superVertices = verticesWithSuperVertex.filter(new FilterSuperVertices()).map(new BuildSuperVertexFromTuple(this.vertexGroupingKeys, this.vertexAggregateFunctions, graph.getFactory().getVertexFactory()));
        MapOperator superEdges = superEdgeTuples.map(new BuildSuperEdgeFromTuple(this.edgeGroupingKeys, this.edgeAggregateFunctions, graph.getFactory().getEdgeFactory()));
        return graph.getFactory().fromDataSets(superVertices, superEdges);
    }

    private int[] getInternalEdgeGroupingKeys() {
        return IntStream.range(0, 2 + this.edgeGroupingKeys.size()).toArray();
    }

    private int[] getInternalVertexGroupingKeys() {
        return IntStream.range(2, 2 + this.vertexGroupingKeys.size()).toArray();
    }

    public KeyedGrouping<G, V, E, LG, GC> setUseGroupCombine(boolean useGroupCombine) {
        this.useGroupCombine = useGroupCombine;
        return this;
    }
}

