/*
 * Decompiled with CFR 0.152.
 */
package org.gradoop.flink.model.impl.operators.cypher.capf.query;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.metadata.PropertyMetaData;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaDataSource;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.operators.Operator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.count.Count;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.EdgeLabelFilter;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.EdgeToTuple;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.IdOfF1;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.PropertyEncoder;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.ReplaceSourceId;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.ReplaceTargetId;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.TupleToRow;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.UniqueIdWithOffset;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.VertexLabelFilter;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.functions.VertexToRow;
import org.gradoop.flink.model.impl.operators.cypher.capf.result.CAPFQueryResult;
import org.opencypher.flink.api.CAPFSession;
import org.opencypher.flink.api.CAPFSession$;
import org.opencypher.flink.api.io.CAPFElementTable;
import org.opencypher.flink.impl.table.FlinkCypherTable;
import org.opencypher.okapi.api.io.conversion.ElementMapping;
import org.opencypher.okapi.api.io.conversion.NodeMappingBuilder;
import org.opencypher.okapi.api.io.conversion.RelationshipMappingBuilder;
import org.opencypher.okapi.relational.api.graph.RelationalCypherGraph;
import org.opencypher.okapi.relational.api.io.ElementTable;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import scala.reflect.ClassTag$;

public class CAPFQuery
implements Operator {
    private String query;
    private MetaData metaData;
    private CAPFSession session;
    private org.apache.flink.api.java.DataSet<Long> vertexCount;
    private org.apache.flink.api.java.DataSet<Tuple2<Long, EPGMVertex>> verticesWithIds;
    private org.apache.flink.api.java.DataSet<Tuple2<Long, EPGMEdge>> edgesWithIds;

    public CAPFQuery(String query, ExecutionEnvironment env) {
        this.query = query;
        this.vertexCount = null;
        this.session = CAPFSession$.MODULE$.create(new org.apache.flink.api.scala.ExecutionEnvironment(env));
    }

    public CAPFQuery(String query, MetaData metaData, ExecutionEnvironment env) {
        this.query = query;
        this.metaData = metaData;
        this.vertexCount = null;
        this.session = CAPFSession$.MODULE$.create(new org.apache.flink.api.scala.ExecutionEnvironment(env));
    }

    public CAPFQueryResult execute(LogicalGraph graph) throws Exception {
        if (this.metaData == null) {
            graph = this.transformGraphProperties(graph);
            this.metaData = new CSVMetaDataSource().fromTuples(new CSVMetaDataSource().tuplesFromGraph(graph).collect());
        }
        List<CAPFElementTable> nodeTables = this.createNodeTables(graph);
        List<CAPFElementTable> relTables = this.createRelationshipTables(graph);
        if (nodeTables.size() > 0) {
            ArrayList<CAPFElementTable> tables = new ArrayList<CAPFElementTable>(nodeTables.subList(1, nodeTables.size()));
            tables.addAll(relTables);
            Buffer tableSeq = JavaConversions.asScalaBuffer(tables);
            RelationalCypherGraph g = this.session.readFrom((ElementTable)nodeTables.get(0), (Seq)tableSeq);
            return new CAPFQueryResult(g.cypher(this.query, g.cypher$default$2(), g.cypher$default$3(), g.cypher$default$4()), this.verticesWithIds, this.edgesWithIds, (BaseGraphCollectionFactory<EPGMGraphHead, EPGMVertex, EPGMEdge, LogicalGraph, GraphCollection>)graph.getCollectionFactory());
        }
        return null;
    }

    private LogicalGraph transformGraphProperties(LogicalGraph graph) {
        MapOperator transformedVertices = graph.getVertices().map(new PropertyEncoder());
        MapOperator transformedEdges = graph.getEdges().map(new PropertyEncoder());
        return (LogicalGraph)graph.getFactory().fromDataSets((org.apache.flink.api.java.DataSet)transformedVertices, (org.apache.flink.api.java.DataSet)transformedEdges);
    }

    private List<CAPFElementTable> createNodeTables(LogicalGraph graph) {
        ArrayList<CAPFElementTable> nodeTables = new ArrayList<CAPFElementTable>();
        this.verticesWithIds = graph.getVertices().map(new UniqueIdWithOffset());
        this.vertexCount = Count.count((org.apache.flink.api.java.DataSet)graph.getVertices());
        for (String label : this.metaData.getVertexLabels()) {
            List propertyTypes = this.metaData.getVertexPropertyMetaData(label);
            TypeInformation[] types = new TypeInformation[propertyTypes.size() + 1];
            ArrayList<String> propKeys = new ArrayList<String>(propertyTypes.size());
            types[0] = TypeInformation.of(Long.class);
            for (int i = 0; i < propertyTypes.size(); ++i) {
                PropertyMetaData pmd = (PropertyMetaData)propertyTypes.get(i);
                propKeys.add(pmd.getKey());
                types[i + 1] = TypeInformation.of((Class)MetaData.getClassFromTypeString((String)pmd.getTypeString()));
            }
            RowTypeInfo info = new RowTypeInfo(types);
            FilterOperator verticesByLabelWithIds = this.verticesWithIds.filter((FilterFunction)new VertexLabelFilter(label));
            DataSet scalaRowDataSet = new DataSet((org.apache.flink.api.java.DataSet)verticesByLabelWithIds.map((MapFunction)new VertexToRow(propKeys)).returns((TypeInformation)info), ClassTag$.MODULE$.apply(Row.class));
            StringBuilder schemaStringBuilder = new StringBuilder("node_id");
            NodeMappingBuilder nodeMappingBuilder = NodeMappingBuilder.withSourceIdKey((String)"node_id").withImpliedLabel(label);
            for (String propKey : propKeys) {
                schemaStringBuilder.append(", ").append("prop_").append(propKey);
                nodeMappingBuilder = nodeMappingBuilder.withPropertyKey(propKey, "prop_" + propKey);
            }
            String schemaString = schemaStringBuilder.toString();
            FlinkCypherTable.FlinkTable vertexTable = FlinkCypherTable.FlinkTable((Table)this.session.tableEnv().fromDataSet(scalaRowDataSet).as(schemaString), (CAPFSession)this.session);
            nodeTables.add(CAPFElementTable.create((ElementMapping)nodeMappingBuilder.build(), (FlinkCypherTable.FlinkTable)vertexTable, (CAPFSession)this.session));
        }
        return nodeTables;
    }

    private List<CAPFElementTable> createRelationshipTables(LogicalGraph graph) {
        ArrayList<CAPFElementTable> relTables = new ArrayList<CAPFElementTable>();
        this.edgesWithIds = graph.getEdges().map(new UniqueIdWithOffset()).withBroadcastSet(this.vertexCount, "offset");
        JoinOperator.EquiJoin edgeTuples = this.edgesWithIds.map((MapFunction)new EdgeToTuple()).join(this.verticesWithIds).where(new int[]{1}).equalTo(new IdOfF1()).with((JoinFunction)new ReplaceSourceId()).join(this.verticesWithIds).where(new int[]{2}).equalTo(new IdOfF1()).with((JoinFunction)new ReplaceTargetId());
        for (String label : this.metaData.getEdgeLabels()) {
            List propertyTypes = this.metaData.getEdgePropertyMetaData(label);
            TypeInformation[] types = new TypeInformation[propertyTypes.size() + 3];
            ArrayList<String> propKeys = new ArrayList<String>(propertyTypes.size());
            types[0] = TypeInformation.of(Long.class);
            types[1] = TypeInformation.of(Long.class);
            types[2] = TypeInformation.of(Long.class);
            for (int i = 0; i < propertyTypes.size(); ++i) {
                PropertyMetaData pmd = (PropertyMetaData)propertyTypes.get(i);
                propKeys.add(pmd.getKey());
                types[i + 3] = TypeInformation.of((Class)MetaData.getClassFromTypeString((String)pmd.getTypeString()));
            }
            RowTypeInfo info = new RowTypeInfo(types);
            FilterOperator edgesByLabel = edgeTuples.filter((FilterFunction)new EdgeLabelFilter(label));
            DataSet scalaRowDataSet = new DataSet((org.apache.flink.api.java.DataSet)edgesByLabel.map((MapFunction)new TupleToRow(propKeys)).returns((TypeInformation)info), ClassTag$.MODULE$.apply(Row.class));
            StringBuilder schemaStringBuilder = new StringBuilder();
            schemaStringBuilder.append("edge_id").append(", ").append("start_node").append(", ").append("end_node");
            RelationshipMappingBuilder relMappingBuilder = RelationshipMappingBuilder.withSourceIdKey((String)"edge_id").withSourceStartNodeKey("start_node").withSourceEndNodeKey("end_node").withRelType(label);
            for (String propKey : propKeys) {
                schemaStringBuilder.append(", ").append("prop_").append(propKey);
                relMappingBuilder = relMappingBuilder.withPropertyKey(propKey, "prop_" + propKey);
            }
            String schemaString = schemaStringBuilder.toString();
            FlinkCypherTable.FlinkTable edgeTable = FlinkCypherTable.FlinkTable((Table)this.session.tableEnv().fromDataSet(scalaRowDataSet).as(schemaString), (CAPFSession)this.session);
            relTables.add(CAPFElementTable.create((ElementMapping)relMappingBuilder.build(), (FlinkCypherTable.FlinkTable)edgeTable, (CAPFSession)this.session));
        }
        return relTables;
    }
}

