/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.BackgroundTask;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.distributed.ZookeeperConnection;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.EngineID;
import ai.grakn.engine.util.ExceptionWrapper;
import ai.grakn.exception.EngineStorageException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.zookeeper.CreateMode;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskRunner
implements Runnable,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private static final int POLLING_FREQUENCY = properties.getPropertyAsInt("tasks.runner.polling-frequency");
    private static final String ENGINE_ID = EngineID.getInstance().id();
    private final Set<String> runningTasks = new HashSet<String>();
    private final TaskStateStorage storage;
    private final ZookeeperConnection connection;
    private final CountDownLatch shutdownLatch;
    private final ExecutorService executor;
    private final int executorSize;
    private final AtomicInteger acceptedTasks = new AtomicInteger(0);
    private final KafkaConsumer<String, String> consumer;

    public TaskRunner(TaskStateStorage storage, ZookeeperConnection connection) {
        this.storage = storage;
        this.connection = connection;
        this.consumer = ConfigHelper.kafkaConsumer("task-runners");
        this.consumer.subscribe(Collections.singletonList("work-queue"), (ConsumerRebalanceListener)new HandleRebalance());
        this.registerAsRunning();
        this.updateOwnState();
        int numberAvailableThreads = properties.getAvailableThreads();
        this.executor = Executors.newFixedThreadPool(numberAvailableThreads);
        this.executorSize = numberAvailableThreads * 4;
        this.shutdownLatch = new CountDownLatch(1);
        LOG.info("TaskRunner started");
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (this.getAcceptedTasksCount() >= this.executorSize) {
                    continue;
                }
                this.processRecords((ConsumerRecords<String, String>)this.consumer.poll((long)POLLING_FREQUENCY));
            }
        }
        catch (WakeupException e) {
            LOG.debug("TaskRunner exiting, woken up.");
            ExceptionWrapper.noThrow(() -> this.consumer.commitSync(), "Exception syncing commits while closing in TaskRunner");
            ExceptionWrapper.noThrow(() -> this.consumer.close(), "Exception while closing consumer in TaskRunner");
            ExceptionWrapper.noThrow(this.shutdownLatch::countDown, "Exception while counting down close latch in TaskRunner");
        }
        catch (Throwable t) {
            try {
                LOG.error("Error in TaskRunner poll " + ExceptionUtils.getFullStackTrace((Throwable)t));
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                ExceptionWrapper.noThrow(() -> this.consumer.commitSync(), "Exception syncing commits while closing in TaskRunner");
                ExceptionWrapper.noThrow(() -> this.consumer.close(), "Exception while closing consumer in TaskRunner");
                ExceptionWrapper.noThrow(this.shutdownLatch::countDown, "Exception while counting down close latch in TaskRunner");
            }
        }
    }

    @Override
    public void close() {
        ExceptionWrapper.noThrow(() -> this.consumer.wakeup(), "Could not wake up task runner thread.");
        ExceptionWrapper.noThrow(this.shutdownLatch::await, "Error waiting for TaskRunner consumer to exit");
        ExceptionWrapper.noThrow(this.executor::shutdownNow, "Could not shutdown scheduling service.");
        LOG.debug("TaskRunner stopped");
    }

    private void processRecords(ConsumerRecords<String, String> records) {
        for (ConsumerRecord record : records) {
            LOG.debug(String.format("Received [%s], currently running: %s has: %s allowed: %s", record.key(), this.getRunningTasksCount(), this.getAcceptedTasksCount(), this.executorSize));
            if (this.getAcceptedTasksCount() >= this.executorSize) {
                this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset());
                break;
            }
            String id = (String)record.key();
            try {
                TaskState state = this.storage.getState(id);
                if (state.status() != TaskStatus.SCHEDULED) {
                    LOG.debug("Cant run this task - " + id + " because\n\t\tstatus: " + state.status());
                    continue;
                }
                this.storage.updateState(state.status(TaskStatus.RUNNING).statusChangedBy(this.getClass().getName()).engineID(ENGINE_ID));
                this.acceptedTasks.incrementAndGet();
                this.executor.submit(() -> this.executeTask(state));
                this.seekAndCommit(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L);
            }
            catch (EngineStorageException e) {
                LOG.error("Cant run this task - " + id + " because state was not found in storage");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask(TaskState state) {
        LOG.debug("Executing task " + state.getId());
        try {
            this.addRunningTask(state.getId());
            Class<?> c = Class.forName(state.taskClassName());
            BackgroundTask task = (BackgroundTask)c.newInstance();
            if (state.checkpoint() != null) {
                task.resume(this.saveCheckpoint(state), state.checkpoint());
            } else {
                task.start(this.saveCheckpoint(state), state.configuration());
            }
            this.storage.updateState(state.status(TaskStatus.COMPLETED));
        }
        catch (Throwable t) {
            this.storage.updateState(state.status(TaskStatus.FAILED));
            LOG.error("Failed task - " + state.getId() + ": " + ExceptionUtils.getFullStackTrace((Throwable)t));
        }
        finally {
            this.removeRunningTask(state.getId());
            this.acceptedTasks.decrementAndGet();
            LOG.debug("Finished executing task - " + state.getId());
        }
    }

    private Consumer<String> saveCheckpoint(TaskState taskState) {
        return checkpoint -> this.storage.updateState(taskState.checkpoint((String)checkpoint));
    }

    private void updateOwnState() {
        JSONArray out = new JSONArray();
        out.put(this.runningTasks);
        try {
            this.connection.connection().setData().forPath("/task_runners/last_state/" + ENGINE_ID, out.toString().getBytes(StandardCharsets.UTF_8));
        }
        catch (Exception e) {
            LOG.error("Could not update TaskRunner taskstorage in ZooKeeper! " + e);
        }
    }

    private void registerAsRunning() {
        try {
            if (this.connection.connection().checkExists().forPath("/task_runners/watch/" + ENGINE_ID) == null) {
                ((ACLBackgroundPathAndBytesable)this.connection.connection().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath("/task_runners/watch/" + ENGINE_ID);
            }
            if (this.connection.connection().checkExists().forPath("/task_runners/last_state/" + ENGINE_ID) == null) {
                this.connection.connection().create().creatingParentContainersIfNeeded().forPath("/task_runners/last_state/" + ENGINE_ID);
            }
        }
        catch (Exception exception) {
            throw new RuntimeException("Could not create Zookeeper paths in TaskRunner");
        }
        LOG.debug("Registered TaskRunner");
    }

    private synchronized int getAcceptedTasksCount() {
        return this.acceptedTasks.get();
    }

    private synchronized int getRunningTasksCount() {
        return this.runningTasks.size();
    }

    private synchronized void addRunningTask(String id) {
        this.runningTasks.add(id);
        this.updateOwnState();
    }

    private synchronized void removeRunningTask(String id) {
        this.runningTasks.remove(id);
        this.updateOwnState();
    }

    private void seekAndCommit(TopicPartition partition, long offset) {
        this.consumer.seek(partition, offset);
        this.consumer.commitSync();
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            LOG.debug("TaskRunner consumer partitions assigned " + partitions);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            TaskRunner.this.consumer.commitSync();
            LOG.debug("TaskRunner consumer partitions revoked " + partitions);
        }
    }
}

