/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedTaskManager
implements TaskManager {
    private final Logger LOG = LoggerFactory.getLogger(DistributedTaskManager.class);
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private static DistributedTaskManager instance = null;
    private KafkaProducer<String, String> producer;
    private StateStorage stateStorage;
    private SynchronizedStateStorage zkStorage;

    public static synchronized DistributedTaskManager getInstance() {
        if (instance == null) {
            instance = new DistributedTaskManager();
        }
        return instance;
    }

    @Override
    public DistributedTaskManager open() {
        if (this.OPENED.compareAndSet(false, true)) {
            try {
                ExceptionWrapper.noThrow(() -> {
                    this.producer = ConfigHelper.kafkaProducer();
                }, "Could not instantiate Kafka Producer");
                ExceptionWrapper.noThrow(() -> {
                    this.stateStorage = new GraknStateStorage();
                }, "Could not instantiate grakn state storage");
                this.zkStorage = SynchronizedStateStorage.getInstance();
            }
            catch (Exception e) {
                e.printStackTrace(System.err);
                this.LOG.error("While trying to start the DistributedTaskManager", (Throwable)e);
                throw new RuntimeException("Could not start task manager : " + e);
            }
        } else {
            this.LOG.error("DistributedTaskManager open() called multiple times!");
        }
        return this;
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            ExceptionWrapper.noThrow(() -> this.producer.close(), "Could not close Kafka Producer.");
            this.stateStorage = null;
            this.zkStorage = null;
        } else {
            this.LOG.error("DistributedTaskManager close() called before open()!");
        }
    }

    @Override
    public String scheduleTask(BackgroundTask task, String createdBy, Date runAt, long period, JSONObject configuration) {
        Boolean recurring = period > 0L;
        String id = this.stateStorage.newState(task.getClass().getName(), createdBy, runAt, recurring, period, configuration);
        try {
            this.zkStorage.newState(id, TaskStatus.CREATED, null, null);
            this.producer.send(new ProducerRecord("new-tasks", (Object)id, (Object)configuration.toString()));
            this.producer.flush();
        }
        catch (Exception e) {
            this.LOG.error("Could not write to ZooKeeper! - " + ExceptionUtils.getFullStackTrace((Throwable)e));
            this.stateStorage.updateState(id, TaskStatus.FAILED, this.getClass().getName(), EngineID.getInstance().id(), e, null, null);
            id = null;
        }
        return id;
    }

    @Override
    public TaskManager stopTask(String id, String requesterName) {
        throw new UnsupportedOperationException(this.getClass().getName() + " currently doesnt support stopping tasks");
    }

    @Override
    public StateStorage storage() {
        return this.stateStorage;
    }

    @Override
    public CompletableFuture completableFuture(String taskId) {
        return CompletableFuture.runAsync(() -> {
            TaskStatus status;
            while ((status = this.zkStorage.getState(taskId).status()) != TaskStatus.COMPLETED && status != TaskStatus.FAILED && status != TaskStatus.STOPPED) {
                try {
                    Thread.sleep(5000L);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public TaskStatus getState(String taskID) {
        return this.zkStorage.getState(taskID).status();
    }
}

