/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import io.camunda.zeebe.engine.metrics.ReplayMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataEventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingException;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProtocolVersionFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.mutable.MutableLastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.log.LogStreamBatchReaderImpl;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.retry.RecoverableRetryStrategy;
import io.camunda.zeebe.util.retry.RetryStrategy;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.prometheus.client.Histogram;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

public final class ReplayStateMachine
implements LogRecordAwaiter {
    private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
    private static final String LOG_STMT_REPLAY_FINISHED = "Processor finished replay at event position {}";
    private static final String ERROR_INCONSISTENT_LOG = "Expected that position '%d' of current event is higher then position '%d' of last event, but was not. Inconsistent log detected!";
    private static final String ERROR_MSG_EXPECTED_TO_READ_METADATA = "Expected to read the metadata for the record '%s', but an exception was thrown.";
    private static final MetadataFilter REPLAY_FILTER = recordMetadata -> recordMetadata.getRecordType() == RecordType.EVENT;
    private final RecordMetadata metadata = new RecordMetadata();
    private final MutableZeebeState zeebeState;
    private final KeyGeneratorControls keyGeneratorControls;
    private final MutableLastProcessedPositionState lastProcessedPositionState;
    private final ActorControl actor;
    private final TypedEventImpl typedEvent;
    private final RecordValues recordValues;
    private final EventFilter eventFilter = new MetadataEventFilter(new RecordProtocolVersionFilter().and(REPLAY_FILTER));
    private final LogStreamBatchReader logStreamBatchReader;
    private final EventApplier eventApplier;
    private final TransactionContext transactionContext;
    private final RetryStrategy replayStrategy;
    private final BooleanSupplier abortCondition;
    private long lastSourceEventPosition = -1L;
    private long batchSourceEventPosition = -1L;
    private long snapshotPosition;
    private long lastReadRecordPosition = -1L;
    private long lastReplayedEventPosition = -1L;
    private ActorFuture<Long> recoveryFuture;
    private ZeebeDbTransaction zeebeDbTransaction;
    private final StreamProcessorMode streamProcessorMode;
    private final LogStream logStream;
    private final StreamProcessorListener streamProcessorListener;
    private State currentState = State.AWAIT_RECORD;
    private final BooleanSupplier shouldPause = () -> !shouldReplayNext.getAsBoolean();
    private final ReplayMetrics replayMetrics;

    public ReplayStateMachine(ProcessingContext context, BooleanSupplier shouldReplayNext) {
        this.actor = context.getActor();
        this.recordValues = context.getRecordValues();
        this.transactionContext = context.getTransactionContext();
        this.zeebeState = context.getZeebeState();
        this.abortCondition = context.getAbortCondition();
        this.eventApplier = context.getEventApplier();
        this.keyGeneratorControls = context.getKeyGeneratorControls();
        this.lastProcessedPositionState = context.getLastProcessedPositionState();
        this.streamProcessorListener = context.getStreamProcessorListener();
        this.typedEvent = new TypedEventImpl(context.getLogStream().getPartitionId());
        this.replayStrategy = new RecoverableRetryStrategy(this.actor);
        this.streamProcessorMode = context.getProcessorMode();
        this.logStream = context.getLogStream();
        this.logStreamBatchReader = new LogStreamBatchReaderImpl(context.getLogStreamReader());
        this.replayMetrics = new ReplayMetrics(this.logStream.getPartitionId());
    }

    ActorFuture<Long> startRecover(long snapshotPosition) {
        this.recoveryFuture = new CompletableActorFuture();
        this.snapshotPosition = snapshotPosition;
        this.lastSourceEventPosition = snapshotPosition > 0L ? snapshotPosition : -1L;
        this.logStreamBatchReader.seekToNextBatch(snapshotPosition);
        LOG.info("Processor starts replay of events. [snapshot-position: {}, replay-mode: {}]", (Object)snapshotPosition, (Object)this.streamProcessorMode);
        this.replayNextEvent();
        if (this.streamProcessorMode == StreamProcessorMode.REPLAY) {
            this.logStream.registerRecordAvailableListener((LogRecordAwaiter)this);
        }
        return this.recoveryFuture;
    }

    public void onRecordAvailable() {
        this.actor.call(() -> {
            if (this.currentState == State.AWAIT_RECORD) {
                this.replayNextEvent();
            }
        });
    }

    void replayNextEvent() {
        if (this.shouldPause.getAsBoolean()) {
            return;
        }
        try {
            if (this.logStreamBatchReader.hasNext()) {
                this.currentState = State.REPLAY_EVENT;
                Histogram.Timer replayDurationTimer = this.replayMetrics.startReplayDurationTimer();
                LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch)this.logStreamBatchReader.next();
                this.replayStrategy.runWithRetry(() -> this.tryToReplayBatch(batch), this.abortCondition).onComplete((success, failure) -> {
                    if (failure != null) {
                        throw new RuntimeException((Throwable)failure);
                    }
                    replayDurationTimer.close();
                    this.lastSourceEventPosition = Math.max(this.lastSourceEventPosition, this.batchSourceEventPosition);
                    this.replayMetrics.setLastSourcePosition(this.lastSourceEventPosition);
                    this.actor.submit(this::replayNextEvent);
                    this.notifyReplayListener();
                });
            } else if (this.streamProcessorMode == StreamProcessorMode.PROCESSING) {
                this.onRecordsReplayed();
            } else {
                this.currentState = State.AWAIT_RECORD;
            }
        }
        catch (RuntimeException e) {
            String message = String.format("Failed to replay records. [snapshot-position: %d, last-read-record-position: %d, last-replayed-event-position: %d]", this.snapshotPosition, this.lastReadRecordPosition, this.lastReplayedEventPosition);
            this.recoveryFuture.completeExceptionally((Throwable)new RuntimeException(message, e));
        }
    }

    private boolean tryToReplayBatch(LogStreamBatchReader.Batch batch) throws Exception {
        boolean onRetry;
        boolean bl = onRetry = this.zeebeDbTransaction != null;
        if (onRetry) {
            this.zeebeDbTransaction.rollback();
            batch.head();
        }
        this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
        this.zeebeDbTransaction.run(() -> {
            batch.forEachRemaining(this::replayEvent);
            if (this.batchSourceEventPosition > this.snapshotPosition) {
                this.lastProcessedPositionState.markAsProcessed(this.batchSourceEventPosition);
            }
        });
        this.zeebeDbTransaction.commit();
        this.zeebeDbTransaction = null;
        return true;
    }

    private void replayEvent(LoggedEvent currentEvent) {
        if (this.eventFilter.applies(currentEvent) && currentEvent.getSourceEventPosition() > this.snapshotPosition) {
            this.readMetadata(currentEvent);
            TypedRecord<?> currentTypedEvent = this.readRecordValue(currentEvent);
            this.applyCurrentEvent(currentTypedEvent);
        }
        this.onRecordReplayed(currentEvent);
    }

    private void onRecordsReplayed() {
        LOG.info(LOG_STMT_REPLAY_FINISHED, (Object)this.lastReadRecordPosition);
        this.recoveryFuture.complete((Object)this.lastSourceEventPosition);
    }

    private void onRecordReplayed(LoggedEvent currentEvent) {
        this.replayMetrics.event();
        long sourceEventPosition = currentEvent.getSourceEventPosition();
        long currentPosition = currentEvent.getPosition();
        long currentRecordKey = currentEvent.getKey();
        if (this.lastReadRecordPosition >= currentPosition) {
            throw new IllegalStateException(String.format(ERROR_INCONSISTENT_LOG, currentPosition, this.lastReadRecordPosition));
        }
        this.lastReadRecordPosition = currentPosition;
        this.batchSourceEventPosition = sourceEventPosition;
        if (Protocol.decodePartitionId((long)currentRecordKey) == this.zeebeState.getPartitionId()) {
            this.keyGeneratorControls.setKeyIfHigher(currentRecordKey);
        }
    }

    private void readMetadata(LoggedEvent currentEvent) throws ProcessingException {
        try {
            this.metadata.reset();
            currentEvent.readMetadata((BufferReader)this.metadata);
        }
        catch (Exception e) {
            String errorMsg = String.format(ERROR_MSG_EXPECTED_TO_READ_METADATA, currentEvent);
            throw new ProcessingException(errorMsg, currentEvent, null, e);
        }
    }

    private TypedRecord<?> readRecordValue(LoggedEvent currentEvent) {
        UnifiedRecordValue value = this.recordValues.readRecordValue(currentEvent, this.metadata.getValueType());
        this.typedEvent.wrap(currentEvent, this.metadata, value);
        return this.typedEvent;
    }

    private void applyCurrentEvent(TypedRecord<?> currentEvent) {
        this.eventApplier.applyState(currentEvent.getKey(), currentEvent.getIntent(), (RecordValue)currentEvent.getValue());
        this.lastReplayedEventPosition = currentEvent.getPosition();
    }

    private void notifyReplayListener() {
        try {
            this.streamProcessorListener.onReplayed(this.lastReplayedEventPosition, this.lastReadRecordPosition);
        }
        catch (Exception e) {
            LOG.error("Expected to invoke replay listener successfully, but an exception was thrown. [last-read-record-position: {}, last-replayed-event-position: {}]", (Object)this.lastReadRecordPosition, (Object)this.lastReplayedEventPosition);
        }
    }

    public long getLastSourceEventPosition() {
        return this.lastSourceEventPosition;
    }

    public long getLastReplayedEventPosition() {
        return this.lastReplayedEventPosition;
    }

    public void close() {
        this.logStream.removeRecordAvailableListener((LogRecordAwaiter)this);
    }

    private static enum State {
        AWAIT_RECORD,
        REPLAY_EVENT;

    }
}

