/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.distributed.SchedulerElector;
import ai.grakn.engine.backgroundtasks.distributed.TaskRunner;
import ai.grakn.engine.backgroundtasks.distributed.ZookeeperConnection;
import ai.grakn.engine.backgroundtasks.taskstatestorage.TaskStateZookeeperStore;
import ai.grakn.engine.util.ExceptionWrapper;
import java.time.Instant;
import mjson.Json;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public final class DistributedTaskManager
implements TaskManager {
    private final KafkaProducer<String, String> producer;
    private final SchedulerElector elector;
    private final ZookeeperConnection connection = new ZookeeperConnection();
    private final TaskRunner taskRunner;
    private final TaskStateStorage stateStorage = new TaskStateZookeeperStore(this.connection);
    private static final String TASKRUNNER_THREAD_NAME = "taskrunner-";
    private Thread taskRunnerThread;

    public DistributedTaskManager() {
        this.taskRunner = new TaskRunner(this.stateStorage, this.connection);
        this.taskRunnerThread = new Thread((Runnable)this.taskRunner, TASKRUNNER_THREAD_NAME + this.taskRunner.hashCode());
        this.taskRunnerThread.start();
        this.elector = new SchedulerElector(this.stateStorage, this.connection);
        this.producer = ConfigHelper.kafkaProducer();
    }

    @Override
    public void close() {
        ExceptionWrapper.noThrow(() -> this.producer.close(), "Error shutting down producer in TaskManager");
        ExceptionWrapper.noThrow(this.elector::stop, "Error stopping Scheduler elector from TaskManager");
        ExceptionWrapper.noThrow(this.taskRunner::close, "Error shutting down TaskRunner");
        ExceptionWrapper.noThrow(this.taskRunnerThread::join, "Error waiting for TaskRunner to close");
        ExceptionWrapper.noThrow(this.connection::close, "Error waiting for zookeeper connection to close");
    }

    @Override
    public String createTask(String taskClassName, String createdBy, Instant runAt, long period, Json configuration) {
        Boolean recurring = period > 0L;
        TaskState taskState = new TaskState(taskClassName).creator(createdBy).runAt(runAt).isRecurring(recurring).interval(period).configuration(configuration);
        this.producer.send(new ProducerRecord("new-tasks", (Object)taskState.getId(), (Object)TaskState.serialize(taskState)));
        this.producer.flush();
        return taskState.getId();
    }

    @Override
    public TaskManager stopTask(String id, String requesterName) {
        throw new UnsupportedOperationException(this.getClass().getName() + " currently doesn't support stopping tasks");
    }

    @Override
    public TaskStateStorage storage() {
        return this.stateStorage;
    }
}

