/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.bop.engine;

import com.bigdata.bop.BOp;
import com.bigdata.bop.BOpUtility;
import com.bigdata.bop.IBindingSet;
import com.bigdata.bop.IQueryAttributes;
import com.bigdata.bop.PipelineOp;
import com.bigdata.bop.bindingSet.ListBindingSet;
import com.bigdata.bop.engine.AbstractRunningQuery;
import com.bigdata.bop.engine.ChunkedRunningQuery;
import com.bigdata.bop.engine.IChunkMessage;
import com.bigdata.bop.engine.IHaltOpMessage;
import com.bigdata.bop.engine.IQueryClient;
import com.bigdata.bop.engine.IQueryDecl;
import com.bigdata.bop.engine.IQueryPeer;
import com.bigdata.bop.engine.IRunningQuery;
import com.bigdata.bop.engine.IStartOpMessage;
import com.bigdata.bop.engine.LocalChunkMessage;
import com.bigdata.bop.engine.QueryDeadline;
import com.bigdata.bop.engine.QueryEngineCounters;
import com.bigdata.cache.ConcurrentWeakValueCache;
import com.bigdata.concurrent.FutureTaskMon;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.ICounterSetAccess;
import com.bigdata.journal.ConcurrencyManager;
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.Journal;
import com.bigdata.rawstore.IRawStore;
import com.bigdata.rdf.sail.webapp.client.HttpClientConfigurator;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.geospatial.GeoSpatialCounters;
import com.bigdata.util.DaemonThreadFactory;
import com.bigdata.util.InnerCause;
import com.bigdata.util.concurrent.IHaltable;
import java.lang.reflect.Constructor;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.eclipse.jetty.client.HttpClient;

