/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;

public final class MessageSubscriptionRejectProcessor
implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final SideEffectWriter sideEffectWriter;

    public MessageSubscriptionRejectProcessor(MessageState messageState, MessageSubscriptionState subscriptionState, SubscriptionCommandSender commandSender, Writers writers) {
        this.messageState = messageState;
        this.subscriptionState = subscriptionState;
        this.commandSender = commandSender;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.sideEffectWriter = writers.sideEffect();
    }

    @Override
    public void processRecord(TypedRecord<MessageSubscriptionRecord> record) {
        MessageSubscriptionRecord subscriptionRecord = record.getValue();
        if (!this.messageState.existMessageCorrelation(subscriptionRecord.getMessageKey(), subscriptionRecord.getBpmnProcessIdBuffer())) {
            this.rejectCommand(record);
            return;
        }
        this.stateWriter.appendFollowUpEvent(record.getKey(), (Intent)MessageSubscriptionIntent.REJECTED, (RecordValue)subscriptionRecord);
        this.findSubscriptionToCorrelate(subscriptionRecord);
    }

    private void findSubscriptionToCorrelate(MessageSubscriptionRecord subscriptionRecord) {
        long messageKey = subscriptionRecord.getMessageKey();
        StoredMessage storedMessage = this.messageState.getMessage(messageKey);
        if (storedMessage == null) {
            return;
        }
        this.subscriptionState.visitSubscriptions(subscriptionRecord.getMessageNameBuffer(), subscriptionRecord.getCorrelationKeyBuffer(), subscription -> {
            boolean canBeCorrelated;
            MessageSubscriptionRecord correlatingSubscription = subscription.getRecord();
            boolean bl = canBeCorrelated = correlatingSubscription.getBpmnProcessIdBuffer().equals(subscriptionRecord.getBpmnProcessIdBuffer()) && !subscription.isCorrelating();
            if (canBeCorrelated) {
                correlatingSubscription.setMessageKey(messageKey).setVariables(storedMessage.getMessage().getVariablesBuffer());
                this.stateWriter.appendFollowUpEvent(subscription.getKey(), (Intent)MessageSubscriptionIntent.CORRELATING, (RecordValue)correlatingSubscription);
                this.sideEffectWriter.appendSideEffect(() -> this.sendCorrelateCommand(correlatingSubscription));
            }
            return !canBeCorrelated;
        });
    }

    private boolean sendCorrelateCommand(MessageSubscriptionRecord subscription) {
        return this.commandSender.correlateProcessMessageSubscription(subscription.getProcessInstanceKey(), subscription.getElementInstanceKey(), subscription.getBpmnProcessIdBuffer(), subscription.getMessageNameBuffer(), subscription.getMessageKey(), subscription.getVariablesBuffer(), subscription.getCorrelationKeyBuffer());
    }

    private void rejectCommand(TypedRecord<MessageSubscriptionRecord> record) {
        MessageSubscriptionRecord subscription = record.getValue();
        String reason = String.format("Expected message '%d' to be correlated for process with BPMN process id '%s' but no correlation was found", subscription.getMessageKey(), subscription.getBpmnProcessId());
        this.rejectionWriter.appendRejection(record, RejectionType.INVALID_STATE, reason);
    }
}

