/*
 * Decompiled with CFR 0.152.
 */
package com.cloudhopper.mq.broker;

import com.cloudhopper.commons.util.URL;
import com.cloudhopper.mq.broker.AsyncHttpClientFactory;
import com.cloudhopper.mq.broker.DistributedQueueConfiguration;
import com.cloudhopper.mq.broker.DistributedQueueEvent;
import com.cloudhopper.mq.broker.DistributedQueueManagerMBean;
import com.cloudhopper.mq.broker.DistributedQueueState;
import com.cloudhopper.mq.broker.DistributedQueueStateListener;
import com.cloudhopper.mq.broker.FairRemoteQueueTransferScheduler;
import com.cloudhopper.mq.broker.LocalToRemoteQueueProcessor;
import com.cloudhopper.mq.broker.RemoteBrokerInfo;
import com.cloudhopper.mq.broker.RemoteBrokerMonitor;
import com.cloudhopper.mq.broker.RemoteQueueInfo;
import com.cloudhopper.mq.broker.RemoteQueueTransferScheduler;
import com.cloudhopper.mq.queue.Queue;
import com.cloudhopper.mq.queue.QueueInvalidStateException;
import com.cloudhopper.mq.queue.QueueManager;
import com.cloudhopper.mq.queue.QueueManagerListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedQueueManager
implements QueueManagerListener,
DistributedQueueStateListener,
DistributedQueueManagerMBean {
    private static final Logger logger = LoggerFactory.getLogger(DistributedQueueManager.class);
    protected final AtomicBoolean started;
    protected final DistributedQueueConfiguration configuration;
    protected final QueueManager queueManager;
    protected final DistributedQueueState dqs;
    protected final DelayQueue<DistributedQueueEvent> events;
    protected ExecutorService remotingExecutorService;
    protected RemoteQueueTransferScheduler remotingScheduler;
    protected ScheduledThreadPoolExecutor monitorExecutorService;
    protected final ConcurrentHashMap<String, RemoteBrokerMonitor> remoteBrokerMonitors;
    protected final ConcurrentHashMap<String, LocalToRemoteQueueProcessor> queueProcessors;
    private EventProcessor eventProcessor;
    private final AsyncHttpClientFactory httpFactory;
    private final AtomicLong remotingTakeTimeouts;
    private final AtomicLong remotingTakeNulls;

    public DistributedQueueManager(DistributedQueueConfiguration configuration, QueueManager queueManager) {
        this.configuration = configuration;
        this.queueManager = queueManager;
        this.dqs = new DistributedQueueState(configuration);
        this.events = new DelayQueue();
        this.remoteBrokerMonitors = new ConcurrentHashMap();
        this.queueProcessors = new ConcurrentHashMap();
        this.httpFactory = new AsyncHttpClientFactory((int)configuration.getConnectionTimeout(), (int)configuration.getConnectionTimeout());
        this.remotingTakeTimeouts = new AtomicLong(0L);
        this.remotingTakeNulls = new AtomicLong(0L);
        this.started = new AtomicBoolean(false);
        this.registerMBean();
    }

    protected void registerMBean() {
        if (this.queueManager.getConfiguration().isJmxEnabled()) {
            try {
                ObjectName name = new ObjectName(this.queueManager.getConfiguration().getJmxDomain() + ":name=DistributedQueueManager");
                this.queueManager.getConfiguration().getMBeanServer().registerMBean(this, name);
            }
            catch (Exception e) {
                logger.warn("Error while attempting to register DistributedQueueManager as an MBean: {}", (Object)e.toString());
            }
        }
    }

    public DistributedQueueConfiguration getConfiguration() {
        return this.configuration;
    }

    public QueueManager getQueueManager() {
        return this.queueManager;
    }

    public DistributedQueueState getDistributedQueueState() {
        return this.dqs;
    }

    public DelayQueue getEventQueue() {
        return this.events;
    }

    public ConcurrentHashMap<String, RemoteBrokerMonitor> getRemoteBrokerMonitors() {
        return this.remoteBrokerMonitors;
    }

    public ConcurrentHashMap<String, LocalToRemoteQueueProcessor> getQueueProcessors() {
        return this.queueProcessors;
    }

    @Override
    public Integer getAreaId() {
        return this.configuration.getAreaId();
    }

    @Override
    public String getGroupName() {
        return this.configuration.getGroupName();
    }

    @Override
    public String[] getRemoteBrokerMonitorNames() {
        String[] monitorNames = this.remoteBrokerMonitors.keySet().toArray(new String[0]);
        return monitorNames;
    }

    @Override
    public int getEventQueueSize() {
        return this.events.size();
    }

    @Override
    public int getLocalToRemoteQueueProcessorSize() {
        return this.queueProcessors.size();
    }

    @Override
    public String[] getLocalToRemoteQueueProcessorNames() {
        String[] queueNames = this.queueProcessors.keySet().toArray(new String[0]);
        return queueNames;
    }

    @Override
    public boolean isStarted() {
        return this.started.get();
    }

    @Override
    public boolean isStopped() {
        return !this.started.get();
    }

    @Override
    public boolean isEventProcessorAlive() {
        return this.eventProcessor.isAlive();
    }

    @Override
    public boolean isMonitorThreadPoolAlive() {
        return !this.monitorExecutorService.isTerminated() && !this.monitorExecutorService.isShutdown();
    }

    @Override
    public boolean isTransferThreadPoolAlive() {
        return !this.remotingExecutorService.isTerminated() && !this.remotingExecutorService.isShutdown();
    }

    @Override
    public int getMonitorThreadsSize() {
        return this.monitorExecutorService.getPoolSize();
    }

    @Override
    public int getTransferThreadsSize() {
        return ((ThreadPoolExecutor)this.remotingExecutorService).getPoolSize();
    }

    @Override
    public long getRemotingTakeTimeouts() {
        return this.remotingTakeTimeouts.get();
    }

    public long incrementRemotingTakeTimeouts() {
        return this.remotingTakeTimeouts.incrementAndGet();
    }

    @Override
    public long getRemotingTakeNulls() {
        return this.remotingTakeNulls.get();
    }

    public long incrementRemotingTakeNulls() {
        return this.remotingTakeNulls.incrementAndGet();
    }

    @Override
    public synchronized void start() {
        if (this.isStarted()) {
            logger.warn("Ignoring duplicate start() request");
            return;
        }
        this.queueManager.addListener((QueueManagerListener)this);
        this.dqs.addListener(this);
        logger.debug("Starting monitoring ExecutorService with {} threads.", (Object)this.configuration.getMonitorThreads());
        this.monitorExecutorService = new ScheduledThreadPoolExecutor(this.configuration.getMonitorThreads(), new ThreadFactory(){
            AtomicInteger sequence = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("CHMQ-RemoteBrokerMonitor-" + this.sequence.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        });
        this.monitorExecutorService.prestartAllCoreThreads();
        this.remoteBrokerMonitors.clear();
        this.remotingScheduler = new FairRemoteQueueTransferScheduler(this.configuration.getMaxConcurrentRequests(), this.queueManager);
        logger.debug("Starting remoting ExecutorService with {} request and {} response threads.", (Object)this.configuration.getRemotingRequestThreads(), (Object)this.configuration.getRemotingResponseThreads());
        ThreadPoolExecutor tpRemotingExecutorService = new ThreadPoolExecutor(this.configuration.getRemotingRequestThreads(), this.configuration.getRemotingRequestThreads(), 5000L, TimeUnit.MILLISECONDS, this.remotingScheduler, new ThreadFactory(){
            AtomicInteger sequence = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("CHMQ-RemoteQueueTransfer-" + this.sequence.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        }, new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                logger.warn("Rejected execution of {} by executor with {} current threads in the pool", (Object)r.getClass().getName(), (Object)executor.getPoolSize());
            }
        });
        tpRemotingExecutorService.prestartAllCoreThreads();
        this.remotingExecutorService = tpRemotingExecutorService;
        this.eventProcessor = new EventProcessor();
        this.eventProcessor.start();
        this.started.set(true);
        for (URL url : this.configuration.getRemoteBrokers()) {
            RemoteBrokerInfo bi = new RemoteBrokerInfo(url.toString());
            this.dqs.addRemoteBroker(bi);
        }
    }

    @Override
    public synchronized void stop() {
        if (this.isStopped()) {
            logger.warn("Ignoring duplicate stop() request");
            return;
        }
        this.killAllLocalToRemoteQueueProcessors();
        this.queueManager.removeListener((QueueManagerListener)this);
        this.dqs.removeListener(this);
        this.monitorExecutorService.shutdownNow();
        this.remoteBrokerMonitors.clear();
        this.eventProcessor.interrupt();
        this.remotingExecutorService.shutdownNow();
        this.remotingScheduler = null;
        this.dqs.clear();
        this.started.set(false);
    }

    private synchronized void startLocalToRemoteQueueProcessor(String queueName) {
        Queue localQueue = null;
        try {
            localQueue = this.queueManager.getQueue(queueName);
        }
        catch (QueueInvalidStateException e) {
            logger.error("Unable to complete startLocalToRemoteQueueProcessor since QueueManager is in an invalid state", (Throwable)e);
            return;
        }
        if (localQueue == null) {
            logger.warn("Unable to complete startLocalToRemoteQueueProcessor since local queue [" + queueName + "] does not exist - perhaps a delayed event?");
            return;
        }
        if (localQueue.getConsumerCount() > 0) {
            logger.warn("Ignoring request to startLocalToRemoteQueueProcessor for queue [" + queueName + "]: there are currently [" + localQueue.getConsumerCount() + "] local consumers");
            return;
        }
        RemoteQueueInfo remoteQueue = this.dqs.getRemoteQueue(queueName);
        if (remoteQueue == null) {
            logger.error("Ignoring request to startLocalToRemoteQueueProcessor for queue [" + queueName + "]: there is no remote queue for it in DistributedQueueState");
            return;
        }
        if (remoteQueue.isNotAvailable()) {
            logger.error("Ignoring request to startLocalToRemoteQueueProcessor for queue [" + queueName + "]: the remote queue is not available (perhaps it flapped?)");
            return;
        }
        LocalToRemoteQueueProcessor queueProcessor = this.queueProcessors.get(queueName);
        if (queueProcessor != null && !queueProcessor.isKilled() && queueProcessor.isAlive()) {
            logger.error("Ignoring request to startLocalToRemoteQueueProcessor for queue [" + queueName + "]: the processor already exists and is still alive");
            return;
        }
        logger.debug("Creating and starting LocalToRemoteQueueProcessor for queue [" + localQueue.getName() + "]");
        try {
            queueProcessor = new LocalToRemoteQueueProcessor(this, localQueue, remoteQueue, this.httpFactory);
            this.queueProcessors.put(queueName, queueProcessor);
            queueProcessor.start();
            this.remotingScheduler.addLocalToRemoteQueueProcessor(localQueue, queueProcessor);
        }
        catch (Exception e) {
            logger.error("Error while starting and adding new LocalToRemoteQueueProcessor.", (Throwable)e);
        }
    }

    private synchronized void killLocalToRemoteQueueProcessor(String queueName) {
        LocalToRemoteQueueProcessor queueProcessor = this.queueProcessors.remove(queueName);
        if (queueProcessor == null) {
            logger.warn("Ignoring request to killLocalToRemoteQueueProcessor for queue [" + queueName + "]: no processor currently exists");
            return;
        }
        logger.debug("Removing and destroying LocalToRemoteQueueProcessor for queue [" + queueName + "]");
        try {
            queueProcessor.kill();
            this.remotingScheduler.removeLocalToRemoteQueueProcessor(queueProcessor.getLocalQueue());
        }
        catch (Exception e) {
            logger.error("Error while stopping and removing new LocalToRemoteQueueProcessor.", (Throwable)e);
        }
    }

    private synchronized void killAllLocalToRemoteQueueProcessors() {
        for (String queueName : this.queueProcessors.keySet()) {
            this.killLocalToRemoteQueueProcessor(queueName);
        }
    }

    public void notifyQueueCreated(Queue queue) {
        logger.debug("Received notification that queue [" + queue.getName() + "] created, going to wait " + this.configuration.getLocalQueueCreateDelay() + " ms before checking if a remote processor should start");
        this.events.put(new DistributedQueueEvent(queue.getName(), Event.LOCAL_QUEUE_CREATED, this.configuration.getLocalQueueCreateDelay()));
    }

    public void notifyQueueDestroyed(Queue queue) {
        logger.debug("Received notification that queue [" + queue.getName() + "] destroyed");
        this.events.put(new DistributedQueueEvent(queue.getName(), Event.LOCAL_QUEUE_DESTROYED));
    }

    public void notifyAtLeastOneQueueConsumerStarted(Queue queue) {
        logger.debug("Received notification that queue [" + queue.getName() + "] now has at least one local consumer");
        this.events.put(new DistributedQueueEvent(queue.getName(), Event.LOCAL_CONSUMER_STARTED));
    }

    public void notifyAllQueueConsumersStopped(Queue queue) {
        logger.debug("Received notification that queue [" + queue.getName() + "] now has no local consumers");
        logger.debug("To prevent possible flapping, will delay processing event that all local consumers stopped on queue [" + queue.getName() + "] for " + this.configuration.getLocalConsumerFlappingDelay() + " ms");
        this.events.put(new DistributedQueueEvent(queue.getName(), Event.LOCAL_CONSUMER_STOPPED, this.configuration.getLocalConsumerFlappingDelay()));
    }

    @Override
    public void notifyRemoteBrokerAdded(RemoteBrokerInfo bi) {
        logger.debug("Received notification that remote broker [" + bi.getUrl() + "] was added");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
        if (this.remoteBrokerMonitors.get(bi.getUrl()) == null) {
            RemoteBrokerMonitor monitor = new RemoteBrokerMonitor(this.configuration, this.dqs, bi, this.httpFactory);
            this.remoteBrokerMonitors.put(bi.getUrl(), monitor);
            logger.debug("Scheduling monitor for RemoteBroker [" + bi.getUrl() + "] to run every [" + this.configuration.getMonitorInterval() + " ms]");
            this.monitorExecutorService.scheduleWithFixedDelay(monitor, 1000L, this.configuration.getMonitorInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void notifyRemoteBrokerRemoved(RemoteBrokerInfo bi) {
        RemoteBrokerMonitor monitor;
        logger.debug("Received notification that remote broker [" + bi.getUrl() + "] was removed");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
        if ((monitor = this.remoteBrokerMonitors.get(bi.getUrl())) != null) {
            logger.debug("Removing monitor for RemoteBroker [" + bi.getUrl() + "]");
            this.monitorExecutorService.remove(monitor);
        }
    }

    @Override
    public void notifyRemoteBrokerStateChanged(RemoteBrokerInfo bi, int state) {
        logger.debug("Received notification that remote broker [" + bi.getUrl() + "] had a state change");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
    }

    @Override
    public void notifyRemoteQueueAdded(RemoteQueueInfo qi) {
        logger.debug("Received notification that remote queue [" + qi.getName() + "] was added");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
    }

    @Override
    public void notifyRemoteQueueRemoved(RemoteQueueInfo qi) {
        logger.debug("Received notification that remote queue [" + qi.getName() + "] was removed");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
    }

    @Override
    public void notifyRemoteQueueStateChanged(RemoteQueueInfo qi, int state) {
        logger.debug("Received notification that remote queue [" + qi.getName() + "] has a new state [" + RemoteQueueInfo.STATES[state] + "]");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
        if (qi.isAvailable()) {
            this.events.put(new DistributedQueueEvent(qi.getName(), Event.REMOTE_QUEUE_AVAILABLE));
        } else if (qi.isNotAvailable()) {
            this.events.put(new DistributedQueueEvent(qi.getName(), Event.REMOTE_QUEUE_NOT_AVAILABLE));
        }
    }

    @Override
    public void notifyRemoteQueueAttributesChanged(RemoteQueueInfo qi) {
        logger.debug("Received notification that remote queue [" + qi.getName() + "] had attributes change, but not its overall state");
        if (logger.isTraceEnabled()) {
            logger.trace(this.dqs.toDebugString());
        }
    }

    private class EventProcessor
    extends Thread {
        public EventProcessor() {
            this.setDaemon(true);
            this.setName("CHMQ-DistributedQueueManager-EventProcessor");
        }

        @Override
        public void run() {
            logger.info("Event processing thread started");
            while (!Thread.interrupted()) {
                try {
                    DistributedQueueEvent event = (DistributedQueueEvent)DistributedQueueManager.this.events.take();
                    logger.info("Processing event " + event);
                    if (event.getEvent() == Event.REMOTE_QUEUE_AVAILABLE) {
                        DistributedQueueManager.this.startLocalToRemoteQueueProcessor(event.getQueueName());
                        continue;
                    }
                    if (event.getEvent() == Event.REMOTE_QUEUE_NOT_AVAILABLE) {
                        DistributedQueueManager.this.killLocalToRemoteQueueProcessor(event.getQueueName());
                        continue;
                    }
                    if (event.getEvent() == Event.LOCAL_CONSUMER_STARTED) {
                        DistributedQueueManager.this.killLocalToRemoteQueueProcessor(event.getQueueName());
                        continue;
                    }
                    if (event.getEvent() == Event.LOCAL_CONSUMER_STOPPED) {
                        DistributedQueueManager.this.startLocalToRemoteQueueProcessor(event.getQueueName());
                        continue;
                    }
                    if (event.getEvent() == Event.LOCAL_QUEUE_CREATED) {
                        DistributedQueueManager.this.startLocalToRemoteQueueProcessor(event.getQueueName());
                        continue;
                    }
                    if (event.getEvent() != Event.LOCAL_QUEUE_DESTROYED) continue;
                    DistributedQueueManager.this.killLocalToRemoteQueueProcessor(event.getQueueName());
                }
                catch (InterruptedException e) {
                    // empty catch block
                    break;
                }
            }
            logger.info("Event processing thread ending");
        }
    }

    public static enum Event {
        REMOTE_QUEUE_AVAILABLE,
        REMOTE_QUEUE_NOT_AVAILABLE,
        LOCAL_CONSUMER_STARTED,
        LOCAL_CONSUMER_STOPPED,
        LOCAL_QUEUE_CREATED,
        LOCAL_QUEUE_DESTROYED;

    }
}

