/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStatus;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.backgroundtasks.distributed.KafkaLogger;
import ai.grakn.engine.backgroundtasks.distributed.RebalanceListener;
import ai.grakn.engine.backgroundtasks.taskstorage.GraknStateStorage;
import ai.grakn.engine.backgroundtasks.taskstorage.SynchronizedStateStorage;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.engine.util.ExceptionWrapper;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javafx.util.Pair;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class Scheduler
implements Runnable,
AutoCloseable {
    private static final ConfigProperties properties = ConfigProperties.getInstance();
    private final KafkaLogger LOG = KafkaLogger.getInstance();
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private GraknStateStorage stateStorage;
    private SynchronizedStateStorage zkStorage;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private ScheduledExecutorService schedulingService;
    private CountDownLatch waitToClose;
    private boolean initialised = false;
    private volatile boolean running = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.running = true;
        this.restartRecurringTasks();
        try {
            while (this.running) {
                this.printInitialization();
                this.LOG.debug("Scheduler polling, size of new tasks " + this.consumer.endOffsets((Collection)this.consumer.partitionsFor("new-tasks").stream().map(i -> new TopicPartition("new-tasks", i.partition())).collect(Collectors.toSet())));
                ConsumerRecords records = this.consumer.poll((long)properties.getPropertyAsInt("tasks.scheduler.polling-frequency"));
                for (ConsumerRecord record : records) {
                    this.LOG.debug(String.format("Scheduler received topic = %s, partition = %s, offset = %s, taskid = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    this.scheduleTask((String)record.key(), (String)record.value());
                    this.LOG.debug("Scheduler acknowledging " + (String)record.key() + " OFFSET " + (record.offset() + 1L) + " topic " + record.topic());
                    this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L);
                }
            }
        }
        catch (WakeupException e) {
            this.LOG.debug("Shutting down scheduler consumer");
        }
        finally {
            this.consumer.commitSync();
            this.consumer.close();
            this.waitToClose.countDown();
        }
    }

    public Scheduler open() throws Exception {
        if (this.OPENED.compareAndSet(false, true)) {
            this.stateStorage = new GraknStateStorage();
            this.consumer = ConfigHelper.kafkaConsumer("schedulers");
            this.consumer.subscribe(Collections.singletonList("new-tasks"), (ConsumerRebalanceListener)new RebalanceListener(this.consumer));
            this.producer = ConfigHelper.kafkaProducer();
            this.zkStorage = SynchronizedStateStorage.getInstance();
            this.waitToClose = new CountDownLatch(1);
            this.schedulingService = Executors.newScheduledThreadPool(1);
            this.LOG.debug("Scheduler started");
        } else {
            this.LOG.error("Scheduled already opened!");
        }
        return this;
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            this.running = false;
            ExceptionWrapper.noThrow(() -> this.consumer.wakeup(), "Could not wake up scheduler thread.");
            try {
                this.waitToClose.await(5L * properties.getPropertyAsLong("tasks.scheduler.polling-frequency"), TimeUnit.MILLISECONDS);
            }
            catch (Throwable t) {
                this.LOG.error("Exception whilst waiting for scheduler run() thread to finish - " + ExceptionUtils.getFullStackTrace((Throwable)t));
            }
            ExceptionWrapper.noThrow(this.schedulingService::shutdown, "Could not shutdown scheduling service.");
            ExceptionWrapper.noThrow(() -> this.producer.flush(), "Could not flush Kafka producer in scheduler.");
            ExceptionWrapper.noThrow(() -> this.producer.close(), "Could not close Kafka producer in scheduler.");
            this.stateStorage = null;
            this.zkStorage = null;
            ExceptionWrapper.noThrow(() -> this.LOG.debug("Scheduler stopped."), "Kafka logging error.");
        } else {
            this.LOG.error("Scheduler open() must be called before close()!");
        }
    }

    private void scheduleTask(String id, String configuration) {
        TaskState state = this.stateStorage.getState(id);
        this.scheduleTask(id, configuration, state);
    }

    private void scheduleTask(String id, String configuration, TaskState state) {
        long delay = state.runAt().getTime() - new Date().getTime();
        this.markAsScheduled(id);
        if (state.isRecurring().booleanValue()) {
            this.LOG.debug("Scheduling recurring " + id);
            Runnable submit = () -> {
                this.markAsScheduled(id);
                this.sendToWorkQueue(id, configuration);
            };
            this.schedulingService.scheduleAtFixedRate(submit, delay, state.interval(), TimeUnit.MILLISECONDS);
        } else {
            this.LOG.debug("Scheduling once " + id + " @ " + delay);
            Runnable submit = () -> this.sendToWorkQueue(id, configuration);
            this.schedulingService.schedule(submit, delay, TimeUnit.MILLISECONDS);
        }
    }

    private void markAsScheduled(String id) {
        this.LOG.debug("Marking " + id + " as scheduled");
        this.zkStorage.updateState(id, TaskStatus.SCHEDULED, null, null);
        this.stateStorage.updateState(id, TaskStatus.SCHEDULED, this.getClass().getName(), null, null, null, null);
    }

    private void sendToWorkQueue(String taskId, String configuration) {
        this.LOG.debug("Sending to work queue " + taskId);
        this.producer.send(new ProducerRecord("work-queue", (Object)taskId, (Object)configuration), (Callback)new KafkaLoggingCallback());
        this.producer.flush();
    }

    private void restartRecurringTasks() {
        Set<Pair<String, TaskState>> tasks = this.stateStorage.getTasks(null, null, null, 0, 0, true);
        tasks.stream().filter(p -> ((TaskState)p.getValue()).status() != TaskStatus.STOPPED).forEach(p -> {
            String config = ((TaskState)p.getValue()).configuration() == null ? "{}" : ((TaskState)p.getValue()).configuration().toString();
            this.scheduleTask((String)p.getKey(), config, (TaskState)p.getValue());
        });
        this.LOG.debug("Scheduler restarted " + tasks.size() + " recurring tasks");
    }

    private void printInitialization() {
        if (!this.initialised) {
            this.initialised = true;
            this.LOG.info("Scheduler initialised");
        }
    }

    private class KafkaLoggingCallback
    implements Callback {
        private KafkaLoggingCallback() {
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                Scheduler.this.LOG.debug(ExceptionUtils.getFullStackTrace((Throwable)exception));
            }
        }
    }
}

