/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.transaction.log.checkpoint;

import java.io.Flushable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.exceptions.UnderlyingStorageException;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointScheduler;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo;
import org.neo4j.monitoring.DatabaseHealth;
import org.neo4j.monitoring.Health;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobMonitoringParams;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.test.OtherThreadExecutor;

class CheckPointSchedulerTest {
    private final IOLimiter ioLimiter = (IOLimiter)Mockito.mock(IOLimiter.class);
    private final CheckPointer checkPointer = (CheckPointer)Mockito.mock(CheckPointer.class);
    private final OnDemandJobScheduler jobScheduler = (OnDemandJobScheduler)Mockito.spy((Object)new OnDemandJobScheduler());
    private final Health health = (Health)Mockito.mock(DatabaseHealth.class);
    private static ExecutorService executor;

    CheckPointSchedulerTest() {
    }

    @BeforeAll
    static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    static void tearDownExecutor() throws InterruptedException {
        executor.shutdown();
        executor.awaitTermination(30L, TimeUnit.SECONDS);
    }

    @Test
    void shouldScheduleTheCheckPointerJobOnStart() {
        CheckPointScheduler scheduler = new CheckPointScheduler(this.checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 20L, this.health, "test db");
        Assertions.assertNull((Object)this.jobScheduler.getJob());
        scheduler.start();
        Assertions.assertNotNull((Object)this.jobScheduler.getJob());
        ((OnDemandJobScheduler)Mockito.verify((Object)this.jobScheduler)).schedule((Group)ArgumentMatchers.eq((Object)Group.CHECKPOINT), (JobMonitoringParams)ArgumentMatchers.any(JobMonitoringParams.class), (Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq((long)20L), (TimeUnit)((Object)ArgumentMatchers.eq((Object)((Object)TimeUnit.MILLISECONDS))));
    }

    @Test
    void shouldRescheduleTheJobAfterARun() throws Throwable {
        CheckPointScheduler scheduler = new CheckPointScheduler(this.checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 20L, this.health, "test db");
        Assertions.assertNull((Object)this.jobScheduler.getJob());
        scheduler.start();
        Object scheduledJob = this.jobScheduler.getJob();
        Assertions.assertNotNull((Object)scheduledJob);
        this.jobScheduler.runJob();
        ((OnDemandJobScheduler)Mockito.verify((Object)this.jobScheduler, (VerificationMode)Mockito.times((int)2))).schedule((Group)ArgumentMatchers.eq((Object)Group.CHECKPOINT), (JobMonitoringParams)ArgumentMatchers.any(JobMonitoringParams.class), (Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq((long)20L), (TimeUnit)((Object)ArgumentMatchers.eq((Object)((Object)TimeUnit.MILLISECONDS))));
        ((CheckPointer)Mockito.verify((Object)this.checkPointer)).checkPointIfNeeded((TriggerInfo)ArgumentMatchers.any(TriggerInfo.class));
        Assertions.assertEquals((Object)scheduledJob, (Object)this.jobScheduler.getJob());
    }

    @Test
    void shouldNotRescheduleAJobWhenStopped() {
        CheckPointScheduler scheduler = new CheckPointScheduler(this.checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 20L, this.health, "test db");
        Assertions.assertNull((Object)this.jobScheduler.getJob());
        scheduler.start();
        Assertions.assertNotNull((Object)this.jobScheduler.getJob());
        scheduler.stop();
        Assertions.assertNull((Object)this.jobScheduler.getJob());
    }

    @Test
    void stoppedJobCantBeInvoked() throws Throwable {
        CheckPointScheduler scheduler = new CheckPointScheduler(this.checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 10L, this.health, "test db");
        scheduler.start();
        this.jobScheduler.runJob();
        ((CheckPointer)Mockito.verify((Object)this.checkPointer)).checkPointIfNeeded((TriggerInfo)ArgumentMatchers.any(TriggerInfo.class));
        scheduler.stop();
        scheduler.start();
        this.jobScheduler.runJob();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.checkPointer});
    }

