/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.raftlog.segmented;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.metrics.RaftLogMetricsBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.LogSegmentStartEnd;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.DataBlockingQueue;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class TestSegmentedRaftLog
extends BaseTest {
    private static final RaftPeerId PEER_ID;
    private static final RaftGroupId GROUP_ID;
    private static final RaftGroupMemberId MEMBER_ID;
    private File storageDir;
    private RaftProperties properties;
    private RaftStorage storage;
    private long segmentMaxSize;
    private long preallocatedSize;
    private int bufferSize;

    public static Stream<Arguments> data() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{Boolean.FALSE, Boolean.FALSE}), Arguments.arguments((Object[])new Object[]{Boolean.FALSE, Boolean.TRUE}), Arguments.arguments((Object[])new Object[]{Boolean.TRUE, Boolean.FALSE}), Arguments.arguments((Object[])new Object[]{Boolean.TRUE, Boolean.TRUE}));
    }

    public static long getOpenSegmentSize(RaftLog raftLog) {
        return ((SegmentedRaftLog)raftLog).getRaftLogCache().getOpenSegment().getTotalFileSize();
    }

    SegmentedRaftLog newSegmentedRaftLog() {
        return TestSegmentedRaftLog.newSegmentedRaftLog(this.storage, this.properties);
    }

    SegmentedRaftLog newSegmentedRaftLog(LongSupplier getSnapshotIndexFromStateMachine) {
        return this.newSegmentedRaftLogWithSnapshotIndex(this.storage, this.properties, getSnapshotIndexFromStateMachine);
    }

    static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, RaftProperties properties) {
        return SegmentedRaftLog.newBuilder().setMemberId(MEMBER_ID).setStorage(storage).setProperties(properties).build();
    }

    private SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage storage, RaftProperties properties, LongSupplier getSnapshotIndexFromStateMachine) {
        return SegmentedRaftLog.newBuilder().setMemberId(MEMBER_ID).setStorage(storage).setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine).setProperties(properties).build();
    }

    @BeforeEach
    public void setup() throws Exception {
        this.storageDir = this.getTestDir();
        this.properties = new RaftProperties();
        RaftServerConfigKeys.setStorageDir((RaftProperties)this.properties, Collections.singletonList(this.storageDir));
        this.storage = RaftStorageTestUtils.newRaftStorage((File)this.storageDir);
        this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax((RaftProperties)this.properties).getSize();
        this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize((RaftProperties)this.properties).getSize();
        this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize((RaftProperties)this.properties).getSizeInt();
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (this.storageDir != null) {
            FileUtils.deleteFully((File)this.storageDir.getParentFile());
        }
    }

    private RaftProtos.LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException {
        ArrayList entryList = new ArrayList();
        for (SegmentRange range : list) {
            File file = range.getFile(this.storage);
            int size = (int)(range.end - range.start + 1L);
            RaftProtos.LogEntryProto[] entries = new RaftProtos.LogEntryProto[size];
            try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(file, false, this.segmentMaxSize, this.preallocatedSize, ByteBuffer.allocateDirect(this.bufferSize));){
                for (int i = 0; i < size; ++i) {
                    RaftTestUtil.SimpleOperation m = new RaftTestUtil.SimpleOperation("m" + ((long)i + range.start));
                    entries[i] = LogProtoUtils.toLogEntryProto((RaftProtos.StateMachineLogEntryProto)m.getLogEntryContent(), (long)range.term, (long)((long)i + range.start));
                    out.write(entries[i]);
                }
            }
            Collections.addAll(entryList, entries);
        }
        return entryList.toArray(new RaftProtos.LogEntryProto[entryList.size()]);
    }

    static List<SegmentRange> prepareRanges(int startTerm, int endTerm, int segmentSize, long startIndex) {
        ArrayList<SegmentRange> list = new ArrayList<SegmentRange>(endTerm - startTerm);
        for (int i = startTerm; i < endTerm; ++i) {
            list.add(new SegmentRange(startIndex, startIndex + (long)segmentSize - 1L, i, i == endTerm - 1));
            startIndex += (long)segmentSize;
        }
        return list;
    }

    private RaftProtos.LogEntryProto getLastEntry(SegmentedRaftLog raftLog) throws IOException {
        return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testLoadLogSegments(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 5, 100, 0L);
        Object[] entries = this.prepareLog(ranges);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            for (RaftProtos.LogEntryProto logEntryProto : entries) {
                RaftProtos.LogEntryProto entry = raftLog.get(logEntryProto.getIndex());
                Assertions.assertEquals((Object)logEntryProto, (Object)entry);
            }
            LogEntryHeader[] termIndices = raftLog.getEntries(0L, 500L);
            Object[] entriesFromLog = (RaftProtos.LogEntryProto[])Arrays.stream(termIndices).map(ti -> {
                try {
                    return raftLog.get(ti.getIndex());
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).toArray(RaftProtos.LogEntryProto[]::new);
            Assertions.assertArrayEquals((Object[])entries, (Object[])entriesFromLog);
            Assertions.assertEquals((Object)entries[entries.length - 1], (Object)this.getLastEntry(raftLog));
            RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry((RaftGroupMemberId)MEMBER_ID);
            DefaultTimekeeperImpl defaultTimekeeperImpl = (DefaultTimekeeperImpl)metricRegistryForLogWorker.timer("segmentLoadLatency");
            Assertions.assertTrue((defaultTimekeeperImpl.getTimer().getMeanRate() > 0.0 ? 1 : 0) != 0);
            DefaultTimekeeperImpl read = (DefaultTimekeeperImpl)metricRegistryForLogWorker.timer("readEntryLatency");
            Assertions.assertTrue((read.getTimer().getMeanRate() > 0.0 ? 1 : 0) != 0);
        }
    }

    static List<RaftProtos.LogEntryProto> prepareLogEntries(List<SegmentRange> slist, Supplier<String> stringSupplier) {
        ArrayList<RaftProtos.LogEntryProto> eList = new ArrayList<RaftProtos.LogEntryProto>();
        for (SegmentRange range : slist) {
            TestSegmentedRaftLog.prepareLogEntries(range, stringSupplier, false, eList);
        }
        return eList;
    }

    static List<RaftProtos.LogEntryProto> prepareLogEntries(SegmentRange range, Supplier<String> stringSupplier, boolean hasStataMachineData, List<RaftProtos.LogEntryProto> eList) {
        for (long index = range.start; index <= range.end; ++index) {
            eList.add(TestSegmentedRaftLog.prepareLogEntry(range.term, index, stringSupplier, hasStataMachineData));
        }
        return eList;
    }

    static RaftProtos.LogEntryProto prepareLogEntry(long term, long index, Supplier<String> stringSupplier, boolean hasStataMachineData) {
        RaftTestUtil.SimpleOperation m = stringSupplier == null ? new RaftTestUtil.SimpleOperation("m" + index, hasStataMachineData) : new RaftTestUtil.SimpleOperation(stringSupplier.get(), hasStataMachineData);
        return LogProtoUtils.toLogEntryProto((RaftProtos.StateMachineLogEntryProto)m.getLogEntryContent(), (long)term, (long)index);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 5, 200, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
        }
        raftLog = this.newSegmentedRaftLog();
        var6_6 = null;
        try {
            raftLog.open(-1L, null);
            this.checkEntries((RaftLog)raftLog, entries, 0, entries.size());
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (raftLog != null) {
                if (var6_6 != null) {
                    try {
                        raftLog.close();
                    }
                    catch (Throwable throwable) {
                        var6_6.addSuppressed(throwable);
                    }
                } else {
                    raftLog.close();
                }
            }
        }
        raftLog = this.newSegmentedRaftLog();
        var6_6 = null;
        try {
            raftLog.open(-1L, null);
            TermIndex lastTermIndex = raftLog.getLastEntryTermIndex();
            IllegalStateException ex = null;
            try {
                raftLog.appendEntry(RaftProtos.LogEntryProto.newBuilder((RaftProtos.LogEntryProto)entries.get(0)).setTerm(lastTermIndex.getTerm() - 1L).setIndex(lastTermIndex.getIndex() + 1L).build());
            }
            catch (IllegalStateException e) {
                ex = e;
            }
            Assertions.assertTrue((boolean)ex.getMessage().contains("term less than RaftLog's last term"));
            try {
                raftLog.appendEntry(RaftProtos.LogEntryProto.newBuilder((RaftProtos.LogEntryProto)entries.get(0)).setTerm(lastTermIndex.getTerm()).setIndex(lastTermIndex.getIndex() + 2L).build());
            }
            catch (IllegalStateException e) {
                ex = e;
            }
            Assertions.assertTrue((boolean)ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " (or snapshot index " + raftLog.getSnapshotIndex() + ") is greater than 1"));
            raftLog.onSnapshotInstalled(raftLog.getLastEntryTermIndex().getIndex());
            try {
                raftLog.appendEntry(RaftProtos.LogEntryProto.newBuilder((RaftProtos.LogEntryProto)entries.get(0)).setTerm(lastTermIndex.getTerm()).setIndex(lastTermIndex.getIndex() + 2L).build());
            }
            catch (IllegalStateException e) {
                ex = e;
            }
            Assertions.assertTrue((boolean)ex.getMessage().contains("Difference between entry index and RaftLog's latest snapshot index 999 is greater than 1"));
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (raftLog != null) {
                if (var6_6 != null) {
                    try {
                        raftLog.close();
                    }
                    catch (Throwable throwable) {
                        var6_6.addSuppressed(throwable);
                    }
                } else {
                    raftLog.close();
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAppendEntryAfterPurge(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 5, 200, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        final long desiredSnapshotIndex = entries.size() - 2;
        LongSupplier getSnapshotIndexFromStateMachine = new LongSupplier(){
            private boolean firstCall = true;

            @Override
            public long getAsLong() {
                long index = this.firstCall ? -1L : desiredSnapshotIndex;
                this.firstCall = !this.firstCall;
                return index;
            }
        };
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog(getSnapshotIndexFromStateMachine);){
            raftLog.open(-1L, null);
            entries.subList(0, entries.size() - 1).stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
            raftLog.onSnapshotInstalled(desiredSnapshotIndex);
            CompletableFuture appendEntryFuture = raftLog.appendEntry(entries.get(entries.size() - 1));
            Assertions.assertTrue((desiredSnapshotIndex + 1L == (Long)appendEntryFuture.get() ? 1 : 0) != 0);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAppendAndRoll(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        RaftServerConfigKeys.Log.setPreallocatedSize((RaftProperties)this.properties, (SizeInBytes)SizeInBytes.valueOf((String)"16KB"));
        RaftServerConfigKeys.Log.setSegmentSizeMax((RaftProperties)this.properties, (SizeInBytes)SizeInBytes.valueOf((String)"128KB"));
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 1, 1024, 0L);
        byte[] content = new byte[1024];
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, () -> new String(content));
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
        }
        raftLog = this.newSegmentedRaftLog();
        var7_7 = null;
        try {
            raftLog.open(-1L, null);
            this.checkEntries((RaftLog)raftLog, entries, 0, entries.size());
            Assertions.assertEquals((int)9, (int)raftLog.getRaftLogCache().getNumOfSegments());
        }
        catch (Throwable throwable) {
            var7_7 = throwable;
            throw throwable;
        }
        finally {
            if (raftLog != null) {
                if (var7_7 != null) {
                    try {
                        raftLog.close();
                    }
                    catch (Throwable throwable) {
                        var7_7.addSuppressed(throwable);
                    }
                } else {
                    raftLog.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        RaftServerConfigKeys.Log.setPurgeGap((RaftProperties)this.properties, (int)1);
        RaftServerConfigKeys.Log.setForceSyncNum((RaftProperties)this.properties, (int)128);
        int startTerm = 0;
        int endTerm = 2;
        int segmentSize = 10;
        long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1);
        long nextStartIndex = segmentSize * (endTerm - startTerm);
        List<SegmentRange> ranges0 = TestSegmentedRaftLog.prepareRanges(startTerm, endTerm, segmentSize, 0L);
        List<RaftProtos.LogEntryProto> entries0 = TestSegmentedRaftLog.prepareLogEntries(ranges0, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            entries0.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
        }
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(endTerm - 1, endTerm, 1, nextStartIndex);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            CountDownLatch raftLogOpened = new CountDownLatch(1);
            CountDownLatch tasksAdded = new CountDownLatch(1);
            ConcurrentLinkedQueue appendFutures = new ConcurrentLinkedQueue();
            AtomicReference purgeFuture = new AtomicReference();
            AtomicInteger tasksCount = new AtomicInteger(0);
            CodeInjectionForTesting.put((String)SegmentedRaftLogWorker.RUN_WORKER, (localId, remoteId, args) -> {
                try {
                    if (!raftLogOpened.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) {
                        throw new TimeoutException();
                    }
                }
                catch (InterruptedException | TimeoutException e) {
                    this.LOG.error("an exception occurred", (Throwable)e);
                    throw new RuntimeException(e);
                }
                entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(appendFutures::add);
                purgeFuture.set(raftLog.purge(endIndexOfClosedSegment));
                tasksCount.set(((DataBlockingQueue)args[0]).getNumElements());
                tasksAdded.countDown();
                return true;
            });
            raftLog.open(-1L, null);
            raftLogOpened.countDown();
            if (!tasksAdded.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) {
                throw new TimeoutException();
            }
            Assertions.assertEquals((int)(entries.size() + 1), (int)tasksCount.get());
            Long purged = (Long)((CompletableFuture)purgeFuture.get()).get();
            this.LOG.info("purgeIndex = {}, purged = {}", (Object)endIndexOfClosedSegment, (Object)purged);
            Assertions.assertEquals((long)endIndexOfClosedSegment, (long)raftLog.getRaftLogCache().getStartIndex());
            JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit());
        }
        finally {
            CodeInjectionForTesting.put((String)SegmentedRaftLogWorker.RUN_WORKER, (localId, remoteId, args) -> false);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testTruncate(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 5, 200, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
        }
        for (long fromIndex = 900L; fromIndex >= 0L; fromIndex -= 150L) {
            this.testTruncate(entries, fromIndex);
        }
    }

    private void testTruncate(List<RaftProtos.LogEntryProto> entries, long fromIndex) throws Exception {
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            raftLog.truncate(fromIndex).join();
            this.checkEntries((RaftLog)raftLog, entries, 0, (int)fromIndex);
        }
        raftLog = this.newSegmentedRaftLog();
        var5_4 = null;
        try {
            raftLog.open(-1L, null);
            if (fromIndex > 0L) {
                Assertions.assertEquals((Object)entries.get((int)(fromIndex - 1L)), (Object)this.getLastEntry(raftLog));
            } else {
                Assertions.assertNull((Object)raftLog.getLastEntryTermIndex());
            }
            this.checkEntries((RaftLog)raftLog, entries, 0, (int)fromIndex);
        }
        catch (Throwable throwable) {
            var5_4 = throwable;
            throw throwable;
        }
        finally {
            if (raftLog != null) {
                if (var5_4 != null) {
                    try {
                        raftLog.close();
                    }
                    catch (Throwable throwable) {
                        var5_4.addSuppressed(throwable);
                    }
                } else {
                    raftLog.close();
                }
            }
        }
    }

    private void checkEntries(RaftLog raftLog, List<RaftProtos.LogEntryProto> expected, int offset, int size) throws IOException {
        if (size > 0) {
            for (int i = offset; i < size + offset; ++i) {
                RaftProtos.LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
                Assertions.assertEquals((Object)expected.get(i), (Object)entry);
            }
            LogEntryHeader[] termIndices = raftLog.getEntries(expected.get(offset).getIndex(), expected.get(offset + size - 1).getIndex() + 1L);
            Object[] entriesFromLog = (RaftProtos.LogEntryProto[])Arrays.stream(termIndices).map(ti -> {
                try {
                    return raftLog.get(ti.getIndex());
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).toArray(RaftProtos.LogEntryProto[]::new);
            Object[] expectedArray = (RaftProtos.LogEntryProto[])expected.subList(offset, offset + size).stream().toArray(RaftProtos.LogEntryProto[]::new);
            Assertions.assertArrayEquals((Object[])expectedArray, (Object[])entriesFromLog);
        }
    }

    private void checkFailedEntries(List<RaftProtos.LogEntryProto> entries, long fromIndex, RetryCache retryCache) {
        for (int i = 0; i < entries.size(); ++i) {
            if ((long)i < fromIndex) {
                RetryCacheTestUtil.assertFailure((RetryCache)retryCache, (RaftProtos.LogEntryProto)entries.get(i), (boolean)false);
                continue;
            }
            RetryCacheTestUtil.assertFailure((RetryCache)retryCache, (RaftProtos.LogEntryProto)entries.get(i), (boolean)true);
        }
    }

    @Test
    public void testPurgeOnOpenSegment() throws Exception {
        int startTerm = 0;
        int endTerm = 5;
        int segmentSize = 200;
        long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
        long expectedIndex = segmentSize * (endTerm - startTerm - 1);
        long purgePreservation = 0L;
        this.purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex);
    }

    @Test
    public void testPurgeOnClosedSegments() throws Exception {
        int startTerm = 0;
        int endTerm = 5;
        int segmentSize = 200;
        long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
        long expectedIndex = segmentSize * (endTerm - startTerm - 1);
        this.purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
    }

    @Test
    public void testPurgeLogMetric() throws Exception {
        int startTerm = 0;
        int endTerm = 5;
        int segmentSize = 200;
        long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
        long expectedIndex = segmentSize * (endTerm - startTerm - 1);
        RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry((RaftGroupMemberId)MEMBER_ID);
        this.purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
        DefaultTimekeeperImpl purge = (DefaultTimekeeperImpl)metricRegistryForLogWorker.timer("purgeLog");
        Assertions.assertTrue((purge.getTimer().getCount() > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
        int startTerm = 0;
        int endTerm = 5;
        int segmentSize = 200;
        long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
        long expectedIndex = 0L;
        this.purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex);
    }

    @Test
    public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Exception {
        int startTerm = 0;
        int endTerm = 5;
        int segmentSize = 200;
        long endIndex = segmentSize * (endTerm - startTerm) - 1;
        long startIndex = 200L;
        long purgePreservation = segmentSize * (endTerm - startTerm) + 100;
        this.purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation);
    }

    private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex, long expectedIndex) throws Exception {
        this.purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, expectedIndex, 0L, 0L);
    }

    private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex, long expectedIndex, long startIndex, long purgePreservation) throws Exception {
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(startTerm, endTerm, segmentSize, startIndex);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        RaftProperties p = new RaftProperties();
        RaftServerConfigKeys.Log.setPurgeGap((RaftProperties)p, (int)purgeGap);
        RaftServerConfigKeys.Log.setPurgePreservationLogNum((RaftProperties)p, (long)purgePreservation);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLogWithSnapshotIndex(this.storage, p, () -> startIndex - 1L);){
            raftLog.open(startIndex - 1L, null);
            entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
            CompletableFuture f = raftLog.purge(purgeIndex);
            Long purged = (Long)f.get();
            this.LOG.info("purgeIndex = {}, purged = {}", (Object)purgeIndex, (Object)purged);
            Assertions.assertEquals((long)expectedIndex, (long)raftLog.getRaftLogCache().getStartIndex());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAppendEntriesWithInconsistency(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 5, 200, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
        try (SegmentedRaftLog raftLog = RetryCacheTestUtil.newSegmentedRaftLog((RaftGroupMemberId)MEMBER_ID, (RetryCache)retryCache, (RaftStorage)this.storage, (RaftProperties)this.properties);){
            raftLog.open(-1L, null);
            entries.forEach(entry -> RetryCacheTestUtil.createEntry((RetryCache)retryCache, (RaftProtos.LogEntryProto)entry));
            entries.stream().map(arg_0 -> ((SegmentedRaftLog)raftLog).appendEntry(arg_0)).forEach(CompletableFuture::join);
        }
        SegmentRange r1 = new SegmentRange(550L, 599L, 2L, false);
        SegmentRange r2 = new SegmentRange(600L, 649L, 3L, false);
        SegmentRange r3 = new SegmentRange(650L, 749L, 10L, false);
        List<RaftProtos.LogEntryProto> newEntries = TestSegmentedRaftLog.prepareLogEntries(Arrays.asList(r1, r2, r3), null);
        try (SegmentedRaftLog raftLog = RetryCacheTestUtil.newSegmentedRaftLog((RaftGroupMemberId)MEMBER_ID, (RetryCache)retryCache, (RaftStorage)this.storage, (RaftProperties)this.properties);){
            raftLog.open(-1L, null);
            this.LOG.info("newEntries[0] = {}", (Object)newEntries.get(0));
            int last = newEntries.size() - 1;
            this.LOG.info("newEntries[{}] = {}", (Object)last, (Object)newEntries.get(last));
            raftLog.append(newEntries).forEach(CompletableFuture::join);
            this.checkFailedEntries(entries, 650L, retryCache);
            this.checkEntries((RaftLog)raftLog, entries, 0, 650);
            this.checkEntries((RaftLog)raftLog, newEntries, 100, 100);
            Assertions.assertEquals((Object)newEntries.get(newEntries.size() - 1), (Object)this.getLastEntry(raftLog));
            Assertions.assertEquals((long)newEntries.get(newEntries.size() - 1).getIndex(), (long)raftLog.getFlushIndex());
        }
        raftLog = RetryCacheTestUtil.newSegmentedRaftLog((RaftGroupMemberId)MEMBER_ID, (RetryCache)retryCache, (RaftStorage)this.storage, (RaftProperties)this.properties);
        var11_15 = null;
        try {
            raftLog.open(-1L, null);
            this.checkEntries((RaftLog)raftLog, entries, 0, 650);
            this.checkEntries((RaftLog)raftLog, newEntries, 100, 100);
            Assertions.assertEquals((Object)newEntries.get(newEntries.size() - 1), (Object)this.getLastEntry(raftLog));
            Assertions.assertEquals((long)newEntries.get(newEntries.size() - 1).getIndex(), (long)raftLog.getFlushIndex());
            SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
            Assertions.assertEquals((int)5, (int)cache.getNumOfSegments());
        }
        catch (Throwable throwable) {
            var11_15 = throwable;
            throw throwable;
        }
        finally {
            if (raftLog != null) {
                if (var11_15 != null) {
                    try {
                        raftLog.close();
                    }
                    catch (Throwable throwable) {
                        var11_15.addSuppressed(throwable);
                    }
                } else {
                    raftLog.close();
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testSegmentedRaftLogStateMachineData(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        SegmentRange range = new SegmentRange(0L, 10L, 1L, true);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(range, null, true, new ArrayList<RaftProtos.LogEntryProto>());
        SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
        try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder().setMemberId(MEMBER_ID).setStateMachine((StateMachine)sm).setStorage(this.storage).setProperties(this.properties).build();){
            raftLog.open(-1L, null);
            int next = 0;
            long flush = -1L;
            this.assertIndices((RaftLog)raftLog, flush, next);
            raftLog.appendEntry(entries.get(next++));
            this.assertIndices((RaftLog)raftLog, flush, next);
            raftLog.appendEntry(entries.get(next++));
            this.assertIndices((RaftLog)raftLog, flush, next);
            raftLog.appendEntry(entries.get(next++));
            this.assertIndicesMultipleAttempts((RaftLog)raftLog, flush += 3L, next);
            sm.blockFlushStateMachineData();
            raftLog.appendEntry(entries.get(next++));
            sm.blockWriteStateMachineData();
            Thread t = TestSegmentedRaftLog.startAppendEntryThread((RaftLog)raftLog, entries.get(next++));
            TimeUnit.SECONDS.sleep(1L);
            Assertions.assertTrue((boolean)t.isAlive());
            sm.unblockWriteStateMachineData();
            this.assertIndices((RaftLog)raftLog, flush, next);
            TimeUnit.SECONDS.sleep(1L);
            this.assertIndices((RaftLog)raftLog, flush, next);
            sm.unblockFlushStateMachineData();
            this.assertIndicesMultipleAttempts((RaftLog)raftLog, flush + 2L, next);
            t.join();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testServerShutdownOnTimeoutIOException(Boolean useAsyncFlush, Boolean smSyncFlush) throws Throwable {
        ExecutionException ex;
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)true);
        TimeDuration syncTimeout = TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout((RaftProperties)this.properties, (TimeDuration)syncTimeout);
        int numRetries = 2;
        RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry((RaftProperties)this.properties, (int)2);
        RaftProtos.LogEntryProto entry = TestSegmentedRaftLog.prepareLogEntry(0L, 0L, null, true);
        BaseStateMachine sm = new BaseStateMachine(){

            public CompletableFuture<Void> write(RaftProtos.LogEntryProto entry) {
                this.getLifeCycle().transition(LifeCycle.State.STARTING);
                this.getLifeCycle().transition(LifeCycle.State.RUNNING);
                return new CompletableFuture<Void>();
            }

            public void notifyLogFailed(Throwable cause, RaftProtos.LogEntryProto entry) {
                LOG.info("Test StateMachine: Ratis log failed notification received as expected.", cause);
                LOG.info("Test StateMachine: Transition to PAUSED state.");
                Assertions.assertNotNull((Object)entry);
                this.getLifeCycle().transition(LifeCycle.State.PAUSING);
                this.getLifeCycle().transition(LifeCycle.State.PAUSED);
            }
        };
        try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder().setMemberId(MEMBER_ID).setStateMachine((StateMachine)sm).setStorage(this.storage).setProperties(this.properties).build();){
            raftLog.open(-1L, null);
            CompletableFuture f = raftLog.appendEntry(entry);
            ex = (ExecutionException)Assertions.assertThrows(ExecutionException.class, f::get);
        }
        Assertions.assertSame((Object)LifeCycle.State.PAUSED, (Object)sm.getLifeCycleState());
        Assertions.assertInstanceOf(TimeoutIOException.class, (Object)ex.getCause());
    }

    static Thread startAppendEntryThread(RaftLog raftLog, RaftProtos.LogEntryProto entry) {
        Thread t = new Thread(() -> {
            try {
                raftLog.appendEntry(entry).get();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        });
        t.start();
        return t;
    }

    void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) {
        this.LOG.info("assert expectedFlushIndex={}", (Object)expectedFlushIndex);
        Assertions.assertEquals((long)expectedFlushIndex, (long)raftLog.getFlushIndex());
        this.LOG.info("assert expectedNextIndex={}", (Object)expectedNextIndex);
        Assertions.assertEquals((long)expectedNextIndex, (long)raftLog.getNextIndex());
    }

    void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) throws Exception {
        JavaUtils.attempt(() -> this.assertIndices(raftLog, expectedFlushIndex, expectedNextIndex), (int)10, (TimeDuration)HUNDRED_MILLIS, (String)"assertIndices", (Logger)this.LOG);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAsyncFlushPerf1(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 50, 20000, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            ArrayList<List> futures = new ArrayList<List>();
            long start = System.nanoTime();
            for (int i = 0; i < entries.size(); i += 5) {
                futures.add(raftLog.append(Arrays.asList(entries.get(i), entries.get(i + 1), entries.get(i + 2), entries.get(i + 3), entries.get(i + 4))));
            }
            for (List futureList : futures) {
                futureList.forEach(CompletableFuture::join);
            }
            System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) + " ns with asyncFlush " + useAsyncFlush);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testAsyncFlushPerf2(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
        RaftServerConfigKeys.Log.setAsyncFlushEnabled((RaftProperties)this.properties, (boolean)useAsyncFlush);
        RaftServerConfigKeys.Log.StateMachineData.setSync((RaftProperties)this.properties, (boolean)smSyncFlush);
        List<SegmentRange> ranges = TestSegmentedRaftLog.prepareRanges(0, 50, 20000, 0L);
        List<RaftProtos.LogEntryProto> entries = TestSegmentedRaftLog.prepareLogEntries(ranges, null);
        try (SegmentedRaftLog raftLog = this.newSegmentedRaftLog();){
            raftLog.open(-1L, null);
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            long start = System.nanoTime();
            for (int i = 0; i < entries.size(); ++i) {
                futures.add(raftLog.appendEntry(entries.get(i)));
            }
            for (CompletableFuture futureList : futures) {
                futureList.join();
            }
            System.out.println(entries.size() + " appendEntry finished in " + (System.nanoTime() - start) + " ns with asyncFlush " + useAsyncFlush);
        }
    }

    static {
        Slf4jUtils.setLogLevel((Logger)SegmentedRaftLogWorker.LOG, (Level)Level.INFO);
        Slf4jUtils.setLogLevel((Logger)SegmentedRaftLogCache.LOG, (Level)Level.INFO);
        Slf4jUtils.setLogLevel((Logger)SegmentedRaftLog.LOG, (Level)Level.INFO);
        PEER_ID = RaftPeerId.valueOf((String)"s0");
        GROUP_ID = RaftGroupId.randomId();
        MEMBER_ID = RaftGroupMemberId.valueOf((RaftPeerId)PEER_ID, (RaftGroupId)GROUP_ID);
    }

    static class SegmentRange {
        final long start;
        final long end;
        final long term;
        final boolean isOpen;

        SegmentRange(long s, long e, long term, boolean isOpen) {
            this.start = s;
            this.end = e;
            this.term = term;
            this.isOpen = isOpen;
        }

        File getFile(RaftStorage storage) {
            return LogSegmentStartEnd.valueOf((long)this.start, (long)this.end, (boolean)this.isOpen).getFile(storage);
        }
    }
}

