/*
 * Decompiled with CFR 0.152.
 */
package net.greghaines.jesque.worker;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.JobFailure;
import net.greghaines.jesque.WorkerStatus;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.VersionUtils;
import net.greghaines.jesque.worker.DefaultExceptionHandler;
import net.greghaines.jesque.worker.ExceptionHandler;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerAware;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerListener;
import net.greghaines.jesque.worker.WorkerListenerDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;

public class WorkerImpl
implements Worker {
    private static final Logger log = LoggerFactory.getLogger(WorkerImpl.class);
    private static final AtomicLong workerCounter = new AtomicLong(0L);
    protected static final long emptyQueueSleepTime = 500L;
    protected static final long reconnectSleepTime = 5000L;
    protected static final int reconnectAttempts = 120;
    private static volatile boolean threadNameChangingEnabled = false;
    protected final Config config;
    protected final Jedis jedis;
    protected final String namespace;
    protected final BlockingDeque<String> queueNames = new LinkedBlockingDeque<String>();
    private final ConcurrentMap<String, Class<?>> jobTypes = new ConcurrentHashMap();
    private final String name;
    protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();
    protected final AtomicReference<JobExecutor.State> state = new AtomicReference<JobExecutor.State>(JobExecutor.State.NEW);
    private final AtomicBoolean paused = new AtomicBoolean(false);
    private final AtomicBoolean processingJob = new AtomicBoolean(false);
    private final long workerId = workerCounter.getAndIncrement();
    private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
    private final AtomicReference<Thread> threadRef = new AtomicReference<Object>(null);
    private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<DefaultExceptionHandler>(new DefaultExceptionHandler());

    public static boolean isThreadNameChangingEnabled() {
        return threadNameChangingEnabled;
    }

    public static void setThreadNameChangingEnabled(boolean enabled) {
        threadNameChangingEnabled = enabled;
    }

    protected static void checkQueues(Iterable<String> queues) {
        if (queues == null) {
            throw new IllegalArgumentException("queues must not be null");
        }
        for (String queue : queues) {
            if (queue != null && !"".equals(queue)) continue;
            throw new IllegalArgumentException("queues' members must not be null: " + queues);
        }
    }

    public WorkerImpl(Config config, Collection<String> queues, Map<String, ? extends Class<?>> jobTypes) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        WorkerImpl.checkQueues(queues);
        this.checkJobTypes(jobTypes);
        this.config = config;
        this.namespace = config.getNamespace();
        this.jedis = new Jedis(config.getHost(), config.getPort(), config.getTimeout());
        this.authenticateAndSelectDB();
        this.name = this.createName();
        this.setQueues(queues);
        this.setJobTypes(jobTypes);
    }

    public long getWorkerId() {
        return this.workerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block4: {
            block3: {
                if (!this.state.compareAndSet(JobExecutor.State.NEW, JobExecutor.State.RUNNING)) break block3;
                try {
                    this.renameThread("RUNNING");
                    this.threadRef.set(Thread.currentThread());
                    this.jedis.sadd(this.key("workers"), new String[]{this.name});
                    this.jedis.set(this.key("worker", this.name, "started"), new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date()));
                    this.listenerDelegate.fireEvent(WorkerEvent.WORKER_START, this, null, null, null, null, null);
                    this.poll();
                    this.renameThread("STOPPING");
                    this.listenerDelegate.fireEvent(WorkerEvent.WORKER_STOP, this, null, null, null, null, null);
                }
                catch (Throwable throwable) {
                    this.renameThread("STOPPING");
                    this.listenerDelegate.fireEvent(WorkerEvent.WORKER_STOP, this, null, null, null, null, null);
                    this.jedis.srem(this.key("workers"), new String[]{this.name});
                    this.jedis.del(new String[]{this.key("worker", this.name), this.key("worker", this.name, "started"), this.key("stat", "failed", this.name), this.key("stat", "processed", this.name)});
                    this.jedis.quit();
                    this.threadRef.set(null);
                    throw throwable;
                }
                break block4;
            }
            if (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
                throw new IllegalStateException("This WorkerImpl is already running");
            }
            throw new IllegalStateException("This WorkerImpl is shutdown");
        }
        this.jedis.srem(this.key("workers"), new String[]{this.name});
        this.jedis.del(new String[]{this.key("worker", this.name), this.key("worker", this.name, "started"), this.key("stat", "failed", this.name), this.key("stat", "processed", this.name)});
        this.jedis.quit();
        this.threadRef.set(null);
    }

    @Override
    public void end(boolean now) {
        Thread workerThread;
        this.state.set(JobExecutor.State.SHUTDOWN);
        if (now && (workerThread = this.threadRef.get()) != null) {
            workerThread.interrupt();
        }
        this.togglePause(false);
    }

    @Override
    public boolean isShutdown() {
        return JobExecutor.State.SHUTDOWN.equals((Object)this.state.get());
    }

    @Override
    public boolean isPaused() {
        return this.paused.get();
    }

    @Override
    public boolean isProcessingJob() {
        return this.processingJob.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void togglePause(boolean paused) {
        this.paused.set(paused);
        AtomicBoolean atomicBoolean = this.paused;
        synchronized (atomicBoolean) {
            this.paused.notifyAll();
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public void addListener(WorkerListener listener) {
        this.listenerDelegate.addListener(listener);
    }

    @Override
    public void addListener(WorkerListener listener, WorkerEvent ... events) {
        this.listenerDelegate.addListener(listener, events);
    }

    @Override
    public void removeListener(WorkerListener listener) {
        this.listenerDelegate.removeListener(listener);
    }

    @Override
    public void removeListener(WorkerListener listener, WorkerEvent ... events) {
        this.listenerDelegate.removeListener(listener, events);
    }

    @Override
    public void removeAllListeners() {
        this.listenerDelegate.removeAllListeners();
    }

    @Override
    public void removeAllListeners(WorkerEvent ... events) {
        this.listenerDelegate.removeAllListeners(events);
    }

    @Override
    public Collection<String> getQueues() {
        return Collections.unmodifiableCollection(this.queueNames);
    }

    @Override
    public void addQueue(String queueName) {
        if (queueName == null || "".equals(queueName)) {
            throw new IllegalArgumentException("queueName must not be null or empty: " + queueName);
        }
        this.queueNames.add(queueName);
    }

    @Override
    public void removeQueue(String queueName, boolean all) {
        if (queueName == null || "".equals(queueName)) {
            throw new IllegalArgumentException("queueName must not be null or empty: " + queueName);
        }
        if (all) {
            boolean tryAgain = true;
            while (tryAgain) {
                tryAgain = this.queueNames.remove(queueName);
            }
        } else {
            this.queueNames.remove(queueName);
        }
    }

    @Override
    public void removeAllQueues() {
        this.queueNames.clear();
    }

    @Override
    public void setQueues(Collection<String> queues) {
        WorkerImpl.checkQueues(queues);
        this.queueNames.clear();
        this.queueNames.addAll(queues == ALL_QUEUES ? this.jedis.smembers(this.key("queues")) : queues);
    }

    @Override
    public Map<String, Class<?>> getJobTypes() {
        return Collections.unmodifiableMap(this.jobTypes);
    }

    @Override
    public void addJobType(String jobName, Class<?> jobType) {
        this.checkJobType(jobName, jobType);
        this.jobTypes.put(jobName, jobType);
    }

    @Override
    public void removeJobType(Class<?> jobType) {
        if (jobType == null) {
            throw new IllegalArgumentException("jobType must not be null");
        }
        this.jobTypes.values().remove(jobType);
    }

    @Override
    public void removeJobName(String jobName) {
        if (jobName == null) {
            throw new IllegalArgumentException("jobName must not be null");
        }
        this.jobTypes.remove(jobName);
    }

    @Override
    public void setJobTypes(Map<String, ? extends Class<?>> jobTypes) {
        this.checkJobTypes(jobTypes);
        this.jobTypes.clear();
        for (Map.Entry<String, Class<?>> entry : jobTypes.entrySet()) {
            this.addJobType(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandlerRef.get();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler must not be null");
        }
        this.exceptionHandlerRef.set(exceptionHandler);
    }

    @Override
    public void join(long millis) throws InterruptedException {
        Thread workerThread = this.threadRef.get();
        if (workerThread != null && workerThread.isAlive()) {
            workerThread.join(millis);
        }
    }

    protected int getReconnectAttempts() {
        return 120;
    }

    protected void poll() {
        int missCount = 0;
        String curQueue = null;
        while (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
            try {
                if (threadNameChangingEnabled) {
                    this.renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
                }
                if ((curQueue = this.queueNames.poll(500L, TimeUnit.MILLISECONDS)) == null) continue;
                this.queueNames.add(curQueue);
                this.checkPaused();
                if (!JobExecutor.State.RUNNING.equals((Object)this.state.get())) continue;
                this.listenerDelegate.fireEvent(WorkerEvent.WORKER_POLL, this, curQueue, null, null, null, null);
                String payload = this.jedis.lpop(this.key("queue", curQueue));
                if (payload != null) {
                    Job job = (Job)ObjectMapperFactory.get().readValue(payload, Job.class);
                    this.process(job, curQueue);
                    missCount = 0;
                    continue;
                }
                if (++missCount < this.queueNames.size() || !JobExecutor.State.RUNNING.equals((Object)this.state.get())) continue;
                missCount = 0;
                Thread.sleep(500L);
            }
            catch (InterruptedException ie) {
                if (this.isShutdown()) continue;
                this.recoverFromException(curQueue, ie);
            }
            catch (Exception e) {
                this.recoverFromException(curQueue, e);
            }
        }
    }

    protected void recoverFromException(String curQueue, Exception e) {
        RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, e, curQueue);
        switch (recoveryStrategy) {
            case RECONNECT: {
                log.info("Reconnecting to Redis in response to exception", (Throwable)e);
                int reconAttempts = this.getReconnectAttempts();
                if (!JedisUtils.reconnect(this.jedis, reconAttempts, 5000L)) {
                    log.warn("Terminating in response to exception after " + reconAttempts + " to reconnect", (Throwable)e);
                    this.end(false);
                    break;
                }
                this.authenticateAndSelectDB();
                log.info("Reconnected to Redis");
                break;
            }
            case TERMINATE: {
                log.warn("Terminating in response to exception", (Throwable)e);
                this.end(false);
                break;
            }
            case PROCEED: {
                this.listenerDelegate.fireEvent(WorkerEvent.WORKER_ERROR, this, curQueue, null, null, null, e);
                break;
            }
            default: {
                log.error("Unknown RecoveryStrategy: " + (Object)((Object)recoveryStrategy) + " while attempting to recover from the following exception; worker proceeding...", (Throwable)e);
            }
        }
    }

    private void authenticateAndSelectDB() {
        if (this.config.getPassword() != null) {
            this.jedis.auth(this.config.getPassword());
        }
        this.jedis.select(this.config.getDatabase());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void checkPaused() throws IOException {
        if (this.paused.get()) {
            AtomicBoolean atomicBoolean = this.paused;
            synchronized (atomicBoolean) {
                if (this.paused.get()) {
                    this.jedis.set(this.key("worker", this.name), this.pauseMsg());
                }
                while (this.paused.get()) {
                    try {
                        this.paused.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
                this.jedis.del(new String[]{this.key("worker", this.name)});
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void process(Job job, String curQueue) {
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                this.renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(WorkerEvent.JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(this.key("worker", this.name), this.statusMsg(curQueue, job));
            Object instance = JesqueUtils.materializeJob(job, this.jobTypes);
            Object result = this.execute(job, curQueue, instance);
            this.success(job, instance, result, curQueue);
        }
        catch (Exception e) {
            try {
                this.failure(e, job, curQueue);
            }
            catch (Throwable throwable) {
                this.jedis.del(new String[]{this.key("worker", this.name)});
                this.processingJob.set(false);
                throw throwable;
            }
            this.jedis.del(new String[]{this.key("worker", this.name)});
            this.processingJob.set(false);
        }
        this.jedis.del(new String[]{this.key("worker", this.name)});
        this.processingJob.set(false);
    }

    protected Object execute(Job job, String curQueue, Object instance) throws Exception {
        Object result;
        if (instance instanceof WorkerAware) {
            ((WorkerAware)instance).setWorker(this);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_EXECUTE, this, curQueue, job, instance, null, null);
        if (instance instanceof Callable) {
            result = ((Callable)instance).call();
        } else if (instance instanceof Runnable) {
            ((Runnable)instance).run();
            result = null;
        } else {
            throw new ClassCastException("Instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
        }
        return result;
    }

    protected void success(Job job, Object runner, Object result, String curQueue) {
        JedisUtils.ensureJedisConnection(this.jedis);
        try {
            this.jedis.incr(this.key("stat", "processed"));
            this.jedis.incr(this.key("stat", "processed", this.name));
        }
        catch (JedisException je) {
            log.warn("Error updating success stats for job=" + job, (Throwable)je);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_SUCCESS, this, curQueue, job, runner, result, null);
    }

    protected void failure(Exception ex, Job job, String curQueue) {
        JedisUtils.ensureJedisConnection(this.jedis);
        try {
            this.jedis.incr(this.key("stat", "failed"));
            this.jedis.incr(this.key("stat", "failed", this.name));
            this.jedis.rpush(this.key("failed"), new String[]{this.failMsg(ex, curQueue, job)});
        }
        catch (JedisException je) {
            log.warn("Error updating failure stats for exception=" + ex + " job=" + job, (Throwable)je);
        }
        catch (IOException ioe) {
            log.warn("Error serializing failure payload for exception=" + ex + " job=" + job, (Throwable)ioe);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_FAILURE, this, curQueue, job, null, null, ex);
    }

    protected String failMsg(Exception ex, String queue, Job job) throws IOException {
        JobFailure f = new JobFailure();
        f.setFailedAt(new Date());
        f.setWorker(this.name);
        f.setQueue(queue);
        f.setPayload(job);
        f.setException(ex);
        return ObjectMapperFactory.get().writeValueAsString((Object)f);
    }

    protected String statusMsg(String queue, Job job) throws IOException {
        WorkerStatus s = new WorkerStatus();
        s.setRunAt(new Date());
        s.setQueue(queue);
        s.setPayload(job);
        return ObjectMapperFactory.get().writeValueAsString((Object)s);
    }

    protected String pauseMsg() throws IOException {
        WorkerStatus s = new WorkerStatus();
        s.setRunAt(new Date());
        s.setPaused(this.isPaused());
        return ObjectMapperFactory.get().writeValueAsString((Object)s);
    }

    protected String createName() {
        StringBuilder sb = new StringBuilder(128);
        try {
            sb.append(InetAddress.getLocalHost().getHostName()).append(":").append(ManagementFactory.getRuntimeMXBean().getName().split("@")[0]).append('-').append(this.workerId).append(":").append("JAVA_DYNAMIC_QUEUES");
            for (String queueName : this.queueNames) {
                sb.append(',').append(queueName);
            }
        }
        catch (UnknownHostException uhe) {
            throw new RuntimeException(uhe);
        }
        return sb.toString();
    }

    protected String key(String ... parts) {
        return JesqueUtils.createKey(this.namespace, parts);
    }

    protected void renameThread(String msg) {
        Thread.currentThread().setName(this.threadNameBase + msg);
    }

    protected void checkJobTypes(Map<String, ? extends Class<?>> jobTypes) {
        if (jobTypes == null) {
            throw new IllegalArgumentException("jobTypes must not be null");
        }
        for (Map.Entry<String, Class<?>> entry : jobTypes.entrySet()) {
            try {
                this.checkJobType(entry.getKey(), entry.getValue());
            }
            catch (IllegalArgumentException iae) {
                throw new IllegalArgumentException("jobTypes contained invalid value", iae);
            }
        }
    }

    protected void checkJobType(String jobName, Class<?> jobType) {
        if (jobName == null) {
            throw new IllegalArgumentException("jobName must not be null");
        }
        if (jobType == null) {
            throw new IllegalArgumentException("jobType must not be null");
        }
        if (!Runnable.class.isAssignableFrom(jobType) && !Callable.class.isAssignableFrom(jobType)) {
            throw new IllegalArgumentException("jobType must implement either Runnable or Callable: " + jobType);
        }
    }

    public String toString() {
        return this.namespace + ":" + "worker" + ":" + this.name;
    }
}

