/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.partitionapi;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.protocol.InterPartitionMessageDecoder;
import io.camunda.zeebe.broker.protocol.MessageHeaderDecoder;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import io.camunda.zeebe.stream.impl.TypedEventRegistry;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.ReflectUtil;
import java.util.Optional;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

final class InterPartitionCommandReceiverImpl {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final Decoder decoder = new Decoder();
    private final LogStreamWriter logStreamWriter;
    private boolean diskSpaceAvailable = true;
    private long checkpointId = -1L;

    InterPartitionCommandReceiverImpl(LogStreamWriter logStreamWriter) {
        this.logStreamWriter = logStreamWriter;
    }

    void handleMessage(MemberId memberId, byte[] message) {
        LOG.trace("Received message from {}", (Object)memberId);
        DecodedMessage decoded = this.decoder.decodeMessage(message);
        if (!this.diskSpaceAvailable) {
            LOG.warn("Ignoring command {} {} from {}, checkpoint {}, no disk space available", new Object[]{decoded.metadata.getValueType(), decoded.metadata.getIntent(), memberId, decoded.checkpointId});
            return;
        }
        Either<LogStreamWriter.WriteFailure, Long> checkpointWritten = this.writeCheckpoint(decoded);
        if (checkpointWritten.isLeft()) {
            this.logCheckpointFailure(memberId, decoded, checkpointWritten);
            return;
        }
        this.writeCommand(decoded).ifLeft(failure -> this.logWriteFailure(memberId, decoded, (LogStreamWriter.WriteFailure)failure));
    }

    private void logCheckpointFailure(MemberId memberId, DecodedMessage decoded, Either<LogStreamWriter.WriteFailure, Long> checkpointWritten) {
        LOG.warn("Failed to write new command for checkpoint {} (currently at {}), ignoring command {} {} from {} (error = {})", new Object[]{decoded.checkpointId, this.checkpointId, decoded.metadata.getValueType(), decoded.metadata.getIntent(), memberId, checkpointWritten.getLeft()});
    }

    private void logWriteFailure(MemberId memberId, DecodedMessage decoded, LogStreamWriter.WriteFailure failure) {
        LOG.warn("Failed to write command {} {} from {} to logstream (error = {})", new Object[]{decoded.metadata.getValueType(), decoded.metadata.getIntent(), memberId, failure});
    }

    private Either<LogStreamWriter.WriteFailure, Long> writeCheckpoint(DecodedMessage decoded) {
        if (decoded.checkpointId <= this.checkpointId) {
            return Either.right((Object)this.checkpointId);
        }
        LOG.debug("Received command with checkpoint {}, current checkpoint is {}", (Object)decoded.checkpointId, (Object)this.checkpointId);
        RecordMetadata metadata = new RecordMetadata().recordType(RecordType.COMMAND).intent((Intent)CheckpointIntent.CREATE).valueType(ValueType.CHECKPOINT);
        CheckpointRecord checkpointRecord = new CheckpointRecord().setCheckpointId(decoded.checkpointId);
        return this.logStreamWriter.tryWrite(LogAppendEntry.of((RecordMetadata)metadata, (UnifiedRecordValue)checkpointRecord));
    }

    private Either<LogStreamWriter.WriteFailure, Long> writeCommand(DecodedMessage decoded) {
        LogAppendEntry appendEntry = decoded.recordKey().map(key -> LogAppendEntry.of((long)key, (RecordMetadata)decoded.metadata(), (UnifiedRecordValue)decoded.command())).orElseGet(() -> LogAppendEntry.of((RecordMetadata)decoded.metadata(), (UnifiedRecordValue)decoded.command()));
        return this.logStreamWriter.tryWrite(appendEntry);
    }

    void setDiskSpaceAvailable(boolean available) {
        this.diskSpaceAvailable = available;
    }

    void setCheckpointId(long checkpointId) {
        this.checkpointId = checkpointId;
    }

    private static final class Decoder {
        private final InterPartitionMessageDecoder messageDecoder = new InterPartitionMessageDecoder();
        private final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();

        private Decoder() {
        }

        DecodedMessage decodeMessage(byte[] message) {
            UnsafeBuffer messageBuffer = new UnsafeBuffer();
            RecordMetadata recordMetadata = new RecordMetadata();
            messageBuffer.wrap(message);
            this.messageDecoder.wrapAndApplyHeader((DirectBuffer)messageBuffer, 0, this.headerDecoder);
            long checkpointId = this.messageDecoder.checkpointId();
            Optional<Long> recordKey = Optional.empty();
            if (this.messageDecoder.recordKey() != InterPartitionMessageDecoder.recordKeyNullValue()) {
                recordKey = Optional.of(this.messageDecoder.recordKey());
            }
            ValueType valueType = ValueType.get((short)this.messageDecoder.valueType());
            Intent intent = Intent.fromProtocolValue((ValueType)valueType, (short)this.messageDecoder.intent());
            recordMetadata.reset().recordType(RecordType.COMMAND).valueType(valueType).intent(intent);
            int commandOffset = this.messageDecoder.limit() + InterPartitionMessageDecoder.commandHeaderLength();
            int commandLength = this.messageDecoder.commandLength();
            Class valueClass = (Class)TypedEventRegistry.EVENT_REGISTRY.get(valueType);
            if (valueClass == null) {
                throw new IllegalArgumentException("No value type mapped to %s, can't decode message".formatted(valueType));
            }
            UnifiedRecordValue value = (UnifiedRecordValue)ReflectUtil.newInstance((Class)valueClass);
            value.wrap((DirectBuffer)messageBuffer, commandOffset, commandLength);
            return new DecodedMessage(checkpointId, recordKey, recordMetadata, value);
        }
    }

    private record DecodedMessage(long checkpointId, Optional<Long> recordKey, RecordMetadata metadata, UnifiedRecordValue command) {
    }
}

