/*
 * Decompiled with CFR 0.152.
 */
package apoc.cypher;

import apoc.Pools;
import apoc.result.MapResult;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

public class Timeboxed {
    @Context
    public GraphDatabaseService db;
    @Context
    public Log log;
    private static final Map<String, Object> POISON = Collections.singletonMap("__magic", "POISON");

    @Procedure
    @Description(value="apoc.cypher.runTimeboxed('cypherStatement',{params}, timeout) - abort kernelTransaction after timeout ms if not finished")
    public Stream<MapResult> runTimeboxed(@Name(value="cypher") String cypher, @Name(value="params") Map<String, Object> params, final @Name(value="timeout") long timeout) {
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
        AtomicReference txAtomic = new AtomicReference();
        Pools.DEFAULT.submit(() -> {
            try (Transaction tx = this.db.beginTx();){
                txAtomic.set(tx);
                Result result = this.db.execute(cypher, params == null ? Collections.EMPTY_MAP : params);
                while (result.hasNext()) {
                    Map map = result.next();
                    this.offerToQueue(queue, map, timeout);
                }
                this.offerToQueue(queue, POISON, timeout);
                tx.success();
            }
            catch (TransactionTerminatedException e) {
                this.log.warn("query " + cypher + " has been terminated");
            }
            finally {
                txAtomic.set(null);
            }
        });
        Pools.SCHEDULED.schedule(() -> {
            Transaction tx = (Transaction)txAtomic.get();
            if (tx == null) {
                this.log.info("tx is null, either the other transaction finished gracefully or has not yet been start.");
            } else {
                tx.terminate();
                this.offerToQueue(queue, POISON, timeout);
                this.log.warn("terminating transaction, putting POISON onto queue");
            }
        }, timeout, TimeUnit.MILLISECONDS);
        Iterator<Map<String, Object>> queueConsumer = new Iterator<Map<String, Object>>(){
            Map<String, Object> nextElement = null;
            boolean hasFinished = false;

            @Override
            public boolean hasNext() {
                if (this.hasFinished) {
                    return false;
                }
                try {
                    this.nextElement = (Map)queue.poll(timeout, TimeUnit.MILLISECONDS);
                    if (this.nextElement == null) {
                        Timeboxed.this.log.warn("couldn't grab queue element, aborting - this should never happen");
                        this.hasFinished = true;
                    } else {
                        this.hasFinished = POISON.equals(this.nextElement);
                    }
                    return !this.hasFinished;
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public Map<String, Object> next() {
                return this.nextElement;
            }
        };
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(queueConsumer, 16), false).map(MapResult::new);
    }

    private void offerToQueue(BlockingQueue<Map<String, Object>> queue, Map<String, Object> map, long timeout) {
        try {
            boolean hasBeenAdded = queue.offer(map, timeout, TimeUnit.MILLISECONDS);
            if (!hasBeenAdded) {
                throw new IllegalStateException("couldn't add a value to a queue of size " + queue.size() + ". Either increase capacity or fix consumption of the queue");
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

