/*
 * Decompiled with CFR 0.152.
 */
package com.tinkerpop.gremlin.tinkergraph.process.computer;

import com.tinkerpop.gremlin.process.computer.ComputerResult;
import com.tinkerpop.gremlin.process.computer.GraphComputer;
import com.tinkerpop.gremlin.process.computer.MapReduce;
import com.tinkerpop.gremlin.process.computer.Memory;
import com.tinkerpop.gremlin.process.computer.VertexProgram;
import com.tinkerpop.gremlin.process.computer.util.ComputerDataStrategy;
import com.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import com.tinkerpop.gremlin.structure.Vertex;
import com.tinkerpop.gremlin.structure.strategy.GraphStrategy;
import com.tinkerpop.gremlin.structure.util.StringFactory;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerMapEmitter;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerMemory;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerMessageBoard;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerMessenger;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerReduceEmitter;
import com.tinkerpop.gremlin.tinkergraph.process.computer.TinkerWorkerPool;
import com.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import com.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class TinkerGraphComputer
implements GraphComputer {
    private GraphComputer.Isolation isolation = GraphComputer.Isolation.BSP;
    private VertexProgram<?> vertexProgram;
    private final TinkerGraph graph;
    private TinkerMemory memory;
    private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
    private boolean executed = false;
    private final Set<MapReduce> mapReducers = new HashSet<MapReduce>();

    public TinkerGraphComputer(TinkerGraph graph) {
        this.graph = graph;
    }

    public GraphComputer isolation(GraphComputer.Isolation isolation) {
        this.isolation = isolation;
        return this;
    }

    public GraphComputer program(VertexProgram vertexProgram) {
        this.vertexProgram = vertexProgram;
        return this;
    }

    public GraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReducers.add(mapReduce);
        return this;
    }

    public Future<ComputerResult> submit() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
        if (null == this.vertexProgram && this.mapReducers.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer((GraphComputer)this, this.vertexProgram);
            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
        }
        TinkerGraph sg = null == this.vertexProgram ? this.graph : this.graph.strategy(new GraphStrategy[]{new ComputerDataStrategy(this.vertexProgram.getElementComputeKeys())});
        this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
        return CompletableFuture.supplyAsync(() -> {
            long time = System.currentTimeMillis();
            if (null != this.vertexProgram) {
                TinkerHelper.createGraphView(this.graph, this.isolation, this.vertexProgram.getElementComputeKeys());
                this.vertexProgram.setup((Memory)this.memory);
                this.memory.completeSubRound();
                TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), this.vertexProgram);
                while (true) {
                    workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationStart(this.memory.asImmutable()));
                    SynchronizedIterator vertices = new SynchronizedIterator(sg.iterators().vertexIterator(new Object[0]));
                    workers.executeVertexProgram(vertexProgram -> {
                        Vertex vertex;
                        while (null != (vertex = (Vertex)vertices.next())) {
                            vertexProgram.execute(vertex, new TinkerMessenger(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), (Memory)this.memory);
                        }
                        return;
                    });
                    workers.executeVertexProgram(vertexProgram -> vertexProgram.workerIterationEnd(this.memory.asImmutable()));
                    this.messageBoard.completeIteration();
                    this.memory.completeSubRound();
                    if (this.vertexProgram.terminate((Memory)this.memory)) {
                        this.memory.incrIteration();
                        this.memory.completeSubRound();
                        break;
                    }
                    this.memory.incrIteration();
                    this.memory.completeSubRound();
                }
            }
            for (MapReduce mapReduce : this.mapReducers) {
                TinkerWorkerPool workers = new TinkerWorkerPool(Runtime.getRuntime().availableProcessors(), mapReduce);
                if (!mapReduce.doStage(MapReduce.Stage.MAP)) continue;
                TinkerMapEmitter mapEmitter = new TinkerMapEmitter(mapReduce.doStage(MapReduce.Stage.REDUCE));
                SynchronizedIterator vertices = new SynchronizedIterator(sg.iterators().vertexIterator(new Object[0]));
                workers.executeMapReduce(workerMapReduce -> {
                    Vertex vertex;
                    while (null != (vertex = (Vertex)vertices.next())) {
                        workerMapReduce.map(vertex, (MapReduce.MapEmitter)mapEmitter);
                    }
                    return;
                });
                mapEmitter.complete(mapReduce);
                if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                    TinkerReduceEmitter reduceEmitter = new TinkerReduceEmitter();
                    SynchronizedIterator keyValues = new SynchronizedIterator(mapEmitter.reduceMap.entrySet().iterator());
                    workers.executeMapReduce(workerMapReduce -> {
                        Map.Entry entry;
                        while (null != (entry = (Map.Entry)keyValues.next())) {
                            workerMapReduce.reduce(entry.getKey(), ((Queue)entry.getValue()).iterator(), (MapReduce.ReduceEmitter)reduceEmitter);
                        }
                        return;
                    });
                    reduceEmitter.complete(mapReduce);
                    mapReduce.addResultToMemory((Memory.Admin)this.memory, reduceEmitter.reduceQueue.iterator());
                    continue;
                }
                mapReduce.addResultToMemory((Memory.Admin)this.memory, mapEmitter.mapQueue.iterator());
            }
            this.memory.setRuntime(System.currentTimeMillis() - time);
            this.memory.complete();
            return new ComputerResult(sg, this.memory.asImmutable());
        });
    }

    public String toString() {
        return StringFactory.graphComputerString((GraphComputer)this);
    }

    private static class SynchronizedIterator<V> {
        private final Iterator<V> iterator;

        public SynchronizedIterator(Iterator<V> iterator) {
            this.iterator = iterator;
        }

        public synchronized V next() {
            return this.iterator.hasNext() ? (V)this.iterator.next() : null;
        }
    }
}

