/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.impl.jobs.queues;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
import org.apache.sling.event.impl.jobs.queues.JobQueueImpl;
import org.apache.sling.event.impl.jobs.queues.QueueServices;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true, service={Runnable.class, QueueManager.class, EventHandler.class}, property={"scheduler.period:Long=60", "scheduler.concurrent:Boolean=false", "event.topics=org/apache/sling/event/notification/job/ADDED", "service.vendor=The Apache Software Foundation"})
public class QueueManager
implements Runnable,
EventHandler,
ConfigurationChangeListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Reference(policyOption=ReferencePolicyOption.GREEDY)
    private EventAdmin eventAdmin;
    @Reference
    private JobConsumerManager jobConsumerManager;
    @Reference
    private QueuesMBean queuesMBean;
    @Reference(policyOption=ReferencePolicyOption.GREEDY)
    private ThreadPoolManager threadPoolManager;
    @Reference(service=EventingThreadPool.class)
    private ThreadPool threadPool;
    @Reference
    private JobManagerConfiguration configuration;
    @Reference
    private StatisticsManager statisticsManager;
    private final Object queuesLock = new Object();
    private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap<String, JobQueueImpl>();
    private volatile long schedulerRuns;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private volatile QueueServices queueServices;
    private final Set<String> haltedTopics = new ConcurrentSkipListSet<String>();

    static QueueManager newForTest(EventAdmin eventAdmin, JobConsumerManager jobConsumerManager, QueuesMBean queuesMBean, ThreadPoolManager threadPoolManager, ThreadPool threadPool, JobManagerConfiguration configuration, StatisticsManager statisticsManager) {
        QueueManager qm = new QueueManager();
        qm.eventAdmin = eventAdmin;
        qm.jobConsumerManager = jobConsumerManager;
        qm.queuesMBean = queuesMBean;
        qm.threadPoolManager = threadPoolManager;
        qm.threadPool = threadPool;
        qm.configuration = configuration;
        qm.statisticsManager = statisticsManager;
        return qm;
    }

    @Activate
    protected void activate(Map<String, Object> props) {
        this.logger.info("Apache Sling Queue Manager starting on instance {}", (Object)Environment.APPLICATION_ID);
        this.queueServices = new QueueServices();
        this.queueServices.configuration = this.configuration;
        this.queueServices.eventAdmin = this.eventAdmin;
        this.queueServices.jobConsumerManager = this.jobConsumerManager;
        this.queueServices.threadPoolManager = this.threadPoolManager;
        this.queueServices.statisticsManager = this.statisticsManager;
        this.queueServices.eventingThreadPool = this.threadPool;
        this.configuration.addListener(this);
        this.logger.info("Apache Sling Queue Manager started on instance {}", (Object)Environment.APPLICATION_ID);
    }

    @Deactivate
    protected void deactivate() {
        this.logger.debug("Apache Sling Queue Manager stopping on instance {}", (Object)Environment.APPLICATION_ID);
        this.configuration.removeListener(this);
        for (JobQueueImpl jbq : this.queues.values()) {
            jbq.close();
            ((QueuesMBeanImpl)this.queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
        }
        this.queues.clear();
        this.queueServices = null;
        this.logger.info("Apache Sling Queue Manager stopped on instance {}", (Object)Environment.APPLICATION_ID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maintain() {
        boolean doFullCleanUp;
        ++this.schedulerRuns;
        this.logger.debug("Queue manager maintenance: Starting #{}", (Object)this.schedulerRuns);
        if (this.isActive.get()) {
            for (JobQueueImpl jbq : this.queues.values()) {
                jbq.maintain();
            }
        }
        if (this.schedulerRuns % 3L == 0L && this.isActive.get()) {
            this.fullTopicScan();
        }
        boolean bl = doFullCleanUp = this.schedulerRuns % 5L == 0L;
        if (doFullCleanUp) {
            this.logger.debug("Checking for idle queues...");
            Object object = this.queuesLock;
            synchronized (object) {
                Iterator<Map.Entry<String, JobQueueImpl>> i = this.queues.entrySet().iterator();
                while (i.hasNext()) {
                    Map.Entry<String, JobQueueImpl> current = i.next();
                    JobQueueImpl jbq = current.getValue();
                    if (!jbq.tryToClose()) continue;
                    this.logger.debug("Removing idle job queue {}", (Object)jbq);
                    i.remove();
                    ((QueuesMBeanImpl)this.queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
                }
            }
        }
        this.logger.debug("Queue manager maintenance: Finished #{}", (Object)this.schedulerRuns);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void start(QueueConfigurationManager.QueueInfo queueInfo, Set<String> topics) {
        InternalQueueConfiguration config = queueInfo.queueConfiguration;
        HashSet<String> filteredTopics = new HashSet<String>(topics);
        filteredTopics.removeAll(this.haltedTopics);
        boolean isNewQueue = false;
        JobQueueImpl queue = null;
        Object object = this.queuesLock;
        synchronized (object) {
            queue = this.queues.get(queueInfo.queueName);
            if (queue != null && queue.getConfiguration() != config) {
                this.outdateQueue(queue);
                queue = null;
            }
            if (queue == null && (queue = JobQueueImpl.createQueue(queueInfo.queueName, config, this.queueServices, filteredTopics, this.haltedTopics)) != null) {
                isNewQueue = true;
                this.queues.put(queueInfo.queueName, queue);
                ((QueuesMBeanImpl)this.queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
            }
        }
        if (queue != null) {
            this.logger.debug("Starting queue {}", (Object)queueInfo.queueName);
            if (!isNewQueue) {
                queue.wakeUpQueue(filteredTopics);
            }
            queue.startJobs();
        }
    }

    @Override
    public void run() {
        this.maintain();
    }

    private void outdateQueue(JobQueueImpl queue) {
        String oldName = ResourceHelper.filterQueueName(queue.getName());
        this.queues.remove(oldName);
        if (queue.tryToClose()) {
            ((QueuesMBeanImpl)this.queuesMBean).sendEvent(new QueueStatusEvent(null, queue));
        } else {
            queue.outdate();
            String newName = ResourceHelper.filterName(queue.getName());
            int index = 0;
            while (this.queues.containsKey(newName)) {
                newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++);
            }
            this.queues.put(newName, queue);
            ((QueuesMBeanImpl)this.queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restart() {
        Object object = this.queuesLock;
        synchronized (object) {
            ArrayList<JobQueueImpl> queues = new ArrayList<JobQueueImpl>(this.queues.values());
            for (JobQueueImpl queue : queues) {
                this.outdateQueue(queue);
            }
        }
        JobManagerConfiguration config = this.configuration;
        if (config != null) {
            List<Job> rescheduleList = this.configuration.clearJobRetryList();
            for (Job j : rescheduleList) {
                JobHandler jh = new JobHandler((JobImpl)j, null, this.configuration);
                jh.reschedule();
            }
        }
    }

    public Queue getQueue(String name) {
        return this.queues.get(name);
    }

    public Iterable<Queue> getQueues() {
        final Iterator<JobQueueImpl> jqI = this.queues.values().iterator();
        return new Iterable<Queue>(){

            @Override
            public Iterator<Queue> iterator() {
                return new Iterator<Queue>(){

                    @Override
                    public boolean hasNext() {
                        return jqI.hasNext();
                    }

                    @Override
                    public Queue next() {
                        return (Queue)jqI.next();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    @Override
    public void configurationChanged(boolean active) {
        if (this.configuration != null) {
            this.logger.debug("Topology changed {}", (Object)active);
            this.isActive.set(active);
            this.clearHaltedTopics("configurationChanged : unhalted topics due to configuration change");
            if (active) {
                this.fullTopicScan();
            } else {
                this.restart();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearHaltedTopics(String logPrefix) {
        String haltedTopicsToString;
        Set<String> set = this.haltedTopics;
        synchronized (set) {
            if (this.haltedTopics.isEmpty()) {
                return;
            }
            haltedTopicsToString = this.haltedTopics.toString();
            this.haltedTopics.clear();
        }
        this.logger.info(logPrefix + " : " + haltedTopicsToString);
    }

    void fullTopicScan() {
        this.logger.debug("Scanning repository for existing topics...");
        Set<String> topics = this.scanTopics();
        Map<QueueConfigurationManager.QueueInfo, Set<String>> mapping = this.updateTopicMapping(topics);
        for (Map.Entry<QueueConfigurationManager.QueueInfo, Set<String>> entry : mapping.entrySet()) {
            this.start(entry.getKey(), entry.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<String> scanTopics() {
        HashSet<String> topics = new HashSet<String>();
        try (ResourceResolver resolver = this.configuration.createResourceResolver();){
            Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath());
            if (baseResource != null) {
                Iterator topicIter = baseResource.listChildren();
                while (topicIter.hasNext()) {
                    Resource topicResource = (Resource)topicIter.next();
                    String topic = topicResource.getName().replace('.', '/');
                    this.logger.debug("Found topic {}", (Object)topic);
                    topics.add(topic);
                }
            }
        }
        return topics;
    }

    public void handleEvent(Event event) {
        if ("org/osgi/framework/BundleEvent/STARTED".equals(event.getTopic()) || "org/osgi/framework/BundleEvent/UPDATED".equals(event.getTopic())) {
            this.clearHaltedTopics("handleEvent: unhalted topics due to bundle started/updated event");
        }
        String topic = (String)event.getProperty("event.job.topic");
        if (this.isActive.get() && topic != null) {
            this.logger.debug("Received event {}", (Object)topic);
            QueueConfigurationManager.QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topic);
            this.start(info, Collections.singleton(topic));
        }
    }

    private Map<QueueConfigurationManager.QueueInfo, Set<String>> updateTopicMapping(Set<String> topics) {
        HashMap<QueueConfigurationManager.QueueInfo, Set<String>> mapping = new HashMap<QueueConfigurationManager.QueueInfo, Set<String>>();
        for (String topic : topics) {
            QueueConfigurationManager.QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(topic);
            HashSet<String> queueTopics = (HashSet<String>)mapping.get(queueInfo);
            if (queueTopics == null) {
                queueTopics = new HashSet<String>();
                mapping.put(queueInfo, queueTopics);
            }
            queueTopics.add(topic);
        }
        this.logger.debug("Established new topic mapping: {}", mapping);
        return mapping;
    }

    protected void bindThreadPool(EventingThreadPool etp) {
        this.threadPool = etp;
    }

    protected void unbindThreadPool(EventingThreadPool etp) {
        if (this.threadPool == etp) {
            this.threadPool = null;
        }
    }
}

