/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import com.netflix.concurrency.limits.limit.AbstractLimit;
import com.netflix.concurrency.limits.limit.WindowedLimit;
import io.camunda.zeebe.dispatcher.BlockPeek;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.backpressure.AlgorithmCfg;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendBackpressureMetrics;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendEntryLimiter;
import io.camunda.zeebe.logstreams.impl.backpressure.AppendLimiter;
import io.camunda.zeebe.logstreams.impl.backpressure.AppenderGradient2Cfg;
import io.camunda.zeebe.logstreams.impl.backpressure.AppenderVegasCfg;
import io.camunda.zeebe.logstreams.impl.backpressure.NoopAppendLimiter;
import io.camunda.zeebe.logstreams.impl.log.AppenderMetrics;
import io.camunda.zeebe.logstreams.impl.log.Listener;
import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Environment;
import io.camunda.zeebe.util.collection.Tuple;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import io.prometheus.client.Histogram;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

final class LogStorageAppender
extends Actor
implements HealthMonitorable {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final Map<String, AlgorithmCfg> ALGORITHM_CFG = Map.of("vegas", new AppenderVegasCfg(), "gradient2", new AppenderGradient2Cfg());
    private final String name;
    private final Subscription writeBufferSubscription;
    private final int maxAppendBlockSize;
    private final LogStorage logStorage;
    private final AppendLimiter appendEntryLimiter;
    private final AppendBackpressureMetrics appendBackpressureMetrics;
    private final Environment env;
    private final LoggedEventImpl positionReader = new LoggedEventImpl();
    private final AppenderMetrics appenderMetrics;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private final ActorFuture<Void> closeFuture;
    private final int partitionId;

    LogStorageAppender(String name, int partitionId, LogStorage logStorage, Subscription writeBufferSubscription, int maxBlockSize) {
        this.appenderMetrics = new AppenderMetrics(Integer.toString(partitionId));
        this.env = new Environment();
        this.name = name;
        this.partitionId = partitionId;
        this.logStorage = logStorage;
        this.writeBufferSubscription = writeBufferSubscription;
        this.maxAppendBlockSize = maxBlockSize;
        this.appendBackpressureMetrics = new AppendBackpressureMetrics(partitionId);
        boolean isBackpressureEnabled = this.env.getBool("ZEEBE_BP_APPENDER").orElse(true);
        this.appendEntryLimiter = isBackpressureEnabled ? this.initBackpressure(partitionId) : this.initNoBackpressure(partitionId);
        this.closeFuture = new CompletableActorFuture<Void>();
    }

    private AppendLimiter initBackpressure(int partitionId) {
        String algorithmName = this.env.get("ZEEBE_BP_APPENDER_ALGORITHM").orElse("vegas").toLowerCase();
        AlgorithmCfg algorithmCfg = ALGORITHM_CFG.getOrDefault(algorithmName, new AppenderVegasCfg());
        algorithmCfg.applyEnvironment(this.env);
        AbstractLimit abstractLimit = (AbstractLimit)algorithmCfg.get();
        boolean windowedLimiter = this.env.getBool("ZEEBE_BP_APPENDER_WINDOWED").orElse(false);
        LOG.debug("Configured log appender back pressure at partition {} as {}. Window limiting is {}", partitionId, algorithmCfg, windowedLimiter ? "enabled" : "disabled");
        return ((AppendEntryLimiter.AppendEntryLimiterBuilder)AppendEntryLimiter.builder().limit(windowedLimiter ? WindowedLimit.newBuilder().build(abstractLimit) : abstractLimit)).partitionId(partitionId).build();
    }

    private AppendLimiter initNoBackpressure(int partition) {
        LOG.warn("No back pressure for the log appender (partition = {}) configured! This might cause problems.", (Object)partition);
        return new NoopAppendLimiter();
    }

    private boolean appendBlock(BlockPeek blockPeek) {
        ByteBuffer rawBuffer = blockPeek.getRawBuffer();
        int bytes = rawBuffer.remaining();
        ByteBuffer copiedBuffer = ByteBuffer.allocate(bytes).put(rawBuffer).flip();
        Tuple<Long, Long> positions = this.readLowestHighestPosition(copiedBuffer);
        this.appendBackpressureMetrics.newEntryToAppend();
        if (this.appendEntryLimiter.tryAcquire(positions.getRight())) {
            Listener listener = new Listener(this, positions.getRight(), this.appenderMetrics.startAppendLatencyTimer(), this.appenderMetrics.startCommitLatencyTimer());
            this.logStorage.append(positions.getLeft(), positions.getRight(), copiedBuffer, listener);
            blockPeek.markCompleted();
            return true;
        }
        this.appendBackpressureMetrics.deferred();
        LOG.trace("Backpressure happens: in flight {} limit {}", (Object)this.appendEntryLimiter.getInflight(), (Object)this.appendEntryLimiter.getLimit());
        return false;
    }

    @Override
    protected Map<String, String> createContext() {
        Map<String, String> context = super.createContext();
        context.put("partitionId", Integer.toString(this.partitionId));
        return context;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    protected void onActorStarting() {
        this.actor.consume(this.writeBufferSubscription, this::onWriteBufferAvailable);
    }

    @Override
    protected void onActorClosed() {
        this.closeFuture.complete(null);
    }

    @Override
    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        super.closeAsync();
        return this.closeFuture;
    }

    @Override
    protected void handleFailure(Throwable failure) {
        this.onFailure(failure);
    }

    @Override
    public void onActorFailed() {
        this.closeFuture.complete(null);
    }

    private void onWriteBufferAvailable() {
        BlockPeek blockPeek = new BlockPeek();
        int readBytes = this.writeBufferSubscription.peekBlock(blockPeek, this.maxAppendBlockSize, true);
        boolean canAppend = readBytes > 0;
        boolean appendBlockSucceeded = false;
        if (canAppend) {
            appendBlockSucceeded = this.appendBlock(blockPeek);
        }
        if (!canAppend || !appendBlockSucceeded) {
            this.actor.yieldThread();
        }
    }

    private Tuple<Long, Long> readLowestHighestPosition(ByteBuffer buffer) {
        UnsafeBuffer view = new UnsafeBuffer(buffer);
        Tuple<Long, Long> positions = new Tuple<Long, Long>(-1L, -1L);
        int offset = 0;
        long lastPosition = -1L;
        do {
            this.positionReader.wrap(view, offset);
            long pos = this.positionReader.getPosition();
            if (lastPosition == -1L) {
                positions.setLeft(pos);
            } else if (pos != lastPosition + 1L) {
                throw new IllegalStateException(String.format("Expected all positions in a single log entry batch to increase by 1 starting at %d, but got %d followed by %d", positions.getLeft(), lastPosition, pos));
            }
            positions.setRight(pos);
            lastPosition = pos;
        } while ((offset += this.positionReader.getLength()) < view.capacity());
        return positions;
    }

    @Override
    public HealthReport getHealthReport() {
        return this.actor.isClosed() ? HealthReport.unhealthy(this).withMessage("actor is closed") : HealthReport.healthy(this);
    }

    @Override
    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    @Override
    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    private void onFailure(Throwable error2) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.name, this.actor.getLifecyclePhase(), error2});
        this.actor.fail(error2);
        HealthReport report = HealthReport.unhealthy(this).withIssue(error2);
        this.failureListeners.forEach(l -> l.onFailure(report));
    }

    void runOnFailure(Throwable error2) {
        this.actor.run(() -> this.onFailure(error2));
    }

    void releaseBackPressure(long highestPosition) {
        this.actor.run(() -> this.appendEntryLimiter.onCommit(highestPosition));
    }

    void notifyWritePosition(long highestPosition, Histogram.Timer appendLatencyTimer) {
        this.actor.run(() -> {
            this.appenderMetrics.setLastAppendedPosition(highestPosition);
            appendLatencyTimer.close();
        });
    }

    void notifyCommitPosition(long highestPosition, Histogram.Timer commitLatencyTimer) {
        this.actor.run(() -> {
            this.appenderMetrics.setLastCommittedPosition(highestPosition);
            commitLatencyTimer.close();
        });
    }
}

