/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.aggregation;

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.operators.aggregation.functions.AggregateElements;
import org.gradoop.flink.model.impl.operators.aggregation.functions.CombinePartitionAggregates;
import org.gradoop.flink.model.impl.operators.aggregation.functions.SetAggregateProperty;

public class Aggregation<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements UnaryBaseGraphToBaseGraphOperator<LG> {
    private final Set<AggregateFunction> aggregateFunctions;

    public Aggregation(AggregateFunction ... aggregateFunctions) {
        for (AggregateFunction func : aggregateFunctions) {
            Preconditions.checkNotNull((Object)func);
        }
        this.aggregateFunctions = new HashSet<AggregateFunction>(Arrays.asList(aggregateFunctions));
    }

    @Override
    public LG execute(LG graph) {
        DataSet vertices = graph.getVertices();
        DataSet edges = graph.getEdges();
        GroupReduceOperator aggregate = this.aggregateVertices(vertices).union(this.aggregateEdges(edges)).reduceGroup((GroupReduceFunction)new CombinePartitionAggregates(this.aggregateFunctions));
        SingleInputUdfOperator graphHead = graph.getGraphHead().map(new SetAggregateProperty(this.aggregateFunctions)).withBroadcastSet((DataSet)aggregate, "value");
        return graph.getFactory().fromDataSets(graphHead, vertices, edges);
    }

    private DataSet<Map<String, PropertyValue>> aggregateVertices(DataSet<V> vertices) {
        return vertices.combineGroup(new AggregateElements(this.aggregateFunctions.stream().filter(AggregateFunction::isVertexAggregation).collect(Collectors.toSet())));
    }

    private DataSet<Map<String, PropertyValue>> aggregateEdges(DataSet<E> edges) {
        return edges.combineGroup(new AggregateElements(this.aggregateFunctions.stream().filter(AggregateFunction::isEdgeAggregation).collect(Collectors.toSet())));
    }
}

