/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.StateStorage;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.distributed.KafkaLogger;
import ai.grakn.engine.backgroundtasks.distributed.RebalanceListener;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedState;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.zookeeper.CreateMode;
import org.json.JSONArray;
import org.json.JSONObject;

public class TaskRunner
implements Runnable,
AutoCloseable {
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private ExecutorService executor;
    private final Integer allowableRunningTasks;
    private final Set<String> runningTasks = new HashSet<String>();
    private final String engineID = EngineID.getInstance().id();
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private StateStorage graknStorage;
    private SynchronizedStateStorage zkStorage;
    private KafkaConsumer<String, String> consumer;
    private volatile boolean running = false;
    private CountDownLatch waitToClose;
    private boolean initialised = false;

    TaskRunner() {
        this.allowableRunningTasks = properties.getAvailableThreads();
    }

    @Override
    public void run() {
        this.running = true;
        try {
            while (this.running) {
                this.printInitialization();
                this.LOG.debug("TaskRunner polling, size of new tasks " + this.consumer.endOffsets((Collection)this.consumer.partitionsFor("work-queue").stream().map(i -> new TopicPartition("work-queue", i.partition())).collect(Collectors.toSet())));
                if (this.getRunningTasksCount() < this.allowableRunningTasks) {
                    ConsumerRecords records = this.consumer.poll((long)properties.getPropertyAsInt("tasks.runner.polling-frequency"));
                    this.processRecords((ConsumerRecords<String, String>)records);
                    continue;
                }
                Thread.sleep(500L);
            }
        }
        catch (InterruptedException | WakeupException e) {
            if (this.running) {
                this.LOG.error("TaskRunner interrupted unexpectedly (without clearing 'running' flag first", e);
            } else {
                this.LOG.debug("TaskRunner exiting gracefully.");
            }
        }
        finally {
            this.consumer.commitSync();
            this.consumer.close();
            this.waitToClose.countDown();
        }
    }

    public TaskRunner open() throws Exception {
        if (this.OPENED.compareAndSet(false, true)) {
            this.graknStorage = new GraknStateStorage();
            this.consumer = ConfigHelper.kafkaConsumer("task-runners");
            this.consumer.subscribe(Collections.singletonList("work-queue"), (ConsumerRebalanceListener)new RebalanceListener(this.consumer));
            this.zkStorage = SynchronizedStateStorage.getInstance();
            this.registerAsRunning();
            this.updateOwnState();
            this.executor = Executors.newFixedThreadPool(properties.getAvailableThreads());
            this.waitToClose = new CountDownLatch(1);
            this.LOG.info("TaskRunner opened.");
        } else {
            this.LOG.error("TaskRunner already opened!");
        }
        return this;
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            this.running = false;
            ExceptionWrapper.noThrow(() -> this.consumer.wakeup(), "Could not call wakeup on Kafka Consumer.");
            try {
                this.waitToClose.await(5L * properties.getPropertyAsLong("tasks.runner.polling-frequency"), TimeUnit.MILLISECONDS);
            }
            catch (Throwable t) {
                this.LOG.error("Exception whilst waiting for scheduler run() thread to finish - " + ExceptionUtils.getFullStackTrace((Throwable)t));
            }
            ExceptionWrapper.noThrow(this.executor::shutdownNow, "Could shutdown executor pool.");
            this.graknStorage = null;
            this.zkStorage = null;
            this.LOG.debug("TaskRunner stopped");
        } else {
            this.LOG.error("TaskRunner close() called before open()!");
        }
    }

    private void processRecords(ConsumerRecords<String, String> records) {
        for (ConsumerRecord record : records) {
            this.LOG.debug("Got a record\n\t\tkey: " + (String)record.key() + "\n\t\toffset " + record.offset() + "\n\t\tvalue " + (String)record.value());
            this.LOG.debug("Runner currently has tasks: " + this.getRunningTasksCount() + " allowed: " + this.allowableRunningTasks);
            if (this.getRunningTasksCount() >= this.allowableRunningTasks) {
                this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset());
                break;
            }
            String id = (String)record.key();
            InterProcessMutex mutex = this.acquireMutex(id);
            if (mutex == null) {
                this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset());
                break;
            }
            TaskStatus status = this.getStatus(id);
            if (status == null) {
                this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset());
                this.releaseMutex(mutex, id);
                break;
            }
            if (status != TaskStatus.SCHEDULED) {
                this.LOG.debug("Cant schedule this task - " + id + " because\n\t\tstatus: " + (Object)((Object)status));
                this.releaseMutex(mutex, id);
                continue;
            }
            this.addRunningTask(id);
            this.updateTaskState(id, TaskStatus.RUNNING, this.getClass().getName(), this.engineID, null, null);
            this.releaseMutex(mutex, id);
            try {
                JSONObject configuration = new JSONObject((String)record.value());
                this.executor.submit(() -> this.executeTask(id, configuration));
            }
            catch (NullPointerException | RejectedExecutionException e) {
                this.removeRunningTask(id);
                this.LOG.error(ExceptionUtils.getFullStackTrace((Throwable)e));
            }
            this.LOG.debug("Runner next read from " + (String)record.key() + " OFFSET " + (record.offset() + 1L) + " topic " + record.topic());
            this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L);
        }
    }

    private TaskStatus getStatus(String id) {
        SynchronizedState state = this.zkStorage.getState(id);
        if (state == null) {
            this.LOG.error("Cant run task - " + id + " - because zkStorage returned null");
            return null;
        }
        return state.status();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask(String id, JSONObject configuration) {
        try {
            this.LOG.debug("Executing task " + id);
            TaskState state = this.graknStorage.getState(id);
            this.LOG.debug("Got state of " + id + " from storage");
            Class<?> c = Class.forName(state.taskClassName());
            BackgroundTask task = (BackgroundTask)c.newInstance();
            task.start(this.saveCheckpoint(id), configuration);
            this.LOG.debug("Task - " + id + " completed successfully, updating state in graph");
            this.updateTaskState(id, TaskStatus.COMPLETED, this.getClass().getName(), null, null, null);
        }
        catch (Throwable t) {
            this.LOG.debug("Failed task - " + id + ": " + ExceptionUtils.getFullStackTrace((Throwable)t));
            this.updateTaskState(id, TaskStatus.FAILED, this.getClass().getName(), null, t, null);
            this.LOG.debug("Updated state " + id);
        }
        finally {
            this.removeRunningTask(id);
            this.LOG.debug("Finished executing task - " + id);
        }
    }

    private InterProcessMutex acquireMutex(String id) {
        InterProcessMutex mutex = null;
        try {
            if (this.zkStorage.connection().checkExists().forPath("/tasks/" + id + "/lock") == null) {
                this.zkStorage.connection().create().creatingParentContainersIfNeeded().forPath("/tasks/" + id + "/lock");
            }
            if (!(mutex = new InterProcessMutex(this.zkStorage.connection(), "/tasks/" + id + "/lock")).acquire(5000L, TimeUnit.MILLISECONDS)) {
                this.LOG.debug("Could not acquire mutex");
                mutex = null;
            }
        }
        catch (Exception e) {
            this.LOG.debug("Exception whilst trying to get mutex for task - " + id + " - " + ExceptionUtils.getFullStackTrace((Throwable)e));
        }
        this.LOG.debug("<<<<<<<<<<<< Got mutex for - " + id);
        return mutex;
    }

    private void releaseMutex(InterProcessMutex mutex, String id) {
        try {
            mutex.release();
            this.LOG.debug(">>>>>>>>>>>> released mutex for - " + id);
        }
        catch (Exception e) {
            this.LOG.error("********************************\nCOULD NOT RELEASE MUTEX FOR TASK - " + id + "\n" + ExceptionUtils.getFullStackTrace((Throwable)e) + "\n********************************");
        }
    }

    private Consumer<String> saveCheckpoint(String id) {
        return checkpoint -> {
            this.LOG.debug("Writing checkpoint");
            this.updateTaskState(id, null, null, null, null, (String)checkpoint);
        };
    }

    private void updateTaskState(String id, TaskStatus status, String statusChangeBy, String engineID, Throwable failure, String checkpoint) {
        this.LOG.debug("Updating state of task " + id);
        this.zkStorage.updateState(id, status, engineID, checkpoint);
        try {
            this.graknStorage.updateState(id, status, statusChangeBy, engineID, failure, checkpoint, null);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void updateOwnState() {
        JSONArray out = new JSONArray();
        out.put(this.runningTasks);
        try {
            this.zkStorage.connection().setData().forPath("/task_runners/last_state/" + this.engineID, out.toString().getBytes());
        }
        catch (Exception e) {
            this.LOG.error("Could not update TaskRunner taskstorage in ZooKeeper! " + e);
        }
    }

    private void registerAsRunning() throws Exception {
        if (this.zkStorage.connection().checkExists().forPath("/task_runners/watch/" + this.engineID) == null) {
            ((ACLBackgroundPathAndBytesable)this.zkStorage.connection().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath("/task_runners/watch/" + this.engineID);
        }
        if (this.zkStorage.connection().checkExists().forPath("/task_runners/last_state/" + this.engineID) == null) {
            this.zkStorage.connection().create().creatingParentContainersIfNeeded().forPath("/task_runners/last_state/" + this.engineID);
        }
        this.LOG.debug("Registered TaskRunner");
    }

    private synchronized int getRunningTasksCount() {
        return this.runningTasks.size();
    }

    private synchronized void addRunningTask(String id) {
        this.runningTasks.add(id);
        this.updateOwnState();
    }

    private synchronized void removeRunningTask(String id) {
        this.runningTasks.remove(id);
        this.updateOwnState();
    }

    private void seekAndCommit(TopicPartition partition, long offset) {
        this.consumer.seek(partition, offset);
        this.consumer.commitSync();
    }

    private void printInitialization() {
        if (!this.initialised) {
            this.initialised = true;
            this.LOG.info("TaskRunner initialised");
        }
    }
}

