/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.backgroundtasks.distributed;

import ai.grakn.engine.TaskStatus;
import ai.grakn.engine.backgroundtasks.TaskState;
import ai.grakn.engine.backgroundtasks.TaskStateStorage;
import ai.grakn.engine.backgroundtasks.config.ConfigHelper;
import ai.grakn.engine.util.ExceptionWrapper;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements Runnable,
AutoCloseable {
    private static final String STATUS_MESSAGE = "Topic [%s], partition [%s] received [%s] records, next offset is [%s]";
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final AtomicBoolean OPENED = new AtomicBoolean(false);
    private final TaskStateStorage storage;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private ScheduledExecutorService schedulingService;
    private CountDownLatch waitToClose;
    private volatile boolean running = false;

    public Scheduler(TaskStateStorage storage) {
        this.storage = storage;
        if (this.OPENED.compareAndSet(false, true)) {
            this.consumer = ConfigHelper.kafkaConsumer("schedulers");
            this.consumer.subscribe(Collections.singletonList("new-tasks"), (ConsumerRebalanceListener)new HandleRebalance());
            this.producer = ConfigHelper.kafkaProducer();
            this.waitToClose = new CountDownLatch(1);
            this.schedulingService = Executors.newScheduledThreadPool(1);
            LOG.debug("Scheduler started");
        } else {
            LOG.error("Scheduled already opened!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        block9: {
            this.running = true;
            this.restartRecurringTasks();
            try {
                while (this.running) {
                    ConsumerRecords records = this.consumer.poll(1000L);
                    this.printConsumerStatus((ConsumerRecords<String, String>)records);
                    for (ConsumerRecord record : records) {
                        TaskState taskState = TaskState.deserialize((String)record.value());
                        this.storage.newState(taskState);
                        this.scheduleTask(taskState);
                        this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L);
                    }
                }
                ExceptionWrapper.noThrow(() -> this.consumer.commitSync(), "Exception syncing commits while closing in Scheduler");
            }
            catch (WakeupException e) {
                LOG.debug("Shutting down scheduler consumer");
                break block9;
            }
            catch (Throwable t) {
                LOG.error("Error in scheduler poll " + ExceptionUtils.getFullStackTrace((Throwable)t));
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
                ExceptionWrapper.noThrow(() -> this.consumer.commitSync(), "Exception syncing commits while closing in Scheduler");
                ExceptionWrapper.noThrow(() -> this.consumer.close(), "Exception while closing consumer in Scheduler");
                ExceptionWrapper.noThrow(this.waitToClose::countDown, "Exception while counting down close latch in Scheduler");
                break block9;
            }
            finally {
                ExceptionWrapper.noThrow(() -> this.consumer.commitSync(), "Exception syncing commits while closing in Scheduler");
                ExceptionWrapper.noThrow(() -> this.consumer.close(), "Exception while closing consumer in Scheduler");
                ExceptionWrapper.noThrow(this.waitToClose::countDown, "Exception while counting down close latch in Scheduler");
            }
            ExceptionWrapper.noThrow(() -> this.consumer.close(), "Exception while closing consumer in Scheduler");
            ExceptionWrapper.noThrow(this.waitToClose::countDown, "Exception while counting down close latch in Scheduler");
        }
    }

    @Override
    public void close() {
        if (this.OPENED.compareAndSet(true, false)) {
            this.running = false;
            ExceptionWrapper.noThrow(() -> this.consumer.wakeup(), "Could not wake up scheduler thread.");
            ExceptionWrapper.noThrow(this.waitToClose::await, "Error waiting for TaskRunner consumer to exit");
            ExceptionWrapper.noThrow(this.schedulingService::shutdownNow, "Could not shutdown scheduling service.");
            ExceptionWrapper.noThrow(() -> this.producer.flush(), "Could not flush Kafka producer in scheduler.");
            ExceptionWrapper.noThrow(() -> this.producer.close(), "Could not close Kafka producer in scheduler.");
            LOG.debug("Scheduler stopped.");
        } else {
            LOG.error("Scheduler open() must be called before close()!");
        }
    }

    private void scheduleTask(TaskState state) {
        long delay = Duration.between(Instant.now(), state.runAt()).toMillis();
        this.markAsScheduled(state);
        if (state.isRecurring().booleanValue()) {
            Runnable submit = () -> {
                this.markAsScheduled(state);
                this.sendToWorkQueue(state);
            };
            this.schedulingService.scheduleAtFixedRate(submit, delay, state.interval(), TimeUnit.MILLISECONDS);
        } else {
            Runnable submit = () -> this.sendToWorkQueue(state);
            this.schedulingService.schedule(submit, delay, TimeUnit.MILLISECONDS);
        }
    }

    private void markAsScheduled(TaskState state) {
        LOG.debug("Marking " + state.getId() + " as scheduled");
        this.storage.updateState(state.status(TaskStatus.SCHEDULED));
    }

    private void sendToWorkQueue(TaskState state) {
        LOG.debug("Sending to work queue " + state.getId());
        this.producer.send(new ProducerRecord("work-queue", (Object)state.getId(), (Object)TaskState.serialize(state)), (Callback)new KafkaLoggingCallback());
        this.producer.flush();
    }

    private void restartRecurringTasks() {
        Set<TaskState> tasks = this.storage.getTasks(null, null, null, 0, 0);
        tasks.stream().filter(TaskState::isRecurring).filter(p -> p.status() != TaskStatus.STOPPED).forEach(this::scheduleTask);
    }

    private void printConsumerStatus(ConsumerRecords<String, String> records) {
        this.consumer.assignment().stream().map(p -> String.format(STATUS_MESSAGE, p.partition(), p.topic(), records.count(), this.consumer.position(p))).forEach(arg_0 -> ((Logger)LOG).debug(arg_0));
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            LOG.debug("Scheduler partitions assigned " + partitions);
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Scheduler.this.consumer.commitSync();
            LOG.debug("Scheduler partitions revoked " + partitions);
        }
    }

    private static class KafkaLoggingCallback
    implements Callback {
        private KafkaLoggingCallback() {
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                LOG.debug(ExceptionUtils.getFullStackTrace((Throwable)exception));
            }
        }
    }
}

