/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.neighborhood;

import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.EdgeAggregateFunction;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.functions.tuple.SwitchPair;
import org.gradoop.flink.model.impl.operators.neighborhood.EdgeNeighborhood;
import org.gradoop.flink.model.impl.operators.neighborhood.Neighborhood;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.NeighborEdgeReduceFunction;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.VertexIdsWithEdge;
import org.gradoop.flink.model.impl.operators.neighborhood.functions.VertexToFieldOne;
import org.gradoop.flink.model.impl.operators.neighborhood.keyselector.IdInTuple;

public class ReduceEdgeNeighborhood<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>>
extends EdgeNeighborhood<LG> {
    public ReduceEdgeNeighborhood(EdgeAggregateFunction function, Neighborhood.EdgeDirection direction) {
        super(function, direction);
    }

    @Override
    public LG execute(LG graph) {
        GroupReduceOperator vertices;
        switch (this.getDirection()) {
            case IN: {
                vertices = graph.getEdges().join(graph.getVertices()).where(new TargetId()).equalTo(new Id()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborEdgeReduceFunction((EdgeAggregateFunction)this.getFunction()));
                break;
            }
            case OUT: {
                vertices = graph.getEdges().join(graph.getVertices()).where(new SourceId()).equalTo(new Id()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborEdgeReduceFunction((EdgeAggregateFunction)this.getFunction()));
                break;
            }
            case BOTH: {
                vertices = graph.getEdges().flatMap(new VertexIdsWithEdge()).map(new SwitchPair()).join(graph.getVertices()).where(new int[]{1}).equalTo(new Id()).with(new VertexToFieldOne()).groupBy(new IdInTuple(1)).reduceGroup(new NeighborEdgeReduceFunction((EdgeAggregateFunction)this.getFunction()));
                break;
            }
            default: {
                vertices = null;
            }
        }
        return graph.getFactory().fromDataSets(graph.getGraphHead(), vertices, graph.getEdges());
    }
}

