/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.tasks.manager.redisqueue;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.factory.EngineGraknGraphFactory;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.tasks.manager.TaskConfiguration;
import ai.grakn.engine.tasks.manager.TaskManager;
import ai.grakn.engine.tasks.manager.TaskState;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskQueue;
import ai.grakn.engine.tasks.manager.redisqueue.RedisTaskStorage;
import ai.grakn.engine.tasks.manager.redisqueue.Task;
import ai.grakn.engine.util.EngineID;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

public class RedisTaskManager
implements TaskManager {
    private static final Logger LOG = LoggerFactory.getLogger(RedisTaskManager.class);
    private final EngineID engineId;
    private final GraknEngineConfig config;
    private final RedisTaskStorage redisTaskStorage;
    private final EngineGraknGraphFactory factory;
    private final RedisTaskQueue redisTaskQueue;
    private final int threads;

    public RedisTaskManager(EngineID engineId, GraknEngineConfig config, Pool<Jedis> jedisPool, EngineGraknGraphFactory factory, LockProvider distributedLockClient, MetricRegistry metricRegistry) {
        this(engineId, config, jedisPool, 2, factory, distributedLockClient, metricRegistry);
    }

    public RedisTaskManager(EngineID engineId, GraknEngineConfig config, Pool<Jedis> jedisPool, int threads, EngineGraknGraphFactory factory, LockProvider distributedLockClient, MetricRegistry metricRegistry) {
        this.engineId = engineId;
        this.config = config;
        this.factory = factory;
        this.redisTaskStorage = RedisTaskStorage.create(jedisPool, metricRegistry);
        this.redisTaskQueue = new RedisTaskQueue(jedisPool, distributedLockClient, metricRegistry, Integer.parseInt(config.tryProperty("tasks.retry.delay").orElse("180")));
        this.threads = threads;
    }

    @Override
    public void close() {
        LOG.info("Closing task manager");
        this.redisTaskQueue.close();
    }

    @Override
    public CompletableFuture<Void> start() {
        return CompletableFuture.runAsync(this::startBlocking).exceptionally(e -> {
            this.close();
            throw new RuntimeException("Failed to intitialize subscription");
        });
    }

    private void startBlocking() {
        this.redisTaskQueue.runInFlightProcessor();
        for (int i = 0; i < this.threads; ++i) {
            this.redisTaskQueue.subscribe(this, this.engineId, this.config, this.factory, this.threads);
        }
        LOG.info("Redis task manager started with {} subscriptions", (Object)this.threads);
    }

    @Override
    public void stopTask(TaskId id) {
        TaskState task = this.redisTaskStorage.getState(id);
        if (task == null) {
            task = TaskState.of(id);
        }
        try {
            task.markStopped();
            this.redisTaskStorage.updateState(task);
        }
        catch (Exception e) {
            LOG.error("Unexpected error while stopping {}", (Object)id);
            throw e;
        }
    }

    @Override
    public RedisTaskStorage storage() {
        return this.redisTaskStorage;
    }

    @Override
    public void addTask(TaskState taskState, TaskConfiguration configuration) {
        Task task = Task.builder().setTaskConfiguration(configuration).setTaskState(taskState).build();
        this.redisTaskQueue.putJob(task);
    }

    public RedisTaskQueue getQueue() {
        return this.redisTaskQueue;
    }
}

