/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.util.ExceptionWrapper;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskFailover
implements TreeCacheListener,
AutoCloseable {
    private final Logger LOG = LoggerFactory.getLogger(TaskFailover.class);
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private final TaskStateStorage stateStorage;
    private Map<String, ChildData> current;
    private TreeCache cache;
    private KafkaProducer<String, String> producer;

    public TaskFailover(CuratorFramework client, TreeCache cache, TaskStateStorage stateStorage) throws Exception {
        this.stateStorage = stateStorage;
        if (this.OPENED.compareAndSet(false, true)) {
            this.cache = cache;
            this.current = cache.getCurrentChildren("/task_runners/watch");
            this.producer = ConfigHelper.kafkaProducer();
            this.scanStaleStates(client);
        } else {
            this.LOG.error("TaskFailover already opened!");
        }
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            ExceptionWrapper.noThrow(() -> this.producer.flush(), "Could not flush Kafka Producer.");
            ExceptionWrapper.noThrow(() -> this.producer.close(), "Could not close Kafka Producer.");
        } else {
            this.LOG.error("TaskFailover close() called before open().");
        }
    }

    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
        Map nodes = this.cache.getCurrentChildren("/task_runners/watch");
        switch (event.getType()) {
            case NODE_ADDED: {
                this.LOG.debug("New engine joined pool. Current engines: " + nodes.keySet());
                this.current = nodes;
                break;
            }
            case NODE_REMOVED: {
                this.LOG.debug("Engine failure detected. Current engines " + nodes.keySet());
                this.failover(client, nodes);
                this.current = nodes;
                break;
            }
        }
    }

    private void failover(CuratorFramework client, Map<String, ChildData> nodes) throws Exception {
        for (String engineId : this.current.keySet()) {
            if (nodes.containsKey(engineId)) continue;
            this.LOG.debug("Dead engine: " + engineId);
            this.reQueue(client, engineId);
        }
    }

    private void reQueue(CuratorFramework client, String engineID) throws Exception {
        byte[] b = (byte[])client.getData().forPath("/task_runners/last_state/" + engineID);
        JSONArray ids = new JSONArray(new String(b, StandardCharsets.UTF_8));
        for (Object o : ids) {
            String id = (String)o;
            TaskState taskState = this.stateStorage.getState(id);
            if (taskState.status() == TaskStatus.RUNNING) {
                this.LOG.debug(String.format("Engine [%s] stopped, task [%s] requeued", engineID, taskState.getId()));
                this.stateStorage.updateState(taskState.status(TaskStatus.SCHEDULED));
                this.producer.send(new ProducerRecord("work-queue", (Object)id, (Object)taskState.configuration().toString()));
                continue;
            }
            this.LOG.debug(String.format("Engine [%s] stopped, task [%s] not restarted because state [%s]", engineID, taskState.getId(), taskState.status()));
        }
    }

    private void scanStaleStates(CuratorFramework client) throws Exception {
        String id;
        TaskState state;
        HashSet<String> deadRunners = new HashSet<String>();
        Iterator iterator = ((List)client.getChildren().forPath("/tasks")).iterator();
        while (iterator.hasNext() && (state = this.stateStorage.getState(id = (String)iterator.next())).status() == TaskStatus.RUNNING) {
            String engineId = state.engineID();
            if (engineId == null || engineId.isEmpty()) {
                throw new IllegalStateException("ZK Task SynchronizedState - " + id + " - has no engineID (" + engineId + ") - status " + state.status().toString());
            }
            if (deadRunners.contains(engineId)) break;
            if (client.checkExists().forPath("/task_runners/watch/" + engineId) != null) continue;
            this.reQueue(client, engineId);
            deadRunners.add(engineId);
        }
    }
}

