/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.tasks.manager.redisqueue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.TimerTask;
import mjson.Json;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.utils.JesqueUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.util.Pool;

public class RedisInflightTaskConsumer
extends TimerTask {
    private static final Logger LOG = LoggerFactory.getLogger(RedisInflightTaskConsumer.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    static final int ITERATIONS = 10;
    private Pool<Jedis> jedisPool;
    private Duration processInterval;
    private Config config;
    private String queueName;

    public RedisInflightTaskConsumer(Pool<Jedis> jedisPool, Duration processInterval, Config config, String queueName) {
        this.jedisPool = jedisPool;
        this.processInterval = processInterval;
        this.config = config;
        this.queueName = queueName;
    }

    @Override
    public void run() {
        try (Jedis resource = (Jedis)this.jedisPool.getResource();){
            Set keys = resource.keys(String.format("resque:%s:*", "inflight"));
            for (String key : keys) {
                for (int i = 0; i < 10; ++i) {
                    LOG.debug("Processing inflight for {}, iteration {}", (Object)key, (Object)i);
                    resource.watch(new String[]{key});
                    List elements = resource.lrange(key, -1L, -1L);
                    LOG.info("NOOP");
                    if (elements.isEmpty()) continue;
                    String head = (String)elements.get(0);
                    try {
                        Job job = (Job)objectMapper.readValue(head, Job.class);
                        if (job.getArgs().length <= 0) continue;
                        this.processElement(key, resource, head);
                        continue;
                    }
                    catch (IOException e) {
                        LOG.error("Could not deserialize task, moving to head: {}", (Object)head, (Object)e);
                        this.attemptMove(resource, key, key);
                    }
                }
            }
        }
    }

    private void processElement(String key, Jedis resource, String head) {
        long runAt = Json.read((String)head).at("args").at(0).at("taskState").at("schedule").at("runAt").asLong();
        Instant runAtDate = Instant.ofEpochMilli(runAt);
        Duration gap = Duration.between(runAtDate, Instant.now());
        if (gap.getSeconds() > this.processInterval.getSeconds()) {
            LOG.info("Found dead task in inflight: {}", (Object)head);
            String keyDest = JesqueUtils.createKey((String)this.config.getNamespace(), (String[])new String[]{"queue", this.queueName});
            this.attemptMove(resource, key, keyDest);
        }
    }

    private void attemptMove(Jedis resource, String key, String keyDest) {
        Transaction transaction = resource.multi();
        transaction.rpoplpush(key, keyDest);
        List result = transaction.exec();
        if (result == null) {
            LOG.warn("Could not move job from {} to {}, something modified the queue. The move will be retried when the task is next scheduled", (Object)key, (Object)keyDest);
        }
    }
}