public class QueryEngine
implements IQueryPeer,
IQueryClient,
ICounterSetAccess {
    private static final transient Logger log = Logger.getLogger(QueryEngine.class);
    protected static final transient String ERR_QUERY_NOT_RUNNING = "Query is not running:";
    protected final QueryEngineCounters counters = this.newCounters();
    protected final GeoSpatialCounters geoSpatialCounters = this.newGeoSpatialCounters();
    private final IIndexManager localIndexManager;
    private final AtomicReference<HttpClient> clientConnectionManagerRef = new AtomicReference();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition nothingRunning = this.lock.newCondition();
    private final ConcurrentHashMap<UUID, AbstractRunningQuery> runningQueries = new ConcurrentHashMap();
    private final LinkedHashMap<UUID, IHaltable<Void>> doneQueries = new LinkedHashMap<UUID, IHaltable<Void>>(16, 0.75f, true){
        private static final long serialVersionUID = 1L;

        @Override
        protected boolean removeEldestEntry(Map.Entry<UUID, IHaltable<Void>> eldest) {
            return this.size() > 100;
        }
    };
    private final ConcurrentWeakValueCache<UUID, UUID> pendingCancelLRU = new ConcurrentWeakValueCache(50);
    private final BlockingQueue<AbstractRunningQuery> priorityQueue = new LinkedBlockingQueue<AbstractRunningQuery>();
    private final PriorityBlockingQueue<QueryDeadline> deadlineQueue = new PriorityBlockingQueue();
    private static final int DEADLINE_QUEUE_SCAN_SIZE = 200;
    private static final long DEADLINE_CHECK_MILLIS = 100L;
    private final AtomicReference<ExecutorService> engineService = new AtomicReference();
    private final AtomicReference<FutureTask<Void>> engineFuture = new AtomicReference();
    private volatile boolean shutdown = false;
    private final CopyOnWriteArraySet<IRunningQueryListener> listeners = new CopyOnWriteArraySet();

    @Override
    public CounterSet getCounters() {
        CounterSet root = new CounterSet();
        this.counters.deadlineQueueSize.set(this.deadlineQueue.size());
        root.attach(this.counters.getCounters());
        CounterSet geoSpatial = root.makePath("GeoSpatial");
        geoSpatial.attach(this.geoSpatialCounters.getCounters());
        return root;
    }

    protected QueryEngineCounters newCounters() {
        return new QueryEngineCounters();
    }

    protected GeoSpatialCounters newGeoSpatialCounters() {
        return new GeoSpatialCounters();
    }

    protected QueryEngineCounters getQueryEngineCounters() {
        return this.counters;
    }

    public GeoSpatialCounters getGeoSpatialCounters() {
        return this.geoSpatialCounters;
    }

    @Override
    public UUID getServiceUUID() {
        return ((IRawStore)((Object)this.localIndexManager)).getUUID();
    }

    public IBigdataFederation<?> getFederation() {
        return null;
    }

    public IIndexManager getIndexManager() {
        return this.localIndexManager;
    }

    public ConcurrencyManager getConcurrencyManager() {
        return ((Journal)this.localIndexManager).getConcurrencyManager();
    }

    public IQueryClient getProxy() {
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HttpClient getClientConnectionManager() {
        HttpClient cm = this.clientConnectionManagerRef.get();
        if (cm == null) {
            AtomicReference<HttpClient> atomicReference = this.clientConnectionManagerRef;
            synchronized (atomicReference) {
                cm = this.clientConnectionManagerRef.get();
                if (cm == null) {
                    if (!this.isRunning()) {
                        throw new IllegalStateException();
                    }
                    cm = HttpClientConfigurator.getInstance().newInstance();
                    this.clientConnectionManagerRef.set(cm);
                }
            }
        }
        return cm;
    }

    public boolean isScaleOut() {
        return false;
    }

    public void addPendingCancel(UUID queryId) {
        if (queryId == null) {
            throw new IllegalArgumentException();
        }
        this.pendingCancelLRU.putIfAbsent((Object)queryId, (Object)queryId);
    }

    public boolean pendingCancel(UUID queryId) {
        if (queryId == null) {
            throw new IllegalArgumentException();
        }
        return this.pendingCancelLRU.remove((Object)queryId) != null;
    }

    void addQueryToDeadlineQueue(AbstractRunningQuery query) {
        long deadline = query.getDeadline();
        if (deadline == Long.MAX_VALUE) {
            throw new IllegalArgumentException();
        }
        long deadlineNanos = TimeUnit.MILLISECONDS.toNanos(deadline);
        this.deadlineQueue.add(new QueryDeadline(deadlineNanos, query));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void checkDeadlines(long nowNanos, PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
        PriorityBlockingQueue<QueryDeadline> priorityBlockingQueue = deadlineQueue;
        synchronized (priorityBlockingQueue) {
            QueryEngine.checkHeadOfDeadlineQueue(nowNanos, deadlineQueue);
            if (deadlineQueue.size() > 200) {
                QueryEngine.scanDeadlineQueue(nowNanos, deadlineQueue);
            }
        }
    }

    private static void checkHeadOfDeadlineQueue(long nowNanos, PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
        QueryDeadline x;
        while ((x = deadlineQueue.poll()) != null) {
            if (x.checkDeadline(nowNanos) == null || x.deadlineNanos <= nowNanos) continue;
            deadlineQueue.add(x);
            break;
        }
    }

    private static void scanDeadlineQueue(long nowNanos, PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
        ArrayList c = new ArrayList(200);
        deadlineQueue.drainTo(c, 200);
        int ndropped = 0;
        int nrunning = 0;
        for (QueryDeadline x : c) {
            if (x.checkDeadline(nowNanos) != null) {
                deadlineQueue.add(x);
                ++nrunning;
                continue;
            }
            ++ndropped;
        }
        if (log.isInfoEnabled()) {
            log.info((Object)("Scan: threadhold=200, ndropped=" + ndropped + ", nrunning=" + nrunning + ", deadlineQueueSize=" + deadlineQueue.size()));
        }
    }

    public QueryEngine(IIndexManager localIndexManager) {
        if (localIndexManager == null) {
            throw new IllegalArgumentException();
        }
        this.localIndexManager = localIndexManager;
    }

    public void init() {
        FutureTaskMon<Void> ft = new FutureTaskMon<Void>((Runnable)new QueryEngineTask(this.priorityQueue, this.deadlineQueue), (Void)null);
        if (!this.engineFuture.compareAndSet(null, ft)) {
            throw new IllegalStateException("Already running");
        }
        this.engineService.set(Executors.newSingleThreadExecutor((ThreadFactory)new DaemonThreadFactory(QueryEngine.class + ".engineService")));
        this.engineService.get().execute(ft);
    }

    protected void finalize() throws Throwable {
        this.shutdownNow();
        super.finalize();
    }

    protected void assertRunning() {
        if (this.engineFuture.get() == null) {
            throw new IllegalStateException("Not initialized.");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Shutting down.");
        }
    }

    protected boolean isRunning() {
        return this.engineFuture.get() != null && !this.shutdown;
    }

    protected final void execute(Runnable r) {
        this.localIndexManager.getExecutorService().execute(r);
    }

    protected boolean acceptChunk(IChunkMessage<IBindingSet> msg) {
        if (msg == null) {
            throw new IllegalArgumentException();
        }
        if (!msg.isMaterialized()) {
            throw new IllegalStateException();
        }
        AbstractRunningQuery q = this.getRunningQuery(msg.getQueryId());
        if (q == null) {
            throw new IllegalStateException();
        }
        if (!q.acceptChunk(msg)) {
            msg.release();
            return false;
        }
        if (!this.isRunning()) {
            msg.release();
            return false;
        }
        this.priorityQueue.add(q);
        return true;
    }

    /*
     * Exception decompiling
     */
    public void shutdown() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    protected void didShutdown() {
    }

    public void shutdownNow() {
        HttpClient cm;
        ExecutorService s;
        this.shutdown = true;
        Future f = this.engineFuture.get();
        if (f != null) {
            if (log.isInfoEnabled()) {
                log.info((Object)("Cancelling engineFuture: " + this));
            }
            f.cancel(true);
        }
        if ((s = this.engineService.get()) != null) {
            if (log.isInfoEnabled()) {
                log.info((Object)("Terminating engineService: " + this));
            }
            s.shutdownNow();
        }
        if ((cm = this.clientConnectionManagerRef.get()) != null) {
            if (log.isInfoEnabled()) {
                log.info((Object)("Terminating HttpClient: " + this));
            }
            try {
                cm.stop();
            }
            catch (Exception e) {
                log.error((Object)"Problem stopping HttpClient", (Throwable)e);
            }
        }
        for (AbstractRunningQuery q : this.runningQueries.values()) {
            q.cancel(true);
        }
        this.priorityQueue.clear();
        this.deadlineQueue.clear();
        this.engineFuture.set(null);
        this.engineService.set(null);
        this.clientConnectionManagerRef.set(null);
    }

    @Override
    @Deprecated
    public void declareQuery(IQueryDecl queryDecl) throws RemoteException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void bufferReady(IChunkMessage<IBindingSet> msg) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void cancelQuery(UUID queryId, Throwable cause) {
    }

    @Override
    public PipelineOp getQuery(UUID queryId) {
        AbstractRunningQuery q = this.getRunningQuery(queryId);
        if (q == null) {
            throw new IllegalArgumentException();
        }
        return q.getQuery();
    }

    @Override
    public void startOp(IStartOpMessage msg) throws RemoteException {
        AbstractRunningQuery q = this.getRunningQuery(msg.getQueryId());
        if (q != null) {
            q.startOp(msg);
        }
    }

    @Override
    public void haltOp(IHaltOpMessage msg) throws RemoteException {
        AbstractRunningQuery q = this.getRunningQuery(msg.getQueryId());
        if (q != null) {
            q.haltOp(msg);
        }
    }

    private static UUID getQueryUUID(BOp op) {
        return (UUID)op.getProperty(Annotations.QUERY_ID, (Object)UUID.randomUUID());
    }

    private int getStartId(BOp op) {
        BOp startOp = BOpUtility.getPipelineStart(op);
        int startId = startOp.getId();
        return startId;
    }

    private LocalChunkMessage newLocalChunkMessage(UUID queryId, BOp op, IBindingSet src) {
        return new LocalChunkMessage((IQueryClient)this, queryId, this.getStartId(op), -1, src);
    }

    private LocalChunkMessage newLocalChunkMessage(UUID queryId, BOp op, IBindingSet[] src) {
        return new LocalChunkMessage((IQueryClient)this, queryId, this.getStartId(op), -1, src);
    }

    private LocalChunkMessage newLocalChunkMessage(UUID queryId, BOp op, IBindingSet[][] src) {
        return new LocalChunkMessage((IQueryClient)this, queryId, this.getStartId(op), -1, src);
    }

    public AbstractRunningQuery eval(BOp op) throws Exception {
        return this.eval(op, new ListBindingSet());
    }

    public AbstractRunningQuery eval(BOp op, IBindingSet bset) throws Exception {
        UUID queryId = QueryEngine.getQueryUUID(op);
        return this.eval(queryId, (PipelineOp)op, null, this.newLocalChunkMessage(queryId, op, bset));
    }

    public AbstractRunningQuery eval(UUID queryId, BOp op, IBindingSet bset) throws Exception {
        return this.eval(queryId, (PipelineOp)op, null, this.newLocalChunkMessage(queryId, op, bset));
    }

    public AbstractRunningQuery eval(UUID queryId, BOp op, Map<Object, Object> queryAttributes, IBindingSet[] bset) throws Exception {
        return this.eval(queryId, (PipelineOp)op, queryAttributes, this.newLocalChunkMessage(queryId, op, bset));
    }

    public AbstractRunningQuery eval(UUID queryId, BOp op, Map<Object, Object> queryAttributes, IBindingSet[][] bset) throws Exception {
        return this.eval(queryId, (PipelineOp)op, queryAttributes, this.newLocalChunkMessage(queryId, op, bset));
    }

    public AbstractRunningQuery eval(BOp op, IBindingSet[] bsets) throws Exception {
        return this.eval(op, bsets, null);
    }

    public AbstractRunningQuery eval(BOp op, IBindingSet[] bsets, Map<Object, Object> attribs) throws Exception {
        UUID queryId = QueryEngine.getQueryUUID(op);
        return this.eval(queryId, (PipelineOp)op, attribs, this.newLocalChunkMessage(queryId, op, bsets));
    }

    public AbstractRunningQuery eval(UUID queryId, PipelineOp query, Map<Object, Object> queryAttributes, IChunkMessage<IBindingSet> msg) throws Exception {
        return this.startEval(queryId, query, queryAttributes, msg);
    }

    private AbstractRunningQuery startEval(UUID queryId, PipelineOp query, Map<Object, Object> queryAttributes, IChunkMessage<IBindingSet> msg) throws Exception {
        long deadline;
        long timeout;
        if (queryId == null) {
            throw new IllegalArgumentException();
        }
        if (query == null) {
            throw new IllegalArgumentException();
        }
        if (msg == null) {
            throw new IllegalArgumentException();
        }
        if (!queryId.equals(msg.getQueryId())) {
            throw new IllegalArgumentException();
        }
        AbstractRunningQuery runningQuery = this.newRunningQuery(queryId, true, this.getProxy(), this.getServiceUUID(), query, msg);
        if (queryAttributes != null) {
            IQueryAttributes tmp = runningQuery.getAttributes();
            for (Map.Entry<Object, Object> e : queryAttributes.entrySet()) {
                tmp.put(e.getKey(), e.getValue());
            }
        }
        if ((timeout = query.getProperty(BOp.Annotations.TIMEOUT, Long.MAX_VALUE).longValue()) < 0L) {
            throw new IllegalArgumentException(BOp.Annotations.TIMEOUT);
        }
        if (timeout != Long.MAX_VALUE && (deadline = System.currentTimeMillis() + timeout) > 0L) {
            runningQuery.setDeadline(deadline);
        }
        this.assertRunning();
        if (this.putIfAbsent(queryId, runningQuery) != runningQuery) {
            throw new RuntimeException("Query exists with that UUID: uuid=" + runningQuery.getQueryId());
        }
        this.counters.queryStartCount.increment();
        if (this.pendingCancelLRU.containsKey((Object)runningQuery.getQueryId())) {
            runningQuery.cancel(true);
            this.pendingCancelLRU.remove((Object)runningQuery.getQueryId());
            return runningQuery;
        }
        runningQuery.startQuery(msg);
        this.acceptChunk(msg);
        return runningQuery;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AbstractRunningQuery putIfAbsent(UUID queryId, AbstractRunningQuery runningQuery) {
        if (queryId == null) {
            throw new IllegalArgumentException();
        }
        if (runningQuery == null) {
            throw new IllegalArgumentException();
        }
        AbstractRunningQuery tmp = this.runningQueries.get(queryId);
        if (tmp != null) {
            return tmp;
        }
        this.lock.lock();
        try {
            Future doneQueryFuture = this.doneQueries.get(queryId);
            if (doneQueryFuture != null) {
                this.handleDoneQuery(queryId, doneQueryFuture);
                throw new AssertionError();
            }
            AbstractRunningQuery tmp2 = this.runningQueries.putIfAbsent(queryId, runningQuery);
            if (tmp2 != null) {
                AbstractRunningQuery abstractRunningQuery = tmp2;
                return abstractRunningQuery;
            }
            try {
                this.assertRunning();
            }
            catch (IllegalStateException ex) {
                this.runningQueries.remove(queryId, runningQuery);
                throw ex;
            }
            AbstractRunningQuery abstractRunningQuery = runningQuery;
            return abstractRunningQuery;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AbstractRunningQuery getRunningQuery(UUID queryId) {
        if (queryId == null) {
            throw new IllegalArgumentException();
        }
        AbstractRunningQuery q = this.runningQueries.get(queryId);
        if (q != null) {
            return q;
        }
        this.lock.lock();
        try {
            q = this.runningQueries.get(queryId);
            if (q != null) {
                AbstractRunningQuery abstractRunningQuery = q;
                return abstractRunningQuery;
            }
            Future doneQueryFuture = this.doneQueries.get(queryId);
            if (doneQueryFuture != null) {
                this.handleDoneQuery(queryId, doneQueryFuture);
                throw new AssertionError();
            }
            AbstractRunningQuery abstractRunningQuery = null;
            return abstractRunningQuery;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void halt(AbstractRunningQuery q) {
        boolean interrupted = false;
        this.lock.lock();
        try {
            block7: {
                try {
                    this.fireEvent(q);
                }
                catch (Throwable t) {
                    if (!InnerCause.isInnerCause((Throwable)t, InterruptedException.class)) break block7;
                    interrupted = true;
                }
            }
            this.doneQueries.put(q.getQueryId(), q.getFuture());
            this.runningQueries.remove(q.getQueryId(), q);
            if (this.runningQueries.isEmpty()) {
                this.nothingRunning.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    private void handleDoneQuery(UUID queryId, Future<Void> doneQueryFuture) {
        try {
            doneQueryFuture.get();
            throw new InterruptedException();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void addListener(IRunningQueryListener l) {
        if (l == null) {
            throw new IllegalArgumentException();
        }
        this.listeners.add(l);
    }

    public void removeListener(IRunningQueryListener l) {
        if (l == null) {
            throw new IllegalArgumentException();
        }
        this.listeners.remove(l);
    }

    private void fireEvent(IRunningQuery q) {
        IRunningQueryListener[] a;
        if (q == null) {
            throw new IllegalArgumentException();
        }
        if (this.listeners.isEmpty()) {
            return;
        }
        IRunningQueryListener[] arr$ = a = this.listeners.toArray(new IRunningQueryListener[0]);
        int len$ = arr$.length;
        for (int i$ = 0; i$ < len$; ++i$) {
            IRunningQueryListener l;
            IRunningQueryListener listener = l = arr$[i$];
            try {
                listener.notify(q);
                continue;
            }
            catch (Throwable t) {
                if (InnerCause.isInnerCause((Throwable)t, InterruptedException.class)) {
                    throw new RuntimeException(t);
                }
                log.error((Object)t, t);
            }
        }
    }

    protected AbstractRunningQuery newRunningQuery(UUID queryId, boolean controller, IQueryClient clientProxy, UUID queryControllerId, PipelineOp query, IChunkMessage<IBindingSet> realSource) {
        IRunningQuery runningQuery;
        Class<?> cls;
        String className = query.getProperty(Annotations.RUNNING_QUERY_CLASS, Annotations.DEFAULT_RUNNING_QUERY_CLASS);
        try {
            cls = Class.forName(className);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Bad option: " + Annotations.RUNNING_QUERY_CLASS, e);
        }
        if (!IRunningQuery.class.isAssignableFrom(cls)) {
            throw new RuntimeException(Annotations.RUNNING_QUERY_CLASS + ": Must extend: " + IRunningQuery.class.getName());
        }
        try {
            Constructor<?> ctor = cls.getConstructor(QueryEngine.class, UUID.class, Boolean.TYPE, IQueryClient.class, PipelineOp.class, IChunkMessage.class);
            runningQuery = (IRunningQuery)ctor.newInstance(this, queryId, controller, clientProxy, query, realSource);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
        return (AbstractRunningQuery)runningQuery;
    }

    @Override
    public UUID[] getRunningQueries() {
        return this.runningQueries.keySet().toArray(new UUID[0]);
    }

    public static interface IRunningQueryListener {
        public void notify(IRunningQuery var1);
    }

    private static class QueryEngineTask
    implements Runnable {
        private final BlockingQueue<AbstractRunningQuery> priorityQueue;
        private final PriorityBlockingQueue<QueryDeadline> deadlineQueue;

        public QueryEngineTask(BlockingQueue<AbstractRunningQuery> priorityQueue, PriorityBlockingQueue<QueryDeadline> deadlineQueue) {
            if (priorityQueue == null) {
                throw new IllegalArgumentException();
            }
            if (deadlineQueue == null) {
                throw new IllegalArgumentException();
            }
            this.priorityQueue = priorityQueue;
            this.deadlineQueue = deadlineQueue;
        }

        @Override
        public void run() {
            if (log.isInfoEnabled()) {
                log.info((Object)("Running: " + this));
            }
            try {
                long deadline = TimeUnit.MILLISECONDS.toNanos(100L);
                long mark = System.nanoTime();
                long remaining = deadline;
                while (true) {
                    try {
                        while (true) {
                            AbstractRunningQuery q = this.priorityQueue.poll(remaining, TimeUnit.NANOSECONDS);
                            long now = System.nanoTime();
                            remaining = deadline - (now - mark);
                            if (remaining < 0L) {
                                QueryEngine.checkDeadlines(now, this.deadlineQueue);
                                mark = now;
                                remaining = deadline;
                            }
                            if (q == null || q.isDone()) continue;
                            q.consumeChunk();
                        }
                    }
                    catch (InterruptedException e) {
                        if (log.isInfoEnabled()) {
                            log.info((Object)"Interrupted.");
                        }
                        return;
                    }
                    catch (Throwable t) {
                        log.error((Object)t, t);
                        continue;
                    }
                    break;
                }
            }
            finally {
                if (log.isInfoEnabled()) {
                    log.info((Object)"QueryEngineTask is done.");
                }
            }
        }
    }

    public static interface Annotations
    extends PipelineOp.Annotations {
        public static final String QUERY_ID = QueryEngine.class.getName() + ".queryId";
        public static final String RUNNING_QUERY_CLASS = QueryEngine.class.getName() + ".runningQueryClass";
        public static final String DEFAULT_RUNNING_QUERY_CLASS = ChunkedRunningQuery.class.getName();
    }
}

