/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.distributed.KafkaLogger;
import ai.grakn.engine.backgroundtasks.distributed.Scheduler;
import ai.grakn.engine.backgroundtasks.distributed.TaskFailover;
import ai.grakn.engine.backgroundtasks.distributed.TaskRunner;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

public class ClusterManager
extends LeaderSelectorListenerAdapter {
    private static ClusterManager instance = null;
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final String engineID;
    private LeaderSelector leaderSelector;
    private Scheduler scheduler;
    private TreeCache cache;
    private TaskRunner taskRunner;
    private Thread taskRunnerThread;
    private SynchronizedStateStorage zookeeperStorage;
    private final CountDownLatch leaderInitLatch = new CountDownLatch(1);

    public static synchronized ClusterManager getInstance() {
        if (instance == null) {
            instance = new ClusterManager();
        }
        return instance;
    }

    private ClusterManager() {
        this.engineID = EngineID.getInstance().id();
    }

    public void start() {
        try {
            this.LOG.debug("Starting Cluster manager, called by " + Thread.currentThread().getStackTrace()[1]);
            this.zookeeperStorage = SynchronizedStateStorage.getInstance();
            this.taskRunner = new TaskRunner();
            this.taskRunner.open();
            this.taskRunnerThread = new Thread(this.taskRunner);
            this.taskRunnerThread.start();
            this.leaderSelector = new LeaderSelector(this.zookeeperStorage.connection(), "/scheduler", (LeaderSelectorListener)this);
            this.leaderSelector.autoRequeue();
            this.leaderSelector.start();
            while (!this.leaderSelector.getLeader().isLeader()) {
                Thread.sleep(1000L);
            }
            if (this.leaderSelector.hasLeadership()) {
                this.leaderInitLatch.await();
            }
        }
        catch (Exception e) {
            this.LOG.error(ExceptionUtils.getFullStackTrace((Throwable)e));
            throw new RuntimeException(e);
        }
        this.LOG.debug("ClusterManager started, a leader has been elected.");
    }

    public void stop() {
        ExceptionWrapper.noThrow(() -> ((LeaderSelector)this.leaderSelector).interruptLeadership(), "Could not interrupt leadership.");
        ExceptionWrapper.noThrow(() -> ((LeaderSelector)this.leaderSelector).close(), "Could not close leaderSelector.");
        if (this.scheduler != null) {
            ExceptionWrapper.noThrow(this.scheduler::close, "Could not stop scheduler.");
        }
        if (this.cache != null) {
            ExceptionWrapper.noThrow(() -> ((TreeCache)this.cache).close(), "Could not close ZK Tree Cache.");
        }
        ExceptionWrapper.noThrow(this.taskRunner::close, "Could not stop TaskRunner.");
        try {
            this.taskRunnerThread.join();
        }
        catch (Throwable t) {
            this.LOG.error("Exception whilst waiting for TaskRunner thread to join - " + ExceptionUtils.getFullStackTrace((Throwable)t));
        }
        ExceptionWrapper.noThrow(this.zookeeperStorage::close, "Could not close ZK storage.");
        this.zookeeperStorage = null;
    }

    public void takeLeadership(CuratorFramework client) throws Exception {
        this.registerFailover(client);
        this.scheduler = new Scheduler();
        this.scheduler.open();
        this.LOG.info(this.engineID + " has taken over the scheduler.");
        Thread schedulerThread = new Thread(this.scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
        this.leaderInitLatch.countDown();
        schedulerThread.join();
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    private void registerFailover(CuratorFramework client) throws Exception {
        this.cache = new TreeCache(client, "/task_runners/watch");
        try (TaskFailover failover = TaskFailover.getInstance().open(client, this.cache);){
            this.cache.getListenable().addListener((Object)failover);
        }
        this.cache.start();
    }
}

