/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.broker.transport.backpressure.BackpressureMetrics;
import io.camunda.zeebe.broker.transport.backpressure.RequestLimiter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiRequestReader;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiResponseWriter;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ExecuteCommandRequestDecoder;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Either;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

final class CommandApiRequestHandler
extends AsyncApiRequestHandler<CommandApiRequestReader, CommandApiResponseWriter> {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final Int2ObjectHashMap<LogStreamWriter> leadingStreams = new Int2ObjectHashMap();
    private final Int2ObjectHashMap<RequestLimiter<Intent>> partitionLimiters = new Int2ObjectHashMap();
    private final BackpressureMetrics metrics = new BackpressureMetrics();
    private boolean isDiskSpaceAvailable = true;

    CommandApiRequestHandler() {
        super(CommandApiRequestReader::new, CommandApiResponseWriter::new);
    }

    @Override
    protected ActorFuture<Either<ErrorResponseWriter, CommandApiResponseWriter>> handleAsync(int partitionId, long requestId, CommandApiRequestReader requestReader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        return CompletableActorFuture.completed(this.handle(partitionId, requestId, requestReader, responseWriter, errorWriter));
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handle(int partitionId, long requestId, CommandApiRequestReader requestReader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        return this.handleExecuteCommandRequest(partitionId, requestId, requestReader, responseWriter, errorWriter);
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handleExecuteCommandRequest(int partitionId, long requestId, CommandApiRequestReader reader, CommandApiResponseWriter responseWriter, ErrorResponseWriter errorWriter) {
        if (!this.isDiskSpaceAvailable) {
            return Either.left((Object)errorWriter.outOfDiskSpace(partitionId));
        }
        ExecuteCommandRequestDecoder command = reader.getMessageDecoder();
        LogStreamWriter logStreamWriter = (LogStreamWriter)this.leadingStreams.get(partitionId);
        RequestLimiter limiter = (RequestLimiter)this.partitionLimiters.get(partitionId);
        ValueType valueType = command.valueType();
        Intent intent = Intent.fromProtocolValue((ValueType)valueType, (short)command.intent());
        UnifiedRecordValue value = reader.value();
        RecordMetadata metadata = reader.metadata();
        metadata.requestId(requestId);
        metadata.requestStreamId(partitionId);
        metadata.recordType(RecordType.COMMAND);
        metadata.intent(intent);
        metadata.valueType(valueType);
        if (logStreamWriter == null) {
            errorWriter.partitionLeaderMismatch(partitionId);
            return Either.left((Object)errorWriter);
        }
        if (value == null) {
            errorWriter.unsupportedMessage(valueType.name(), CommandApiRequestReader.RECORDS_BY_TYPE.keySet().toArray());
            return Either.left((Object)errorWriter);
        }
        this.metrics.receivedRequest(partitionId);
        if (!limiter.tryAcquire(partitionId, requestId, intent)) {
            this.metrics.dropped(partitionId);
            LOG.trace("Partition-{} receiving too many requests. Current limit {} inflight {}, dropping request {} from gateway", new Object[]{partitionId, limiter.getLimit(), limiter.getInflightCount(), requestId});
            errorWriter.resourceExhausted();
            return Either.left((Object)errorWriter);
        }
        try {
            return this.writeCommand(command.key(), metadata, value, logStreamWriter, errorWriter, partitionId).map(b -> responseWriter).mapLeft(failure -> {
                limiter.onIgnore(partitionId, requestId);
                return errorWriter;
            });
        }
        catch (Exception error) {
            limiter.onIgnore(partitionId, requestId);
            String errorMessage = "Failed to write client request to partition '%d', %s".formatted(partitionId, error);
            LOG.error(errorMessage);
            return Either.left((Object)errorWriter.internalError(errorMessage, new Object[0]));
        }
    }

    private Either<ErrorResponseWriter, Boolean> writeCommand(long key, RecordMetadata metadata, UnifiedRecordValue value, LogStreamWriter logStreamWriter, ErrorResponseWriter errorWriter, int partitionId) {
        LogAppendEntry appendEntry = key != ExecuteCommandRequestDecoder.keyNullValue() ? LogAppendEntry.of((long)key, (RecordMetadata)metadata, (UnifiedRecordValue)value) : LogAppendEntry.of((RecordMetadata)metadata, (UnifiedRecordValue)value);
        if (logStreamWriter.canWriteEvents(1, appendEntry.getLength())) {
            return logStreamWriter.tryWrite(appendEntry).map(ignore -> true).mapLeft(error -> errorWriter.mapWriteError(partitionId, (LogStreamWriter.WriteFailure)error));
        }
        return Either.left((Object)errorWriter.errorCode(ErrorCode.MALFORMED_REQUEST).errorMessage("Request size is above configured maxMessageSize."));
    }

    void addPartition(int partitionId, LogStreamWriter logStreamWriter, RequestLimiter<Intent> limiter) {
        this.actor.submit(() -> {
            this.leadingStreams.put(partitionId, (Object)logStreamWriter);
            this.partitionLimiters.put(partitionId, (Object)limiter);
        });
    }

    void removePartition(int partitionId) {
        this.actor.submit(() -> {
            this.leadingStreams.remove(partitionId);
            this.partitionLimiters.remove(partitionId);
        });
    }

    void onDiskSpaceNotAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = false;
            LOG.debug("Broker is out of disk space. All client requests will be rejected");
        });
    }

    void onDiskSpaceAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = true;
        });
    }
}

