/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.graphalgo.core.utils.traverse;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.container.AtomicBitSet;
import org.neo4j.graphalgo.core.utils.queue.IntMaxPriorityQueue;
import org.neo4j.graphalgo.core.utils.traverse.BFS;
import org.neo4j.graphdb.Direction;

public class ParallelLocalQueueBFS
implements BFS {
    private final Graph graph;
    private final AtomicInteger threads;
    private final AtomicBitSet visited;
    private final ExecutorService executorService;
    private final int concurrency;
    private final ConcurrentLinkedQueue<Future<?>> futures;
    private AtomicInteger threadsCreated = new AtomicInteger(0);
    private double concurrencyFactor = 0.5;

    public ParallelLocalQueueBFS(Graph graph, ExecutorService executorService, int concurrency) {
        this.graph = graph;
        this.visited = new AtomicBitSet(Math.toIntExact(graph.nodeCount()));
        this.executorService = executorService;
        this.concurrency = concurrency;
        this.threads = new AtomicInteger(0);
        this.futures = new ConcurrentLinkedQueue();
    }

    public ParallelLocalQueueBFS reset() {
        this.visited.clear();
        this.futures.clear();
        this.threadsCreated.set(0);
        this.threads.set(0);
        return this;
    }

    public ParallelLocalQueueBFS awaitTermination() {
        ParallelUtil.awaitTerminations(this.futures);
        return this;
    }

    @Override
    public ParallelLocalQueueBFS bfs(int startNodeId, Direction direction, IntPredicate predicate, IntConsumer visitor) {
        if (!predicate.test(startNodeId)) {
            return this;
        }
        IntMaxPriorityQueue queue = new IntMaxPriorityQueue();
        queue.add(startNodeId, 0.0);
        while (!queue.isEmpty()) {
            int node = queue.pop();
            if (!this.visited.trySet(node)) continue;
            visitor.accept(node);
            this.graph.forEachRelationship(node, direction, (sourceNodeId, targetNodeId, relationId) -> {
                if (!predicate.test(targetNodeId)) {
                    return true;
                }
                if (this.visited.get(targetNodeId)) {
                    return true;
                }
                if (!this.addThread(() -> this.bfs(targetNodeId, direction, predicate, visitor))) {
                    queue.add(targetNodeId, this.graph.degree(targetNodeId, direction));
                }
                return true;
            });
        }
        this.threads.decrementAndGet();
        return this;
    }

    public ParallelLocalQueueBFS withConcurrencyFactor(double concurrencyFactor) {
        this.concurrencyFactor = concurrencyFactor;
        return this;
    }

    private boolean addThread(Runnable runnable) {
        if (Math.random() >= this.concurrencyFactor) {
            return false;
        }
        int current = this.threads.get();
        if (current >= this.concurrency) {
            return false;
        }
        if (this.threads.compareAndSet(current, current + 1)) {
            this.futures.add(this.executorService.submit(runnable));
            this.threadsCreated.incrementAndGet();
            return true;
        }
        return false;
    }

    public int getThreadsCreated() {
        return this.threadsCreated.get();
    }
}

