/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.impl.jobs.queues;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobQueue
extends StatisticsImpl
implements JobStatusNotifier,
Queue {
    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60000L;
    private static final long MAX_SUSPEND_TIME = 3600000L;
    protected final Logger logger;
    protected final InternalQueueConfiguration configuration;
    private final EventAdmin eventAdmin;
    private final JobConsumerManager jobConsumerManager;
    protected volatile String queueName;
    protected volatile boolean running;
    protected volatile boolean isWaiting = false;
    private final Map<String, JobHandler> startedJobsLists = new HashMap<String, JobHandler>();
    private final Map<String, JobHandler> processsingJobsLists = new HashMap<String, JobHandler>();
    private volatile long suspendedSince = -1L;
    private final Object suspendLock = new Object();
    private final AtomicInteger asyncCounter = new AtomicInteger();
    private final AtomicBoolean isOutdated = new AtomicBoolean(false);
    protected boolean isWaitingForNext = false;
    private final AtomicBoolean closeMarker = new AtomicBoolean(false);

    public AbstractJobQueue(String name, InternalQueueConfiguration config, JobConsumerManager jobConsumerManager, EventAdmin eventAdmin) {
        this.queueName = name;
        this.configuration = config;
        this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + '.' + name));
        this.running = true;
        this.eventAdmin = eventAdmin;
        this.jobConsumerManager = jobConsumerManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getStateInfo() {
        Object object = this.suspendLock;
        synchronized (object) {
            return "isWaiting=" + this.isWaiting + ", suspendedSince=" + this.suspendedSince + ", isWaitingForNext=" + this.isWaitingForNext + ", asyncJobs=" + this.asyncCounter.get();
        }
    }

    public void start() {
        Thread queueThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (AbstractJobQueue.this.running) {
                    AbstractJobQueue.this.logger.info("Starting job queue {}", (Object)AbstractJobQueue.this.queueName);
                    AbstractJobQueue.this.logger.debug("Configuration for job queue={}", (Object)AbstractJobQueue.this.configuration);
                    try {
                        AbstractJobQueue.this.runJobQueue();
                    }
                    catch (Throwable t) {
                        AbstractJobQueue.this.logger.error("Job queue " + AbstractJobQueue.this.queueName + " stopped with exception: " + t.getMessage() + ". Restarting.", t);
                    }
                }
            }
        }, "Apache Sling Job Queue " + this.queueName);
        queueThread.setDaemon(true);
        queueThread.start();
    }

    @Override
    public InternalQueueConfiguration getConfiguration() {
        return this.configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this.running = false;
        this.logger.debug("Shutting down job queue {}", (Object)this.queueName);
        this.resume();
        if (this.isWaiting) {
            this.logger.debug("Waking up waiting queue {}", (Object)this.queueName);
            this.notifyFinished(null);
        }
        this.put(new JobHandler(null, null));
        Map<String, JobHandler> map = this.processsingJobsLists;
        synchronized (map) {
            this.processsingJobsLists.clear();
        }
        map = this.startedJobsLists;
        synchronized (map) {
            this.startedJobsLists.clear();
        }
        this.logger.info("Stopped job queue {}", (Object)this.queueName);
    }

    public boolean tryToClose() {
        this.resume();
        if (this.canBeClosed()) {
            if (this.closeMarker.get()) {
                this.close();
                return true;
            }
            this.closeMarker.set(true);
        }
        return false;
    }

    protected boolean canBeClosed() {
        return this.isEmpty() && !this.isWaiting && !this.isSuspended() && this.asyncCounter.get() == 0 && this.isWaitingForNext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkForUnprocessedJobs() {
        if (this.running) {
            long tooOld = System.currentTimeMillis() - 60000L;
            ArrayList<JobHandler> restartJobs = new ArrayList<JobHandler>();
            Map<String, JobHandler> map = this.startedJobsLists;
            synchronized (map) {
                for (Map.Entry<String, JobHandler> entry : this.startedJobsLists.entrySet()) {
                    if (entry.getValue().started > tooOld) continue;
                    restartJobs.add(entry.getValue());
                }
            }
            if (restartJobs.size() > 0) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    this.ignoreException(e);
                }
            }
            for (JobHandler info : restartJobs) {
                boolean process = false;
                Map<String, JobHandler> map2 = this.startedJobsLists;
                synchronized (map2) {
                    process = this.startedJobsLists.remove(info.getJob().getId()) != null;
                }
                if (!process) continue;
                if (!info.reschedule()) {
                    this.decQueued();
                    this.checkForNotify(null);
                    continue;
                }
                this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", (Object)Utility.toString(info.getJob()), (Object)info.getJob().getId());
                this.checkForNotify(info);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean sendAcknowledge(Event job) {
        JobHandler ack;
        String jobId = (String)job.getProperty("slingevent:eventId");
        Map<String, JobHandler> map = this.startedJobsLists;
        synchronized (map) {
            ack = this.startedJobsLists.remove(jobId);
        }
        if (ack != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Received ack for job {}", (Object)Utility.toString(ack.getJob()));
            }
            long queueTime = ack.started - ack.queued;
            this.addActive(queueTime);
            Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/START", ack.getJob(), queueTime);
            Map<String, JobHandler> map2 = this.processsingJobsLists;
            synchronized (map2) {
                this.processsingJobsLists.put(jobId, ack);
            }
        } else {
            this.decQueued();
        }
        return ack != null;
    }

    private boolean handleReschedule(JobHandler jobEvent, JobConsumer.JobResult result) {
        boolean reschedule = false;
        switch (result) {
            case OK: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Finished job {}", (Object)Utility.toString(jobEvent.getJob()));
                }
                long processingTime = System.currentTimeMillis() - jobEvent.started;
                this.finishedJob(processingTime);
                Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/FINISHED", jobEvent.getJob(), processingTime);
                break;
            }
            case FAILED: {
                int retries = (Integer)jobEvent.getJob().getProperty("event.job.retries");
                int retryCount = (Integer)jobEvent.getJob().getProperty("event.job.retrycount");
                if (retries != -1 && ++retryCount > retries) {
                    reschedule = false;
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Cancelled job {}", (Object)Utility.toString(jobEvent.getJob()));
                    }
                    this.cancelledJob();
                    Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/CANCELLED", jobEvent.getJob(), null);
                    break;
                }
                reschedule = true;
                jobEvent.getJob().retry();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Failed job {}", (Object)Utility.toString(jobEvent.getJob()));
                }
                this.failedJob();
                jobEvent.queued = System.currentTimeMillis();
                Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/FAILED", jobEvent.getJob(), null);
                break;
            }
            case CANCEL: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cancelled job {}", (Object)Utility.toString(jobEvent.getJob()));
                }
                this.cancelledJob();
                Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/CANCELLED", jobEvent.getJob(), null);
                break;
            }
        }
        return reschedule;
    }

    @Override
    public boolean finishedJob(Event job, boolean shouldReschedule) {
        String location = (String)job.getProperty("slingevent:eventId");
        return this.finishedJob(location, shouldReschedule ? JobConsumer.JobResult.FAILED : JobConsumer.JobResult.OK, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean finishedJob(String jobId, JobConsumer.JobResult result, boolean isAsync) {
        JobHandler reprocessHandler;
        boolean finishSuccessful;
        JobHandler info;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Received finish for job {}, result={}", (Object)jobId, (Object)result);
        }
        Map<String, JobHandler> map = this.startedJobsLists;
        synchronized (map) {
            this.startedJobsLists.remove(jobId);
        }
        Map<String, JobHandler> map2 = this.processsingJobsLists;
        synchronized (map2) {
            info = this.processsingJobsLists.remove(jobId);
        }
        if (!this.running) {
            this.logger.warn("Queue is not running anymore. Discarding finish for {}", (Object)jobId);
            return false;
        }
        if (info == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("This job has never been started by this queue: {}", (Object)jobId);
            }
            return false;
        }
        boolean reschedule = this.handleReschedule(info, result);
        if (!reschedule) {
            info.finished();
            finishSuccessful = true;
        } else {
            finishSuccessful = info.reschedule();
        }
        if (!isAsync) {
            if (!finishSuccessful || !reschedule) {
                this.checkForNotify(null);
                return false;
            }
            this.checkForNotify(info);
            return true;
        }
        if (finishSuccessful && reschedule && (reprocessHandler = this.reschedule(info)) != null) {
            this.put(reprocessHandler);
        }
        return true;
    }

    private void checkForNotify(JobHandler info) {
        JobHandler reprocessInfo = null;
        if (info != null) {
            reprocessInfo = this.reschedule(info);
        }
        this.notifyFinished(reprocessInfo);
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    public void process(JobHandler handler) {
        this.closeMarker.set(false);
        handler.queued = System.currentTimeMillis();
        this.incQueued();
        this.put(handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkSuspended() {
        Object object = this.suspendLock;
        synchronized (object) {
            while (this.suspendedSince != -1L) {
                try {
                    this.suspendLock.wait(3600000L);
                }
                catch (InterruptedException ignore) {
                    this.ignoreException(ignore);
                }
                if (System.currentTimeMillis() <= this.suspendedSince + 3600000L) continue;
                this.resume();
            }
        }
    }

    private void runJobQueue() {
        JobHandler info = null;
        while (this.running) {
            if (info == null) {
                info = this.take();
            }
            if (this.running) {
                this.checkSuspended();
            }
            if (info == null || !this.running) continue;
            info = this.start(info);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean executeJob(JobHandler handler) {
        final JobImpl job = handler.getJob();
        final JobConsumer consumer = this.jobConsumerManager.getConsumer(job.getTopic());
        if (consumer != null || job.isBridgedEvent() && this.jobConsumerManager.supportsBridgedEvents()) {
            if (handler.start()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Starting job {}", (Object)Utility.toString(job));
                }
                try {
                    handler.started = System.currentTimeMillis();
                    if (consumer != null) {
                        long queueTime = handler.started - handler.queued;
                        this.addActive(queueTime);
                        Utility.sendNotification(this.eventAdmin, "org/apache/sling/event/notification/job/START", job, queueTime);
                        Map<String, JobHandler> map = this.processsingJobsLists;
                        synchronized (map) {
                            this.processsingJobsLists.put(job.getId(), handler);
                        }
                        Runnable task = new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                Thread currentThread = Thread.currentThread();
                                String oldName = currentThread.getName();
                                int oldPriority = currentThread.getPriority();
                                currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
                                if (job.getJobPriority() != null) {
                                    switch (job.getJobPriority()) {
                                        case NORM: {
                                            currentThread.setPriority(5);
                                            break;
                                        }
                                        case MIN: {
                                            currentThread.setPriority(1);
                                            break;
                                        }
                                        case MAX: {
                                            currentThread.setPriority(10);
                                        }
                                    }
                                }
                                JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL;
                                JobConsumer.AsyncHandler asyncHandler = new JobConsumer.AsyncHandler(){
                                    final Object asyncLock = new Object();
                                    final AtomicBoolean asyncDone = new AtomicBoolean(false);

                                    /*
                                     * WARNING - Removed try catching itself - possible behaviour change.
                                     */
                                    private void check(JobConsumer.JobResult result) {
                                        Object object = this.asyncLock;
                                        synchronized (object) {
                                            if (this.asyncDone.get()) {
                                                throw new IllegalStateException("Job is already marked as processed");
                                            }
                                            this.asyncDone.set(true);
                                            AbstractJobQueue.this.finishedJob(job.getId(), result, true);
                                            AbstractJobQueue.this.asyncCounter.decrementAndGet();
                                        }
                                    }

                                    @Override
                                    public void ok() {
                                        this.check(JobConsumer.JobResult.OK);
                                    }

                                    @Override
                                    public void failed() {
                                        this.check(JobConsumer.JobResult.FAILED);
                                    }

                                    @Override
                                    public void cancel() {
                                        this.check(JobConsumer.JobResult.CANCEL);
                                    }
                                };
                                job.setProperty(":sling:jobs:asynchandler", asyncHandler);
                                try {
                                    result = consumer.process(job);
                                }
                                catch (Throwable t) {
                                    AbstractJobQueue.this.logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                                    result = JobConsumer.JobResult.CANCEL;
                                }
                                finally {
                                    currentThread.setPriority(oldPriority);
                                    currentThread.setName(oldName);
                                    if (result != JobConsumer.JobResult.ASYNC) {
                                        AbstractJobQueue.this.finishedJob(job.getId(), result, false);
                                    }
                                }
                                if (result == JobConsumer.JobResult.ASYNC) {
                                    AbstractJobQueue.this.asyncCounter.incrementAndGet();
                                    AbstractJobQueue.this.notifyFinished(null);
                                }
                            }
                        };
                        ThreadPool pool = Environment.THREAD_POOL;
                        if (pool != null) {
                            pool.execute(task);
                        } else {
                            new Thread(task).start();
                        }
                    } else {
                        Map<String, JobHandler> queueTime = this.startedJobsLists;
                        synchronized (queueTime) {
                            this.startedJobsLists.put(job.getId(), handler);
                        }
                        Event jobEvent = this.getJobEvent(handler);
                        this.eventAdmin.postEvent(jobEvent);
                    }
                    return true;
                }
                catch (Exception re) {
                    this.logger.error("Exception during job processing.", (Throwable)re);
                }
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("Discarding removed job {}", (Object)Utility.toString(job));
            }
        } else {
            handler.reassign();
        }
        this.decQueued();
        return false;
    }

    private Event getJobEvent(JobHandler info) {
        String eventTopic = info.getJob().getTopic();
        Hashtable<String, Object> properties = new Hashtable<String, Object>();
        for (String name : info.getJob().getPropertyNames()) {
            ((Dictionary)properties).put(name, info.getJob().getProperty(name));
        }
        ((Dictionary)properties).put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifier.NotifierContext(this));
        ((Dictionary)properties).remove("event.distribute");
        ((Dictionary)properties).remove("event.application");
        return new Event(eventTopic, properties);
    }

    protected void ignoreException(Exception e) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Ignored exception " + e.getMessage(), (Throwable)e);
        }
    }

    protected boolean isOutdated() {
        return this.isOutdated.get();
    }

    public void outdate() {
        if (!this.isOutdated()) {
            this.isOutdated.set(true);
            String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
            this.logger.info("Outdating queue {}, renaming to {}.", (Object)this.queueName, (Object)name);
            this.queueName = name;
        }
    }

    @Override
    public Statistics getStatistics() {
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume() {
        Object object = this.suspendLock;
        synchronized (object) {
            if (this.suspendedSince != -1L) {
                this.logger.debug("Waking up suspended queue {}", (Object)this.queueName);
                this.suspendedSince = -1L;
                this.suspendLock.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void suspend() {
        Object object = this.suspendLock;
        synchronized (object) {
            if (this.suspendedSince == -1L) {
                this.logger.debug("Suspending queue {}", (Object)this.queueName);
                this.suspendedSince = System.currentTimeMillis();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isSuspended() {
        Object object = this.suspendLock;
        synchronized (object) {
            return this.suspendedSince != -1L;
        }
    }

    @Override
    public synchronized void removeAll() {
        boolean wasSuspended = this.isSuspended();
        this.suspend();
        final Collection<JobHandler> events = this.removeAllJobs();
        this.clearQueued();
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                for (JobHandler job : events) {
                    job.remove();
                }
            }
        }, "Queue RemoveAll Thread for " + this.queueName);
        t.setDaemon(true);
        t.start();
        if (!wasSuspended) {
            this.resume();
        }
    }

    @Override
    public void clear() {
        this.clearQueued();
    }

    @Override
    public Object getState(String key) {
        return null;
    }

    protected abstract JobHandler reschedule(JobHandler var1);

    protected abstract void put(JobHandler var1);

    protected abstract JobHandler take();

    protected abstract boolean isEmpty();

    protected abstract Collection<JobHandler> removeAllJobs();

    protected abstract JobHandler start(JobHandler var1);

    protected abstract void notifyFinished(JobHandler var1);
}

