/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.matching.single.simulation.dual;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.ProjectOperator;
import org.apache.log4j.Logger;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.VertexFromId;
import org.gradoop.flink.model.impl.functions.utils.RightSide;
import org.gradoop.flink.model.impl.operators.matching.common.PostProcessor;
import org.gradoop.flink.model.impl.operators.matching.common.PreProcessor;
import org.gradoop.flink.model.impl.operators.matching.common.debug.Printer;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.single.PatternMatching;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.debug.PrintDeletion;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.debug.PrintFatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.debug.PrintMessage;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.BuildFatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.CloneAndReverse;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.CombinedMessages;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.GroupedFatVertices;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.GroupedMessages;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.UpdateVertexState;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.UpdatedFatVertices;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.ValidFatVertices;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.ValidateNeighborhood;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.Deletion;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.Message;

public class DualSimulation<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>>
extends PatternMatching<G, V, E, LG, GC> {
    private static Logger LOG = Logger.getLogger(DualSimulation.class);
    private final boolean useBulkIteration;

    public DualSimulation(String query, boolean attachData, boolean useBulk) {
        super(query, attachData, LOG);
        this.useBulkIteration = useBulk;
    }

    @Override
    protected GC executeForVertex(LG graph) {
        ProjectOperator matchingVertexIds = PreProcessor.filterVertices(graph, this.getQuery()).project(new int[]{0});
        BaseGraphFactory graphFactory = graph.getFactory();
        BaseGraphCollectionFactory collectionFactory = graph.getCollectionFactory();
        if (this.doAttachData()) {
            return collectionFactory.fromGraph(graphFactory.fromDataSets(matchingVertexIds.join(graph.getVertices()).where(new int[]{0}).equalTo(new Id()).with(new RightSide())));
        }
        return collectionFactory.fromGraph(graphFactory.fromDataSets(matchingVertexIds.map(new VertexFromId(graphFactory.getVertexFactory()))));
    }

    @Override
    protected GC executeForPattern(LG graph) {
        DataSet<TripleWithCandidates<GradoopId>> triples = this.filterTriples(graph);
        DataSet<FatVertex> fatVertices = this.buildInitialWorkingSet(triples);
        DataSet<FatVertex> result = this.useBulkIteration ? this.simulateBulk(fatVertices) : this.simulateDelta(fatVertices);
        return this.postProcess(graph, result);
    }

    private DataSet<TripleWithCandidates<GradoopId>> filterTriples(LG g) {
        return PreProcessor.filterTriplets(g, this.getQuery());
    }

    private DataSet<FatVertex> buildInitialWorkingSet(DataSet<TripleWithCandidates<GradoopId>> triples) {
        return triples.flatMap((FlatMapFunction)new CloneAndReverse()).groupBy(new int[]{1}).combineGroup((GroupCombineFunction)new BuildFatVertex(this.getQuery())).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupedFatVertices());
    }

    private DataSet<FatVertex> simulateBulk(DataSet<FatVertex> vertices) {
        vertices = Printer.log(vertices, new PrintFatVertex(false, "iteration start"), this.getVertexMapping(), this.getEdgeMapping());
        IterativeDataSet workSet = vertices.iterate(Integer.MAX_VALUE);
        DataSet<Deletion> deletions = workSet.filter((FilterFunction)new UpdatedFatVertices()).flatMap((FlatMapFunction)new ValidateNeighborhood(this.getQuery()));
        deletions = Printer.log(deletions, new PrintDeletion(true, "deletion"), this.getVertexMapping(), this.getEdgeMapping());
        DataSet<Message> combinedMessages = deletions.groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new CombinedMessages());
        combinedMessages = Printer.log(combinedMessages, new PrintMessage(true, "combined"), this.getVertexMapping(), this.getEdgeMapping());
        DataSet<Message> messages = combinedMessages.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupedMessages());
        messages = Printer.log(messages, new PrintMessage(true, "grouped"), this.getVertexMapping(), this.getEdgeMapping());
        DataSet<FatVertex> nextWorkingSet = workSet.leftOuterJoin(messages).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new UpdateVertexState(this.getQuery())).filter((FilterFunction)new ValidFatVertices());
        nextWorkingSet = Printer.log(nextWorkingSet, new PrintFatVertex(true, "next workset"), this.getVertexMapping(), this.getEdgeMapping());
        return workSet.closeWith(nextWorkingSet, deletions);
    }

    private DataSet<FatVertex> simulateDelta(DataSet<FatVertex> vertices) {
        DataSet<Message> initialWorkingSet = vertices.flatMap((FlatMapFunction)new ValidateNeighborhood(this.getQuery())).groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new CombinedMessages()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupedMessages());
        vertices = Printer.log(vertices, new PrintFatVertex(false, "initial solution set"), this.getVertexMapping(), this.getEdgeMapping());
        initialWorkingSet = Printer.log(initialWorkingSet, new PrintMessage(false, "initial working set"), this.getVertexMapping(), this.getEdgeMapping());
        DeltaIteration iteration = vertices.iterateDelta(initialWorkingSet, Integer.MAX_VALUE, new int[]{0});
        DataSet<FatVertex> deltas = iteration.getSolutionSet().join((DataSet)iteration.getWorkset()).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new UpdateVertexState(this.getQuery()));
        deltas = Printer.log(deltas, new PrintFatVertex(true, "solution set delta"), this.getVertexMapping(), this.getEdgeMapping());
        DataSet<Message> updates = deltas.filter((FilterFunction)new ValidFatVertices()).flatMap((FlatMapFunction)new ValidateNeighborhood(this.getQuery())).groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new CombinedMessages()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupedMessages());
        updates = Printer.log(updates, new PrintMessage(true, "next working set"), this.getVertexMapping(), this.getEdgeMapping());
        return iteration.closeWith(deltas, updates).filter((FilterFunction)new ValidFatVertices());
    }

    private GC postProcess(LG graph, DataSet<FatVertex> vertices) {
        BaseGraphFactory graphFactory = graph.getFactory();
        DataSet matchVertices = this.doAttachData() ? PostProcessor.extractVerticesWithData(vertices, graph.getVertices()) : PostProcessor.extractVertices(vertices, graphFactory.getVertexFactory());
        DataSet matchEdges = this.doAttachData() ? PostProcessor.extractEdgesWithData(vertices, graph.getEdges()) : PostProcessor.extractEdges(vertices, graphFactory.getEdgeFactory());
        return graph.getCollectionFactory().fromGraph(graphFactory.fromDataSets(matchVertices, matchEdges));
    }
}

