/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.scheduler;

import io.quarkus.runtime.StartupEvent;
import io.vertx.axle.core.Vertx;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.VertxJobScheduler;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JobSchedulerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedulerManager.class);
    @ConfigProperty(name="kogito.jobs-service.schedulerChunkInMinutes")
    long schedulerChunkInMinutes;
    @ConfigProperty(name="kogito.jobs-service.loadJobIntervalInMinutes")
    long loadJobIntervalInMinutes;
    @ConfigProperty(name="kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes")
    long loadJobFromCurrentTimeIntervalInMinutes;
    @Inject
    VertxJobScheduler scheduler;
    @Inject
    ReactiveJobRepository repository;
    @Inject
    Vertx vertx;

    void onStartup(@Observes StartupEvent startupEvent) {
        if (this.loadJobIntervalInMinutes > this.schedulerChunkInMinutes) {
            LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be greater than schedulerChunkInMinutes ({}), setting value {} for both", new Object[]{this.loadJobIntervalInMinutes, this.schedulerChunkInMinutes, this.schedulerChunkInMinutes});
            this.loadJobIntervalInMinutes = this.schedulerChunkInMinutes;
        }
        this.vertx.runOnContext(v -> this.loadScheduledJobs());
        this.vertx.setPeriodic(TimeUnit.MINUTES.toMillis(this.loadJobIntervalInMinutes), id -> this.loadScheduledJobs());
    }

    void loadScheduledJobs() {
        this.loadJobsInCurrentChunk().filter(j -> !this.scheduler.scheduled(j.getId()).isPresent()).flatMapRsPublisher(t -> ErrorHandling.skipErrorPublisher(this.scheduler::schedule, t)).forEach(a -> LOGGER.debug("Loaded and scheduled job {}", (Object)a)).run().whenComplete((v, t) -> Optional.ofNullable(t).map(ex -> {
            LOGGER.error("Error Loading scheduled jobs!", ex);
            return null;
        }).orElseGet(() -> {
            LOGGER.info("Loading scheduled jobs completed !");
            return null;
        }));
    }

    private PublisherBuilder<ScheduledJob> loadJobsInCurrentChunk() {
        return this.repository.findByStatusBetweenDatesOrderByPriority(DateUtil.now().minusMinutes(this.loadJobFromCurrentTimeIntervalInMinutes), DateUtil.now().plusMinutes(this.schedulerChunkInMinutes), JobStatus.SCHEDULED, JobStatus.RETRY);
    }
}

