/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderFlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.InFlightAppend;
import io.camunda.zeebe.logstreams.impl.log.SequencedBatch;
import io.camunda.zeebe.logstreams.impl.log.Sequencer;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;

final class LogStorageAppender
extends Actor
implements HealthMonitorable,
AppendErrorHandler {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final String name;
    private final AppenderFlowControl flowControl;
    private final Sequencer sequencer;
    private final LogStorage logStorage;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private final ActorFuture<Void> closeFuture;
    private final int partitionId;

    LogStorageAppender(String name, int partitionId, LogStorage logStorage, Sequencer sequencer) {
        this.name = name;
        this.partitionId = partitionId;
        this.logStorage = logStorage;
        this.sequencer = sequencer;
        this.flowControl = new AppenderFlowControl(this, partitionId);
        this.closeFuture = new CompletableActorFuture<Void>();
    }

    @Override
    protected Map<String, String> createContext() {
        Map<String, String> context = super.createContext();
        context.put("partitionId", Integer.toString(this.partitionId));
        return context;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    protected void onActorStarting() {
        this.sequencer.registerConsumer(this.actor.onCondition("sequencer", this::tryWriteBatch));
        this.actor.submit(this::tryWriteBatch);
    }

    @Override
    protected void onActorClosed() {
        this.closeFuture.complete(null);
    }

    @Override
    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        super.closeAsync();
        return this.closeFuture;
    }

    @Override
    protected void handleFailure(Throwable failure) {
        this.onFailure(failure);
    }

    @Override
    public void onActorFailed() {
        this.closeFuture.complete(null);
    }

    @Override
    public HealthReport getHealthReport() {
        return this.actor.isClosed() ? HealthReport.unhealthy(this).withMessage("actor is closed") : HealthReport.healthy(this);
    }

    @Override
    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    @Override
    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    private void tryWriteBatch() {
        Optional<InFlightAppend> inflightAppend = this.flowControl.tryAcquire();
        if (inflightAppend.isEmpty()) {
            this.actor.submit(this::tryWriteBatch);
            return;
        }
        this.writeBatch(inflightAppend.get());
    }

    private void writeBatch(InFlightAppend append) {
        SequencedBatch sequencedBatch = this.sequencer.tryRead();
        if (sequencedBatch == null) {
            append.discard();
            return;
        }
        long lowestPosition = sequencedBatch.firstPosition();
        long highestPosition = sequencedBatch.firstPosition() + (long)sequencedBatch.entries().size() - 1L;
        append.start(highestPosition);
        this.logStorage.append(lowestPosition, highestPosition, sequencedBatch, (LogStorage.AppendListener)append);
        this.actor.submit(this::tryWriteBatch);
    }

    private void onFailure(Throwable error2) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.name, this.actor.getLifecyclePhase(), error2});
        this.actor.fail(error2);
        HealthReport report = HealthReport.unhealthy(this).withIssue(error2);
        this.failureListeners.forEach(l -> l.onFailure(report));
    }

    @Override
    public void onWriteError(Throwable error2) {
        this.actor.run(() -> this.onFailure(error2));
    }

    @Override
    public void onCommitError(Throwable error2) {
        this.actor.run(() -> this.onFailure(error2));
    }
}

