/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.kb.internal.computer;

import ai.grakn.kb.internal.computer.GraknSparkMemory;
import com.google.common.base.Optional;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.spark.process.computer.CombineIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.MapIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.ReduceIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMessenger;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;

public class GraknSparkExecutor {
    private GraknSparkExecutor() {
    }

    public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, GraphFilter graphFilter) {
        return graphRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            GraphFilter gFilter = graphFilter.clone();
            return () -> IteratorUtils.filter((Iterator)partitionIterator, tuple -> ((VertexWritable)tuple._2()).get().applyGraphFilter(gFilter).isPresent());
        }, true);
    }

    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, GraknSparkMemory memory, Configuration graphComputerConfiguration, Configuration vertexProgramConfiguration) {
        boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
        if (partitionedGraphRDD && null != viewIncomingRDD) assert (((Partitioner)graphRDD.partitioner().get()).equals(viewIncomingRDD.partitioner().get()));
        JavaPairRDD viewOutgoingRDD = (null == viewIncomingRDD ? graphRDD.mapValues((Function & Serializable)vertexWritable -> new Tuple2(vertexWritable, (Object)Optional.absent())) : graphRDD.leftOuterJoin(viewIncomingRDD)).mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            KryoShimServiceLoader.applyConfiguration((Configuration)graphComputerConfiguration);
            if (!partitionIterator.hasNext()) {
                return Collections.emptyList();
            }
            VertexProgram workerVertexProgram = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((Configuration)graphComputerConfiguration), (Configuration)vertexProgramConfiguration);
            String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray((Set)workerVertexProgram.getVertexComputeKeys());
            SparkMessenger messenger = new SparkMessenger();
            workerVertexProgram.workerIterationStart(memory.asImmutable());
            return () -> IteratorUtils.map((Iterator)partitionIterator, vertexViewIncoming -> {
                ArrayList previousView;
                StarGraph.StarVertex vertex = ((VertexWritable)((Tuple2)vertexViewIncoming._2())._1()).get();
                boolean hasViewAndMessages = ((Optional)((Tuple2)vertexViewIncoming._2())._2()).isPresent();
                List list = hasViewAndMessages ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getView() : (previousView = memory.isInitialIteration() ? new ArrayList() : Collections.emptyList());
                if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0) {
                    vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach((VertexProperty)vertexProperty, (boolean)true)));
                }
                vertex.dropVertexProperties(vertexComputeKeysArray);
                List incomingMessages = hasViewAndMessages ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getIncomingMessages() : Collections.emptyList();
                IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> {
                    VertexProperty cfr_ignored_0 = (VertexProperty)property.attach(Attachable.Method.create((Host)vertex));
                });
                assert (previousView.isEmpty());
                messenger.setVertexAndIncomingMessages((Vertex)vertex, (Iterable)incomingMessages);
                workerVertexProgram.execute((Vertex)ComputerGraph.vertexProgram((Vertex)vertex, (VertexProgram)workerVertexProgram), (Messenger)messenger, (Memory)memory);
                incomingMessages.clear();
                List nextView = vertexComputeKeysArray.length == 0 ? Collections.emptyList() : IteratorUtils.list((Iterator)IteratorUtils.map((Iterator)vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach((VertexProperty)vertexProperty, (boolean)true)));
                vertex.dropVertexProperties(vertexComputeKeysArray);
                List outgoingMessages = messenger.getOutgoingMessages();
                if (!partitionIterator.hasNext()) {
                    workerVertexProgram.workerIterationEnd(memory.asImmutable());
                }
                return nextView.isEmpty() && outgoingMessages.isEmpty() ? null : new Tuple2(vertex.id(), (Object)new ViewOutgoingPayload(nextView, outgoingMessages));
            });
        }, true).filter((Function & Serializable)tuple -> null != tuple);
        if (partitionedGraphRDD) assert (((Partitioner)graphRDD.partitioner().get()).equals(viewOutgoingRDD.partitioner().get()));
        PairFlatMapFunction & Serializable messageFunction = (PairFlatMapFunction & Serializable)tuple -> () -> IteratorUtils.concat((Iterator[])new Iterator[]{IteratorUtils.of((Object)new Tuple2(tuple._1(), (Object)((ViewOutgoingPayload)tuple._2()).getView())), IteratorUtils.map(((ViewOutgoingPayload)tuple._2()).getOutgoingMessages().iterator(), message -> new Tuple2(message._1(), (Object)new MessagePayload(message._2())))});
        MessageCombiner messageCombiner = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((Configuration)vertexProgramConfiguration), (Configuration)vertexProgramConfiguration).getMessageCombiner().orElse(null);
        Function2 & Serializable reducerFunction = (Function2 & Serializable)(a, b) -> {
            if (a instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)a).mergePayload(b, messageCombiner);
                return a;
            }
            if (b instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)b).mergePayload(a, messageCombiner);
                return b;
            }
            ViewIncomingPayload c = new ViewIncomingPayload(messageCombiner);
            c.mergePayload(a, messageCombiner);
            c.mergePayload(b, messageCombiner);
            return c;
        };
        JavaPairRDD newViewIncomingRDD = (partitionedGraphRDD ? viewOutgoingRDD.flatMapToPair((PairFlatMapFunction)messageFunction).reduceByKey((Partitioner)graphRDD.partitioner().get(), (Function2)reducerFunction) : viewOutgoingRDD.flatMapToPair((PairFlatMapFunction)messageFunction).reduceByKey((Function2)reducerFunction)).mapValues((Function & Serializable)payload -> {
            if (payload instanceof ViewIncomingPayload) {
                return (ViewIncomingPayload)payload;
            }
            if (payload instanceof ViewPayload) {
                return new ViewIncomingPayload((ViewPayload)payload);
            }
            return new ViewIncomingPayload((MessagePayload)payload);
        });
        if (partitionedGraphRDD) assert (((Partitioner)graphRDD.partitioner().get()).equals(newViewIncomingRDD.partitioner().get()));
        newViewIncomingRDD.foreachPartition((VoidFunction & Serializable)partitionIterator -> KryoShimServiceLoader.applyConfiguration((Configuration)graphComputerConfiguration));
        return newViewIncomingRDD;
    }

    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, Set<VertexComputeKey> vertexComputeKeys) {
        if (graphRDD.partitioner().isPresent()) assert (((Partitioner)graphRDD.partitioner().get()).equals(viewIncomingRDD.partitioner().get()));
        String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys);
        return graphRDD.leftOuterJoin(viewIncomingRDD).mapValues((Function & Serializable)tuple -> {
            StarGraph.StarVertex vertex = ((VertexWritable)tuple._1()).get();
            vertex.dropVertexProperties(vertexComputeKeysArray);
            List view = ((Optional)tuple._2()).isPresent() ? ((ViewIncomingPayload)((Optional)tuple._2()).get()).getView() : Collections.emptyList();
            for (DetachedVertexProperty property : view) {
                if (VertexProgramHelper.isTransientVertexComputeKey((String)property.key(), (Set)vertexComputeKeys)) continue;
                property.attach(Attachable.Method.create((Host)vertex));
            }
            return (VertexWritable)tuple._1();
        });
    }

    public static <K, V> JavaPairRDD<K, V> executeMap(JavaPairRDD<Object, VertexWritable> graphRDD, MapReduce<K, V, ?, ?, ?> mapReduce, Configuration graphComputerConfiguration) {
        JavaPairRDD mapRDD = graphRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            KryoShimServiceLoader.applyConfiguration((Configuration)graphComputerConfiguration);
            return () -> new MapIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)graphComputerConfiguration), (Configuration)graphComputerConfiguration), partitionIterator);
        });
        if (mapReduce.getMapKeySort().isPresent()) {
            mapRDD = mapRDD.sortByKey((Comparator)mapReduce.getMapKeySort().get(), true, 1);
        }
        return mapRDD;
    }

    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(JavaPairRDD<K, V> mapRDD, Configuration graphComputerConfiguration) {
        return mapRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            KryoShimServiceLoader.applyConfiguration((Configuration)graphComputerConfiguration);
            return () -> new CombineIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)graphComputerConfiguration), (Configuration)graphComputerConfiguration), partitionIterator);
        });
    }

    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(JavaPairRDD<K, V> mapOrCombineRDD, MapReduce<K, V, OK, OV, ?> mapReduce, Configuration graphComputerConfiguration) {
        JavaPairRDD reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            KryoShimServiceLoader.applyConfiguration((Configuration)graphComputerConfiguration);
            return () -> new ReduceIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)graphComputerConfiguration), (Configuration)graphComputerConfiguration), partitionIterator);
        });
        if (mapReduce.getReduceKeySort().isPresent()) {
            reduceRDD = reduceRDD.sortByKey((Comparator)mapReduce.getReduceKeySort().get(), true, 1);
        }
        return reduceRDD;
    }
}

