/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.graphalgo.impl;

import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.ProgressLogger;
import org.neo4j.graphalgo.core.utils.TerminationFlag;
import org.neo4j.graphalgo.impl.Algorithm;
import org.neo4j.graphdb.Direction;

public class TriangleStream
extends Algorithm<TriangleStream> {
    public static final Direction D = Direction.BOTH;
    private Graph graph;
    private ExecutorService executorService;
    private final int concurrency;
    private final int nodeCount;
    private AtomicInteger visitedNodes;
    private AtomicInteger runningThreads;
    private BlockingQueue<Result> resultQueue;

    public TriangleStream(Graph graph, ExecutorService executorService, int concurrency) {
        this.graph = graph;
        this.executorService = executorService;
        this.concurrency = concurrency;
        this.nodeCount = Math.toIntExact(graph.nodeCount());
        this.resultQueue = new LinkedBlockingQueue<Result>();
        this.runningThreads = new AtomicInteger();
        this.visitedNodes = new AtomicInteger();
    }

    @Override
    public TriangleStream me() {
        return this;
    }

    @Override
    public TriangleStream release() {
        this.visitedNodes = null;
        this.runningThreads = null;
        this.resultQueue = null;
        this.graph = null;
        this.executorService = null;
        return this;
    }

    public Stream<Result> resultStream() {
        this.submitTasks();
        final TerminationFlag flag = this.getTerminationFlag();
        Iterator<Result> it = new Iterator<Result>(){

            @Override
            public boolean hasNext() {
                return flag.running() && (TriangleStream.this.runningThreads.get() > 0 || !TriangleStream.this.resultQueue.isEmpty());
            }

            @Override
            public Result next() {
                Result result = null;
                try {
                    while (this.hasNext() && result == null) {
                        result = (Result)TriangleStream.this.resultQueue.poll(1L, TimeUnit.SECONDS);
                    }
                    return result;
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false).filter(Objects::nonNull);
    }

    private void submitTasks() {
        this.runningThreads.set(0);
        int batchSize = ParallelUtil.adjustBatchSize(this.nodeCount, this.concurrency, 1);
        for (int i = 0; i < this.nodeCount; i += batchSize) {
            int end = Math.min(i + batchSize, this.nodeCount);
            this.executorService.execute(new Task(i, end));
        }
    }

    public static class Result {
        public final long nodeA;
        public final long nodeB;
        public final long nodeC;

        public Result(long nodeA, long nodeB, long nodeC) {
            this.nodeA = nodeA;
            this.nodeB = nodeB;
            this.nodeC = nodeC;
        }

        public String toString() {
            return "Triangle{" + this.nodeA + ", " + this.nodeB + ", " + this.nodeC + '}';
        }
    }

    private class Task
    implements Runnable {
        private final int startIndex;
        private final int endIndex;

        private Task(int startIndex, int endIndex) {
            TriangleStream.this.runningThreads.incrementAndGet();
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }

        @Override
        public void run() {
            TerminationFlag flag = TriangleStream.this.getTerminationFlag();
            ProgressLogger progressLogger = TriangleStream.this.getProgressLogger();
            for (int i = this.startIndex; i < this.endIndex; ++i) {
                TriangleStream.this.graph.forEachRelationship(i, D, (u, v, relationId) -> {
                    if (u >= v) {
                        return true;
                    }
                    if (!flag.running()) {
                        return false;
                    }
                    TriangleStream.this.graph.forEachRelationship(v, D, (v2, w, relationId2) -> {
                        if (v2 >= w) {
                            return true;
                        }
                        if (!flag.running()) {
                            return false;
                        }
                        TriangleStream.this.graph.forEachRelationship(w, D, (sourceNodeId3, t, relationId3) -> {
                            if (t == u) {
                                try {
                                    TriangleStream.this.resultQueue.put(new Result(TriangleStream.this.graph.toOriginalNodeId(u), TriangleStream.this.graph.toOriginalNodeId(v), TriangleStream.this.graph.toOriginalNodeId(w)));
                                }
                                catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                return false;
                            }
                            return flag.running();
                        });
                        return true;
                    });
                    return true;
                });
                progressLogger.logProgress((double)TriangleStream.this.visitedNodes.incrementAndGet(), TriangleStream.this.nodeCount);
            }
            TriangleStream.this.runningThreads.decrementAndGet();
        }
    }
}

