/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.message.MessageCorrelator;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;

public final class MessageSubscriptionCreateProcessor
implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private static final String SUBSCRIPTION_ALREADY_OPENED_MESSAGE = "Expected to open a new message subscription for element with key '%d' and message name '%s', but there is already a message subscription for that element key and message name opened";
    private final MessageCorrelator messageCorrelator;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private final StateWriter stateWriter;
    private final KeyGenerator keyGenerator;
    private MessageSubscriptionRecord subscriptionRecord;
    private final TypedRejectionWriter rejectionWriter;

    public MessageSubscriptionCreateProcessor(int partitionId, MessageState messageState, MessageSubscriptionState subscriptionState, SubscriptionCommandSender commandSender, Writers writers, KeyGenerator keyGenerator) {
        this.subscriptionState = subscriptionState;
        this.commandSender = commandSender;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        SideEffectWriter sideEffectWriter = writers.sideEffect();
        this.keyGenerator = keyGenerator;
        this.messageCorrelator = new MessageCorrelator(partitionId, messageState, commandSender, this.stateWriter, sideEffectWriter);
    }

    @Override
    public void processRecord(TypedRecord<MessageSubscriptionRecord> record) {
        this.subscriptionRecord = record.getValue();
        if (this.subscriptionState.existSubscriptionForElementInstance(this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageNameBuffer())) {
            this.sendAcknowledgeCommand();
            this.rejectionWriter.appendRejection(record, RejectionType.INVALID_STATE, String.format(SUBSCRIPTION_ALREADY_OPENED_MESSAGE, this.subscriptionRecord.getElementInstanceKey(), BufferUtil.bufferAsString((DirectBuffer)this.subscriptionRecord.getMessageNameBuffer())));
            return;
        }
        this.handleNewSubscription();
    }

    private void handleNewSubscription() {
        long subscriptionKey = this.keyGenerator.nextKey();
        this.stateWriter.appendFollowUpEvent(subscriptionKey, (Intent)MessageSubscriptionIntent.CREATED, (RecordValue)this.subscriptionRecord);
        boolean isMessageCorrelated = this.messageCorrelator.correlateNextMessage(subscriptionKey, this.subscriptionRecord);
        if (!isMessageCorrelated) {
            this.sendAcknowledgeCommand();
        }
    }

    private boolean sendAcknowledgeCommand() {
        return this.commandSender.openProcessMessageSubscription(this.subscriptionRecord.getProcessInstanceKey(), this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageNameBuffer(), this.subscriptionRecord.isInterrupting());
    }
}

