/*
 * Decompiled with CFR 0.152.
 */
package net.greghaines.jesque.worker;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImpl;

public class WorkerExitOnEmpty
extends WorkerImpl {
    public static final int DEFAULT_MAX_LOOPS_ON_EMPTY_QUEUES = 3;
    private final int maxLoopsOnEmptyQueues;

    public WorkerExitOnEmpty(Config config, Collection<String> queues, Map<String, ? extends Class<?>> jobTypes) {
        this(config, queues, jobTypes, 3);
    }

    public WorkerExitOnEmpty(Config config, Collection<String> queues, Map<String, ? extends Class<?>> jobTypes, int maxLoopsOnEmptyQueues) {
        super(config, queues, jobTypes);
        this.maxLoopsOnEmptyQueues = maxLoopsOnEmptyQueues;
    }

    @Override
    protected void poll() {
        int missCount = 0;
        String curQueue = null;
        int allQueuesEmptyCount = 0;
        while (JobExecutor.State.RUNNING.equals(this.state.get())) {
            try {
                if (WorkerExitOnEmpty.isThreadNameChangingEnabled()) {
                    this.renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
                }
                if ((curQueue = (String)this.queueNames.poll(500L, TimeUnit.MILLISECONDS)) == null) continue;
                this.queueNames.add(curQueue);
                this.checkPaused();
                if (!JobExecutor.State.RUNNING.equals(this.state.get())) continue;
                this.listenerDelegate.fireEvent(WorkerEvent.WORKER_POLL, this, curQueue, null, null, null, null);
                String payload = this.jedis.lpop(this.key("queue", curQueue));
                if (payload != null) {
                    Job job = (Job)ObjectMapperFactory.get().readValue(payload, Job.class);
                    this.process(job, curQueue);
                    missCount = 0;
                    allQueuesEmptyCount = 0;
                } else if (++missCount >= this.queueNames.size() && JobExecutor.State.RUNNING.equals(this.state.get())) {
                    missCount = 0;
                    Thread.sleep(500L);
                    ++allQueuesEmptyCount;
                }
                if (allQueuesEmptyCount < this.maxLoopsOnEmptyQueues) continue;
                this.end(false);
            }
            catch (Exception e) {
                this.recoverFromException(curQueue, e);
            }
        }
    }
}

