/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.distributed.Scheduler;
import ai.grakn.engine.backgroundtasks.distributed.TaskFailover;
import ai.grakn.engine.backgroundtasks.distributed.ZookeeperConnection;
import ai.grakn.engine.util.ExceptionWrapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;

public class SchedulerElector
extends LeaderSelectorListenerAdapter {
    private static final String SCHEDULER_THREAD_NAME = "scheduler-";
    private final LeaderSelector leaderSelector;
    private final TaskStateStorage storage;
    private Scheduler scheduler;
    private TreeCache cache;
    private TaskFailover failover;

    public SchedulerElector(TaskStateStorage storage, ZookeeperConnection zookeeperConnection) {
        this.storage = storage;
        this.leaderSelector = new LeaderSelector(zookeeperConnection.connection(), "/scheduler", (LeaderSelectorListener)this);
        this.leaderSelector.autoRequeue();
        try {
            this.leaderSelector.start();
            while (!this.leaderSelector.getLeader().isLeader()) {
                Thread.sleep(1000L);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("There were errors electing a leader- Engine should stop");
        }
    }

    public void stop() {
        this.leaderSelector.interruptLeadership();
        ExceptionWrapper.noThrow(() -> ((LeaderSelector)this.leaderSelector).close(), "Error closing leadership elector");
        if (this.scheduler != null) {
            ExceptionWrapper.noThrow(this.scheduler::close, "Error closing the Scheduler");
            ExceptionWrapper.noThrow(this.failover::close, "Error shutting down task failover hook");
            ExceptionWrapper.noThrow(() -> ((TreeCache)this.cache).close(), "Error closing zookeeper cache");
        }
    }

    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
            this.scheduler.close();
            throw new CancelLeadershipException();
        }
    }

    public void takeLeadership(CuratorFramework client) throws Exception {
        this.registerFailover(client);
        this.scheduler = new Scheduler(this.storage);
        Thread schedulerThread = new Thread((Runnable)this.scheduler, SCHEDULER_THREAD_NAME + this.scheduler.hashCode());
        schedulerThread.start();
        schedulerThread.join();
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    private void registerFailover(CuratorFramework client) throws Exception {
        this.cache = new TreeCache(client, "/task_runners/watch");
        this.failover = new TaskFailover(client, this.cache, this.storage);
        this.cache.getListenable().addListener((Object)this.failover);
        this.cache.start();
    }
}

