/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.scheduler;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTimerJobScheduler
implements ReactiveJobScheduler<ScheduledJob> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseTimerJobScheduler.class);
    @ConfigProperty(name="kogito.jobs-service.backoffRetryMillis", defaultValue="1000")
    long backoffRetryMillis;
    @ConfigProperty(name="kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue="60000")
    long maxIntervalLimitToRetryMillis;
    @Inject
    JobExecutor jobExecutor;
    @Inject
    ReactiveJobRepository jobRepository;

    public BaseTimerJobScheduler() {
    }

    public BaseTimerJobScheduler(JobExecutor jobExecutor, ReactiveJobRepository jobRepository, long backoffRetryMillis, long maxIntervalLimitToRetryMillis) {
        this.jobExecutor = jobExecutor;
        this.jobRepository = jobRepository;
        this.backoffRetryMillis = backoffRetryMillis;
        this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis;
    }

    @Override
    public Publisher<ScheduledJob> schedule(Job job) {
        LOGGER.debug("Scheduling {}", (Object)job);
        return ReactiveStreams.fromCompletionStage(this.jobRepository.exists(job.getId())).flatMap(exists -> Boolean.TRUE.equals(exists) ? this.handleExistingJob(job) : ReactiveStreams.of((Object)Boolean.TRUE)).filter(Boolean.TRUE::equals).map(checked -> job.getExpirationTime()).map(this::calculateDelay).peek(delay -> Optional.of(delay.isNegative()).filter(Boolean.FALSE::equals).orElseThrow(() -> new RuntimeException("Delay should be positive"))).map(delay -> this.doSchedule((Duration)delay, job)).flatMap(p -> p).map(scheduleId -> ScheduledJob.builder().job(job).scheduledId((String)scheduleId).status(JobStatus.SCHEDULED).build()).map(scheduledJob -> this.jobRepository.save((ScheduledJob)((Object)scheduledJob))).flatMapCompletionStage(p -> p).buildRs();
    }

    private PublisherBuilder<Boolean> handleExistingJob(Job job) {
        return ReactiveStreams.fromCompletionStage(this.jobRepository.get(job.getId())).flatMap(j -> {
            switch (j.getStatus()) {
                case SCHEDULED: {
                    return this.wasPeriodicScheduled((ScheduledJob)((Object)j)) ? this.handleJobExecutionSuccess((ScheduledJob)((Object)j)).map(periodic -> CompletableFuture.completedFuture(null)) : this.handleExpirationTime((ScheduledJob)((Object)j)).map(scheduled -> ScheduledJob.builder().of((ScheduledJob)((Object)((Object)scheduled))).status(JobStatus.CANCELED).build()).map(CompletableFuture::completedFuture).map(this::cancel);
                }
                case RETRY: {
                    return this.handleRetry(CompletableFuture.completedFuture(j));
                }
            }
            return ReactiveStreams.empty();
        }).map(j -> Boolean.TRUE).onErrorResumeWith(t -> ReactiveStreams.empty());
    }

    private Duration calculateDelay(ZonedDateTime expirationTime) {
        return Duration.between(DateUtil.now(), expirationTime);
    }

    private boolean validLimit(ScheduledJob job) {
        return Optional.of(job).map(Job::getRepeatLimit).filter(limit -> job.getExecutionCounter() < limit).isPresent();
    }

    private boolean wasPeriodicScheduled(ScheduledJob job) {
        return Optional.ofNullable(job).filter(j -> j.getExecutionCounter() > 1).isPresent();
    }

    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(ScheduledJob futureJob) {
        return ReactiveStreams.of((Object)((Object)futureJob)).map(job -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).incrementExecutionCounter().build()).flatMap(job -> job.hasInterval().filter(interval -> !this.wasPeriodicScheduled((ScheduledJob)((Object)job))).map(Duration::ofMillis).map(interval -> this.doPeriodicSchedule((Duration)interval, (Job)job).map(scheduledId -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).scheduledId((String)scheduledId).expirationTime(DateUtil.now().plus((TemporalAmount)interval)).status(JobStatus.SCHEDULED).build()).flatMapCompletionStage(this.jobRepository::save)).orElseGet(() -> ReactiveStreams.fromCompletionStage((CompletionStage)job.hasInterval().map(interval -> Optional.of(job).filter(this::wasPeriodicScheduled).filter(this::validLimit).map(s -> ScheduledJob.builder().of((ScheduledJob)((Object)job)).expirationTime(DateUtil.now().plus(Duration.ofMillis(interval))).build()).map(this.jobRepository::save).orElse(null)).orElseGet(() -> CompletableFuture.completedFuture(ScheduledJob.builder().of((ScheduledJob)((Object)job)).status(JobStatus.EXECUTED).build()))))).filter(job -> JobStatus.EXECUTED.equals((Object)job.getStatus())).flatMap(job -> ReactiveStreams.fromCompletionStage(this.cancel(CompletableFuture.completedFuture(job))));
    }

    @Override
    public PublisherBuilder<ScheduledJob> handleJobExecutionSuccess(JobExecutionResponse response) {
        return ReactiveStreams.of((Object)response).map(JobExecutionResponse::getJobId).flatMapCompletionStage(this.jobRepository::get).flatMap(this::handleJobExecutionSuccess);
    }

    private boolean isExpired(ZonedDateTime expirationTime) {
        Duration limit = Duration.ofMillis(this.maxIntervalLimitToRetryMillis);
        return this.calculateDelay(expirationTime).plus(limit).isNegative();
    }

    private PublisherBuilder<ScheduledJob> handleExpirationTime(ScheduledJob scheduledJob) {
        return ReactiveStreams.of((Object)((Object)scheduledJob)).map(Job::getExpirationTime).flatMapCompletionStage(time -> this.isExpired((ZonedDateTime)time) ? this.handleExpiredJob(scheduledJob) : CompletableFuture.completedFuture(scheduledJob));
    }

    @Override
    public PublisherBuilder<ScheduledJob> handleJobExecutionError(JobExecutionResponse errorResponse) {
        return this.handleRetry(this.jobRepository.get(errorResponse.getJobId()));
    }

    private PublisherBuilder<ScheduledJob> handleRetry(CompletionStage<ScheduledJob> futureJob) {
        return ReactiveStreams.fromCompletionStage(futureJob).flatMap(scheduledJob -> this.handleExpirationTime((ScheduledJob)((Object)scheduledJob)).map(ScheduledJob::getStatus).filter(s -> !JobStatus.ERROR.equals(s)).map(time -> this.doSchedule(Duration.ofMillis(this.backoffRetryMillis), (Job)scheduledJob)).flatMap(p -> p).map(scheduleId -> ScheduledJob.builder().of((ScheduledJob)((Object)scheduledJob)).scheduledId((String)scheduleId).status(JobStatus.RETRY).incrementRetries().build()).map(this.jobRepository::save).flatMapCompletionStage(p -> p)).peek(job -> LOGGER.debug("Retry executed {}", (Object)job));
    }

    private CompletionStage<ScheduledJob> handleExpiredJob(ScheduledJob scheduledJob) {
        return Optional.of(ScheduledJob.builder().of(scheduledJob).status(JobStatus.ERROR).build()).map(j -> this.jobRepository.delete((ScheduledJob)((Object)j)).thenApply(deleted -> {
            LOGGER.warn("Retry limit exceeded for job{}", (Object)j);
            return j;
        })).orElse(null);
    }

    public abstract PublisherBuilder<String> doSchedule(Duration var1, Job var2);

    public abstract PublisherBuilder<String> doPeriodicSchedule(Duration var1, Job var2);

    protected CompletionStage<ScheduledJob> execute(Job job) {
        LOGGER.debug("Executing job ! {}", (Object)job);
        return this.jobExecutor.execute(this.jobRepository.get(job.getId()));
    }

    public CompletionStage<ScheduledJob> cancel(CompletionStage<ScheduledJob> futureJob) {
        return ReactiveStreams.fromCompletionStageNullable(futureJob).peek(job -> LOGGER.debug("Cancel Job Scheduling {}", (Object)job)).flatMap(scheduledJob -> ReactiveStreams.of((Object)scheduledJob).flatMapRsPublisher(this::doCancel).map(c -> scheduledJob)).flatMapCompletionStage(this.jobRepository::delete).findFirst().run().thenApply(job -> job.orElse(null));
    }

    @Override
    public CompletionStage<ScheduledJob> cancel(String jobId) {
        return this.cancel(this.jobRepository.get(jobId).thenApply(scheduledJob -> ScheduledJob.builder().of((ScheduledJob)((Object)scheduledJob)).status(JobStatus.CANCELED).build()));
    }

    public abstract Publisher<Boolean> doCancel(ScheduledJob var1);

    public long getBackoffRetryMillis() {
        return this.backoffRetryMillis;
    }

    public void setBackoffRetryMillis(long backoffRetryMillis) {
        this.backoffRetryMillis = backoffRetryMillis;
    }

    public long getMaxIntervalLimitToRetryMillis() {
        return this.maxIntervalLimitToRetryMillis;
    }

    public void setMaxIntervalLimitToRetryMillis(long maxIntervalLimitToRetryMillis) {
        this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis;
    }
}

