/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.tasks.manager.redisqueue;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.factory.EngineGraknGraphFactory;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.tasks.manager.redisqueue.RedisInflightTaskConsumer;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskManager;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskQueueConsumer;
import ai.grakn.engine.tasks.manager.redisqueue.Task;
import ai.grakn.engine.util.EngineID;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerPool;
import net.greghaines.jesque.worker.WorkerPoolImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

class RedisTaskQueue {
    private static final Logger LOG = LoggerFactory.getLogger(RedisTaskQueue.class);
    private static final String QUEUE_NAME = "grakn_engine_queue";
    private static final String SUBSCRIPTION_CLASS_NAME = Task.class.getName();
    static final MapBasedJobFactory JOB_FACTORY = new MapBasedJobFactory(JesqueUtils.map((Map.Entry[])new Map.Entry[]{JesqueUtils.entry((Object)SUBSCRIPTION_CLASS_NAME, RedisTaskQueueConsumer.class)}));
    private final Client redisClient;
    private final Config config;
    private final Histogram queueSize;
    private final Meter failures;
    private final int processingDelay;
    private final Timer timer;
    private Pool<Jedis> jedisPool;
    private LockProvider lockProvider;
    private final MetricRegistry metricRegistry;
    private final Meter putJobMeter;
    private WorkerPool workerPool;

    RedisTaskQueue(Pool<Jedis> jedisPool, LockProvider lockProvider, MetricRegistry metricRegistry, int processingDelay) {
        this.jedisPool = jedisPool;
        this.lockProvider = lockProvider;
        this.metricRegistry = metricRegistry;
        this.config = new ConfigBuilder().build();
        this.redisClient = new ClientPoolImpl(this.config, jedisPool);
        this.processingDelay = processingDelay;
        this.putJobMeter = metricRegistry.meter(MetricRegistry.name(RedisTaskQueue.class, (String[])new String[]{"put-job"}));
        this.queueSize = metricRegistry.histogram(MetricRegistry.name(RedisTaskQueue.class, (String[])new String[]{"queue-size"}));
        this.failures = metricRegistry.meter(MetricRegistry.name(RedisTaskQueue.class, (String[])new String[]{"failures"}));
        this.timer = new Timer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        this.timer.cancel();
        RedisTaskQueue redisTaskQueue = this;
        synchronized (redisTaskQueue) {
            if (this.workerPool != null) {
                this.workerPool.end(false);
            }
        }
        this.redisClient.end();
    }

    void putJob(Task job) {
        this.putJobMeter.mark();
        LOG.debug("Enqueuing job {}", (Object)job.getTaskState().getId());
        Job queueJob = new Job(SUBSCRIPTION_CLASS_NAME, new Object[]{job});
        this.redisClient.enqueue(QUEUE_NAME, queueJob);
    }

    void runInFlightProcessor() {
        this.timer.scheduleAtFixedRate((TimerTask)new RedisInflightTaskConsumer(this.jedisPool, Duration.ofSeconds(this.processingDelay), this.config, QUEUE_NAME), new Date(), 2000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void subscribe(RedisTaskManager redisTaskManager, EngineID engineId, GraknEngineConfig engineConfig, EngineGraknGraphFactory factory, int poolSize) {
        LOG.info("Subscribing worker to jobs in queue {}", (Object)QUEUE_NAME);
        RedisTaskQueue redisTaskQueue = this;
        synchronized (redisTaskQueue) {
            this.workerPool = new WorkerPool(() -> this.getWorker(redisTaskManager, engineId, engineConfig, factory), poolSize);
            this.workerPool.run();
        }
    }

    private Worker getWorker(RedisTaskManager redisTaskManager, EngineID engineId, GraknEngineConfig engineConfig, EngineGraknGraphFactory factory) {
        WorkerPoolImpl worker = new WorkerPoolImpl(this.config, Arrays.asList(QUEUE_NAME), (JobFactory)JOB_FACTORY, this.jedisPool);
        worker.getWorkerEventEmitter().addListener((event, worker1, queue, job, runner, result, t) -> {
            this.queueSize.update(queue.length());
            if (runner instanceof RedisTaskQueueConsumer) {
                ((RedisTaskQueueConsumer)runner).setRunningState(redisTaskManager, engineId, engineConfig, this.jedisPool, factory, this.lockProvider, this.metricRegistry);
            } else {
                LOG.error("Found unexoected job in queue of type {}", (Object)runner.getClass().getName());
            }
        }, new WorkerEvent[]{WorkerEvent.JOB_EXECUTE});
        worker.setExceptionHandler((jobExecutor, exception, curQueue) -> {
            this.failures.mark();
            LOG.error("Exception while trying to run task, terminating!", (Throwable)exception);
            return RecoveryStrategy.TERMINATE;
        });
        return worker;
    }
}

