/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.scheduler;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.resources.Profiler;
import org.neo4j.scheduler.CallableExecutorService;
import org.neo4j.scheduler.CancelListener;
import org.neo4j.scheduler.Group;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.scheduler.JobMonitoringParams;
import org.neo4j.scheduler.SchedulerThreadFactory;
import org.neo4j.time.Clocks;
import org.neo4j.util.concurrent.BinaryLatch;

class CentralJobSchedulerTest {
    private final AtomicInteger invocations = new AtomicInteger();
    private final LifeSupport life = new LifeSupport();
    private final CentralJobScheduler scheduler = (CentralJobScheduler)this.life.add((Lifecycle)new CentralJobScheduler(Clocks.nanoClock()));
    private final Runnable countInvocationsJob = this.invocations::incrementAndGet;

    CentralJobSchedulerTest() {
    }

    @AfterEach
    void stopScheduler() {
        this.life.shutdown();
    }

    @Test
    void taskSchedulerGroupMustNotBeDirectlySchedulable() {
        this.life.start();
        org.junit.jupiter.api.Assertions.assertThrows(RejectedExecutionException.class, () -> this.scheduler.schedule(Group.TASK_SCHEDULER, JobMonitoringParams.NOT_MONITORED, () -> org.junit.jupiter.api.Assertions.fail((String)"This task should not have been executed.")));
    }

