/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.impl.jobs.queues;

import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.QueueJobCache;
import org.apache.sling.event.impl.jobs.queues.QueueServices;
import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.consumer.JobExecutionContext;
import org.apache.sling.event.jobs.consumer.JobExecutionResult;
import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobQueue
implements Queue,
JobStatusNotifier {
    private static final long MAX_SUSPEND_TIME = 3600000L;
    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60000L;
    protected final Logger logger;
    protected final InternalQueueConfiguration configuration;
    protected volatile String queueName;
    protected volatile boolean running;
    private volatile long suspendedSince = -1L;
    private final Object suspendLock = new Object();
    protected final QueueServices services;
    private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
    private final ThreadPool threadPool;
    private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
    private final AtomicInteger asyncCounter = new AtomicInteger();
    protected volatile boolean isWaiting = false;
    private final AtomicBoolean isOutdated = new AtomicBoolean(false);
    private final AtomicBoolean closeMarker = new AtomicBoolean(false);
    private final QueueJobCache cache;
    private volatile boolean isWaitingForNextJob = false;
    private final Object nextJobLock = new Object();

    public AbstractJobQueue(String name, InternalQueueConfiguration config, QueueServices services, Set<String> topics) {
        this.threadPool = config.getOwnThreadPoolSize() > 0 ? new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize()) : Environment.THREAD_POOL;
        this.queueName = name;
        this.configuration = config;
        this.services = services;
        this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + '.' + name));
        this.running = true;
        this.cache = new QueueJobCache(services.configuration, config.getType(), topics);
    }

    @Override
    public InternalQueueConfiguration getConfiguration() {
        return this.configuration;
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    @Override
    public Statistics getStatistics() {
        return this.services.statisticsManager.getQueueStatistics(this.queueName);
    }

    public void start() {
        Thread queueThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (AbstractJobQueue.this.running && !AbstractJobQueue.this.isOutdated()) {
                    AbstractJobQueue.this.logger.info("Starting job queue {}", (Object)AbstractJobQueue.this.queueName);
                    AbstractJobQueue.this.logger.debug("Configuration for job queue={}", (Object)AbstractJobQueue.this.configuration);
                    try {
                        AbstractJobQueue.this.runJobQueue();
                    }
                    catch (Throwable t) {
                        AbstractJobQueue.this.logger.error("Job queue " + AbstractJobQueue.this.queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
                    }
                }
            }
        }, "Apache Sling Job Queue " + this.queueName);
        queueThread.setDaemon(true);
        queueThread.start();
    }

    protected boolean isOutdated() {
        return this.isOutdated.get();
    }

    public void outdate() {
        if (this.isOutdated.compareAndSet(false, true)) {
            String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
            this.logger.info("Outdating queue {}, renaming to {}.", (Object)this.queueName, (Object)name);
            this.queueName = name;
        }
    }

    public boolean tryToClose() {
        this.resume();
        if (this.canBeClosed()) {
            if (this.closeMarker.get()) {
                this.close();
                return true;
            }
            this.closeMarker.set(true);
        }
        return false;
    }

    protected boolean canBeClosed() {
        return !this.isWaiting && !this.isSuspended() && this.cache.isEmpty() && this.asyncCounter.get() == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this.running = false;
        this.logger.debug("Shutting down job queue {}", (Object)this.queueName);
        this.resume();
        if (this.isWaiting) {
            this.logger.debug("Waking up waiting queue {}", (Object)this.queueName);
            this.notifyFinished(false);
        }
        this.stopWaitingForNextJob();
        Map<String, JobHandler> map = this.processingJobsLists;
        synchronized (map) {
            this.processingJobsLists.clear();
        }
        map = this.startedJobsLists;
        synchronized (map) {
            this.startedJobsLists.clear();
        }
        if (this.configuration.getOwnThreadPoolSize() > 0) {
            ((EventingThreadPool)this.threadPool).release();
        }
        this.logger.info("Stopped job queue {}", (Object)this.queueName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkForUnprocessedJobs() {
        if (this.running) {
            long tooOld = System.currentTimeMillis() - 60000L;
            ArrayList<JobHandler> restartJobs = new ArrayList<JobHandler>();
            Map<String, JobHandler> map = this.startedJobsLists;
            synchronized (map) {
                for (Map.Entry<String, JobHandler> entry : this.startedJobsLists.entrySet()) {
                    if (entry.getValue().started > tooOld) continue;
                    restartJobs.add(entry.getValue());
                }
            }
            if (restartJobs.size() > 0) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    this.ignoreException(e);
                    Thread.currentThread().interrupt();
                }
            }
            for (JobHandler handler : restartJobs) {
                boolean process = false;
                Map<String, JobHandler> map2 = this.startedJobsLists;
                synchronized (map2) {
                    process = this.startedJobsLists.remove(handler.getJob().getId()) != null;
                }
                if (!process || !handler.reschedule()) continue;
                this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", (Object)Utility.toString(handler.getJob()), (Object)handler.getJob().getId());
                handler.getJob().retry();
                this.requeue(handler);
                this.notifyFinished(true);
            }
        }
    }

    private void runJobQueue() {
        while (this.running && !this.isOutdated()) {
            JobHandler info = null;
            if (info == null) {
                info = this.take();
            }
            if (!this.running || info == null || this.isOutdated() || this.checkSuspended(info)) continue;
            this.start(info);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private JobHandler take() {
        this.logger.debug("Taking new job for {}", (Object)this.queueName);
        JobImpl result = null;
        while (result == null && !this.isOutdated() && this.running) {
            this.isWaitingForNextJob = true;
            result = this.cache.getNextJob();
            if (result == null && !this.isOutdated() && this.running) {
                Object object = this.nextJobLock;
                synchronized (object) {
                    while (this.isWaitingForNextJob) {
                        try {
                            this.nextJobLock.wait(20000L);
                            this.isWaitingForNextJob = false;
                        }
                        catch (InterruptedException ignore) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
            this.isWaitingForNextJob = false;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Returning job for {} : {}", (Object)this.queueName, (Object)Utility.toString(result));
        }
        return result != null ? new JobHandler(result, this.services.configuration) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopWaitingForNextJob() {
        Object object = this.nextJobLock;
        synchronized (object) {
            this.isWaitingForNextJob = false;
            this.nextJobLock.notify();
        }
    }

    public void wakeUpQueue(Set<String> topics) {
        this.cache.handleNewTopics(topics);
        this.stopWaitingForNextJob();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requeue(JobHandler handler) {
        this.cache.reschedule(handler);
        Object object = this.nextJobLock;
        synchronized (object) {
            this.nextJobLock.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkSuspended(JobHandler handler) {
        boolean wasSuspended = false;
        Object object = this.suspendLock;
        synchronized (object) {
            while (this.suspendedSince != -1L) {
                this.requeue(handler);
                this.logger.debug("Sleeping as queue {} is suspended.", (Object)this.getName());
                wasSuspended = true;
                long diff = System.currentTimeMillis() - this.suspendedSince;
                try {
                    this.suspendLock.wait(3600000L - diff);
                }
                catch (InterruptedException ignore) {
                    this.ignoreException(ignore);
                    Thread.currentThread().interrupt();
                }
                this.logger.debug("Waking up queue {}.", (Object)this.getName());
                if (System.currentTimeMillis() <= this.suspendedSince + 3600000L) continue;
                this.resume();
            }
        }
        return wasSuspended;
    }

    protected boolean executeJob(JobHandler handler) {
        JobImpl job = handler.getJob();
        JobExecutor consumer = this.services.jobConsumerManager.getExecutor(job.getTopic());
        if (consumer != null || job.isBridgedEvent() && this.services.jobConsumerManager.supportsBridgedEvents()) {
            boolean success = this.startJobExecution(handler, consumer);
            return success;
        }
        handler.reassign();
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean startJobExecution(final JobHandler handler, final JobExecutor consumer) {
        this.closeMarker.set(false);
        final JobImpl job = handler.getJob();
        if (handler.startProcessing(this)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Starting job {}", (Object)Utility.toString(job));
            }
            try {
                handler.started = System.currentTimeMillis();
                if (consumer != null) {
                    long queueTime = handler.started - handler.queued;
                    NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/START", job, queueTime);
                    Map<String, JobHandler> map = this.processingJobsLists;
                    synchronized (map) {
                        this.processingJobsLists.put(job.getId(), handler);
                    }
                    Runnable task = new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            final Object lock = new Object();
                            Thread currentThread = Thread.currentThread();
                            String oldName = currentThread.getName();
                            int oldPriority = currentThread.getPriority();
                            currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
                            if (AbstractJobQueue.this.configuration.getThreadPriority() != null) {
                                switch (AbstractJobQueue.this.configuration.getThreadPriority()) {
                                    case NORM: {
                                        currentThread.setPriority(5);
                                        break;
                                    }
                                    case MIN: {
                                        currentThread.setPriority(1);
                                        break;
                                    }
                                    case MAX: {
                                        currentThread.setPriority(10);
                                    }
                                }
                            }
                            JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
                            Job.JobState resultState = Job.JobState.ERROR;
                            final AtomicBoolean isAsync = new AtomicBoolean(false);
                            try {
                                Object object = lock;
                                synchronized (object) {
                                    JobExecutionContext ctx = new JobExecutionContext(){
                                        private boolean hasInit = false;

                                        @Override
                                        public void initProgress(int steps, long eta) {
                                            if (!this.hasInit) {
                                                handler.persistJobProperties(job.startProgress(steps, eta));
                                                this.hasInit = true;
                                            }
                                        }

                                        @Override
                                        public void incrementProgressCount(int steps) {
                                            if (this.hasInit) {
                                                handler.persistJobProperties(job.setProgress(steps));
                                            }
                                        }

                                        @Override
                                        public void updateProgress(long eta) {
                                            if (this.hasInit) {
                                                handler.persistJobProperties(job.update(eta));
                                            }
                                        }

                                        @Override
                                        public void log(String message, Object ... args) {
                                            handler.persistJobProperties(job.log(message, args));
                                        }

                                        @Override
                                        public boolean isStopped() {
                                            return handler.isStopped();
                                        }

                                        /*
                                         * WARNING - Removed try catching itself - possible behaviour change.
                                         */
                                        @Override
                                        public void asyncProcessingFinished(JobExecutionResult result) {
                                            Object object = lock;
                                            synchronized (object) {
                                                Job.JobState state;
                                                if (isAsync.compareAndSet(true, false)) {
                                                    AbstractJobQueue.this.services.jobConsumerManager.unregisterListener(job.getId());
                                                    state = null;
                                                    if (result.succeeded()) {
                                                        state = Job.JobState.SUCCEEDED;
                                                    } else if (result.failed()) {
                                                        state = Job.JobState.QUEUED;
                                                    } else if (result.cancelled()) {
                                                        state = handler.isStopped() ? Job.JobState.STOPPED : Job.JobState.ERROR;
                                                    }
                                                } else {
                                                    throw new IllegalStateException("Job is not processed async " + job.getId());
                                                }
                                                AbstractJobQueue.this.finishedJob(job.getId(), state, true);
                                                AbstractJobQueue.this.asyncCounter.decrementAndGet();
                                            }
                                        }

                                        @Override
                                        public JobExecutionContext.ResultBuilder result() {
                                            return new JobExecutionContext.ResultBuilder(){
                                                private String message;
                                                private Long retryDelayInMs;

                                                @Override
                                                public JobExecutionResult failed(long retryDelayInMs) {
                                                    this.retryDelayInMs = retryDelayInMs;
                                                    return new JobExecutionResultImpl(InternalJobState.FAILED, this.message, retryDelayInMs);
                                                }

                                                @Override
                                                public JobExecutionContext.ResultBuilder message(String message) {
                                                    this.message = message;
                                                    return this;
                                                }

                                                @Override
                                                public JobExecutionResult succeeded() {
                                                    return new JobExecutionResultImpl(InternalJobState.SUCCEEDED, this.message, this.retryDelayInMs);
                                                }

                                                @Override
                                                public JobExecutionResult failed() {
                                                    return new JobExecutionResultImpl(InternalJobState.FAILED, this.message, this.retryDelayInMs);
                                                }

                                                @Override
                                                public JobExecutionResult cancelled() {
                                                    return new JobExecutionResultImpl(InternalJobState.CANCELLED, this.message, this.retryDelayInMs);
                                                }
                                            };
                                        }
                                    };
                                    result = (JobExecutionResultImpl)consumer.process(job, ctx);
                                    if (result == null) {
                                        AbstractJobQueue.this.services.jobConsumerManager.registerListener(job.getId(), consumer, ctx);
                                        AbstractJobQueue.this.asyncCounter.incrementAndGet();
                                        isAsync.set(true);
                                    } else if (result.succeeded()) {
                                        resultState = Job.JobState.SUCCEEDED;
                                    } else if (result.failed()) {
                                        resultState = Job.JobState.QUEUED;
                                    } else if (result.cancelled()) {
                                        resultState = handler.isStopped() ? Job.JobState.STOPPED : Job.JobState.ERROR;
                                    }
                                }
                            }
                            catch (Throwable t) {
                                AbstractJobQueue.this.logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                                result = JobExecutionResultImpl.CANCELLED;
                                resultState = Job.JobState.ERROR;
                            }
                            finally {
                                currentThread.setPriority(oldPriority);
                                currentThread.setName(oldName);
                                if (result != null) {
                                    if (result.getRetryDelayInMs() != null) {
                                        job.setProperty(":slingevent:delayOverride", result.getRetryDelayInMs());
                                    }
                                    if (result.getMessage() != null) {
                                        job.setProperty("slingevent:resultMessage", result.getMessage());
                                    }
                                    AbstractJobQueue.this.finishedJob(job.getId(), resultState, false);
                                }
                            }
                        }
                    };
                    ThreadPool pool = this.threadPool;
                    if (pool != null) {
                        pool.execute(task);
                    } else {
                        new Thread(task).start();
                    }
                } else {
                    Map<String, JobHandler> queueTime = this.startedJobsLists;
                    synchronized (queueTime) {
                        this.startedJobsLists.put(job.getId(), handler);
                    }
                    Event jobEvent = this.getJobEvent(handler);
                    this.services.eventAdmin.postEvent(jobEvent);
                }
                return true;
            }
            catch (Exception re) {
                this.logger.error("Exception during job processing.", (Throwable)re);
            }
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("Discarding removed job {}", (Object)Utility.toString(job));
        }
        return false;
    }

    private RescheduleInfo handleReschedule(JobHandler handler, Job.JobState resultState) {
        RescheduleInfo info = new RescheduleInfo();
        switch (resultState) {
            case SUCCEEDED: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Finished job {}", (Object)Utility.toString(handler.getJob()));
                }
                info.processingTime = System.currentTimeMillis() - handler.started;
                NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/FINISHED", handler.getJob(), info.processingTime);
                break;
            }
            case QUEUED: {
                int retries = (Integer)handler.getJob().getProperty("event.job.retries");
                int retryCount = (Integer)handler.getJob().getProperty("event.job.retrycount");
                if (retries != -1 && ++retryCount > retries) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cancelled job {}", (Object)Utility.toString(handler.getJob()));
                    }
                    NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/CANCELLED", handler.getJob(), null);
                    break;
                }
                info.reschedule = true;
                handler.getJob().retry();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Failed job {}", (Object)Utility.toString(handler.getJob()));
                }
                handler.queued = System.currentTimeMillis();
                NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/FAILED", handler.getJob(), null);
                break;
            }
            default: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cancelled job {}", (Object)Utility.toString(handler.getJob()));
                }
                NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/CANCELLED", handler.getJob(), null);
            }
        }
        return info;
    }

    @Override
    public boolean finishedJob(Event job, boolean shouldReschedule) {
        String location = (String)job.getProperty("slingevent:eventId");
        return this.finishedJob(location, shouldReschedule ? Job.JobState.QUEUED : Job.JobState.SUCCEEDED, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean finishedJob(String jobId, Job.JobState resultState, boolean isAsync) {
        JobHandler handler;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Received finish for job {}, resultState={}", (Object)jobId, (Object)resultState);
        }
        Map<String, JobHandler> map = this.startedJobsLists;
        synchronized (map) {
            this.startedJobsLists.remove(jobId);
        }
        Map<String, JobHandler> map2 = this.processingJobsLists;
        synchronized (map2) {
            handler = this.processingJobsLists.remove(jobId);
        }
        if (!this.running) {
            this.logger.warn("Queue is not running anymore. Discarding finish for {}", (Object)jobId);
            return false;
        }
        if (handler == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("This job has never been started by this queue: {}", (Object)jobId);
            }
            return false;
        }
        RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
        if (resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule) {
            resultState = Job.JobState.GIVEN_UP;
        }
        if (!rescheduleInfo.reschedule) {
            boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
            handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
        } else {
            this.reschedule(handler);
        }
        this.notifyFinished(rescheduleInfo.reschedule);
        return rescheduleInfo.reschedule;
    }

    private Event getJobEvent(JobHandler info) {
        String eventTopic = info.getJob().getTopic();
        Hashtable<String, Object> properties = new Hashtable<String, Object>();
        for (String name : info.getJob().getPropertyNames()) {
            ((Dictionary)properties).put(name, info.getJob().getProperty(name));
        }
        ((Dictionary)properties).put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifier.NotifierContext(this));
        ((Dictionary)properties).remove("event.distribute");
        ((Dictionary)properties).remove("event.application");
        return new Event(eventTopic, properties);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean sendAcknowledge(Event job) {
        JobHandler ack;
        String jobId = (String)job.getProperty("slingevent:eventId");
        Map<String, JobHandler> map = this.startedJobsLists;
        synchronized (map) {
            ack = this.startedJobsLists.remove(jobId);
        }
        if (ack != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Received ack for job {}", (Object)Utility.toString(ack.getJob()));
            }
            long queueTime = ack.started - ack.queued;
            NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/START", ack.getJob(), queueTime);
            Map<String, JobHandler> map2 = this.processingJobsLists;
            synchronized (map2) {
                this.processingJobsLists.put(jobId, ack);
            }
        }
        return ack != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume() {
        Object object = this.suspendLock;
        synchronized (object) {
            if (this.suspendedSince != -1L) {
                this.logger.debug("Waking up suspended queue {}", (Object)this.queueName);
                this.suspendedSince = -1L;
                this.suspendLock.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void suspend() {
        Object object = this.suspendLock;
        synchronized (object) {
            if (this.suspendedSince == -1L) {
                this.logger.debug("Suspending queue {}", (Object)this.queueName);
                this.suspendedSince = System.currentTimeMillis();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isSuspended() {
        Object object = this.suspendLock;
        synchronized (object) {
            return this.suspendedSince != -1L;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void removeAll() {
        Set<String> topics = this.cache.getTopics();
        this.logger.debug("Removing all jobs for queue {} : {}", (Object)this.queueName, topics);
        if (!topics.isEmpty()) {
            ResourceResolver resolver = this.services.configuration.createResourceResolver();
            try {
                Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
                if (baseResource != null) {
                    final BatchResourceRemover brr = new BatchResourceRemover();
                    for (String t : topics) {
                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
                        if (topicResource == null) continue;
                        JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.JobCallback(){

                            @Override
                            public boolean handle(JobImpl job) {
                                Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
                                if (jobResource != null) {
                                    try {
                                        brr.delete(jobResource);
                                    }
                                    catch (PersistenceException ignore) {
                                        AbstractJobQueue.this.logger.error("Unable to remove job " + job, (Throwable)ignore);
                                        topicResource.getResourceResolver().revert();
                                        topicResource.getResourceResolver().refresh();
                                    }
                                }
                                return true;
                            }
                        });
                    }
                    try {
                        resolver.commit();
                    }
                    catch (PersistenceException ignore) {
                        this.logger.error("Unable to remove jobs", (Throwable)ignore);
                    }
                }
            }
            finally {
                resolver.close();
            }
        }
    }

    @Override
    public void clear() {
    }

    @Override
    public Object getState(String key) {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getStateInfo() {
        Object object = this.suspendLock;
        synchronized (object) {
            return "isWaiting=" + this.isWaiting + ", suspendedSince=" + this.suspendedSince + ", asyncJobs=" + this.asyncCounter.get();
        }
    }

    protected long getRetryDelay(JobHandler handler) {
        long delay = this.configuration.getRetryDelayInMs();
        if (handler.getJob().getProperty(":slingevent:delayOverride") != null) {
            delay = (Long)((Object)handler.getJob().getProperty(":slingevent:delayOverride", Long.class));
        } else if (handler.getJob().getProperty("event.job.retrydelay") != null) {
            delay = (Long)((Object)handler.getJob().getProperty("event.job.retrydelay", Long.class));
        }
        return delay;
    }

    protected void reschedule(JobHandler handler) {
        this.requeue(handler);
    }

    protected void ignoreException(Exception e) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Ignored exception " + e.getMessage(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean stopJob(JobImpl job) {
        JobHandler handler;
        Map<String, JobHandler> map = this.processingJobsLists;
        synchronized (map) {
            handler = this.processingJobsLists.get(job.getId());
        }
        if (handler != null) {
            handler.stop();
        }
        return handler != null;
    }

    protected abstract void start(JobHandler var1);

    protected abstract void notifyFinished(boolean var1);

    private static final class RescheduleInfo {
        public boolean reschedule = false;
        public long processingTime;

        private RescheduleInfo() {
        }
    }
}

