/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.algorithms.fsm.transactional;

import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.gradoop.flink.algorithms.fsm.transactional.common.FSMConfig;
import org.gradoop.flink.algorithms.fsm.transactional.tle.ThinkLikeAnEmbeddingFSMBase;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSingleEdgeEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSubgraphDecoder;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSubgraphOnly;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSWrapInSubgraphEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryEdgeLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryFrequent;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryFrequentAndInteresting;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryGraphCounts;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryMinFrequencies;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryVertexLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryWithCount;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.IsCharacteristic;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.IsResult;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.LabelOnly;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.MinEdgeCount;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.ToCCSGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.WithoutInfrequentEdgeLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.WithoutInfrequentVertexLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.pojos.CCSGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.CCSSubgraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.CCSSubgraphEmbeddings;
import org.gradoop.flink.model.impl.functions.utils.First;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

public class CategoryCharacteristicSubgraphs
extends ThinkLikeAnEmbeddingFSMBase<CCSGraph, CCSSubgraph, CCSSubgraphEmbeddings> {
    public static final String CATEGORY_KEY = "_category";
    private DataSet<Map<String, Long>> categoryCounts;
    private DataSet<Map<String, Long>> categoryMinFrequencies;
    private final float minInterestingness;

    public CategoryCharacteristicSubgraphs(FSMConfig fsmConfig, float minInterestingness) {
        super(fsmConfig);
        this.minInterestingness = minInterestingness;
    }

    @Override
    public DataSet<GraphTransaction> execute(DataSet<GraphTransaction> transactions) {
        DataSet<CCSGraph> graphs = transactions.map((MapFunction)new ToCCSGraph());
        this.setCategoryCounts(graphs);
        this.setMinFrequencies();
        if (this.fsmConfig.isPreprocessingEnabled()) {
            graphs = this.preProcessCategories(graphs);
        }
        FlatMapOperator embeddings = graphs.flatMap((FlatMapFunction)new CCSSingleEdgeEmbeddings(this.fsmConfig));
        IterativeDataSet iterative = embeddings.iterate(this.fsmConfig.getMaxEdgeCount());
        DataSet parentEmbeddings = iterative.filter(new IsResult(false));
        DataSet<CCSSubgraph> categoryFrequentSubgraphs = this.getCategoryFrequentSubgraphs((DataSet<CCSSubgraphEmbeddings>)parentEmbeddings);
        GroupReduceOperator frequentSubgraphs = categoryFrequentSubgraphs.groupBy(new int[]{0}).reduceGroup(new First());
        parentEmbeddings = this.filterByFrequentSubgraphs(parentEmbeddings, frequentSubgraphs);
        DataSet childEmbeddings = this.growEmbeddingsOfFrequentSubgraphs(parentEmbeddings, frequentSubgraphs);
        MapOperator resultIncrement = this.getCharacteristicSubgraphs(categoryFrequentSubgraphs).map((MapFunction)new CCSWrapInSubgraphEmbeddings());
        UnionOperator resultAndEmbeddings = iterative.filter(new IsResult(true)).union((DataSet)resultIncrement).union(childEmbeddings);
        MapOperator characteristicSubgraphs = iterative.closeWith((DataSet)resultAndEmbeddings, childEmbeddings).filter(new IsResult(true)).map((MapFunction)new CCSSubgraphOnly());
        if (this.fsmConfig.getMinEdgeCount() > 1) {
            characteristicSubgraphs = characteristicSubgraphs.filter(new MinEdgeCount(this.fsmConfig));
        }
        return characteristicSubgraphs.map((MapFunction)new CCSSubgraphDecoder(this.config));
    }

    private void setCategoryCounts(DataSet<CCSGraph> graphs) {
        this.categoryCounts = graphs.map((MapFunction)new CategoryWithCount()).groupBy(new int[]{0}).sum(1).reduceGroup((GroupReduceFunction)new CategoryGraphCounts());
    }

    private void setMinFrequencies() {
        this.categoryMinFrequencies = this.categoryCounts.map((MapFunction)new CategoryMinFrequencies(this.fsmConfig));
    }

    private DataSet<CCSGraph> preProcessCategories(DataSet<CCSGraph> graphs) {
        DistinctOperator frequentVertexLabels = ((FilterOperator)graphs.flatMap((FlatMapFunction)new CategoryVertexLabels()).groupBy(new int[]{0, 1}).sum(2).filter((FilterFunction)new CategoryFrequent()).withBroadcastSet(this.categoryMinFrequencies, "fmin")).map((MapFunction)new LabelOnly()).distinct();
        graphs = graphs.map(new WithoutInfrequentVertexLabels()).withBroadcastSet((DataSet)frequentVertexLabels, "fvl");
        DistinctOperator frequentEdgeLabels = ((FilterOperator)graphs.flatMap((FlatMapFunction)new CategoryEdgeLabels()).groupBy(new int[]{0, 1}).sum(2).filter((FilterFunction)new CategoryFrequent()).withBroadcastSet(this.categoryMinFrequencies, "fmin")).map((MapFunction)new LabelOnly()).distinct();
        graphs = graphs.map(new WithoutInfrequentEdgeLabels()).withBroadcastSet((DataSet)frequentEdgeLabels, "fel");
        return graphs;
    }

    private FilterOperator<CCSSubgraph> getCharacteristicSubgraphs(DataSet<CCSSubgraph> frequentSubgraphs) {
        return frequentSubgraphs.filter((FilterFunction)new IsCharacteristic());
    }

    private DataSet<CCSSubgraph> getCategoryFrequentSubgraphs(DataSet<CCSSubgraphEmbeddings> embeddings) {
        return ((GroupReduceOperator)embeddings.map((MapFunction)new CCSSubgraphOnly()).groupBy(new int[]{0, 3}).sum(1).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new CategoryFrequentAndInteresting(this.minInterestingness)).withBroadcastSet(this.categoryCounts, "graphCount")).withBroadcastSet(this.categoryMinFrequencies, "fmin");
    }
}