    @Test
    void shouldRunRecurringJob() {
        org.junit.jupiter.api.Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(10L), () -> {
            long period = 10L;
            int count = 5;
            this.life.start();
            this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, this.countInvocationsJob, period, TimeUnit.MILLISECONDS);
            this.awaitInvocationCount(count);
            this.scheduler.shutdown();
            int actualInvocations = this.invocations.get();
            Thread.sleep(period * 5L);
            Assertions.assertThat((int)this.invocations.get()).isEqualTo(actualInvocations);
        });
    }

    @Test
    void shouldCancelRecurringJob() throws Exception {
        long period = 2L;
        this.life.start();
        JobHandle jobHandle = this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, this.countInvocationsJob, period, TimeUnit.MILLISECONDS);
        this.awaitFirstInvocation();
        jobHandle.cancel();
        org.junit.jupiter.api.Assertions.assertThrows(CancellationException.class, () -> ((JobHandle)jobHandle).waitTermination());
        int recorded = this.invocations.get();
        Thread.sleep(period * 100L);
        Assertions.assertThat((int)this.invocations.get()).isGreaterThanOrEqualTo(recorded).isLessThanOrEqualTo(recorded + 1);
    }

    @Test
    void shouldRunWithDelay() throws Throwable {
        this.life.start();
        AtomicLong runTime = new AtomicLong();
        CountDownLatch latch = new CountDownLatch(1);
        long time = System.nanoTime();
        this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, () -> {
            runTime.set(System.nanoTime());
            latch.countDown();
        }, 100L, TimeUnit.MILLISECONDS);
        latch.await();
        org.junit.jupiter.api.Assertions.assertTrue((time + TimeUnit.MILLISECONDS.toNanos(100L) <= runTime.get() ? 1 : 0) != 0);
    }

    @Test
    void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs() {
        int i;
        this.life.start();
        ArrayList<JobHandle> handles = new ArrayList<JobHandle>(30);
        AtomicLong startedCounter = new AtomicLong();
        BinaryLatch blockLatch = new BinaryLatch();
        Runnable task = () -> {
            startedCounter.incrementAndGet();
            blockLatch.await();
        };
        for (i = 0; i < 10; ++i) {
            handles.add(this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, task, 0L, TimeUnit.MILLISECONDS));
        }
        for (i = 0; i < 10; ++i) {
            handles.add(this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, task, Integer.MAX_VALUE, TimeUnit.MILLISECONDS));
        }
        for (i = 0; i < 10; ++i) {
            handles.add(this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, task, 0L, Integer.MAX_VALUE, TimeUnit.MILLISECONDS));
        }
        long deadline = TimeUnit.SECONDS.toNanos(10L) + System.nanoTime();
        do {
            if (startedCounter.get() != (long)handles.size()) continue;
            blockLatch.release();
            for (JobHandle handle : handles) {
                handle.cancel();
            }
            return;
        } while (System.nanoTime() < deadline);
        org.junit.jupiter.api.Assertions.fail((String)("Only managed to start " + startedCounter.get() + " tasks in 10 seconds, when " + handles.size() + " was expected."));
    }

    @Test
    void shouldNotifyCancelListeners() {
        this.life.start();
        AtomicBoolean halted = new AtomicBoolean();
        Runnable job = () -> {
            while (!halted.get()) {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
            }
        };
        JobHandle handle = this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job);
        handle.registerCancelListener(() -> halted.set(true));
        handle.cancel();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)halted.get());
    }

    @Test
    void waitTerminationOnDelayedJobMustWaitUntilJobCompletion() {
        org.junit.jupiter.api.Assertions.assertTimeoutPreemptively((Duration)Duration.ofSeconds(10L), () -> {
            this.life.start();
            AtomicBoolean triggered = new AtomicBoolean();
            Runnable job = () -> {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
                triggered.set(true);
            };
            JobHandle handle = this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job, 10L, TimeUnit.MILLISECONDS);
            handle.waitTermination();
            org.junit.jupiter.api.Assertions.assertTrue((boolean)triggered.get());
        });
    }

    @Test
    @Timeout(value=60L)
    void scheduledTasksThatThrowsPropagateLastException() throws InterruptedException {
        this.life.start();
        RuntimeException boom = new RuntimeException("boom");
        CountDownLatch latch = new CountDownLatch(1);
        AtomicBoolean throwException = new AtomicBoolean();
        AtomicBoolean canceled = new AtomicBoolean();
        Runnable job = () -> {
            try {
                if (throwException.get()) {
                    latch.countDown();
                    throw boom;
                }
            }
            finally {
                throwException.set(true);
            }
        };
        JobHandle handle = this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job, 1L, TimeUnit.MILLISECONDS);
        handle.registerCancelListener(() -> canceled.set(true));
        latch.await();
        handle.cancel();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)canceled.get());
        Exception e = (Exception)org.junit.jupiter.api.Assertions.assertThrows(Exception.class, () -> ((JobHandle)handle).waitTermination());
        if (e instanceof ExecutionException) {
            Assertions.assertThat((Throwable)e.getCause()).isEqualTo((Object)boom);
        } else {
            Assertions.assertThat((Throwable)e).isInstanceOf(CancellationException.class);
        }
    }

    @Test
    @Timeout(value=60L)
    void scheduledTasksThatThrowsPropagateDoNotPropagateExceptionAfterSubsequentExecution() throws InterruptedException {
        this.life.start();
        RuntimeException boom = new RuntimeException("boom");
        AtomicBoolean throwException = new AtomicBoolean();
        CountDownLatch startCounter = new CountDownLatch(10);
        AtomicBoolean canceled = new AtomicBoolean();
        Runnable job = () -> {
            try {
                if (throwException.compareAndSet(false, true)) {
                    throw boom;
                }
            }
            finally {
                startCounter.countDown();
            }
        };
        JobHandle handle = this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job, 1L, TimeUnit.MILLISECONDS);
        handle.registerCancelListener(() -> canceled.set(true));
        startCounter.await();
        handle.cancel();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)canceled.get());
        org.junit.jupiter.api.Assertions.assertThrows(CancellationException.class, () -> ((JobHandle)handle).waitTermination());
    }

    @Test
    @Timeout(value=60L)
    void scheduledTasksThatThrowsShouldStop() {
        this.life.start();
        BinaryLatch triggerLatch = new BinaryLatch();
        AtomicBoolean canceled = new AtomicBoolean();
        RuntimeException boom = new RuntimeException("boom");
        AtomicInteger triggerCounter = new AtomicInteger();
        Runnable job = () -> {
            try {
                triggerCounter.incrementAndGet();
                throw boom;
            }
            catch (Throwable throwable) {
                triggerLatch.release();
                throw throwable;
            }
        };
        JobHandle jobHandle = this.scheduler.scheduleRecurring(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job, 1L, TimeUnit.MILLISECONDS);
        jobHandle.registerCancelListener(() -> canceled.set(true));
        triggerLatch.await();
        Assertions.assertThat((int)triggerCounter.get()).isGreaterThanOrEqualTo(1);
        org.junit.jupiter.api.Assertions.assertFalse((boolean)canceled.get());
        jobHandle.cancel();
        org.junit.jupiter.api.Assertions.assertTrue((boolean)canceled.get());
    }

    @Test
    @Timeout(value=60L)
    void shutDownMustKillCancelledJobs() {
        this.life.start();
        BinaryLatch startLatch = new BinaryLatch();
        BinaryLatch stopLatch = new BinaryLatch();
        this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, () -> {
            try {
                startLatch.release();
                Thread.sleep(100000L);
            }
            catch (InterruptedException e) {
                stopLatch.release();
                throw new RuntimeException(e);
            }
        });
        startLatch.await();
        this.scheduler.shutdown();
        stopLatch.await();
    }

    @Test
    void schedulerExecutorMustBeOfTypeDefinedByGroup() {
        this.life.start();
        CallableExecutorService executor = (CallableExecutorService)this.scheduler.executor(Group.CYPHER_WORKER);
        Assertions.assertThat((Object)executor.delegate()).isInstanceOf(ForkJoinPool.class);
    }

    @Test
    void mustRespectDesiredParallelismSetPriorToPoolCreation() throws Exception {
        this.life.start();
        AtomicInteger counter = new AtomicInteger();
        AtomicInteger max = new AtomicInteger();
        this.scheduler.setParallelism(Group.CYPHER_WORKER, 3);
        Runnable runnable = () -> {
            int currentVal;
            int currentMax;
            counter.getAndIncrement();
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50L));
            do {
                currentVal = counter.get();
            } while (!max.compareAndSet(currentMax = max.get(), Math.max(currentMax, currentVal)));
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50L));
            counter.getAndDecrement();
        };
        ArrayList<JobHandle> handles = new ArrayList<JobHandle>();
        for (int i = 0; i < 10; ++i) {
            handles.add(this.scheduler.schedule(Group.CYPHER_WORKER, JobMonitoringParams.NOT_MONITORED, runnable));
        }
        for (JobHandle handle : handles) {
            handle.waitTermination();
        }
        Assertions.assertThat((int)max.get()).isLessThanOrEqualTo(3);
    }

    @Test
    void shouldUseProvidedThreadFactory() {
        this.life.start();
        SchedulerThreadFactory schedulerThreadFactory = (SchedulerThreadFactory)Mockito.mock(SchedulerThreadFactory.class);
        this.scheduler.setThreadFactory(Group.BOLT_WORKER, (group, parentThreadGroup) -> schedulerThreadFactory);
        Assertions.assertThat((Object)this.scheduler.threadFactory(Group.BOLT_WORKER)).isSameAs((Object)schedulerThreadFactory);
    }

    @Test
    void shouldThrowIfModifyingParametersAfterStart() {
        this.life.start();
        this.scheduler.threadFactory(Group.BOLT_WORKER);
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> this.scheduler.setParallelism(Group.BOLT_WORKER, 2));
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> this.scheduler.setThreadFactory(Group.BOLT_WORKER, (a, b) -> (SchedulerThreadFactory)Mockito.mock(SchedulerThreadFactory.class)));
    }

    @Test
    void shouldListActiveGroups() {
        this.life.start();
        org.junit.jupiter.api.Assertions.assertEquals(List.of(), this.scheduler.activeGroups().map(ag -> ag.group).collect(Collectors.toList()));
        BinaryLatch firstLatch = new BinaryLatch();
        this.scheduler.schedule(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, () -> ((BinaryLatch)firstLatch).release());
        firstLatch.await();
        org.junit.jupiter.api.Assertions.assertEquals(List.of(Group.CHECKPOINT), this.scheduler.activeGroups().map(ag -> ag.group).collect(Collectors.toList()));
    }

    @Timeout(value=20L, unit=TimeUnit.SECONDS)
    @Test
    void shouldProfileGroup() throws InterruptedException {
        ByteArrayOutputStream bufferOut;
        String printedProfile;
        this.life.start();
        BinaryLatch checkpointLatch = new BinaryLatch();
        this.scheduler.schedule(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, () -> ((BinaryLatch)checkpointLatch).await());
        Profiler profiler = Profiler.profiler();
        this.scheduler.profileGroup(Group.CHECKPOINT, profiler);
        do {
            bufferOut = new ByteArrayOutputStream();
            PrintStream out = new PrintStream(bufferOut);
            profiler.printProfile(out, "Test Title");
            out.flush();
        } while (!(printedProfile = bufferOut.toString()).contains("BinaryLatch.await"));
        checkpointLatch.release();
        profiler.finish();
    }

    @Test
    void shouldPropagateResultFromCallable() throws ExecutionException, InterruptedException {
        this.life.start();
        Callable<Boolean> job = () -> true;
        JobHandle jobHandle = this.scheduler.schedule(Group.INDEX_POPULATION, JobMonitoringParams.NOT_MONITORED, job);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)((Boolean)jobHandle.get()));
    }

    @Test
    void scheduledTasksCanBeTheirOwnCancellationListeners() {
        this.life.start();
        final AtomicInteger cancelled = new AtomicInteger();
        class RunnableAndCancellable
        extends 1CancelCallback
        implements Runnable {
            RunnableAndCancellable() {
                class CancelCallback
                implements CancelListener {
                    final /* synthetic */ AtomicInteger val$cancelled;

                    CancelCallback() {
                        this.val$cancelled = atomicInteger;
                    }

                    public void cancelled() {
                        this.val$cancelled.incrementAndGet();
                    }
                }
                super(CentralJobSchedulerTest.this, atomicInteger);
            }

            @Override
            public void run() {
            }
        }
        this.scheduler.schedule(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, (Runnable)new RunnableAndCancellable()).cancel();
        Assertions.assertThat((AtomicInteger)cancelled).hasValue(1);
        cancelled.set(0);
        this.scheduler.schedule(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, (Runnable)new RunnableAndCancellable(), 1L, TimeUnit.SECONDS).cancel();
        Assertions.assertThat((AtomicInteger)cancelled).hasValue(1);
        cancelled.set(0);
        this.scheduler.scheduleRecurring(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, (Runnable)new RunnableAndCancellable(), 1L, TimeUnit.SECONDS).cancel();
        Assertions.assertThat((AtomicInteger)cancelled).hasValue(1);
        cancelled.set(0);
        this.scheduler.scheduleRecurring(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, (Runnable)new RunnableAndCancellable(), 1L, 1L, TimeUnit.SECONDS).cancel();
        Assertions.assertThat((AtomicInteger)cancelled).hasValue(1);
        cancelled.set(0);
        class CallableAndCancellable
        extends 1CancelCallback
        implements Callable<Void> {
            CallableAndCancellable() {
                super(CentralJobSchedulerTest.this, atomicInteger);
            }

            @Override
            public Void call() throws Exception {
                return null;
            }
        }
        this.scheduler.schedule(Group.CHECKPOINT, JobMonitoringParams.NOT_MONITORED, (Callable)new CallableAndCancellable()).cancel();
        Assertions.assertThat((AtomicInteger)cancelled).hasValue(1);
    }

    private void awaitFirstInvocation() throws InterruptedException {
        this.awaitInvocationCount(1);
    }

    private void awaitInvocationCount(int count) throws InterruptedException {
        while (this.invocations.get() < count) {
            Thread.sleep(10L);
        }
    }
}

