/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.LogStreamMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.FlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.InFlightAppend;
import io.camunda.zeebe.logstreams.impl.log.SequencedBatch;
import io.camunda.zeebe.logstreams.impl.log.SequencerMetrics;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.util.Either;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Sequencer
implements LogStreamWriter,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(Sequencer.class);
    private final int maxFragmentSize;
    private volatile long position;
    private volatile boolean isClosed = false;
    private final ReentrantLock lock = new ReentrantLock();
    private final LogStorage logStorage;
    private final SequencerMetrics sequencerMetrics;
    private final LogStreamMetrics logStreamMetrics;
    private final FlowControl flowControl;

    Sequencer(LogStorage logStorage, long initialPosition, int maxFragmentSize, SequencerMetrics sequencerMetrics, LogStreamMetrics logStreamMetrics) {
        this.logStorage = logStorage;
        LOG.trace("Starting new sequencer at position {}", (Object)initialPosition);
        this.position = initialPosition;
        this.maxFragmentSize = maxFragmentSize;
        this.sequencerMetrics = Objects.requireNonNull(sequencerMetrics, "must specify sequencer metrics");
        this.logStreamMetrics = Objects.requireNonNull(logStreamMetrics, "must specify appender metrics");
        this.flowControl = new FlowControl(logStreamMetrics);
    }

    @Override
    public boolean canWriteEvents(int eventCount, int batchSize) {
        int framedMessageLength = batchSize + eventCount * 20 + 8;
        return framedMessageLength <= this.maxFragmentSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Either<LogStreamWriter.WriteFailure, Long> tryWrite(List<LogAppendEntry> appendEntries, long sourcePosition) {
        long currentPosition;
        if (this.isClosed) {
            LOG.warn("Rejecting write of {}, sequencer is closed", (Object)appendEntries);
            return Either.left(LogStreamWriter.WriteFailure.CLOSED);
        }
        for (LogAppendEntry entry : appendEntries) {
            if (this.isEntryValid(entry)) continue;
            LOG.warn("Reject write of invalid entry {}", (Object)entry);
            return Either.left(LogStreamWriter.WriteFailure.INVALID_ARGUMENT);
        }
        int batchSize = appendEntries.size();
        if (batchSize == 0) {
            return Either.left(LogStreamWriter.WriteFailure.INVALID_ARGUMENT);
        }
        Either<FlowControl.Rejection, InFlightAppend> permit = this.flowControl.tryAcquire();
        if (permit.isLeft()) {
            return Either.left(LogStreamWriter.WriteFailure.FULL);
        }
        InFlightAppend inflightAppend = permit.get();
        this.lock.lock();
        try {
            currentPosition = this.position;
            SequencedBatch sequencedBatch = new SequencedBatch(ActorClock.currentTimeMillis(), currentPosition, sourcePosition, appendEntries);
            long lowestPosition = sequencedBatch.firstPosition();
            long highestPosition = sequencedBatch.firstPosition() + (long)sequencedBatch.entries().size() - 1L;
            List<LogAppendEntryMetadata> metricsMetadata = Sequencer.copyMetricsMetadata(sequencedBatch);
            inflightAppend.start(highestPosition);
            this.logStorage.append(lowestPosition, highestPosition, sequencedBatch, (LogStorage.AppendListener)new InstrumentedAppendListener(inflightAppend, metricsMetadata, this.logStreamMetrics));
            this.position = currentPosition + (long)batchSize;
            this.sequencerMetrics.observeBatchLengthBytes(sequencedBatch.length());
        }
        finally {
            this.lock.unlock();
        }
        this.sequencerMetrics.observeBatchSize(batchSize);
        return Either.right(currentPosition + (long)batchSize - 1L);
    }

    @Override
    public void close() {
        LOG.info("Closing sequencer for writing");
        this.isClosed = true;
    }

    private boolean isEntryValid(LogAppendEntry entry) {
        return entry.recordValue() != null && entry.recordValue().getLength() > 0 && entry.recordMetadata() != null && entry.recordMetadata().getLength() > 0;
    }

    static List<LogAppendEntryMetadata> copyMetricsMetadata(SequencedBatch sequencedBatch) {
        List<LogAppendEntry> entries = sequencedBatch.entries();
        ArrayList<LogAppendEntryMetadata> metricsMetadata = new ArrayList<LogAppendEntryMetadata>(entries.size());
        for (LogAppendEntry entry : entries) {
            metricsMetadata.add(new LogAppendEntryMetadata(entry));
        }
        return metricsMetadata;
    }

    record InstrumentedAppendListener(LogStorage.AppendListener delegate, List<LogAppendEntryMetadata> batchMetadata, LogStreamMetrics metrics) implements LogStorage.AppendListener
    {
        @Override
        public void onWrite(long address) {
            this.delegate.onWrite(address);
            this.batchMetadata.forEach(this::recordAppendedEntry);
        }

        @Override
        public void onCommit(long address) {
            this.delegate.onCommit(address);
        }

        private void recordAppendedEntry(LogAppendEntryMetadata metadata) {
            this.metrics.recordAppendedEntry(1, metadata.recordType(), metadata.valueType(), metadata.intent());
        }
    }

    record LogAppendEntryMetadata(RecordType recordType, ValueType valueType, Intent intent) {
        private LogAppendEntryMetadata(LogAppendEntry entry) {
            this(entry.recordMetadata().getRecordType(), entry.recordMetadata().getValueType(), entry.recordMetadata().getIntent());
        }
    }
}

