/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.executor;

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.helpers.Exceptions;
import org.neo4j.test.Barrier;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.Race;
import org.neo4j.test.rule.RepeatRule;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.Task;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

public class DynamicTaskExecutorTest {
    private static final ParkStrategy.Park PARK = new ParkStrategy.Park(1L, TimeUnit.MILLISECONDS);
    @Rule
    public final RepeatRule repeater = new RepeatRule();

    @Test
    public void shouldExecuteTasksInParallel() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 0, 5, (ParkStrategy)PARK, this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        TestTask task2 = new TestTask();
        executor.submit((Task)task1);
        task1.latch.waitForAllToStart();
        executor.submit((Task)task2);
        while (task2.executed == 0) {
        }
        task1.latch.finish();
        while (task1.executed == 0) {
        }
        executor.close();
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
    }

    @Test
    public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(1, 0, 5, (ParkStrategy)PARK, this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        TestTask task2 = new TestTask();
        executor.submit((Task)task1);
        task1.latch.waitForAllToStart();
        executor.submit((Task)task2);
        executor.processors(1);
        while (task2.executed == 0) {
        }
        task1.latch.finish();
        while (task1.executed == 0) {
        }
        executor.close();
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
    }

    @Test
    public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 0, 5, (ParkStrategy)PARK, this.getClass().getSimpleName());
        ControlledTask task1 = new ControlledTask();
        ControlledTask task2 = new ControlledTask();
        ControlledTask task3 = new ControlledTask();
        TestTask task4 = new TestTask();
        executor.submit((Task)task1);
        executor.submit((Task)task2);
        task1.latch.waitForAllToStart();
        task2.latch.waitForAllToStart();
        executor.submit((Task)task3);
        executor.submit((Task)task4);
        executor.processors(-1);
        task1.latch.finish();
        task2.latch.finish();
        task3.latch.waitForAllToStart();
        Thread.sleep(200L);
        Assert.assertEquals((long)0L, (long)task4.executed);
        task3.latch.finish();
        executor.close();
        Assert.assertEquals((long)1L, (long)task1.executed);
        Assert.assertEquals((long)1L, (long)task2.executed);
        Assert.assertEquals((long)1L, (long)task3.executed);
        Assert.assertEquals((long)1L, (long)task4.executed);
    }

    @Test
    public void shouldExecuteMultipleTasks() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(30, 0, 5, (ParkStrategy)PARK, this.getClass().getSimpleName());
        ExpensiveTask[] tasks = new ExpensiveTask[1000];
        for (int i = 0; i < tasks.length; ++i) {
            tasks[i] = new ExpensiveTask(10);
            executor.submit((Task)tasks[i]);
        }
        executor.close();
        for (ExpensiveTask task : tasks) {
            Assert.assertEquals((long)1L, (long)task.executed);
        }
    }

    @Test
    public void shouldShutDownOnTaskFailure() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(30, 0, 5, (ParkStrategy)PARK, this.getClass().getSimpleName());
        IOException exception = new IOException("Test message");
        FailingTask task = new FailingTask(exception);
        executor.submit((Task)task);
        task.latch.await();
        task.latch.release();
        this.assertExceptionOnSubmit((TaskExecutor<Void>)executor, exception);
    }

    @Test
    public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 0, 10, (ParkStrategy)PARK, this.getClass().getSimpleName());
        IOException exception = new IOException("Test message");
        ControlledTask firstBlockingTask = new ControlledTask();
        ControlledTask secondBlockingTask = new ControlledTask();
        executor.submit((Task)firstBlockingTask);
        executor.submit((Task)secondBlockingTask);
        firstBlockingTask.latch.waitForAllToStart();
        secondBlockingTask.latch.waitForAllToStart();
        FailingTask failingTask = new FailingTask(exception);
        executor.submit((Task)failingTask);
        ControlledTask thirdBlockingTask = new ControlledTask();
        executor.submit((Task)thirdBlockingTask);
        firstBlockingTask.latch.finish();
        failingTask.latch.await();
        failingTask.latch.release();
        this.assertExceptionOnSubmit((TaskExecutor<Void>)executor, exception);
        executor.close();
        secondBlockingTask.latch.finish();
    }

    @Test
    public void shouldSurfaceTaskErrorInAssertHealthy() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 0, 10, (ParkStrategy)PARK, this.getClass().getSimpleName());
        IOException exception = new IOException("Failure");
        FailingTask failingTask = new FailingTask(exception);
        executor.submit((Task)failingTask);
        failingTask.latch.await();
        failingTask.latch.release();
        for (int i = 0; i < 5; ++i) {
            try {
                executor.assertHealthy();
                Thread.sleep(100L);
                continue;
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)Exceptions.contains((Throwable)e, (String)exception.getMessage(), (Class[])new Class[]{exception.getClass()}));
                return;
            }
        }
        Assert.fail((String)"Should not be considered healthy after failing task");
    }

    @Test
    public void shouldLetShutdownCompleteInEventOfPanic() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(2, 0, 10, (ParkStrategy)PARK, this.getClass().getSimpleName());
        IOException exception = new IOException("Failure");
        FailingTask failingTask = new FailingTask(exception);
        executor.submit((Task)failingTask);
        failingTask.latch.await();
        try (OtherThreadExecutor closer = new OtherThreadExecutor("closer", null);){
            Future shutdown = closer.executeDontWait(arg_0 -> DynamicTaskExecutorTest.lambda$shouldLetShutdownCompleteInEventOfPanic$0((TaskExecutor)executor, arg_0));
            while (!closer.waitUntilWaiting().isAt(DynamicTaskExecutor.class, "close")) {
                Thread.sleep(10L);
            }
            failingTask.latch.release();
            shutdown.get();
        }
    }

    @Test
    public void shouldRespectMaxProcessors() throws Exception {
        int maxProcessors = 4;
        DynamicTaskExecutor executor = new DynamicTaskExecutor(1, maxProcessors, 10, (ParkStrategy)PARK, this.getClass().getSimpleName());
        Assert.assertEquals((long)1L, (long)executor.processors(0));
        Assert.assertEquals((long)2L, (long)executor.processors(1));
        Assert.assertEquals((long)4L, (long)executor.processors(3));
        Assert.assertEquals((long)4L, (long)executor.processors(0));
        Assert.assertEquals((long)4L, (long)executor.processors(1));
        Assert.assertEquals((long)3L, (long)executor.processors(-1));
        Assert.assertEquals((long)1L, (long)executor.processors(-2));
        Assert.assertEquals((long)1L, (long)executor.processors(-2));
        Assert.assertEquals((long)1L, (long)executor.processors(0));
        executor.close();
    }

    @RepeatRule.Repeat(times=10)
    @Test
    public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Throwable {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(1, 2, 2, (ParkStrategy)PARK, "test");
        Race race = new Race().withRandomStartDelays();
        race.addContestant(() -> DynamicTaskExecutorTest.lambda$shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown$1((TaskExecutor)executor));
        race.addContestant(() -> DynamicTaskExecutorTest.lambda$shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown$2((TaskExecutor)executor));
        race.go(10L, TimeUnit.SECONDS);
    }

    @Test
    public void shouldNoticeBadHealthBeforeBeingClosed() throws Exception {
        DynamicTaskExecutor executor = new DynamicTaskExecutor(1, 2, 2, (ParkStrategy)PARK, "test");
        RuntimeException panic = new RuntimeException("My failure");
        executor.receivePanic((Throwable)panic);
        try {
            executor.assertHealthy();
            Assert.fail((String)"Should have failed");
        }
        catch (TaskExecutionPanicException e) {
            Assert.assertSame((Object)panic, (Object)e.getCause());
        }
        executor.close();
        try {
            executor.assertHealthy();
            Assert.fail((String)"Should have failed");
        }
        catch (TaskExecutionPanicException e) {
            Assert.assertSame((Object)panic, (Object)e.getCause());
        }
    }

    private void assertExceptionOnSubmit(TaskExecutor<Void> executor, IOException exception) {
        Throwable submitException = null;
        for (int i = 0; i < 5 && submitException == null; ++i) {
            try {
                executor.submit((Task)new EmptyTask());
                Thread.sleep(100L);
                continue;
            }
            catch (Exception e) {
                submitException = e;
            }
        }
        Assert.assertNotNull(submitException);
        Assert.assertEquals((Object)exception, (Object)submitException.getCause());
    }

    private static /* synthetic */ void lambda$shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown$2(TaskExecutor executor) {
        executor.processors(1);
    }

    private static /* synthetic */ void lambda$shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown$1(TaskExecutor executor) {
        executor.close();
    }

    private static /* synthetic */ Void lambda$shouldLetShutdownCompleteInEventOfPanic$0(TaskExecutor executor, Void state) throws Exception {
        executor.close();
        return null;
    }

    private static class ControlledTask
    extends TestTask {
        private final DoubleLatch latch = new DoubleLatch();

        private ControlledTask() {
        }

        @Override
        public void run(Void nothing) {
            this.latch.startAndWaitForAllToStartAndFinish();
            super.run(nothing);
        }
    }

    private static class ExpensiveTask
    extends TestTask {
        private final int millis;

        ExpensiveTask(int millis) {
            this.millis = millis;
        }

        @Override
        public void run(Void nothing) {
            try {
                Thread.sleep(this.millis);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            super.run(nothing);
        }
    }

    private static class FailingTask
    implements Task<Void> {
        private final Exception exception;
        private final Barrier.Control latch = new Barrier.Control();

        FailingTask(Exception exception) {
            this.exception = exception;
        }

        public void run(Void nothing) throws Exception {
            try {
                throw this.exception;
            }
            catch (Throwable throwable) {
                this.latch.reached();
                throw throwable;
            }
        }
    }

    private static class EmptyTask
    implements Task<Void> {
        private EmptyTask() {
        }

        public void run(Void nothing) throws Exception {
        }
    }

    private static class TestTask
    implements Task<Void> {
        protected volatile int executed;

        private TestTask() {
        }

        public void run(Void nothing) {
            ++this.executed;
        }
    }
}

