/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.log;

import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.dispatcher.Dispatchers;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.log.LogStorageAppender;
import io.camunda.zeebe.logstreams.impl.log.LogStreamBatchWriterImpl;
import io.camunda.zeebe.logstreams.impl.log.LogStreamReaderImpl;
import io.camunda.zeebe.logstreams.impl.log.LogStreamWriterImpl;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.storage.LogStorageReader;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthReport;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import org.slf4j.Logger;

public final class LogStreamImpl
extends Actor
implements LogStream,
FailureListener,
LogStorage.CommitListener {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final String APPENDER_SUBSCRIPTION_NAME = "appender";
    private final Set<LogRecordAwaiter> recordAwaiters = new HashSet<LogRecordAwaiter>();
    private final String logName;
    private final int partitionId;
    private final int maxFrameLength;
    private final ActorSchedulingService actorSchedulingService;
    private final List<LogStreamReader> readers;
    private final LogStorage logStorage;
    private final CompletableActorFuture<Void> closeFuture;
    private final int nodeId;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private ActorFuture<LogStorageAppender> appenderFuture;
    private Dispatcher writeBuffer;
    private LogStorageAppender appender;
    private Throwable closeError;
    private final String actorName;
    private HealthReport healthReport = HealthReport.healthy(this);

    LogStreamImpl(ActorSchedulingService actorSchedulingService, String logName, int partitionId, int nodeId, int maxFrameLength, LogStorage logStorage) {
        this.actorSchedulingService = actorSchedulingService;
        this.logName = logName;
        this.partitionId = partitionId;
        this.nodeId = nodeId;
        this.actorName = LogStreamImpl.buildActorName("LogStream", partitionId);
        this.maxFrameLength = maxFrameLength;
        this.logStorage = logStorage;
        this.closeFuture = new CompletableActorFuture();
        this.readers = new ArrayList<LogStreamReader>();
    }

    @Override
    protected Map<String, String> createContext() {
        Map<String, String> context = super.createContext();
        context.put("partitionId", Integer.toString(this.partitionId));
        return context;
    }

    @Override
    public String getName() {
        return this.actorName;
    }

    @Override
    protected void onActorStarted() {
        this.logStorage.addCommitListener(this);
    }

    @Override
    protected void onActorClosing() {
        LOG.info("On closing logstream {} close {} readers", (Object)this.logName, (Object)this.readers.size());
        this.readers.forEach(CloseableSilently::close);
        this.logStorage.removeCommitListener(this);
    }

    @Override
    protected void onActorClosed() {
        if (this.closeError != null) {
            this.closeFuture.completeExceptionally(this.closeError);
        } else {
            this.closeFuture.complete(null);
        }
    }

    @Override
    public void close() {
        this.closeAsync().join();
    }

    @Override
    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        this.actor.run(() -> this.closeAppender().onComplete((nothing, appenderError) -> {
            this.closeError = appenderError;
            this.actor.close();
        }));
        return this.closeFuture;
    }

    @Override
    protected void handleFailure(Throwable failure) {
        this.onFailure(failure);
    }

    private void onFailure(Throwable failure) {
        LOG.error("Unexpected error in Log Stream {} in phase {}.", new Object[]{this.getName(), this.actor.getLifecyclePhase(), failure});
        if (this.appenderFuture != null && !this.appenderFuture.isDone()) {
            this.appenderFuture.completeExceptionally(failure);
        }
        if (failure instanceof UnrecoverableException) {
            this.onUnrecoverableFailure(HealthReport.dead(this).withIssue(failure));
        } else {
            this.onFailure(HealthReport.unhealthy(this).withIssue(failure));
        }
    }

    @Override
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override
    public String getLogName() {
        return this.logName;
    }

    @Override
    public ActorFuture<LogStreamReader> newLogStreamReader() {
        return this.actor.call(this::createLogStreamReader);
    }

    @Override
    public ActorFuture<LogStreamRecordWriter> newLogStreamRecordWriter() {
        return this.newLogStreamWriter(LogStreamWriterImpl::new);
    }

    @Override
    public ActorFuture<LogStreamBatchWriter> newLogStreamBatchWriter() {
        return this.newLogStreamWriter(LogStreamBatchWriterImpl::new);
    }

    @Override
    public void registerRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.actor.call(() -> this.recordAwaiters.add(recordAwaiter));
    }

    @Override
    public void removeRecordAvailableListener(LogRecordAwaiter recordAwaiter) {
        this.actor.call(() -> this.recordAwaiters.remove(recordAwaiter));
    }

    private <T extends LogStreamWriter> ActorFuture<T> newLogStreamWriter(WriterCreator<T> logStreamCreator) {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
        }
        CompletableActorFuture writerFuture = new CompletableActorFuture();
        this.actor.run(() -> this.createWriter(writerFuture, logStreamCreator));
        return writerFuture;
    }

    private void notifyRecordAwaiters() {
        this.recordAwaiters.forEach(LogRecordAwaiter::onRecordAvailable);
    }

    @Override
    public void onCommit() {
        this.actor.call(this::notifyRecordAwaiters);
    }

    private LogStreamReader createLogStreamReader() {
        LogStreamReaderImpl newReader = new LogStreamReaderImpl(this.logStorage.newReader());
        this.readers.add(newReader);
        return newReader;
    }

    private <T extends LogStreamWriter> void createWriter(CompletableActorFuture<T> writerFuture, WriterCreator<T> creator) {
        BiConsumer<LogStorageAppender, Throwable> onOpenAppenderConsumer = this.onOpenAppender(writerFuture, creator);
        if (this.appender != null) {
            onOpenAppenderConsumer.accept(this.appender, null);
        } else {
            this.openAppender().onComplete(onOpenAppenderConsumer);
        }
    }

    private <T extends LogStreamWriter> BiConsumer<LogStorageAppender, Throwable> onOpenAppender(CompletableActorFuture<T> writerFuture, WriterCreator<T> creator) {
        return (openedAppender, errorOnOpeningAppender) -> {
            if (errorOnOpeningAppender == null) {
                writerFuture.complete(creator.create(this.partitionId, this.writeBuffer));
            } else {
                writerFuture.completeExceptionally((Throwable)errorOnOpeningAppender);
            }
        };
    }

    private ActorFuture<Void> closeAppender() {
        CompletableActorFuture<Void> closeAppenderFuture = new CompletableActorFuture<Void>();
        LOG.info("Close appender for log stream {}", (Object)this.logName);
        LogStorageAppender toCloseAppender = this.appender;
        Dispatcher toCloseWriteBuffer = this.writeBuffer;
        ActorFuture<LogStorageAppender> toCompleteExceptionallyAppenderFuture = this.appenderFuture;
        this.appender = null;
        this.writeBuffer = null;
        this.appenderFuture = null;
        if (toCompleteExceptionallyAppenderFuture != null && !toCompleteExceptionallyAppenderFuture.isDone()) {
            toCompleteExceptionallyAppenderFuture.completeExceptionally(new LogStorageAppenderClosedException());
        }
        if (toCloseAppender == null) {
            closeAppenderFuture.complete(null);
            return closeAppenderFuture;
        }
        toCloseAppender.closeAsync().onComplete((v, t) -> {
            if (t == null) {
                toCloseWriteBuffer.closeAsync().onComplete(closeAppenderFuture);
            } else {
                closeAppenderFuture.completeExceptionally((Throwable)t);
            }
        });
        return closeAppenderFuture;
    }

    private ActorFuture<LogStorageAppender> openAppender() {
        if (this.appenderFuture != null) {
            return this.appenderFuture;
        }
        this.appenderFuture = new CompletableActorFuture<LogStorageAppender>();
        this.actor.run(() -> {
            long initialPosition = this.getWriteBuffersInitialPosition();
            this.writeBuffer = this.createAndScheduleWriteBuffer(initialPosition);
            this.writeBuffer.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME).onComplete((subscription, throwable) -> {
                if (throwable == null) {
                    this.createAndScheduleLogStorageAppender((Subscription)subscription).onComplete((v, t) -> {
                        if (t != null) {
                            this.onFailure((Throwable)t);
                        } else {
                            this.appenderFuture.complete(this.appender);
                            this.appender.addFailureListener(this);
                        }
                    });
                } else {
                    this.onFailure((Throwable)throwable);
                }
            });
        });
        return this.appenderFuture;
    }

    private long getWriteBuffersInitialPosition() {
        long lastPosition = this.getLastCommittedPosition();
        long initialPosition = 1L;
        if (lastPosition > 0L) {
            initialPosition = lastPosition + 1L;
        }
        return initialPosition;
    }

    private Dispatcher createAndScheduleWriteBuffer(long initialPosition) {
        return Dispatchers.create(LogStreamImpl.buildActorName("dispatcher", this.partitionId)).maxFragmentLength(this.maxFrameLength).initialPosition(initialPosition).name(this.logName + "-write-buffer").actorSchedulingService(this.actorSchedulingService).build();
    }

    private ActorFuture<Void> createAndScheduleLogStorageAppender(Subscription subscription) {
        this.appender = new LogStorageAppender(LogStreamImpl.buildActorName("LogAppender", this.partitionId), this.partitionId, this.logStorage, subscription, this.maxFrameLength);
        return this.actorSchedulingService.submitActor(this.appender);
    }

    private long getLastCommittedPosition() {
        try (LogStorageReader storageReader = this.logStorage.newReader();){
            long l;
            try (LogStreamReaderImpl logReader = new LogStreamReaderImpl(storageReader);){
                l = logReader.seekToEnd();
            }
            return l;
        }
    }

    @Override
    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    @Override
    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    @Override
    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    @Override
    public void onFailure(HealthReport report) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.unhealthy(this).withIssue(report);
            this.failureListeners.forEach(l -> l.onFailure(this.healthReport));
            this.closeAsync();
        });
    }

    @Override
    public void onRecovered() {
        this.actor.run(() -> {
            this.healthReport = HealthReport.healthy(this);
            this.failureListeners.forEach(FailureListener::onRecovered);
        });
    }

    @Override
    public void onUnrecoverableFailure(HealthReport report) {
        this.actor.run(() -> {
            this.healthReport = HealthReport.dead(this).withIssue(report);
            this.failureListeners.forEach(l -> l.onUnrecoverableFailure(this.healthReport));
            this.closeAsync();
        });
    }

    @FunctionalInterface
    private static interface WriterCreator<T extends LogStreamWriter> {
        public T create(int var1, Dispatcher var2);
    }

    private static final class LogStorageAppenderClosedException
    extends RuntimeException {
        private LogStorageAppenderClosedException() {
            super("LogStorageAppender was closed before opening succeeded");
        }
    }
}

