/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.scheduled;

import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService;
import io.camunda.zeebe.stream.api.scheduling.SimpleProcessingScheduleService;
import io.camunda.zeebe.stream.api.scheduling.Task;
import io.camunda.zeebe.stream.api.scheduling.TaskResult;
import io.camunda.zeebe.stream.api.scheduling.TaskResultBuilder;
import io.camunda.zeebe.util.AtomicUtil;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public final class DueDateChecker
implements StreamProcessorLifecycleAware {
    private final boolean scheduleAsync;
    private final long timerResolution;
    private final Function<TaskResultBuilder, Long> visitor;
    private ScheduleDelayed scheduleService;
    private boolean shouldRescheduleChecker;
    private final AtomicReference<NextExecution> nextExecution = new AtomicReference<NextExecution.None>(new NextExecution.None());

    public DueDateChecker(long timerResolution, boolean scheduleAsync, Function<TaskResultBuilder, Long> visitor) {
        this.timerResolution = timerResolution;
        this.scheduleAsync = scheduleAsync;
        this.visitor = visitor;
    }

    TaskResult execute(TaskResultBuilder taskResultBuilder) {
        this.nextExecution.set(new NextExecution.None());
        long nextDueDate = this.visitor.apply(taskResultBuilder);
        if (nextDueDate > 0L) {
            this.schedule(nextDueDate);
        }
        return taskResultBuilder.build();
    }

    public void schedule(long dueDate) {
        if (!this.shouldRescheduleChecker) {
            return;
        }
        NextExecution replacedExecution = (NextExecution)AtomicUtil.replace(this.nextExecution, currentlyPlanned -> {
            NextExecution.Scheduled currentlyScheduled;
            long now = ActorClock.currentTimeMillis();
            long scheduleFor = now + Math.max(dueDate - now, this.timerResolution);
            if (!(currentlyPlanned instanceof NextExecution.Scheduled) || (currentlyScheduled = (NextExecution.Scheduled)currentlyPlanned).scheduledFor() - scheduleFor > this.timerResolution) {
                Duration delay = Duration.ofMillis(scheduleFor - now);
                SimpleProcessingScheduleService.ScheduledTask task = this.scheduleService.runDelayed(delay, this::execute);
                return Optional.of(new NextExecution.Scheduled(scheduleFor, task));
            }
            return Optional.empty();
        }, NextExecution::cancel);
        if (replacedExecution != null) {
            replacedExecution.cancel();
        }
    }

    public void onRecovered(ReadonlyStreamProcessorContext processingContext) {
        ProcessingScheduleService scheduleService = processingContext.getScheduleService();
        this.scheduleService = this.scheduleAsync ? (arg_0, arg_1) -> ((ProcessingScheduleService)scheduleService).runDelayedAsync(arg_0, arg_1) : (arg_0, arg_1) -> ((ProcessingScheduleService)scheduleService).runDelayed(arg_0, arg_1);
        this.shouldRescheduleChecker = true;
        this.schedule(-1L);
    }

    public void onClose() {
        this.shouldRescheduleChecker = false;
    }

    public void onFailed() {
        this.shouldRescheduleChecker = false;
    }

    public void onPaused() {
        this.shouldRescheduleChecker = false;
    }

    public void onResumed() {
        this.shouldRescheduleChecker = true;
        this.schedule(-1L);
    }

    static interface NextExecution {
        public void cancel();

        public record Scheduled(long scheduledFor, SimpleProcessingScheduleService.ScheduledTask task) implements NextExecution
        {
            @Override
            public void cancel() {
                this.task.cancel();
            }
        }

        public record None() implements NextExecution
        {
            @Override
            public void cancel() {
            }
        }
    }

    @FunctionalInterface
    static interface ScheduleDelayed {
        public SimpleProcessingScheduleService.ScheduledTask runDelayed(Duration var1, Task var2);
    }
}

