/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.backup;

import io.atomix.cluster.MemberId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.raft.impl.LogCompactor;
import io.atomix.raft.metrics.RaftServiceMetrics;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.management.BackupService;
import io.camunda.zeebe.broker.backup.InMemoryMockBackupStore;
import io.camunda.zeebe.broker.utils.InlineThreadContext;
import io.camunda.zeebe.journal.JournalMetaStore;
import io.camunda.zeebe.journal.file.SegmentedJournal;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.SchedulingHints;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.test.DynamicAutoCloseable;
import io.camunda.zeebe.test.util.junit.AutoCloseResources;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.buffer.DirectBufferWriter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={MockitoExtension.class})
public class ConcurrentBackupCompactionTest
extends DynamicAutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentBackupCompactionTest.class);
    private static final String SNAPSHOT_FILE_NAME = "file1";
    @TempDir
    Path dataDirectory;
    @AutoCloseResources.AutoCloseResource
    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    private ActorScheduler actorScheduler;
    private SegmentedJournal journal;
    private FileBasedSnapshotStore snapshotStore;
    private InMemoryMockBackupStore backupStore;
    private BackupService backupService;
    private final int nodeId = 1;
    private final int partitionId = 1;
    private LogCompactor logCompactor;
    private final ThreadContext threadContext = new InlineThreadContext();
    @Mock
    private RaftLog raftLog;
    private final RaftServiceMetrics raftMetrics = new RaftServiceMetrics("1", this.meterRegistry);

    @BeforeEach
    void setUp() {
        this.actorScheduler = (ActorScheduler)this.manage((AutoCloseable)ActorScheduler.newActorScheduler().build());
        this.actorScheduler.start();
        this.backupStore = (InMemoryMockBackupStore)this.manage(new InMemoryMockBackupStore());
        this.snapshotStore = (FileBasedSnapshotStore)this.manage((AutoCloseable)new FileBasedSnapshotStore(1, this.dataDirectory, snapshotPath -> Map.of(), this.meterRegistry));
        this.actorScheduler.submitActor((Actor)this.snapshotStore, SchedulingHints.IO_BOUND);
        PartitionMetadata partitionMetadata = new PartitionMetadata(PartitionId.from((String)"raft", (int)1), Set.of(), Map.of(), 1, new MemberId("1"));
        RaftPartition raftPartition = new RaftPartition(partitionMetadata, null, this.dataDirectory.toFile(), this.meterRegistry);
        this.journal = (SegmentedJournal)this.manage((AutoCloseable)SegmentedJournal.builder((MeterRegistry)this.meterRegistry).withDirectory(this.dataDirectory.toFile()).withName(raftPartition.name()).withMetaStore((JournalMetaStore)Mockito.mock(JournalMetaStore.class)).withMaxSegmentSize(128).build());
        this.logCompactor = new LogCompactor(this.threadContext, this.raftLog, 3, this.raftMetrics, LOG);
        Mockito.when((Object)this.raftLog.deleteUntil(ArgumentMatchers.anyLong())).thenAnswer(invocation -> {
            Long index = (Long)invocation.getArgument(0);
            return this.journal.deleteUntil(index.longValue());
        });
        this.backupService = (BackupService)this.manage((AutoCloseable)new BackupService(1, 1, 1, (BackupStore)this.backupStore, (PersistedSnapshotStore)this.snapshotStore, this.dataDirectory, index -> CompletableFuture.completedFuture(this.journal.getTailSegments(index).values()), this.meterRegistry));
        this.actorScheduler.submitActor((Actor)this.backupService);
    }

    @AfterEach
    public void tearDown() {
        this.close();
    }

    @Test
    public void shouldNotFailWhenCompactionTriggers() throws Exception {
        this.appendRecord(1L, "1");
        this.appendRecord(2L, "2");
        PersistedSnapshot snapshot = this.takeSnapshot(2L, 2L);
        long backupIdx = 3L;
        this.logCompactor.compactFromSnapshots((PersistedSnapshotStore)this.snapshotStore);
        Awaitility.await((String)"compaction is done").until(() -> this.logCompactor.getCompactableIndex() == snapshot.getIndex());
        this.appendRecord(3L, "3");
        ActorFuture backupResultFut = this.backupService.takeBackup(3L, 3L);
        Awaitility.await((String)"snapshot is reserved").until(() -> this.snapshotStore.getLatestSnapshot().map(PersistedSnapshot::isReserved).orElse(false));
        this.appendRecord(4L, "4");
        this.appendRecord(5L, "5");
        this.takeSnapshot(4L, 5L);
        this.logCompactor.compactFromSnapshots((PersistedSnapshotStore)this.snapshotStore);
        Awaitility.await((String)"no compaction is done").until(() -> this.logCompactor.getCompactableIndex() == snapshot.getIndex());
        try (Stream<Path> files = Files.list(this.dataDirectory.resolve("snapshots"));){
            Assertions.assertThat((int)files.toList().size()).isEqualTo(4L);
        }
        SortedMap activeSegmentPaths = this.journal.getTailSegments(3L);
        activeSegmentPaths.values().forEach(p -> Assertions.assertThat((Path)p).exists());
        Assertions.assertThat((boolean)backupResultFut.isDone()).isFalse();
        Awaitility.await((String)"BackStore.save is called").atMost(Duration.ofSeconds(5L)).until(() -> !this.backupStore.backupInProgress().isEmpty());
        this.backupStore.completeSaveFutures();
        Awaitility.await((String)"backup is completed successfully").atMost(Duration.ofSeconds(5L)).until(() -> backupResultFut.isDone());
    }

    private void appendRecord(long asqn, String data) {
        this.journal.append(asqn, (BufferWriter)new DirectBufferWriter().wrap((DirectBuffer)new UnsafeBuffer(data.getBytes())));
    }

    private PersistedSnapshot takeSnapshot(long index, long lastWrittenPosition) {
        TransientSnapshot transientSnapshot = (TransientSnapshot)this.snapshotStore.newTransientSnapshot(index, 1L, 1L, 1L).get();
        transientSnapshot.take(path -> {
            try {
                FileUtil.ensureDirectoryExists((Path)path);
                Files.write(path.resolve(SNAPSHOT_FILE_NAME), "This is the content".getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        return (PersistedSnapshot)transientSnapshot.withLastFollowupEventPosition(lastWrittenPosition).persist().join();
    }
}

