/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.tasks.manager;

import ai.grakn.engine.GraknEngineConfig;
import ai.grakn.engine.TaskId;
import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.lock.NonReentrantLock;
import ai.grakn.engine.tasks.BackgroundTask;
import ai.grakn.engine.tasks.connection.RedisCountStorage;
import ai.grakn.engine.tasks.manager.TaskCheckpoint;
import ai.grakn.engine.tasks.manager.TaskConfiguration;
import ai.grakn.engine.tasks.manager.TaskManager;
import ai.grakn.engine.tasks.manager.TaskSchedule;
import ai.grakn.engine.tasks.manager.TaskState;
import ai.grakn.engine.tasks.manager.TaskStateInMemoryStore;
import ai.grakn.engine.tasks.manager.TaskStateStorage;
import ai.grakn.engine.util.EngineID;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandaloneTaskManager
implements TaskManager {
    private final Logger LOG = LoggerFactory.getLogger(StandaloneTaskManager.class);
    private final Set<TaskId> stoppedTasks;
    private final Map<TaskId, ScheduledFuture> scheduledTasks;
    private final Map<TaskId, BackgroundTask> runningTasks;
    private final TaskStateStorage storage;
    private final Lock stateUpdateLock;
    private final ExecutorService executorService;
    private final ScheduledExecutorService schedulingService;
    private final EngineID engineID;
    private final GraknEngineConfig config;
    private final RedisCountStorage redis;
    private final Timer addTaskTimer;
    private final Timer executeTaskTimer;
    private final Meter failedMeter;
    private final Meter stoppedMeter;
    private final Meter completedMeter;
    private final EngineGraknTxFactory factory;
    private LockProvider lockProvider;
    private final MetricRegistry metricRegistry;

    public StandaloneTaskManager(EngineID engineId, GraknEngineConfig config, RedisCountStorage redis, EngineGraknTxFactory factory, LockProvider lockProvider, MetricRegistry metricRegistry) {
        this.engineID = engineId;
        this.config = config;
        this.redis = redis;
        this.factory = factory;
        this.lockProvider = lockProvider;
        this.metricRegistry = metricRegistry;
        this.stoppedTasks = new HashSet<TaskId>();
        this.runningTasks = new ConcurrentHashMap<TaskId, BackgroundTask>();
        this.scheduledTasks = new ConcurrentHashMap<TaskId, ScheduledFuture>();
        this.storage = new TaskStateInMemoryStore();
        this.stateUpdateLock = new NonReentrantLock();
        this.schedulingService = Executors.newScheduledThreadPool(1);
        this.executorService = Executors.newFixedThreadPool(config.getAvailableThreads());
        this.addTaskTimer = metricRegistry.timer(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"add-task-timer"}));
        this.executeTaskTimer = metricRegistry.timer(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"execute-task-timer"}));
        this.failedMeter = metricRegistry.meter(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"failed"}));
        this.stoppedMeter = metricRegistry.meter(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"stopped"}));
        this.completedMeter = metricRegistry.meter(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"completed"}));
    }

    @Override
    public void close() {
        this.executorService.shutdown();
        this.schedulingService.shutdownNow();
        this.runningTasks.keySet().forEach(this::stopTask);
        this.runningTasks.clear();
        this.scheduledTasks.values().forEach(t -> t.cancel(true));
        this.scheduledTasks.clear();
    }

    @Override
    public void addTask(TaskState taskState, TaskConfiguration configuration) {
        try (Timer.Context context = this.addTaskTimer.time();){
            if (!taskState.priority().equals((Object)TaskState.Priority.LOW)) {
                this.LOG.info("Standalone mode only has a single priority.");
            }
            this.storage.newState(taskState);
            Instant now = Instant.now();
            TaskSchedule schedule = taskState.schedule();
            long delay = Duration.between(now, taskState.schedule().runAt()).toMillis();
            Runnable taskExecution = this.submitTaskForExecution(taskState, configuration);
            ScheduledFuture<?> future = schedule.isRecurring() && schedule.interval().isPresent() ? this.schedulingService.scheduleAtFixedRate(taskExecution, delay, schedule.interval().get().toMillis(), TimeUnit.MILLISECONDS) : this.schedulingService.schedule(taskExecution, delay, TimeUnit.MILLISECONDS);
            this.scheduledTasks.put(taskState.getId(), future);
            this.LOG.info("Added task " + taskState.getId());
        }
    }

    @Override
    public void runTask(TaskState taskState, TaskConfiguration configuration) {
        this.executeTask(taskState, configuration).run();
    }

    @Override
    public CompletableFuture<Void> start() {
        return CompletableFuture.runAsync(() -> {});
    }

    @Override
    public void stopTask(TaskId id) {
        if (!this.storage.containsTask(id)) {
            this.stoppedTasks.add(id);
            return;
        }
        TaskState state = this.storage.getState(id);
        try {
            if (this.taskShouldRun(state)) {
                this.LOG.info("Stopping a currently scheduled task {}", (Object)id);
                state.markStopped();
            } else if (state.status() == TaskStatus.RUNNING && this.runningTasks.containsKey(id)) {
                this.LOG.info("Stopping running task {}", (Object)id);
                this.runningTasks.get(id).stop();
                state.markStopped();
            } else {
                this.LOG.warn("Task not running {}, was not stopped", (Object)id);
            }
        }
        finally {
            this.saveState(state);
            this.cancelTask(state);
        }
    }

    @Override
    public TaskStateStorage storage() {
        return this.storage;
    }

    private Runnable executeTask(TaskState task, TaskConfiguration configuration) {
        return () -> {
            try (Timer.Context context = this.executeTaskTimer.time();){
                boolean completed;
                BackgroundTask runningTask = task.taskClass().newInstance();
                runningTask.initialize(this.saveCheckpoint(task), configuration, this, this.config, this.redis, this.factory, this.lockProvider, this.metricRegistry);
                this.runningTasks.put(task.getId(), runningTask);
                if (this.taskShouldResume(task)) {
                    completed = runningTask.resume(task.checkpoint());
                } else {
                    task.markRunning(this.engineID);
                    this.saveState(task);
                    Timer.Context runContext = this.metricRegistry.timer(MetricRegistry.name(StandaloneTaskManager.class, (String[])new String[]{"run-task-timer", task.taskClass().getName()})).time();
                    try {
                        completed = runningTask.start();
                    }
                    finally {
                        runContext.stop();
                    }
                }
                if (completed) {
                    this.completedMeter.mark();
                    task.markCompleted();
                } else {
                    this.stoppedMeter.mark();
                    task.markStopped();
                }
            }
            catch (Throwable throwable) {
                this.failedMeter.mark();
                this.LOG.error("{} failed with {}", (Object)task.getId(), (Object)throwable.getMessage());
                task.markFailed(throwable);
            }
            finally {
                this.saveState(task);
                this.runningTasks.remove(task.getId());
                this.cancelTask(task);
            }
        };
    }

    private Runnable submitTaskForExecution(TaskState taskState, TaskConfiguration configuration) {
        return () -> {
            TaskState stateFromStorage = this.storage.getState(taskState.getId());
            if (this.taskIsStopped(taskState)) {
                this.saveState(taskState.markStopped());
            } else if (this.taskShouldRun(stateFromStorage) || this.taskShouldResume(taskState)) {
                this.executorService.submit(this.executeTask(taskState, configuration));
            }
        };
    }

    private boolean taskShouldRun(TaskState task) {
        return task.status() == TaskStatus.CREATED || task.schedule().isRecurring() && task.status() == TaskStatus.COMPLETED;
    }

    private boolean taskIsStopped(TaskState task) {
        return this.stoppedTasks.contains(task.getId());
    }

    private boolean taskShouldResume(TaskState task) {
        return task.status() == TaskStatus.RUNNING;
    }

    private Consumer<TaskCheckpoint> saveCheckpoint(TaskState state) {
        return checkpoint -> this.saveState(state.checkpoint((TaskCheckpoint)checkpoint));
    }

    private synchronized void cancelTask(TaskState task) {
        if (!this.scheduledTasks.containsKey(task.getId())) {
            this.LOG.debug("Given task is not scheduled.");
            return;
        }
        if (task.status() == TaskStatus.STOPPED || task.status() == TaskStatus.FAILED) {
            this.scheduledTasks.remove(task.getId()).cancel(true);
        }
        if (task.status() == TaskStatus.COMPLETED && !task.schedule().isRecurring()) {
            this.scheduledTasks.remove(task.getId());
        }
    }

    private void saveState(TaskState taskState) {
        this.stateUpdateLock.lock();
        try {
            this.storage.updateState(taskState);
        }
        finally {
            this.stateUpdateLock.unlock();
        }
    }
}

