/*
 * Decompiled with CFR 0.152.
 */
package com.cloudhopper.mq.broker;

import com.cloudhopper.mq.broker.LocalToRemoteQueueProcessor;
import com.cloudhopper.mq.broker.NoDupBlockingQueue;
import com.cloudhopper.mq.broker.RemoteQueueTransferScheduler;
import com.cloudhopper.mq.broker.RemotingQueueDrain;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueManager;
import java.util.ArrayList;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FairRemoteQueueTransferScheduler
extends RemoteQueueTransferScheduler {
    private static final Logger logger = LoggerFactory.getLogger(FairRemoteQueueTransferScheduler.class);
    private final NoDupBlockingQueue<Queue> workList = new NoDupBlockingQueue();

    public FairRemoteQueueTransferScheduler(int maxConcurrentRequests, QueueManager queueManager) {
        super(maxConcurrentRequests, queueManager);
    }

    @Override
    public void addLocalToRemoteQueueProcessor(Queue queue, LocalToRemoteQueueProcessor processor) {
        super.addLocalToRemoteQueueProcessor(queue, processor);
        this.add(queue);
    }

    @Override
    public void removeLocalToRemoteQueueProcessor(Queue queue) {
        this.remove(queue);
        super.removeLocalToRemoteQueueProcessor(queue);
    }

    protected synchronized Runnable getNextRemoteTransfer() throws InterruptedException {
        logger.trace("getNextRemoteTransfer()");
        RemotingQueueDrain r = null;
        do {
            Queue q;
            if ((q = this.workList.take()) != null) {
                if (this.processors.get(q) != null) {
                    logger.trace("[{}] Queue from getNext(), returning RemotingQueueDrain.", (Object)q.getName());
                    r = new RemotingQueueDrain(q, (LocalToRemoteQueueProcessor)this.processors.get(q), this.maxConcurrentRequests, this);
                    continue;
                }
                logger.trace("[{}] Queue from getNext(), but L2R was already killed/null.", (Object)q.getName());
                continue;
            }
            logger.warn("Queue from getNext() was null, something is now WRONG.");
        } while (r == null);
        return r;
    }

    public void complete(Queue queue) {
        long s = queue.getSize();
        logger.trace("[{}] Drain complete, Queue has {} elements.", (Object)queue.getName(), (Object)s);
        if (s > 0L) {
            this.add(queue);
        }
    }

    public void remove(Queue item) {
        this.workList.remove(item);
    }

    public void add(Queue item) {
        logger.trace("[{}] Attempting to add Queue to workList.", (Object)item.getName());
        if (this.workList.add(item)) {
            logger.trace("[{}] Added Queue to workList.", (Object)item.getName());
        } else {
            logger.trace("[{}] No need to add Queue, already present.", (Object)item.getName());
        }
    }

    @Override
    public void onNotEmpty(Queue queue) {
        logger.trace("[{}] Called onNotEmpty for Queue id={}", (Object)queue.getName(), (Object)queue.getId());
        this.add(queue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Runnable take() throws InterruptedException {
        logger.trace("take()");
        try {
            Runnable runnable = this.getNextRemoteTransfer();
            return runnable;
        }
        finally {
            this.scheduledCount.incrementAndGet();
        }
    }

    @Override
    public int drainTo(Collection<? super Runnable> c) {
        logger.trace("drainTo(Collection)");
        int i = 0;
        ArrayList qs = new ArrayList();
        this.workList.drainTo(qs);
        for (Queue q : qs) {
            RemotingQueueDrain r = new RemotingQueueDrain(q, (LocalToRemoteQueueProcessor)this.processors.get(q), this.maxConcurrentRequests, this);
            c.add(r);
            ++i;
        }
        return i;
    }

    @Override
    public int size() {
        logger.trace("size()");
        return this.workList.size();
    }
}

