/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions;

import io.atomix.raft.storage.log.entry.RaftEntry;
import io.atomix.raft.storage.log.entry.SerializedApplicationEntry;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.broker.system.partitions.TestIndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.InstanceOfAssertFactory;
import org.assertj.core.api.ObjectAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public final class AsyncSnapshottingTest {
    private final TemporaryFolder tempFolderRule = new TemporaryFolder();
    private final AutoCloseableRule autoCloseableRule = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
    @Rule
    public final RuleChain chain = RuleChain.outerRule((TestRule)this.autoCloseableRule).around((TestRule)this.tempFolderRule).around((TestRule)this.actorSchedulerRule);
    private StateControllerImpl snapshotController;
    private AsyncSnapshotDirector asyncSnapshotDirector;
    private StreamProcessor mockStreamProcessor;
    private FileBasedSnapshotStore persistedSnapshotStore;

    @Before
    public void setup() throws IOException {
        Path rootDirectory = this.tempFolderRule.getRoot().toPath();
        boolean partitionId = true;
        this.persistedSnapshotStore = new FileBasedSnapshotStore(1, rootDirectory, snapshotPath -> Map.of(), (MeterRegistry)new SimpleMeterRegistry());
        this.actorSchedulerRule.submitActor((Actor)this.persistedSnapshotStore).join();
        this.snapshotController = new StateControllerImpl(DefaultZeebeDbFactory.defaultFactory(), (ConstructableSnapshotStore)this.persistedSnapshotStore, rootDirectory.resolve("runtime"), l -> Optional.of(new TestIndexedRaftLogEntry(l + 100L, 1L, (RaftEntry)new SerializedApplicationEntry(1L, 10L, (DirectBuffer)new UnsafeBuffer()))), db -> Long.MAX_VALUE, (ConcurrencyControl)new TestConcurrencyControl());
        this.snapshotController.recover().join();
        this.autoCloseableRule.manage((AutoCloseable)this.snapshotController);
        this.snapshotController = (StateControllerImpl)Mockito.spy((Object)this.snapshotController);
        this.createStreamProcessorControllerMock();
    }

    private void setCommitPosition(long commitPosition) {
        this.asyncSnapshotDirector.newPositionCommitted(commitPosition);
    }

    private void createStreamProcessorControllerMock() {
        this.mockStreamProcessor = (StreamProcessor)Mockito.mock(StreamProcessor.class);
        Mockito.when((Object)this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)0L)).thenReturn((Object)CompletableActorFuture.completed((Object)25L)).thenReturn((Object)CompletableActorFuture.completed((Object)32L));
        Mockito.when((Object)this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)99L)).thenReturn((Object)CompletableActorFuture.completed((Object)100L));
    }

    private void createAsyncSnapshotDirectorOfProcessingMode() {
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofProcessingMode((int)0, (int)1, (StreamProcessor)this.mockStreamProcessor, (StateController)this.snapshotController, (Duration)Duration.ofMinutes(1L), () -> CompletableFuture.completedFuture(null));
        this.actorSchedulerRule.submitActor((Actor)this.asyncSnapshotDirector).join();
    }

    private void createAsyncSnapshotDirectorOfReplayMode() {
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofReplayMode((int)0, (int)1, (StreamProcessor)this.mockStreamProcessor, (StateController)this.snapshotController, (Duration)Duration.ofMinutes(1L), () -> CompletableFuture.completedFuture(null));
        this.actorSchedulerRule.submitActor((Actor)this.asyncSnapshotDirector).join();
    }

    @Test
    public void shouldValidSnapshotWhenCommitPositionGreaterEquals() {
        this.createAsyncSnapshotDirectorOfProcessingMode();
        CompletableActorFuture snapshot = this.asyncSnapshotDirector.forceSnapshot();
        this.setCommitPosition(100L);
        Assertions.assertThat((Object)((PersistedSnapshot)snapshot.join())).isNotNull();
        Assertions.assertThat((Optional)this.persistedSnapshotStore.getLatestSnapshot()).hasValue((Object)((PersistedSnapshot)snapshot.join()));
    }

    @Test
    public void shouldTakeSnapshotsOneByOne() {
        this.createAsyncSnapshotDirectorOfProcessingMode();
        CompletableActorFuture firstSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        this.setCommitPosition(99L);
        Assertions.assertThat((Object)((PersistedSnapshot)firstSnapshot.join())).isNotNull();
        long firstSnapshotIndex = ((PersistedSnapshot)firstSnapshot.join()).getIndex();
        CompletableActorFuture secondSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        this.setCommitPosition(100L);
        ((AbstractLongAssert)((ObjectAssert)((ObjectAssert)((ObjectAssert)Assertions.assertThat((Object)((PersistedSnapshot)secondSnapshot.join())).describedAs("Second snapshot is taken", new Object[0])).isNotNull()).describedAs("Second snapshot has a higher index", new Object[0])).extracting(PersistedSnapshot::getIndex, Assertions.as((InstanceOfAssertFactory)InstanceOfAssertFactories.LONG))).isGreaterThan(firstSnapshotIndex);
        Assertions.assertThat((Optional)this.persistedSnapshotStore.getLatestSnapshot()).hasValue((Object)((PersistedSnapshot)secondSnapshot.join()));
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastWritePosRetrievingFailed() {
        this.createAsyncSnapshotDirectorOfProcessingMode();
        long lastProcessedPosition = 25L;
        long lastWrittenPosition = 26L;
        long commitPosition = 100L;
        Mockito.when((Object)this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)25L));
        RuntimeException initialFailure = new RuntimeException("getLastWrittenPositionAsync fails");
        Mockito.when((Object)this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn((Object)CompletableActorFuture.completedExceptionally((Throwable)initialFailure));
        this.setCommitPosition(100L);
        Assertions.assertThatThrownBy(() -> this.asyncSnapshotDirector.forceSnapshot().join()).hasCause((Throwable)initialFailure);
        ((StreamProcessor)Mockito.verify((Object)this.mockStreamProcessor, (VerificationMode)Mockito.timeout((long)10000L).times(1))).getLastWrittenPositionAsync();
        Mockito.when((Object)this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)26L));
        Assertions.assertThat((Object)((PersistedSnapshot)this.asyncSnapshotDirector.forceSnapshot().join())).isNotNull();
        Assertions.assertThat((Optional)this.persistedSnapshotStore.getLatestSnapshot()).isPresent();
        ((StreamProcessor)Mockito.verify((Object)this.mockStreamProcessor, (VerificationMode)Mockito.timeout((long)10000L).times(2))).getLastWrittenPositionAsync();
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastProcessedPosRetrievingFailed() {
        this.createAsyncSnapshotDirectorOfProcessingMode();
        long lastProcessedPosition = 25L;
        long lastWrittenPosition = 26L;
        long commitPosition = 100L;
        RuntimeException initialFailure = new RuntimeException("getLastProcessedPositionAsync fails");
        Mockito.when((Object)this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn((Object)CompletableActorFuture.completedExceptionally((Throwable)initialFailure));
        Mockito.when((Object)this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)26L));
        Assertions.assertThatThrownBy(() -> this.asyncSnapshotDirector.forceSnapshot().join()).hasCause((Throwable)initialFailure);
        ((StreamProcessor)Mockito.verify((Object)this.mockStreamProcessor, (VerificationMode)Mockito.timeout((long)5000L).times(1))).getLastProcessedPositionAsync();
        Mockito.when((Object)this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn((Object)CompletableActorFuture.completed((Object)25L));
        CompletableActorFuture secondSnapshot = this.asyncSnapshotDirector.forceSnapshot();
        this.setCommitPosition(100L);
        Assertions.assertThat((Object)((PersistedSnapshot)secondSnapshot.join())).isNotNull();
        Assertions.assertThat((Optional)this.persistedSnapshotStore.getLatestSnapshot()).hasValue((Object)((PersistedSnapshot)secondSnapshot.join()));
    }

    @Test
    public void shouldPersistSnapshotWithoutWaitingForCommitWhenInReplayMode() {
        this.createAsyncSnapshotDirectorOfReplayMode();
        CompletableActorFuture snapshot = this.asyncSnapshotDirector.forceSnapshot();
        Assertions.assertThat((Object)((PersistedSnapshot)snapshot.join())).isNotNull();
        Assertions.assertThat((Optional)this.persistedSnapshotStore.getLatestSnapshot()).hasValue((Object)((PersistedSnapshot)snapshot.join()));
    }

    @Test
    public void shouldNotCommitSnapshotIfFlushFailedInProcessingMode() {
        CompletableFuture flushFuture = new CompletableFuture();
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofProcessingMode((int)0, (int)1, (StreamProcessor)this.mockStreamProcessor, (StateController)this.snapshotController, (Duration)Duration.ofMinutes(1L), () -> flushFuture);
        this.actorSchedulerRule.submitActor((Actor)this.asyncSnapshotDirector).join();
        this.setCommitPosition(100L);
        CompletableActorFuture result = this.asyncSnapshotDirector.forceSnapshot();
        flushFuture.completeExceptionally(new RuntimeException("Flush failed"));
        Assertions.assertThat((Future)result).failsWithin(Duration.ofMillis(1000L)).withThrowableOfType(ExecutionException.class).withMessageContaining("Flush failed");
    }

    @Test
    public void shouldNotCommitSnapshotIfFlushFailedInReplayMode() {
        CompletableFuture flushFuture = new CompletableFuture();
        this.asyncSnapshotDirector = AsyncSnapshotDirector.ofReplayMode((int)0, (int)1, (StreamProcessor)this.mockStreamProcessor, (StateController)this.snapshotController, (Duration)Duration.ofMinutes(1L), () -> flushFuture);
        this.actorSchedulerRule.submitActor((Actor)this.asyncSnapshotDirector).join();
        CompletableActorFuture result = this.asyncSnapshotDirector.forceSnapshot();
        flushFuture.completeExceptionally(new RuntimeException("Flush failed"));
        Assertions.assertThat((Future)result).failsWithin(Duration.ofMillis(1000L)).withThrowableOfType(ExecutionException.class).withMessageContaining("Flush failed");
    }
}

