/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Context;
import com.datatorrent.lib.appdata.QueueUtils;
import com.datatorrent.lib.appdata.query.QueryBundle;
import com.datatorrent.lib.appdata.query.QueueList;
import com.datatorrent.lib.appdata.query.QueueManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>
implements QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> {
    protected QueueList<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> queryQueue = new QueueList();
    private QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> currentNode;
    private boolean readCurrent = false;
    private final Semaphore semaphore = new Semaphore(0);
    private final QueueUtils.ConditionBarrier conditionBarrier = new QueueUtils.ConditionBarrier();
    private final AtomicInteger numLeft = new AtomicInteger();
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWindowEndQueueManager.class);

    @Override
    public boolean enqueue(QUERY_TYPE query, META_QUERY metaQuery, QUEUE_CONTEXT context) {
        Preconditions.checkNotNull(query);
        this.conditionBarrier.gate();
        return this.enqueueHelper(query, metaQuery, context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean enqueueHelper(QUERY_TYPE query, META_QUERY metaQuery, QUEUE_CONTEXT context) {
        QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryQueueable = new QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>(query, metaQuery, context);
        QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> node = new QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>>(queryQueueable);
        AtomicInteger atomicInteger = this.numLeft;
        synchronized (atomicInteger) {
            if (this.addingFilter(queryQueueable)) {
                this.queryQueue.enqueue(node);
                this.numLeft.getAndIncrement();
                this.semaphore.release();
                this.addedNode(node);
            }
        }
        return true;
    }

    @Override
    public QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeue() {
        return this.dequeueHelper(false);
    }

    @Override
    public QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeueBlock() {
        return this.dequeueHelper(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> dequeueHelper(boolean block) {
        QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> tempNode;
        QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> qq = null;
        boolean first = true;
        if (block) {
            this.acquire();
        }
        if (this.currentNode == null) {
            this.currentNode = this.queryQueue.getHead();
            this.readCurrent = false;
            if (this.currentNode == null) {
                return null;
            }
        } else if (this.readCurrent) {
            tempNode = this.currentNode.getNext();
            if (tempNode != null) {
                this.currentNode = tempNode;
                this.readCurrent = false;
            } else {
                return null;
            }
        }
        while (true) {
            if (block && !first) {
                this.acquire();
                if (this.currentNode == null) {
                    this.currentNode = this.queryQueue.getHead();
                    this.readCurrent = false;
                    if (this.currentNode == null) {
                        return null;
                    }
                } else if (this.readCurrent) {
                    tempNode = this.currentNode.getNext();
                    if (tempNode != null) {
                        this.currentNode = tempNode;
                        this.readCurrent = false;
                    } else {
                        return null;
                    }
                }
            }
            tempNode = this.numLeft;
            synchronized (tempNode) {
                this.numLeft.getAndDecrement();
                QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queryQueueable = this.currentNode.getPayload();
                first = false;
                QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> nextNode = this.currentNode.getNext();
                if (this.removeBundle(queryQueueable)) {
                    this.queryQueue.removeNode(this.currentNode);
                    this.removedNode(this.currentNode);
                    if (block) {
                        if (nextNode == null) {
                            this.readCurrent = true;
                        } else {
                            this.currentNode = nextNode;
                            this.readCurrent = false;
                        }
                    } else {
                        if (nextNode == null) {
                            this.readCurrent = true;
                            break;
                        }
                        this.currentNode = nextNode;
                    }
                } else {
                    qq = this.currentNode.getPayload();
                    if (nextNode == null) {
                        this.readCurrent = true;
                    } else {
                        this.currentNode = nextNode;
                        this.readCurrent = false;
                    }
                    break;
                }
            }
        }
        if (this.semaphore.availablePermits() > this.numLeft.get()) {
            try {
                this.semaphore.acquire(this.semaphore.availablePermits() - this.numLeft.get());
            }
            catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        }
        return qq;
    }

    public boolean isEmptyAndBlocked() {
        return this.numLeft.get() == 0 && this.semaphore.availablePermits() == 0 && this.semaphore.hasQueuedThreads();
    }

    private void acquire() {
        try {
            this.semaphore.acquire();
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    public abstract boolean addingFilter(QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> var1);

    public abstract void addedNode(QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> var1);

    public abstract void removedNode(QueueList.QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> var1);

    public abstract boolean removeBundle(QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> var1);

    public void setup(Context.OperatorContext context) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beginWindow(long windowId) {
        AtomicInteger atomicInteger = this.numLeft;
        synchronized (atomicInteger) {
            this.currentNode = this.queryQueue.getHead();
            this.readCurrent = false;
            this.numLeft.set(this.queryQueue.getSize());
            this.semaphore.drainPermits();
            this.semaphore.release(this.queryQueue.getSize());
        }
    }

    @Override
    public void endWindow() {
    }

    public void teardown() {
    }

    @Override
    public int getNumLeft() {
        return this.numLeft.get();
    }

    @VisibleForTesting
    int getNumPermits() {
        return this.semaphore.availablePermits();
    }

    @Override
    public void haltEnqueue() {
        this.conditionBarrier.lock();
    }

    @Override
    public void resumeEnqueue() {
        this.conditionBarrier.unlock();
    }
}

