/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.neighborhood;

import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.VertexAggregateFunction;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.operators.neighborhood.Neighborhood;
import org.gradoop.flink.model.impl.operators.neighborhood.VertexNeighborhood;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.NeighborVertexReduceFunction;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.ShuffledVertexIdsFromEdge;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.VertexIdsFromEdge;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.VertexToFieldOne;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.VertexToFieldZero;
import org.gradoop.flink.model.impl.operators.neighborhood.keyselector.IdInTuple;

public class ReduceVertexNeighborhood<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>>
extends VertexNeighborhood<LG> {
    public ReduceVertexNeighborhood(VertexAggregateFunction function, Neighborhood.EdgeDirection direction) {
        super(function, direction);
    }

    @Override
    public LG execute(LG graph) {
        GroupReduceOperator vertices;
        switch (this.getDirection()) {
            case IN: {
                vertices = graph.getEdges().map(new VertexIdsFromEdge()).join(graph.getVertices()).where(new int[]{1}).equalTo(new Id()).with(new VertexToFieldOne()).join(graph.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new VertexToFieldZero()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborVertexReduceFunction((VertexAggregateFunction)this.getFunction()));
                break;
            }
            case OUT: {
                vertices = graph.getEdges().map(new VertexIdsFromEdge(true)).join(graph.getVertices()).where(new int[]{1}).equalTo(new Id()).with(new VertexToFieldOne()).join(graph.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new VertexToFieldZero()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborVertexReduceFunction((VertexAggregateFunction)this.getFunction()));
                break;
            }
            case BOTH: {
                vertices = graph.getEdges().flatMap(new ShuffledVertexIdsFromEdge()).join(graph.getVertices()).where(new int[]{1}).equalTo(new Id()).with(new VertexToFieldOne()).join(graph.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new VertexToFieldZero()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborVertexReduceFunction((VertexAggregateFunction)this.getFunction()));
                break;
            }
            default: {
                vertices = null;
            }
        }
        return graph.getFactory().fromDataSets(graph.getGraphHead(), vertices, graph.getEdges());
    }
}

