/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.tasks.manager.redisqueue;

import ai.grakn.GraknConfigKey;
import ai.grakn.engine.GraknConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.postprocessing.PostProcessor;
import ai.grakn.engine.tasks.manager.TaskConfiguration;
import ai.grakn.engine.tasks.manager.TaskManager;
import ai.grakn.engine.tasks.manager.TaskState;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskQueueConsumer;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskStorage;
import ai.grakn.engine.tasks.manager.redisqueue.Task;
import ai.grakn.engine.util.EngineID;
import ai.grakn.redisq.Document;
import ai.grakn.redisq.Redisq;
import ai.grakn.redisq.RedisqBuilder;
import ai.grakn.redisq.State;
import ai.grakn.redisq.exceptions.StateFutureInitializationException;
import ai.grakn.redisq.exceptions.WaitException;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

public class RedisTaskManager
implements TaskManager {
    private static final Logger LOG = LoggerFactory.getLogger(RedisTaskManager.class);
    private static final String QUEUE_NAME = "grakn";
    private final Redisq<Task> redisq;
    private final RedisTaskStorage taskStorage;

    public RedisTaskManager(EngineID engineId, GraknConfig config, Pool<Jedis> jedisPool, int threads, EngineGraknTxFactory factory, MetricRegistry metricRegistry, PostProcessor postProcessor) {
        RedisTaskQueueConsumer consumer = new RedisTaskQueueConsumer(this, engineId, config, metricRegistry, factory, postProcessor);
        LOG.info("Running queue consumer with {} execution threads", (Object)threads);
        this.redisq = new RedisqBuilder().setJedisPool(jedisPool).setName(QUEUE_NAME).setConsumer((Consumer)consumer).setMetricRegistry(metricRegistry).setThreadPoolSize(threads).setDelay((long)((Integer)config.getProperty(GraknConfigKey.TASK_DELAY)).intValue()).setDocumentClass(Task.class).createRedisq();
        this.taskStorage = RedisTaskStorage.create(this.redisq, metricRegistry);
    }

    @Override
    public void close() {
        LOG.info("Closing task manager");
        try {
            this.redisq.close();
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted while closing queue", (Throwable)e);
        }
    }

    @Override
    public CompletableFuture<Void> start() {
        return CompletableFuture.runAsync(() -> this.redisq.startConsumer()).exceptionally(e -> {
            this.close();
            throw new RuntimeException("Failed to initialise subscription");
        });
    }

    @Override
    public RedisTaskStorage storage() {
        return this.taskStorage;
    }

    @Override
    public void addTask(TaskState taskState, TaskConfiguration configuration) {
        Task task = Task.builder().setTaskConfiguration(configuration).setTaskState(taskState).build();
        this.redisq.push((Document)task);
    }

    @Override
    public void runTask(TaskState taskState, TaskConfiguration configuration) {
        Task task = Task.builder().setTaskConfiguration(configuration).setTaskState(taskState).build();
        try {
            this.redisq.pushAndWait((Document)task, 5L, TimeUnit.MINUTES);
        }
        catch (WaitException e) {
            throw new RuntimeException("Could not run task", e);
        }
    }

    public Future<Void> subscribeToTask(TaskId taskId) throws StateFutureInitializationException, ExecutionException, InterruptedException {
        return this.redisq.getFutureForDocumentStateWait((Set)ImmutableSet.of((Object)State.DONE, (Object)State.FAILED), taskId.value());
    }

    public void waitForTask(TaskId taskId, long timeout, TimeUnit timeUnit) throws StateFutureInitializationException, ExecutionException, InterruptedException, TimeoutException {
        this.redisq.getFutureForDocumentStateWait((Set)ImmutableSet.of((Object)State.DONE, (Object)State.FAILED), taskId.value()).get(timeout, timeUnit);
    }

    public Redisq getQueue() {
        return this.redisq;
    }
}

