/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.lib.appdata.query.AbstractWindowEndQueueManager;
import com.datatorrent.lib.appdata.query.QueryBundle;
import com.datatorrent.lib.appdata.query.QueryExecutor;
import com.datatorrent.lib.appdata.query.QueueManager;
import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
import com.datatorrent.lib.appdata.schemas.Result;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT extends Result>
implements Component<Context.OperatorContext>,
Operator.IdleTimeHandler {
    private DefaultOutputPort<String> resultPort = null;
    private final transient Semaphore inWindowSemaphore = new Semaphore(0);
    private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue();
    private QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager;
    private QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor;
    private MessageSerializerFactory messageSerializerFactory;
    @VisibleForTesting
    protected transient ExecutorService processingThread;
    private transient Thread mainThread;
    private static final Logger LOG = LoggerFactory.getLogger(QueryManagerAsynchronous.class);

    public QueryManagerAsynchronous(DefaultOutputPort<String> resultPort, QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager, QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor, MessageSerializerFactory messageSerializerFactory, Thread mainThread) {
        this.setResultPort(resultPort);
        this.setQueueManager(queueManager);
        this.setQueryExecutor(queryExecutor);
        this.setMessageSerializerFactory(messageSerializerFactory);
        this.setMainThread(mainThread);
    }

    private void setMainThread(@NotNull Thread mainThread) {
        this.mainThread = (Thread)Preconditions.checkNotNull((Object)mainThread);
    }

    private void setResultPort(DefaultOutputPort<String> resultPort) {
        this.resultPort = (DefaultOutputPort)Preconditions.checkNotNull(resultPort);
    }

    private void setQueueManager(QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager) {
        this.queueManager = (QueueManager)Preconditions.checkNotNull(queueManager);
    }

    public QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> getQueueManager() {
        return this.queueManager;
    }

    private void setQueryExecutor(QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor) {
        this.queryExecutor = (QueryExecutor)Preconditions.checkNotNull(queryExecutor);
    }

    public QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> getQueryExecutor() {
        return this.queryExecutor;
    }

    private void setMessageSerializerFactory(MessageSerializerFactory messageSerializerFactory) {
        this.messageSerializerFactory = (MessageSerializerFactory)Preconditions.checkNotNull((Object)messageSerializerFactory);
    }

    public void setup(Context.OperatorContext context) {
        this.processingThread = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NameableThreadFactory("Query Executor Thread"));
        this.processingThread.submit(new ProcessingThread(this.mainThread));
    }

    public void beginWindow(long windowID) {
        this.inWindowSemaphore.release();
        this.queueManager.resumeEnqueue();
    }

    public void endWindow() {
        this.queueManager.haltEnqueue();
        while (!this.isProcessingDone()) {
            if (this.queue.isEmpty()) {
                Thread.yield();
                continue;
            }
            this.emptyQueue();
        }
        this.emptyQueue();
        try {
            this.inWindowSemaphore.acquire();
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    private boolean isProcessingDone() {
        if (this.queueManager instanceof AbstractWindowEndQueueManager) {
            return ((AbstractWindowEndQueueManager)this.queueManager).isEmptyAndBlocked();
        }
        return this.queueManager.getNumLeft() == 0;
    }

    private void emptyQueue() {
        while (!this.queue.isEmpty()) {
            this.resultPort.emit((Object)this.queue.poll());
        }
    }

    public void teardown() {
        this.processingThread.shutdownNow();
    }

    public void handleIdleTime() {
        this.emptyQueue();
    }

    private class ProcessingThread
    implements Callable<Void> {
        private Thread mainThread;

        public ProcessingThread(Thread mainThread) {
            this.setMainThread(mainThread);
        }

        private void setMainThread(Thread mainThread) {
            this.mainThread = (Thread)Preconditions.checkNotNull((Object)mainThread);
        }

        @Override
        public Void call() throws Exception {
            try {
                this.loop();
            }
            catch (Exception ex) {
                LOG.error("Exception thrown while processing:", (Throwable)ex);
                this.mainThread.interrupt();
                throw ex;
            }
            return null;
        }

        private void loop() {
            while (true) {
                QueryBundle queryBundle = QueryManagerAsynchronous.this.queueManager.dequeueBlock();
                try {
                    QueryManagerAsynchronous.this.inWindowSemaphore.acquire();
                }
                catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
                Result result = (Result)QueryManagerAsynchronous.this.queryExecutor.executeQuery(queryBundle.getQuery(), queryBundle.getMetaQuery(), queryBundle.getQueueContext());
                if (result != null) {
                    String serializedMessage = QueryManagerAsynchronous.this.messageSerializerFactory.serialize(result);
                    QueryManagerAsynchronous.this.queue.add(serializedMessage);
                }
                QueryManagerAsynchronous.this.inWindowSemaphore.release();
            }
        }
    }
}

