/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.partitionapi;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.serializer.serializers.DefaultSerializers;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.protocol.InterPartitionMessageEncoder;
import io.camunda.zeebe.broker.protocol.MessageHeaderEncoder;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Objects;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

final class InterPartitionCommandSenderImpl
implements InterPartitionCommandSender {
    public static final String TOPIC_PREFIX = "inter-partition-";
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final ClusterCommunicationService communicationService;
    private final Int2IntHashMap partitionLeaders = new Int2IntHashMap(-1);
    private long checkpointId = -1L;

    public InterPartitionCommandSenderImpl(ClusterCommunicationService communicationService) {
        this.communicationService = communicationService;
    }

    public void sendCommand(int receiverPartitionId, ValueType valueType, Intent intent, UnifiedRecordValue command) {
        this.sendCommand(receiverPartitionId, valueType, intent, null, command);
    }

    public void sendCommand(int receiverPartitionId, ValueType valueType, Intent intent, Long recordKey, UnifiedRecordValue command) {
        if (!this.partitionLeaders.containsKey(receiverPartitionId)) {
            LOG.warn("Not sending command {} {} to {}, no known leader for this partition", new Object[]{valueType, intent, receiverPartitionId});
            return;
        }
        int partitionLeader = this.partitionLeaders.get(receiverPartitionId);
        LOG.trace("Sending command {} {} to partition {}, leader {}", new Object[]{valueType, intent, receiverPartitionId, partitionLeader});
        byte[] message = Encoder.encode(this.checkpointId, receiverPartitionId, valueType, intent, recordKey, (BufferWriter)command);
        this.communicationService.unicast(TOPIC_PREFIX + receiverPartitionId, (Object)message, arg_0 -> ((Serializer)DefaultSerializers.BASIC).encode(arg_0), MemberId.from((String)("" + partitionLeader)), true);
    }

    void setCheckpointId(long checkpointId) {
        this.checkpointId = checkpointId;
    }

    void setCurrentLeader(int partitionId, int currentLeader) {
        this.partitionLeaders.put(partitionId, currentLeader);
    }

    private static final class Encoder {
        private Encoder() {
        }

        private static byte[] encode(long checkpointId, int receiverPartitionId, ValueType valueType, Intent intent, Long recordKey, BufferWriter command) {
            int messageLength = 28 + InterPartitionMessageEncoder.commandHeaderLength() + command.getLength();
            MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
            InterPartitionMessageEncoder bodyEncoder = new InterPartitionMessageEncoder();
            UnsafeBuffer commandBuffer = new UnsafeBuffer(new byte[command.getLength()]);
            UnsafeBuffer messageBuffer = new UnsafeBuffer(new byte[messageLength]);
            command.write((MutableDirectBuffer)commandBuffer, 0);
            bodyEncoder.wrapAndApplyHeader((MutableDirectBuffer)messageBuffer, 0, headerEncoder).checkpointId(checkpointId).receiverPartitionId(receiverPartitionId).valueType(valueType.value()).intent(intent.value()).putCommand((DirectBuffer)commandBuffer, 0, command.getLength());
            bodyEncoder.recordKey(Objects.requireNonNullElseGet(recordKey, InterPartitionMessageEncoder::recordKeyNullValue));
            return messageBuffer.byteArray();
        }
    }
}