    @Test
    void shouldWaitOnStopUntilTheRunningCheckpointIsDone() {
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(60L), this::testWaitOnStopUntilTheRunningCheckpointIsDone);
    }

    void testWaitOnStopUntilTheRunningCheckpointIsDone() throws Throwable {
        AtomicReference ex = new AtomicReference();
        AtomicBoolean stoppedCompleted = new AtomicBoolean();
        final DoubleLatch checkPointerLatch = new DoubleLatch(1);
        OtherThreadExecutor otherThreadExecutor = new OtherThreadExecutor("scheduler stopper");
        CheckPointer checkPointer = new CheckPointer(){

            public long checkPointIfNeeded(TriggerInfo triggerInfo) {
                checkPointerLatch.startAndWaitForAllToStart();
                checkPointerLatch.waitForAllToFinish();
                return 42L;
            }

            public long tryCheckPoint(TriggerInfo triggerInfo) {
                throw new RuntimeException("this should have not been called");
            }

            public long tryCheckPoint(TriggerInfo triggerInfo, BooleanSupplier timeout) {
                throw new RuntimeException("this should have not been called");
            }

            public long tryCheckPointNoWait(TriggerInfo triggerInfo) {
                throw new RuntimeException("this should have not been called");
            }

            public long forceCheckPoint(TriggerInfo triggerInfo) {
                throw new RuntimeException("this should have not been called");
            }

            public long lastCheckPointedTransactionId() {
                return 42L;
            }
        };
        CheckPointScheduler scheduler = new CheckPointScheduler(checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 20L, this.health, "test db");
        scheduler.start();
        Thread runCheckPointer = new Thread(() -> ((OnDemandJobScheduler)this.jobScheduler).runJob());
        runCheckPointer.start();
        checkPointerLatch.waitForAllToStart();
        otherThreadExecutor.executeDontWait(() -> {
            try {
                scheduler.stop();
                stoppedCompleted.set(true);
            }
            catch (Throwable throwable) {
                ex.set(throwable);
            }
            return null;
        });
        otherThreadExecutor.waitUntilWaiting(details -> details.isAt(CheckPointScheduler.class, "waitOngoingCheckpointCompletion"));
        Assertions.assertFalse((boolean)stoppedCompleted.get());
        checkPointerLatch.finish();
        runCheckPointer.join();
        while (!stoppedCompleted.get()) {
            Thread.sleep(1L);
        }
        otherThreadExecutor.close();
        Assertions.assertNull(ex.get());
    }

    @Test
    void shouldContinueThroughSporadicFailures() {
        ControlledCheckPointer checkPointer = new ControlledCheckPointer();
        CheckPointScheduler scheduler = new CheckPointScheduler((CheckPointer)checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 1L, this.health, "test db");
        scheduler.start();
        for (int i = 0; i < CheckPointScheduler.MAX_CONSECUTIVE_FAILURES_TOLERANCE * 2; ++i) {
            checkPointer.fail = true;
            this.jobScheduler.runJob();
            Mockito.verifyNoInteractions((Object[])new Object[]{this.health});
            checkPointer.fail = false;
            this.jobScheduler.runJob();
            Mockito.verifyNoInteractions((Object[])new Object[]{this.health});
        }
    }

    @Test
    void checkpointOnStopShouldFlushAsFastAsPossible() {
        Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(10L), this::testCheckpointOnStopShouldFlushAsFastAsPossible);
    }

    void testCheckpointOnStopShouldFlushAsFastAsPossible() throws Throwable {
        CheckableIOLimiter ioLimiter = new CheckableIOLimiter();
        CountDownLatch checkPointerLatch = new CountDownLatch(1);
        WaitUnlimitedCheckPointer checkPointer = new WaitUnlimitedCheckPointer(ioLimiter, checkPointerLatch);
        CheckPointScheduler scheduler = new CheckPointScheduler((CheckPointer)checkPointer, (IOLimiter)ioLimiter, (JobScheduler)this.jobScheduler, 0L, this.health, "test db");
        scheduler.start();
        Future<?> checkpointerStarter = executor.submit(() -> ((OnDemandJobScheduler)this.jobScheduler).runJob());
        checkPointerLatch.await();
        scheduler.stop();
        checkpointerStarter.get();
        Assertions.assertTrue((boolean)checkPointer.isCheckpointCreated(), (String)"Checkpointer should be created.");
        Assertions.assertTrue((boolean)ioLimiter.isLimited(), (String)"Limiter should be enabled in the end.");
    }

    @Test
    void shouldCausePanicAfterSomeFailures() throws Throwable {
        Object[] failures = new RuntimeException[]{new RuntimeException("First"), new RuntimeException("Second"), new RuntimeException("Third")};
        Mockito.when((Object)this.checkPointer.checkPointIfNeeded((TriggerInfo)ArgumentMatchers.any(TriggerInfo.class))).thenThrow((Throwable[])failures);
        CheckPointScheduler scheduler = new CheckPointScheduler(this.checkPointer, this.ioLimiter, (JobScheduler)this.jobScheduler, 1L, this.health, "test db");
        scheduler.start();
        for (int i = 0; i < CheckPointScheduler.MAX_CONSECUTIVE_FAILURES_TOLERANCE - 1; ++i) {
            this.jobScheduler.runJob();
            Mockito.verifyNoInteractions((Object[])new Object[]{this.health});
        }
        UnderlyingStorageException error = (UnderlyingStorageException)Assertions.assertThrows(UnderlyingStorageException.class, () -> ((OnDemandJobScheduler)this.jobScheduler).runJob());
        Assertions.assertEquals((Object)Iterators.asSet((Object[])failures), (Object)Iterators.asSet((Object[])error.getSuppressed()));
        ((Health)Mockito.verify((Object)this.health)).panic((Throwable)error);
    }

    private static class WaitUnlimitedCheckPointer
    implements CheckPointer {
        private final CheckableIOLimiter ioLimiter;
        private final CountDownLatch latch;
        private volatile boolean checkpointCreated;

        WaitUnlimitedCheckPointer(CheckableIOLimiter ioLimiter, CountDownLatch latch) {
            this.ioLimiter = ioLimiter;
            this.latch = latch;
            this.checkpointCreated = false;
        }

        public long checkPointIfNeeded(TriggerInfo triggerInfo) {
            this.latch.countDown();
            while (this.ioLimiter.isLimited()) {
            }
            this.checkpointCreated = true;
            return 42L;
        }

        public long tryCheckPoint(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException("This should have not been called");
        }

        public long tryCheckPoint(TriggerInfo triggerInfo, BooleanSupplier timeout) {
            throw new UnsupportedOperationException("This should have not been called");
        }

        public long tryCheckPointNoWait(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException("This should have not been called");
        }

        public long forceCheckPoint(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException("This should have not been called");
        }

        public long lastCheckPointedTransactionId() {
            return 0L;
        }

        boolean isCheckpointCreated() {
            return this.checkpointCreated;
        }
    }

    private static class CheckableIOLimiter
    implements IOLimiter {
        private volatile boolean limitEnabled;

        private CheckableIOLimiter() {
        }

        public long maybeLimitIO(long previousStamp, int recentlyCompletedIOs, Flushable flushable) {
            return 0L;
        }

        public void disableLimit() {
            this.limitEnabled = false;
        }

        public void enableLimit() {
            this.limitEnabled = true;
        }

        public boolean isLimited() {
            return this.limitEnabled;
        }
    }

    private static class ControlledCheckPointer
    implements CheckPointer {
        volatile boolean fail;

        private ControlledCheckPointer() {
        }

        public long checkPointIfNeeded(TriggerInfo triggerInfo) throws IOException {
            if (this.fail) {
                throw new IOException("Just failing");
            }
            return 1L;
        }

        public long tryCheckPoint(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException();
        }

        public long tryCheckPoint(TriggerInfo triggerInfo, BooleanSupplier timeout) {
            throw new UnsupportedOperationException();
        }

        public long tryCheckPointNoWait(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException();
        }

        public long forceCheckPoint(TriggerInfo triggerInfo) {
            throw new UnsupportedOperationException();
        }

        public long lastCheckPointedTransactionId() {
            throw new UnsupportedOperationException();
        }
    }
}

