/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.stream.impl;

import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ScheduledTimer;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.retry.AbortableRetryStrategy;
import io.camunda.zeebe.stream.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.stream.api.scheduling.ScheduledCommandCache;
import io.camunda.zeebe.stream.api.scheduling.SimpleProcessingScheduleService;
import io.camunda.zeebe.stream.api.scheduling.Task;
import io.camunda.zeebe.stream.api.scheduling.TaskResult;
import io.camunda.zeebe.stream.impl.BufferedTaskResultBuilder;
import io.camunda.zeebe.stream.impl.Loggers;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.time.Duration;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;

public class ProcessingScheduleServiceImpl
implements SimpleProcessingScheduleService,
AutoCloseable {
    private static final SimpleProcessingScheduleService.ScheduledTask NOOP_SCHEDULED_TASK = () -> {};
    private static final Logger LOG = Loggers.STREAM_PROCESSING;
    private final Supplier<StreamProcessor.Phase> streamProcessorPhaseSupplier;
    private final BooleanSupplier abortCondition;
    private final Supplier<ActorFuture<LogStreamWriter>> writerAsyncSupplier;
    private final ScheduledCommandCache.StageableScheduledCommandCache commandCache;
    private LogStreamWriter logStreamWriter;
    private ActorControl actorControl;
    private AbortableRetryStrategy writeRetryStrategy;
    private CompletableActorFuture<Void> openFuture;

    public ProcessingScheduleServiceImpl(Supplier<StreamProcessor.Phase> streamProcessorPhaseSupplier, BooleanSupplier abortCondition, Supplier<ActorFuture<LogStreamWriter>> writerAsyncSupplier, ScheduledCommandCache.StageableScheduledCommandCache commandCache) {
        this.streamProcessorPhaseSupplier = streamProcessorPhaseSupplier;
        this.abortCondition = abortCondition;
        this.writerAsyncSupplier = writerAsyncSupplier;
        this.commandCache = commandCache;
    }

    @Override
    public SimpleProcessingScheduleService.ScheduledTask runDelayed(Duration delay, Runnable followUpTask) {
        if (this.actorControl == null) {
            LOG.warn("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
            return NOOP_SCHEDULED_TASK;
        }
        ScheduledTimer scheduledTimer = this.actorControl.schedule(delay, followUpTask);
        return () -> ((ScheduledTimer)scheduledTimer).cancel();
    }

    @Override
    public SimpleProcessingScheduleService.ScheduledTask runDelayed(Duration delay, Task task) {
        return this.runDelayed(delay, this.toRunnable(task));
    }

    @Override
    public SimpleProcessingScheduleService.ScheduledTask runAt(long timestamp, Task task) {
        return this.runAt(timestamp, this.toRunnable(task));
    }

    @Override
    public SimpleProcessingScheduleService.ScheduledTask runAt(long timestamp, Runnable task) {
        if (this.actorControl == null) {
            LOG.warn("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
            return NOOP_SCHEDULED_TASK;
        }
        ScheduledTimer scheduledTimer = this.actorControl.runAt(timestamp, task);
        return () -> ((ScheduledTimer)scheduledTimer).cancel();
    }

    @Override
    public void runAtFixedRate(Duration delay, Task task) {
        this.runDelayed(delay, this.toRunnable(builder -> {
            try {
                TaskResult taskResult = task.execute(builder);
                return taskResult;
            }
            finally {
                this.runAtFixedRate(delay, task);
            }
        }));
    }

    public ActorFuture<Void> open(ActorControl control) {
        if (this.openFuture != null) {
            return this.openFuture;
        }
        this.openFuture = new CompletableActorFuture();
        this.writeRetryStrategy = new AbortableRetryStrategy(control);
        ActorFuture<LogStreamWriter> writerFuture = this.writerAsyncSupplier.get();
        control.runOnCompletion(writerFuture, (writer, failure) -> {
            if (failure == null) {
                this.logStreamWriter = writer;
                this.actorControl = control;
                this.openFuture.complete(null);
            } else {
                this.openFuture.completeExceptionally(failure);
            }
        });
        return this.openFuture;
    }

    @Override
    public void close() {
        this.actorControl = null;
        this.logStreamWriter = null;
        this.writeRetryStrategy = null;
        this.openFuture = null;
    }

    Runnable toRunnable(Task task) {
        return () -> {
            if (this.abortCondition.getAsBoolean()) {
                return;
            }
            StreamProcessor.Phase currentStreamProcessorPhase = this.streamProcessorPhaseSupplier.get();
            if (currentStreamProcessorPhase != StreamProcessor.Phase.PROCESSING) {
                LOG.trace("Not able to execute scheduled task right now. [streamProcessorPhase: {}]", (Object)currentStreamProcessorPhase);
                this.actorControl.submit(this.toRunnable(task));
                return;
            }
            ScheduledCommandCache.StagedScheduledCommandCache stagedCache = this.commandCache.stage();
            BufferedTaskResultBuilder builder = new BufferedTaskResultBuilder((arg_0, arg_1) -> ((LogStreamWriter)this.logStreamWriter).canWriteEvents(arg_0, arg_1), stagedCache);
            TaskResult result = task.execute(builder);
            ImmutableRecordBatch recordBatch = result.getRecordBatch();
            stagedCache.persist();
            ActorFuture writeFuture = this.writeRetryStrategy.runWithRetry(() -> {
                LOG.trace("Write scheduled TaskResult to dispatcher!");
                if (recordBatch.isEmpty()) {
                    return true;
                }
                return this.logStreamWriter.tryWrite(recordBatch.entries()).isRight();
            }, this.abortCondition);
            writeFuture.onComplete((v, t) -> {
                if (t != null) {
                    stagedCache.rollback();
                    LOG.warn("Writing of scheduled TaskResult failed!", t);
                }
            });
        };
    }
}

