/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.standalone;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.TaskManager;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.taskstatestorage.TaskStateInMemoryStore;
import ai.grakn.engine.util.ConfigProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javafx.util.Pair;
import mjson.Json;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandaloneTaskManager
implements TaskManager {
    private static final String EXCEPTION_CATCHER_NAME = "Task Exception Catcher.";
    private static final String SAVE_CHECKPOINT_NAME = "Save task checkpoint.";
    private final Logger LOG = LoggerFactory.getLogger(StandaloneTaskManager.class);
    private final Map<String, Pair<ScheduledFuture<?>, BackgroundTask>> instantiatedTasks = new ConcurrentHashMap();
    private final TaskStateStorage stateStorage = new TaskStateInMemoryStore();
    private final ReentrantLock stateUpdateLock = new ReentrantLock();
    private final ExecutorService executorService;
    private final ScheduledExecutorService schedulingService;

    public StandaloneTaskManager() {
        ConfigProperties properties = ConfigProperties.getInstance();
        this.schedulingService = Executors.newScheduledThreadPool(1);
        this.executorService = Executors.newFixedThreadPool(properties.getAvailableThreads());
    }

    public TaskManager open() {
        return this;
    }

    @Override
    public void close() {
        this.executorService.shutdown();
        this.schedulingService.shutdown();
    }

    @Override
    public String createTask(String taskClassName, String createdBy, Instant runAt, long period, Json configuration) {
        Boolean recurring = period != 0L;
        TaskState taskState = new TaskState(taskClassName).creator(createdBy).runAt(runAt).isRecurring(recurring).interval(period).configuration(configuration);
        this.stateStorage.newState(taskState);
        Instant now = Instant.now();
        long delay = Duration.between(now, runAt).toMillis();
        try {
            this.stateStorage.updateState(taskState.status(TaskStatus.SCHEDULED).statusChangedBy(this.getClass().getName()));
            Class<?> c = Class.forName(taskClassName);
            BackgroundTask task = (BackgroundTask)c.newInstance();
            ScheduledFuture<?> future = recurring != false ? this.schedulingService.scheduleAtFixedRate(this.runTask(taskState.getId(), task, true), delay, period, TimeUnit.MILLISECONDS) : this.schedulingService.schedule(this.runTask(taskState.getId(), task, false), delay, TimeUnit.MILLISECONDS);
            this.instantiatedTasks.put(taskState.getId(), new Pair(future, (Object)task));
        }
        catch (Throwable t) {
            this.LOG.error(ExceptionUtils.getFullStackTrace((Throwable)t));
            this.stateStorage.updateState(taskState.status(TaskStatus.FAILED).exception(ExceptionUtils.getFullStackTrace((Throwable)t)));
            this.instantiatedTasks.remove(taskState.getId());
            return null;
        }
        return taskState.getId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TaskManager stopTask(String id, String requesterName) {
        try {
            Pair<ScheduledFuture<?>, BackgroundTask> pair;
            this.stateUpdateLock.lock();
            TaskState state = this.stateStorage.getState(id);
            if (state == null) {
                StandaloneTaskManager standaloneTaskManager = this;
                return standaloneTaskManager;
            }
            Pair<ScheduledFuture<?>, BackgroundTask> pair2 = pair = this.instantiatedTasks.get(id);
            synchronized (pair2) {
                if (state.status() == TaskStatus.SCHEDULED || state.status() == TaskStatus.COMPLETED && state.isRecurring().booleanValue()) {
                    this.LOG.info("Stopping a currently scheduled task " + id);
                    ((ScheduledFuture)pair.getKey()).cancel(true);
                    this.stateStorage.updateState(state.status(TaskStatus.STOPPED));
                } else if (state.status() == TaskStatus.RUNNING) {
                    this.LOG.info("Stopping running task " + id);
                    BackgroundTask task = (BackgroundTask)pair.getValue();
                    if (task != null) {
                        task.stop();
                    }
                    this.stateStorage.updateState(state.status(TaskStatus.STOPPED));
                } else {
                    this.LOG.warn("Task not running - " + id);
                }
            }
        }
        finally {
            this.stateUpdateLock.unlock();
        }
        return this;
    }

    @Override
    public TaskStateStorage storage() {
        return this.stateStorage;
    }

    private Runnable exceptionCatcher(TaskState state, BackgroundTask task) {
        return () -> {
            try {
                task.start(this.saveCheckpoint(state), state.configuration());
                this.stateUpdateLock.lock();
                if (state.status() == TaskStatus.RUNNING) {
                    this.stateStorage.updateState(state.status(TaskStatus.COMPLETED).statusChangedBy(EXCEPTION_CATCHER_NAME));
                }
                this.stateUpdateLock.unlock();
            }
            catch (Throwable t) {
                this.LOG.error(ExceptionUtils.getFullStackTrace((Throwable)t));
                this.stateStorage.updateState(state.status(TaskStatus.FAILED).statusChangedBy(EXCEPTION_CATCHER_NAME).exception(ExceptionUtils.getFullStackTrace((Throwable)t)));
            }
        };
    }

    private Runnable runTask(String id, BackgroundTask task, Boolean recurring) {
        return () -> {
            this.stateUpdateLock.lock();
            TaskState state = this.stateStorage.getState(id);
            if (recurring.booleanValue() && (state.status() == TaskStatus.SCHEDULED || state.status() == TaskStatus.COMPLETED)) {
                this.stateStorage.updateState(state.status(TaskStatus.RUNNING).isRecurring(true));
                this.executorService.submit(this.exceptionCatcher(state, task));
            } else if (!recurring.booleanValue() && state.status() == TaskStatus.SCHEDULED) {
                this.stateStorage.updateState(state.status(TaskStatus.RUNNING).isRecurring(false));
                this.executorService.submit(this.exceptionCatcher(state, task));
            }
            this.stateUpdateLock.unlock();
        };
    }

    private Consumer<String> saveCheckpoint(TaskState state) {
        return s -> {
            this.stateUpdateLock.lock();
            this.stateStorage.updateState(state.checkpoint(SAVE_CHECKPOINT_NAME));
            this.stateUpdateLock.unlock();
        };
    }
}

