/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService;
import java.time.Duration;

public final class PendingProcessMessageSubscriptionChecker
implements StreamProcessorLifecycleAware {
    private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10L);
    private static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30L);
    private final SubscriptionCommandSender commandSender;
    private final MutablePendingProcessMessageSubscriptionState pendingState;
    private final long subscriptionTimeoutInMillis;
    private ProcessingScheduleService scheduleService;
    private boolean schouldRescheduleTimer = false;

    public PendingProcessMessageSubscriptionChecker(SubscriptionCommandSender commandSender, MutablePendingProcessMessageSubscriptionState pendingState) {
        this.commandSender = commandSender;
        this.pendingState = pendingState;
        this.subscriptionTimeoutInMillis = SUBSCRIPTION_TIMEOUT.toMillis();
    }

    public void onRecovered(ReadonlyStreamProcessorContext context) {
        this.scheduleService = context.getScheduleService();
        this.schouldRescheduleTimer = true;
        this.rescheduleTimer();
    }

    public void onClose() {
        this.cancelTimer();
    }

    public void onFailed() {
        this.cancelTimer();
    }

    public void onPaused() {
        this.cancelTimer();
    }

    public void onResumed() {
        this.schouldRescheduleTimer = true;
        this.rescheduleTimer();
    }

    private void rescheduleTimer() {
        if (this.schouldRescheduleTimer) {
            this.scheduleService.runDelayed(SUBSCRIPTION_CHECK_INTERVAL, this::checkPendingSubscriptions);
        }
    }

    private void cancelTimer() {
        this.schouldRescheduleTimer = false;
    }

    private void checkPendingSubscriptions() {
        this.pendingState.visitSubscriptionBefore(ActorClock.currentTimeMillis() - this.subscriptionTimeoutInMillis, this::sendPendingCommand);
        this.rescheduleTimer();
    }

    private boolean sendPendingCommand(ProcessMessageSubscription subscription) {
        boolean success = subscription.isOpening() ? this.sendOpenCommand(subscription) : this.sendCloseCommand(subscription);
        if (success) {
            long sentTime = ActorClock.currentTimeMillis();
            this.pendingState.updateSentTime(subscription.getRecord(), sentTime);
        }
        return success;
    }

    private boolean sendOpenCommand(ProcessMessageSubscription subscription) {
        return this.commandSender.openMessageSubscription(subscription.getRecord().getSubscriptionPartitionId(), subscription.getRecord().getProcessInstanceKey(), subscription.getRecord().getElementInstanceKey(), subscription.getRecord().getBpmnProcessIdBuffer(), subscription.getRecord().getMessageNameBuffer(), subscription.getRecord().getCorrelationKeyBuffer(), subscription.getRecord().isInterrupting());
    }

    private boolean sendCloseCommand(ProcessMessageSubscription subscription) {
        return this.commandSender.closeMessageSubscription(subscription.getRecord().getSubscriptionPartitionId(), subscription.getRecord().getProcessInstanceKey(), subscription.getRecord().getElementInstanceKey(), subscription.getRecord().getMessageNameBuffer());
    }
}

