/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnBehaviors;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.message.TransientPendingSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;

public final class ProcessMessageSubscriptionCorrelateProcessor
implements TypedRecordProcessor<ProcessMessageSubscriptionRecord> {
    private static final String NO_EVENT_OCCURRED_MESSAGE = "Expected to correlate a process message subscription with element key '%d' and message name '%s', but the subscription is not active anymore";
    private static final String NO_SUBSCRIPTION_FOUND_MESSAGE = "Expected to correlate process message subscription with element key '%d' and message name '%s', but no such subscription was found";
    private static final String ALREADY_CLOSING_MESSAGE = "Expected to correlate process message subscription with element key '%d' and message name '%s', but it is already closing";
    private final ProcessMessageSubscriptionState subscriptionState;
    private final TransientPendingSubscriptionState transientProcessMessageSubscriptionState;
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final ProcessState processState;
    private final ElementInstanceState elementInstanceState;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final SideEffectWriter sideEffectWriter;
    private final EventHandle eventHandle;

    public ProcessMessageSubscriptionCorrelateProcessor(ProcessMessageSubscriptionState subscriptionState, SubscriptionCommandSender subscriptionCommandSender, MutableProcessingState processingState, BpmnBehaviors bpmnBehaviors, Writers writers, TransientPendingSubscriptionState transientProcessMessageSubscriptionState) {
        this.subscriptionState = subscriptionState;
        this.transientProcessMessageSubscriptionState = transientProcessMessageSubscriptionState;
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.processState = processingState.getProcessState();
        this.elementInstanceState = processingState.getElementInstanceState();
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.sideEffectWriter = writers.sideEffect();
        this.eventHandle = new EventHandle(processingState.getKeyGenerator(), processingState.getEventScopeInstanceState(), writers, this.processState, bpmnBehaviors.eventTriggerBehavior(), bpmnBehaviors.stateBehavior());
    }

    @Override
    public void processRecord(TypedRecord<ProcessMessageSubscriptionRecord> command) {
        ProcessMessageSubscriptionRecord record = (ProcessMessageSubscriptionRecord)command.getValue();
        long elementInstanceKey = record.getElementInstanceKey();
        String messageName = record.getMessageName();
        String tenantId = record.getTenantId();
        ProcessMessageSubscription subscription = this.subscriptionState.getSubscription(elementInstanceKey, record.getMessageNameBuffer(), tenantId);
        if (subscription == null) {
            this.rejectCommand(command, RejectionType.NOT_FOUND, NO_SUBSCRIPTION_FOUND_MESSAGE);
            return;
        }
        if (subscription.isClosing()) {
            this.rejectCommand(command, RejectionType.INVALID_STATE, ALREADY_CLOSING_MESSAGE);
            return;
        }
        if (this.hasAlreadyBeenCorrelated(record, subscription)) {
            this.rejectionWriter.appendRejection(command, RejectionType.INVALID_STATE, "Already correlated this message");
            this.sendAcknowledgeCommand(record);
            return;
        }
        ElementInstance elementInstance = this.elementInstanceState.getInstance(elementInstanceKey);
        boolean canTriggerElement = this.eventHandle.canTriggerElement(elementInstance, subscription.getRecord().getElementIdBuffer());
        if (!canTriggerElement) {
            this.rejectCommand(command, RejectionType.INVALID_STATE, NO_EVENT_OCCURRED_MESSAGE);
            return;
        }
        ProcessMessageSubscriptionRecord subscriptionRecord = subscription.getRecord();
        record.setElementId(subscriptionRecord.getElementIdBuffer()).setInterrupting(subscriptionRecord.isInterrupting());
        this.stateWriter.appendFollowUpEvent(subscription.getKey(), (Intent)ProcessMessageSubscriptionIntent.CORRELATED, (RecordValue)record);
        this.sideEffectWriter.appendSideEffect(() -> {
            this.transientProcessMessageSubscriptionState.remove(new TransientPendingSubscriptionState.PendingSubscription(elementInstanceKey, messageName, tenantId));
            return true;
        });
        ExecutableFlowElement catchEvent = this.getCatchEvent(elementInstance.getValue(), record.getElementIdBuffer());
        this.eventHandle.activateElement(catchEvent, elementInstanceKey, elementInstance.getValue(), record.getVariablesBuffer());
        this.sendAcknowledgeCommand(record);
    }

    private boolean hasAlreadyBeenCorrelated(ProcessMessageSubscriptionRecord record, ProcessMessageSubscription subscription) {
        long lastCorrelatedMessageKey;
        long messageKey = record.getMessageKey();
        return messageKey <= (lastCorrelatedMessageKey = subscription.getRecord().getMessageKey());
    }

    private ExecutableFlowElement getCatchEvent(ProcessInstanceRecord elementRecord, DirectBuffer elementId) {
        return this.processState.getFlowElement(elementRecord.getProcessDefinitionKey(), elementRecord.getTenantId(), elementId, ExecutableFlowElement.class);
    }

    private void rejectCommand(TypedRecord<ProcessMessageSubscriptionRecord> command, RejectionType rejectionType, String reasonTemplate) {
        ProcessMessageSubscriptionRecord subscription = (ProcessMessageSubscriptionRecord)command.getValue();
        String reason = String.format(reasonTemplate, subscription.getElementInstanceKey(), BufferUtil.bufferAsString((DirectBuffer)subscription.getMessageNameBuffer()));
        this.rejectionWriter.appendRejection(command, rejectionType, reason);
        this.sendRejectionCommand(subscription);
    }

    private void sendAcknowledgeCommand(ProcessMessageSubscriptionRecord subscription) {
        this.subscriptionCommandSender.correlateMessageSubscription(subscription.getMessageKey(), subscription.getSubscriptionPartitionId(), subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessIdBuffer(), subscription.getMessageNameBuffer(), subscription.getTenantId());
    }

    private void sendRejectionCommand(ProcessMessageSubscriptionRecord subscription) {
        this.subscriptionCommandSender.rejectCorrelateMessageSubscription(subscription.getSubscriptionPartitionId(), subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessIdBuffer(), subscription.getMessageKey(), subscription.getMessageNameBuffer(), subscription.getCorrelationKeyBuffer(), subscription.getTenantId());
    }
}

