/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.distributed.KafkaLogger;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedState;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONArray;

public class TaskFailover
implements TreeCacheListener,
AutoCloseable {
    private static TaskFailover instance = null;
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private Map<String, ChildData> current;
    private TreeCache cache;
    private KafkaProducer<String, String> producer;
    private StateStorage stateStorage;
    private SynchronizedStateStorage synchronizedStateStorage;

    public static synchronized TaskFailover getInstance() {
        if (instance == null) {
            instance = new TaskFailover();
        }
        return instance;
    }

    public TaskFailover open(CuratorFramework client, TreeCache cache) throws Exception {
        if (this.OPENED.compareAndSet(false, true)) {
            this.cache = cache;
            this.current = cache.getCurrentChildren("/task_runners/watch");
            this.producer = ConfigHelper.kafkaProducer();
            this.stateStorage = new GraknStateStorage();
            this.synchronizedStateStorage = SynchronizedStateStorage.getInstance();
            this.scanStaleStates(client);
        } else {
            this.LOG.error("TaskFailover already opened!");
        }
        return this;
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            ExceptionWrapper.noThrow(() -> this.producer.flush(), "Could not flush Kafka Producer.");
            ExceptionWrapper.noThrow(() -> this.producer.close(), "Could not close Kafka Producer.");
            this.current = null;
            this.stateStorage = null;
            this.synchronizedStateStorage = null;
        } else {
            this.LOG.error("TaskFailover close() called before open().");
        }
    }

    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
        Map nodes = this.cache.getCurrentChildren("/task_runners/watch");
        switch (event.getType()) {
            case NODE_ADDED: {
                this.LOG.debug("New engine joined pool.");
                this.current = nodes;
                break;
            }
            case NODE_REMOVED: {
                this.LOG.debug("Engine failure detected.");
                this.failover(client, nodes);
                this.current = nodes;
                break;
            }
        }
    }

    private void failover(CuratorFramework client, Map<String, ChildData> nodes) throws Exception {
        for (String engineId : this.current.keySet()) {
            if (nodes.containsKey(engineId)) continue;
            this.LOG.debug("Dead engine: " + engineId);
            this.reQueue(client, engineId);
        }
    }

    private void reQueue(CuratorFramework client, String engineID) throws Exception {
        byte[] b = (byte[])client.getData().forPath("/task_runners/last_state/" + engineID);
        JSONArray ids = new JSONArray(new String(b));
        for (Object o : ids) {
            String id = (String)o;
            this.synchronizedStateStorage.updateState(id, TaskStatus.SCHEDULED, "", null);
            String configuration = this.stateStorage.getState(id).configuration().toString();
            this.producer.send(new ProducerRecord("work-queue", (Object)id, (Object)configuration));
        }
    }

    private void scanStaleStates(CuratorFramework client) throws Exception {
        String id;
        SynchronizedState state;
        HashSet<String> deadRunners = new HashSet<String>();
        Iterator iterator = ((List)client.getChildren().forPath("/tasks")).iterator();
        while (iterator.hasNext() && (state = this.synchronizedStateStorage.getState(id = (String)iterator.next())).status() == TaskStatus.RUNNING) {
            String engineId = state.engineID();
            if (engineId == null || engineId.isEmpty()) {
                throw new IllegalStateException("ZK Task SynchronizedState - " + id + " - has no engineID (" + engineId + ") - status " + state.status().toString());
            }
            if (deadRunners.contains(engineId)) break;
            if (client.checkExists().forPath("/task_runners/watch/" + engineId) != null) continue;
            this.reQueue(client, engineId);
            deadRunners.add(engineId);
        }
    }
}

