/*
 * Decompiled with CFR 0.152.
 */
package net.greghaines.jesque.admin;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.admin.Admin;
import net.greghaines.jesque.admin.commands.PauseCommand;
import net.greghaines.jesque.admin.commands.ShutdownCommand;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.ConcurrentHashSet;
import net.greghaines.jesque.utils.ConcurrentSet;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.worker.DefaultExceptionHandler;
import net.greghaines.jesque.worker.ExceptionHandler;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class AdminImpl
implements Admin {
    private static final Logger log = LoggerFactory.getLogger(AdminImpl.class);
    private static final long reconnectSleepTime = 5000L;
    private static final int reconnectAttempts = 120;
    protected final Jedis jedis;
    protected final String namespace;
    private final ConcurrentMap<String, Class<?>> jobTypes = new ConcurrentHashMap();
    private final ConcurrentSet<String> channels = new ConcurrentHashSet<String>();
    protected final PubSubListener jedisPubSub = new PubSubListener();
    protected final AtomicReference<Worker> workerRef = new AtomicReference<Object>(null);
    protected final AtomicReference<JobExecutor.State> state = new AtomicReference<JobExecutor.State>(JobExecutor.State.NEW);
    private final AtomicBoolean processingJob = new AtomicBoolean(false);
    private final AtomicReference<Thread> threadRef = new AtomicReference<Object>(null);
    private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<DefaultExceptionHandler>(new DefaultExceptionHandler());

    public AdminImpl(Config config) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        this.namespace = config.getNamespace();
        this.jedis = new Jedis(config.getHost(), config.getPort(), config.getTimeout());
        if (config.getPassword() != null) {
            this.jedis.auth(config.getPassword());
        }
        this.jedis.select(config.getDatabase());
        this.setChannels(JesqueUtils.set("admin"));
        this.setJobTypes(JesqueUtils.map(JesqueUtils.entry("PauseCommand", PauseCommand.class), JesqueUtils.entry("ShutdownCommand", ShutdownCommand.class)));
    }

    public AdminImpl(Config config, Set<String> channels, Map<String, ? extends Class<?>> jobTypes) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        this.namespace = config.getNamespace();
        this.jedis = new Jedis(config.getHost(), config.getPort(), config.getTimeout());
        if (config.getPassword() != null) {
            this.jedis.auth(config.getPassword());
        }
        this.jedis.select(config.getDatabase());
        this.setChannels(channels);
        this.setJobTypes(jobTypes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.state.compareAndSet(JobExecutor.State.NEW, JobExecutor.State.RUNNING)) {
            try {
                log.debug("AdminImpl starting up");
                this.threadRef.set(Thread.currentThread());
                while (!this.isShutdown()) {
                    this.jedis.subscribe((JedisPubSub)this.jedisPubSub, this.createFullChannels());
                }
            }
            finally {
                log.debug("AdminImpl shutting down");
                this.jedis.quit();
                this.threadRef.set(null);
            }
        } else {
            if (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
                throw new IllegalStateException("This AdminImpl is already running");
            }
            throw new IllegalStateException("This AdminImpl is shutdown");
        }
    }

    @Override
    public Set<String> getChannels() {
        return Collections.unmodifiableSet(this.channels);
    }

    @Override
    public void setChannels(Set<String> channels) {
        AdminImpl.checkChannels(channels);
        this.channels.clear();
        this.channels.addAll(channels);
        if (this.jedisPubSub.isSubscribed()) {
            this.jedisPubSub.unsubscribe();
        }
    }

    @Override
    public Worker getWorker() {
        return this.workerRef.get();
    }

    @Override
    public void setWorker(Worker worker) {
        this.workerRef.set(worker);
    }

    @Override
    public void end(boolean now) {
        Thread workerThread;
        this.state.set(JobExecutor.State.SHUTDOWN);
        this.jedisPubSub.unsubscribe();
        if (now && (workerThread = this.threadRef.get()) != null) {
            workerThread.interrupt();
        }
    }

    @Override
    public boolean isShutdown() {
        return JobExecutor.State.SHUTDOWN.equals((Object)this.state.get());
    }

    @Override
    public boolean isProcessingJob() {
        return this.processingJob.get();
    }

    @Override
    public void join(long millis) throws InterruptedException {
        Thread workerThread = this.threadRef.get();
        if (workerThread != null && workerThread.isAlive()) {
            workerThread.join(millis);
        }
    }

    @Override
    public Map<String, Class<?>> getJobTypes() {
        return Collections.unmodifiableMap(this.jobTypes);
    }

    @Override
    public void addJobType(String jobName, Class<?> jobType) {
        if (jobName == null) {
            throw new IllegalArgumentException("jobName must not be null");
        }
        if (jobType == null) {
            throw new IllegalArgumentException("jobType must not be null");
        }
        if (!Runnable.class.isAssignableFrom(jobType) && !Callable.class.isAssignableFrom(jobType)) {
            throw new IllegalArgumentException("jobType must implement either Runnable or Callable: " + jobType);
        }
        this.jobTypes.put(jobName, jobType);
    }

    @Override
    public void removeJobType(Class<?> jobType) {
        if (jobType == null) {
            throw new IllegalArgumentException("jobType must not be null");
        }
        this.jobTypes.values().remove(jobType);
    }

    @Override
    public void removeJobName(String jobName) {
        if (jobName == null) {
            throw new IllegalArgumentException("jobName must not be null");
        }
        this.jobTypes.remove(jobName);
    }

    @Override
    public void setJobTypes(Map<String, ? extends Class<?>> jobTypes) {
        this.checkJobTypes(jobTypes);
        this.jobTypes.clear();
        for (Map.Entry<String, Class<?>> entry : jobTypes.entrySet()) {
            this.addJobType(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandlerRef.get();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler must not be null");
        }
        this.exceptionHandlerRef.set(exceptionHandler);
    }

    protected Object execute(Job job, String curQueue, Object instance) throws Exception {
        Object result;
        if (instance instanceof WorkerAware) {
            ((WorkerAware)instance).setWorker(this.workerRef.get());
        }
        if (instance instanceof Callable) {
            result = ((Callable)instance).call();
        } else if (instance instanceof Runnable) {
            ((Runnable)instance).run();
            result = null;
        } else {
            throw new ClassCastException("instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
        }
        return result;
    }

    protected int getReconnectAttempts() {
        return 120;
    }

    protected void recoverFromException(String channel, Exception e) {
        RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, e, channel);
        switch (recoveryStrategy) {
            case RECONNECT: {
                log.info("Reconnecting to Redis in response to exception", (Throwable)e);
                int reconAttempts = this.getReconnectAttempts();
                if (!JedisUtils.reconnect(this.jedis, reconAttempts, 5000L)) {
                    log.warn("Terminating in response to exception after " + reconAttempts + " to reconnect", (Throwable)e);
                    this.end(false);
                    break;
                }
                log.info("Reconnected to Redis");
                break;
            }
            case TERMINATE: {
                log.warn("Terminating in response to exception", (Throwable)e);
                this.end(false);
                break;
            }
            case PROCEED: {
                break;
            }
            default: {
                log.error("Unknown RecoveryStrategy: " + (Object)((Object)recoveryStrategy) + " while attempting to recover from the following exception; Admin proceeding...", (Throwable)e);
            }
        }
    }

    protected static void checkChannels(Iterable<String> channels) {
        if (channels == null) {
            throw new IllegalArgumentException("channels must not be null");
        }
        for (String channel : channels) {
            if (channel != null && !"".equals(channel)) continue;
            throw new IllegalArgumentException("channels' members must not be null: " + channels);
        }
    }

    protected void checkJobTypes(Map<String, ? extends Class<?>> jobTypes) {
        if (jobTypes == null) {
            throw new IllegalArgumentException("jobTypes must not be null");
        }
        for (Map.Entry<String, Class<?>> entry : jobTypes.entrySet()) {
            if (entry.getKey() == null) {
                throw new IllegalArgumentException("jobType's keys must not be null: " + jobTypes);
            }
            Class<?> jobType = entry.getValue();
            if (jobType == null) {
                throw new IllegalArgumentException("jobType's values must not be null: " + jobTypes);
            }
            if (Runnable.class.isAssignableFrom(jobType) || Callable.class.isAssignableFrom(jobType)) continue;
            throw new IllegalArgumentException("jobType's values must implement either Runnable or Callable: " + jobTypes);
        }
    }

    private String[] createFullChannels() {
        String[] fullChannels = this.channels.toArray(new String[this.channels.size()]);
        int i = 0;
        for (String channel : fullChannels) {
            fullChannels[i++] = JesqueUtils.createKey(this.namespace, "channel", channel);
        }
        return fullChannels;
    }

    protected class PubSubListener
    extends JedisPubSub {
        protected PubSubListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(String channel, String message) {
            if (message != null) {
                try {
                    AdminImpl.this.processingJob.set(true);
                    Job job = (Job)ObjectMapperFactory.get().readValue(message, Job.class);
                    AdminImpl.this.execute(job, channel, JesqueUtils.materializeJob(job, AdminImpl.this.jobTypes));
                }
                catch (Exception e) {
                    AdminImpl.this.recoverFromException(channel, e);
                }
                finally {
                    AdminImpl.this.processingJob.set(false);
                }
            }
        }

        public void onPMessage(String pattern, String channel, String message) {
        }

        public void onSubscribe(String channel, int subscribedChannels) {
        }

        public void onUnsubscribe(String channel, int subscribedChannels) {
        }

        public void onPUnsubscribe(String pattern, int subscribedChannels) {
        }

        public void onPSubscribe(String pattern, int subscribedChannels) {
        }
    }
}

