/*
 * Decompiled with CFR 0.152.
 */
package net.greghaines.jesque.admin;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.admin.Admin;
import net.greghaines.jesque.admin.commands.PauseCommand;
import net.greghaines.jesque.admin.commands.ShutdownCommand;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.ConcurrentHashSet;
import net.greghaines.jesque.utils.ConcurrentSet;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.PoolUtils;
import net.greghaines.jesque.worker.DefaultExceptionHandler;
import net.greghaines.jesque.worker.ExceptionHandler;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.util.Pool;

public class AdminPoolImpl
implements Admin {
    private static final Logger LOG = LoggerFactory.getLogger(AdminPoolImpl.class);
    protected final Pool<Jedis> jedisPool;
    protected final String namespace;
    private final JobFactory jobFactory;
    private final ConcurrentSet<String> channels = new ConcurrentHashSet<String>();
    protected final PubSubListener jedisPubSub = new PubSubListener();
    protected final AtomicReference<Worker> workerRef = new AtomicReference<Object>(null);
    protected final AtomicReference<JobExecutor.State> state = new AtomicReference<JobExecutor.State>(JobExecutor.State.NEW);
    private final AtomicBoolean processingJob = new AtomicBoolean(false);
    private final AtomicReference<Thread> threadRef = new AtomicReference<Object>(null);
    private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<DefaultExceptionHandler>(new DefaultExceptionHandler());

    public AdminPoolImpl(Config config, Pool<Jedis> jedisPool) {
        this(config, JesqueUtils.set("admin"), new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("PauseCommand", PauseCommand.class), JesqueUtils.entry("ShutdownCommand", ShutdownCommand.class))), jedisPool);
    }

    public AdminPoolImpl(Config config, Set<String> channels, JobFactory jobFactory, Pool<Jedis> jedisPool) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        if (jobFactory == null) {
            throw new IllegalArgumentException("jobFactory must not be null");
        }
        if (jedisPool == null) {
            throw new IllegalArgumentException("jedisPool must not be null");
        }
        this.namespace = config.getNamespace();
        this.jedisPool = jedisPool;
        this.setChannels(channels);
        this.jobFactory = jobFactory;
    }

    @Override
    public void run() {
        if (this.state.compareAndSet(JobExecutor.State.NEW, JobExecutor.State.RUNNING)) {
            try {
                LOG.debug("AdminImpl starting up");
                this.threadRef.set(Thread.currentThread());
                while (!this.isShutdown()) {
                    PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolUtils.PoolWork<Jedis, Void>(){

                        @Override
                        public Void doWork(Jedis jedis) {
                            jedis.subscribe((JedisPubSub)AdminPoolImpl.this.jedisPubSub, AdminPoolImpl.this.createFullChannels());
                            return null;
                        }
                    });
                }
            }
            finally {
                LOG.debug("AdminImpl shutting down");
                this.threadRef.set(null);
            }
        } else {
            if (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
                throw new IllegalStateException("This AdminImpl is already running");
            }
            throw new IllegalStateException("This AdminImpl is shutdown");
        }
    }

    @Override
    public Set<String> getChannels() {
        return Collections.unmodifiableSet(this.channels);
    }

    @Override
    public void setChannels(Set<String> channels) {
        AdminPoolImpl.checkChannels(channels);
        this.channels.clear();
        this.channels.addAll(channels);
        if (this.jedisPubSub.isSubscribed()) {
            this.jedisPubSub.unsubscribe();
        }
    }

    @Override
    public Worker getWorker() {
        return this.workerRef.get();
    }

    @Override
    public void setWorker(Worker worker) {
        this.workerRef.set(worker);
    }

    @Override
    public void end(boolean now) {
        Thread workerThread;
        this.state.set(JobExecutor.State.SHUTDOWN);
        this.jedisPubSub.unsubscribe();
        if (now && (workerThread = this.threadRef.get()) != null) {
            workerThread.interrupt();
        }
    }

    @Override
    public boolean isShutdown() {
        return JobExecutor.State.SHUTDOWN.equals((Object)this.state.get());
    }

    @Override
    public boolean isProcessingJob() {
        return this.processingJob.get();
    }

    @Override
    public void join(long millis) throws InterruptedException {
        Thread workerThread = this.threadRef.get();
        if (workerThread != null && workerThread.isAlive()) {
            workerThread.join(millis);
        }
    }

    @Override
    public JobFactory getJobFactory() {
        return this.jobFactory;
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandlerRef.get();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler must not be null");
        }
        this.exceptionHandlerRef.set(exceptionHandler);
    }

    protected Object execute(Job job, String curQueue, Object instance) throws Exception {
        Object result;
        if (instance instanceof WorkerAware) {
            ((WorkerAware)instance).setWorker(this.workerRef.get());
        }
        if (instance instanceof Callable) {
            result = ((Callable)instance).call();
        } else if (instance instanceof Runnable) {
            ((Runnable)instance).run();
            result = null;
        } else {
            throw new ClassCastException("instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
        }
        return result;
    }

    protected void recoverFromException(String channel, Exception e) {
        RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, e, channel);
        switch (recoveryStrategy) {
            case RECONNECT: {
                LOG.info("Ignoring RECONNECT strategy in response to exception because this is a pool", (Throwable)e);
                break;
            }
            case TERMINATE: {
                LOG.warn("Terminating in response to exception", (Throwable)e);
                this.end(false);
                break;
            }
            case PROCEED: {
                break;
            }
            default: {
                LOG.error("Unknown RecoveryStrategy: " + (Object)((Object)recoveryStrategy) + " while attempting to recover from the following exception; Admin proceeding...", (Throwable)e);
            }
        }
    }

    protected static void checkChannels(Iterable<String> channels) {
        if (channels == null) {
            throw new IllegalArgumentException("channels must not be null");
        }
        for (String channel : channels) {
            if (channel != null && !"".equals(channel)) continue;
            throw new IllegalArgumentException("channels' members must not be null: " + channels);
        }
    }

    private String[] createFullChannels() {
        String[] fullChannels = this.channels.toArray(new String[this.channels.size()]);
        int i = 0;
        for (String channel : fullChannels) {
            fullChannels[i++] = JesqueUtils.createKey(this.namespace, "channel", channel);
        }
        return fullChannels;
    }

    protected class PubSubListener
    extends JedisPubSub {
        protected PubSubListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(String channel, String message) {
            if (message != null) {
                try {
                    AdminPoolImpl.this.processingJob.set(true);
                    Job job = (Job)ObjectMapperFactory.get().readValue(message, Job.class);
                    AdminPoolImpl.this.execute(job, channel, AdminPoolImpl.this.jobFactory.materializeJob(job));
                }
                catch (Exception e) {
                    AdminPoolImpl.this.recoverFromException(channel, e);
                }
                finally {
                    AdminPoolImpl.this.processingJob.set(false);
                }
            }
        }

        public void onPMessage(String pattern, String channel, String message) {
        }

        public void onSubscribe(String channel, int subscribedChannels) {
        }

        public void onUnsubscribe(String channel, int subscribedChannels) {
        }

        public void onPUnsubscribe(String pattern, int subscribedChannels) {
        }

        public void onPSubscribe(String pattern, int subscribedChannels) {
        }
    }
}

