/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.kb.internal.computer;

import ai.grakn.kb.internal.computer.GraknSparkExecutor;
import ai.grakn.kb.internal.computer.GraknSparkMemory;
import ai.grakn.kb.internal.computer.GraknSparkVertexProgramInterceptor;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkInterceptorStrategy;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkSingleIterationStrategy;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class GraknSparkComputer
extends AbstractHadoopGraphComputer {
    private static final Logger LOGGER = LoggerFactory.getLogger(GraknSparkComputer.class);
    private final org.apache.commons.configuration.Configuration sparkConfiguration;
    private boolean workersSet = false;
    private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(GraknSparkComputer.class.getSimpleName() + "-boss").build();
    private final ExecutorService computerService = Executors.newSingleThreadExecutor(this.threadFactoryBoss);
    private String jobGroupId = null;

    public GraknSparkComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
        this.sparkConfiguration = new HadoopConfiguration();
        ConfigurationUtils.copy((org.apache.commons.configuration.Configuration)this.hadoopGraph.configuration(), (org.apache.commons.configuration.Configuration)this.sparkConfiguration);
    }

    public GraphComputer workers(int workers) {
        super.workers(workers);
        if (this.sparkConfiguration.containsKey("spark.master") && this.sparkConfiguration.getString("spark.master").startsWith("local")) {
            this.sparkConfiguration.setProperty("spark.master", (Object)("local[" + this.workers + "]"));
        }
        this.workersSet = true;
        return this;
    }

    public GraphComputer configure(String key, Object value) {
        this.sparkConfiguration.setProperty(key, value);
        return this;
    }

    public Future<ComputerResult> submit() {
        this.validateStatePriorToExecution();
        return ComputerSubmissionHelper.runWithBackgroundThread(exec -> this.submitWithExecutor(), (String)"SparkSubmitter");
    }

    public void cancelJobs() {
        if (this.jobGroupId != null) {
            Spark.getContext().cancelJobGroup(this.jobGroupId);
        }
    }

    private Future<ComputerResult> submitWithExecutor() {
        this.jobGroupId = Integer.toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
        String jobDescription = this.vertexProgram == null ? this.mapReducers.toString() : this.vertexProgram + "+" + this.mapReducers;
        this.sparkConfiguration.setProperty("gremlin.hadoop.outputLocation", (Object)(this.sparkConfiguration.getString("gremlin.hadoop.outputLocation") + "/" + this.jobGroupId));
        GraknSparkComputer.updateConfigKeys(this.sparkConfiguration);
        Future<ComputerResult> result = this.computerService.submit(() -> {
            boolean filtered;
            OutputRDD outputRDD;
            InputRDD inputRDD;
            String inputLocation;
            long startTime = System.currentTimeMillis();
            HadoopConfiguration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
            if (!graphComputerConfiguration.containsKey("spark.serializer")) {
                graphComputerConfiguration.setProperty("spark.serializer", (Object)GryoSerializer.class.getCanonicalName());
            }
            graphComputerConfiguration.setProperty("gremlin.hadoop.graphWriter.hasEdges", (Object)this.persist.equals((Object)GraphComputer.Persist.EDGES));
            Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration((org.apache.commons.configuration.Configuration)graphComputerConfiguration);
            FileSystemStorage fileSystemStorage = FileSystemStorage.open((Configuration)hadoopConfiguration);
            boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class));
            boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class));
            boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class));
            boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class));
            boolean skipPartitioner = graphComputerConfiguration.getBoolean("gremlin.spark.skipPartitioner", false);
            boolean skipPersist = graphComputerConfiguration.getBoolean("gremlin.spark.skipGraphCache", false);
            if (inputFromHDFS && null != (inputLocation = (String)Constants.getSearchGraphLocation((String)hadoopConfiguration.get("gremlin.hadoop.inputLocation"), (Storage)fileSystemStorage).orElse(null))) {
                try {
                    graphComputerConfiguration.setProperty("mapreduce.input.fileinputformat.inputdir", (Object)FileSystem.get((Configuration)hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
                    hadoopConfiguration.set("mapreduce.input.fileinputformat.inputdir", FileSystem.get((Configuration)hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
                }
                catch (IOException e) {
                    throw new IllegalStateException(e.getMessage(), e);
                }
            }
            try {
                inputRDD = InputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphReader", Object.class)) ? (InputRDD)hadoopConfiguration.getClass("gremlin.hadoop.graphReader", InputRDD.class, InputRDD.class).newInstance() : (InputRDD)InputFormatRDD.class.newInstance();
                OutputRDD outputRDD2 = outputRDD = OutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphWriter", Object.class)) ? (OutputRDD)hadoopConfiguration.getClass("gremlin.hadoop.graphWriter", OutputRDD.class, OutputRDD.class).newInstance() : (OutputRDD)OutputFormatRDD.class.newInstance();
                if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphReader", InputFormat.class, InputFormat.class))) {
                    GraphFilterAware.storeGraphFilter((org.apache.commons.configuration.Configuration)graphComputerConfiguration, (Configuration)hadoopConfiguration, (GraphFilter)this.graphFilter);
                    filtered = false;
                } else if (inputRDD instanceof GraphFilterAware) {
                    ((GraphFilterAware)inputRDD).setGraphFilter(this.graphFilter);
                    filtered = false;
                } else {
                    filtered = this.graphFilter.hasFilter();
                }
            }
            catch (IllegalAccessException | InstantiationException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            JavaSparkContext sparkContext = new JavaSparkContext(Spark.create((Configuration)hadoopConfiguration));
            SparkContextStorage sparkContextStorage = SparkContextStorage.open();
            sparkContext.setJobGroup(this.jobGroupId, jobDescription);
            GraknSparkMemory memory = null;
            String outputLocation = hadoopConfiguration.get("gremlin.hadoop.outputLocation", null);
            if (null != outputLocation) {
                if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
                    fileSystemStorage.rm(outputLocation);
                }
                if (outputToSpark && sparkContextStorage.exists(outputLocation)) {
                    sparkContextStorage.rm(outputLocation);
                }
            }
            this.logger.debug("HadoopGremlin(Spark): " + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
            this.loadJars(hadoopConfiguration, new Object[]{sparkContext});
            GraknSparkComputer.updateLocalConfiguration(sparkContext, hadoopConfiguration);
            boolean partitioned = false;
            JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD((org.apache.commons.configuration.Configuration)graphComputerConfiguration, sparkContext);
            if (filtered) {
                this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
                loadedGraphRDD = GraknSparkExecutor.applyGraphFilter(loadedGraphRDD, this.graphFilter);
            }
            if (loadedGraphRDD.partitioner().isPresent()) {
                this.logger.debug("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
            } else if (!skipPartitioner) {
                HashPartitioner partitioner = new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size());
                this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
                loadedGraphRDD = loadedGraphRDD.partitionBy((Partitioner)partitioner);
                partitioned = true;
                assert (loadedGraphRDD.partitioner().isPresent());
            } else {
                assert (skipPartitioner == !loadedGraphRDD.partitioner().isPresent());
                this.logger.debug("Partitioning has been skipped for the loaded graphRDD via gremlin.spark.skipPartitioner");
            }
            if (this.workersSet) {
                if (loadedGraphRDD.partitions().size() > this.workers) {
                    loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
                } else if (loadedGraphRDD.partitions().size() < this.workers) {
                    loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                }
            }
            if (!skipPersist && (!inputFromSpark || partitioned || filtered)) {
                loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString((String)hadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
            }
            JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
            try {
                MapMemory finalMemory;
                boolean computedGraphCreated;
                if (null != this.vertexProgram) {
                    memory = new GraknSparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                    if (graphComputerConfiguration.containsKey("gremlin.hadoop.vertexProgramInterceptor")) {
                        try {
                            GraknSparkVertexProgramInterceptor interceptor = (GraknSparkVertexProgramInterceptor)Class.forName(graphComputerConfiguration.getString("gremlin.hadoop.vertexProgramInterceptor")).newInstance();
                            computedGraphRDD = (JavaPairRDD)interceptor.apply(this.vertexProgram, loadedGraphRDD, (Memory)memory);
                        }
                        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                            throw new IllegalStateException(e.getMessage());
                        }
                    } else {
                        HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
                        this.vertexProgram.storeState((org.apache.commons.configuration.Configuration)vertexProgramConfiguration);
                        this.vertexProgram.setup((Memory)memory);
                        JavaPairRDD viewIncomingRDD = null;
                        memory.broadcastMemory(sparkContext);
                        while (true) {
                            if (Thread.interrupted()) {
                                sparkContext.cancelAllJobs();
                                throw new TraversalInterruptedException();
                            }
                            memory.setInExecute(true);
                            viewIncomingRDD = GraknSparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, (org.apache.commons.configuration.Configuration)graphComputerConfiguration, (org.apache.commons.configuration.Configuration)vertexProgramConfiguration);
                            memory.setInExecute(false);
                            if (this.vertexProgram.terminate((Memory)memory)) break;
                            memory.incrIteration();
                            memory.broadcastMemory(sparkContext);
                        }
                        if (null != outputRDD && !this.persist.equals((Object)GraphComputer.Persist.NOTHING) || !this.mapReducers.isEmpty()) {
                            computedGraphRDD = GraknSparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
                            assert (null != computedGraphRDD && computedGraphRDD != loadedGraphRDD);
                        } else assert (null == computedGraphRDD);
                    }
                    memory.complete();
                    if (null != outputRDD && !this.persist.equals((Object)GraphComputer.Persist.NOTHING)) {
                        assert (null != computedGraphRDD);
                        outputRDD.writeGraphRDD((org.apache.commons.configuration.Configuration)graphComputerConfiguration, (JavaPairRDD)computedGraphRDD);
                    }
                }
                boolean bl = computedGraphCreated = computedGraphRDD != null && computedGraphRDD != loadedGraphRDD;
                if (!computedGraphCreated) {
                    computedGraphRDD = loadedGraphRDD;
                }
                MapMemory mapMemory = finalMemory = null == memory ? new MapMemory() : new MapMemory((Memory)memory);
                if (!this.mapReducers.isEmpty()) {
                    JavaPairRDD<Object, VertexWritable> mapReduceRDD = computedGraphRDD;
                    if (computedGraphCreated && !outputToSpark) {
                        mapReduceRDD = computedGraphRDD.mapValues((Function & Serializable)vertexWritable -> {
                            vertexWritable.get().dropEdges(Direction.BOTH);
                            return vertexWritable;
                        });
                        if (this.mapReducers.size() > 1) {
                            mapReduceRDD = mapReduceRDD.persist(StorageLevel.fromString((String)hadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                        }
                    }
                    for (MapReduce mapReduce : this.mapReducers) {
                        JavaPairRDD reduceRDD;
                        HadoopConfiguration newApacheConfiguration = new HadoopConfiguration((org.apache.commons.configuration.Configuration)graphComputerConfiguration);
                        mapReduce.storeState((org.apache.commons.configuration.Configuration)newApacheConfiguration);
                        JavaPairRDD mapRDD = GraknSparkExecutor.executeMap(mapReduceRDD, mapReduce, (org.apache.commons.configuration.Configuration)newApacheConfiguration);
                        JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? GraknSparkExecutor.executeCombine(mapRDD, (org.apache.commons.configuration.Configuration)newApacheConfiguration) : mapRDD;
                        Object object = reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? GraknSparkExecutor.executeReduce(combineRDD, mapReduce, (org.apache.commons.configuration.Configuration)newApacheConfiguration) : combineRDD;
                        if (null == outputRDD) continue;
                        mapReduce.addResultToMemory((Memory.Admin)finalMemory, outputRDD.writeMemoryRDD((org.apache.commons.configuration.Configuration)graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD));
                    }
                    if (computedGraphCreated && !outputToSpark) {
                        assert (loadedGraphRDD != computedGraphRDD);
                        assert (mapReduceRDD != computedGraphRDD);
                        mapReduceRDD.unpersist();
                    } else assert (mapReduceRDD == computedGraphRDD);
                }
                if (!inputFromSpark || partitioned || filtered) {
                    loadedGraphRDD.unpersist();
                }
                if ((!outputToSpark || this.persist.equals((Object)GraphComputer.Persist.NOTHING)) && computedGraphCreated) {
                    computedGraphRDD.unpersist();
                }
                if (null != outputLocation && this.persist.equals((Object)GraphComputer.Persist.NOTHING)) {
                    if (outputToHDFS) {
                        fileSystemStorage.rm(outputLocation);
                    }
                    if (outputToSpark) {
                        sparkContextStorage.rm(outputLocation);
                    }
                }
                finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                graphComputerConfiguration.clearProperty("gremlin.hadoop.graphFilter");
                graphComputerConfiguration.clearProperty("gremlin.hadoop.vertexProgramInterceptor");
                graphComputerConfiguration.clearProperty("gremlin.spark.skipGraphCache");
                graphComputerConfiguration.clearProperty("gremlin.spark.skipPartitioner");
                return new DefaultComputerResult((Graph)InputOutputHelper.getOutputGraph((org.apache.commons.configuration.Configuration)graphComputerConfiguration, (GraphComputer.ResultGraph)this.resultGraph, (GraphComputer.Persist)this.persist), finalMemory.asImmutable());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.computerService.shutdown();
        return result;
    }

    private static void updateConfigKeys(org.apache.commons.configuration.Configuration sparkConfiguration) {
        HashSet wrongKeys = new HashSet();
        sparkConfiguration.getKeys().forEachRemaining(wrongKeys::add);
        wrongKeys.forEach(key -> {
            if (key.startsWith("janusmr")) {
                String newKey = "janusgraphmr" + key.substring(7);
                sparkConfiguration.setProperty(newKey, (Object)sparkConfiguration.getString(key));
            }
        });
    }

    protected void loadJar(Configuration hadoopConfiguration, File file, Object ... params) {
        JavaSparkContext sparkContext = (JavaSparkContext)params[0];
        sparkContext.addJar(file.getAbsolutePath());
    }

    private static void updateLocalConfiguration(JavaSparkContext sparkContext, Configuration configuration) {
        String[] validPropertyNames;
        for (String propertyName : validPropertyNames = new String[]{"spark.job.description", "spark.jobGroup.id", "spark.job.interruptOnCancel", "spark.scheduler.pool"}) {
            String propertyValue = configuration.get(propertyName);
            if (propertyValue == null) continue;
            LOGGER.info("Setting Thread Local SparkContext Property - " + propertyName + " : " + propertyValue);
            sparkContext.setLocalProperty(propertyName, configuration.get(propertyName));
        }
    }

    public static void main(String[] args) throws Exception {
        PropertiesConfiguration configuration = new PropertiesConfiguration(args[0]);
        new GraknSparkComputer(HadoopGraph.open((org.apache.commons.configuration.Configuration)configuration)).program(VertexProgram.createVertexProgram((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)configuration), (org.apache.commons.configuration.Configuration)configuration)).submit().get();
    }

    static {
        TraversalStrategies.GlobalCache.registerStrategies(GraknSparkComputer.class, (TraversalStrategies)TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().addStrategies(new TraversalStrategy[]{SparkSingleIterationStrategy.instance(), SparkInterceptorStrategy.instance()}));
    }
}

