/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.context;

import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.test.core.AsyncTestBase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class TaskQueueTest
extends AsyncTestBase {
    private TaskQueue taskQueue;
    private Executor executor;
    private List<Thread> threads = Collections.synchronizedList(new ArrayList());

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.taskQueue = new TaskQueue();
        AtomicInteger idx = new AtomicInteger();
        this.executor = cmd -> new Thread(cmd, "vert.x-" + idx.getAndIncrement()).start();
    }

    @Override
    protected void tearDown() throws Exception {
        try {
            for (int i = 0; i < this.threads.size(); ++i) {
                this.threads.get(i).join();
            }
        }
        finally {
            this.threads.clear();
        }
        super.tearDown();
    }

    private void suspendAndAwaitResume(CountDownLatch suspend) {
        try {
            suspend.await();
        }
        catch (InterruptedException e) {
            this.fail(e);
        }
    }

    @Test
    public void testCreateThread() throws Exception {
        AtomicReference thread = new AtomicReference();
        this.taskQueue.execute(() -> thread.set(Thread.currentThread()), this.executor);
        TaskQueueTest.waitUntil(() -> thread.get() != null);
        Thread.sleep(10L);
        this.taskQueue.execute(() -> {
            this.assertNotSame(thread.get(), Thread.currentThread());
            this.testComplete();
        }, this.executor);
        this.await();
    }

    @Test
    public void testAwaitSchedulesOnNewThread() {
        this.taskQueue.execute(() -> {
            Thread current = Thread.currentThread();
            this.taskQueue.execute(() -> {
                this.assertNotSame(current, Thread.currentThread());
                this.testComplete();
            }, this.executor);
            CountDownLatch suspend = this.taskQueue.current().trySuspend();
            this.suspendAndAwaitResume(suspend);
        }, this.executor);
        this.await();
    }

    @Test
    public void testResumeFromAnotherThread() {
        this.taskQueue.execute(() -> {
            WorkerExecutor.Execution execution = this.taskQueue.current();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                execution.resume();
            }).start();
            CountDownLatch suspend = execution.trySuspend();
            this.suspendAndAwaitResume(suspend);
            this.testComplete();
        }, this.executor);
        this.await();
    }

    @Test
    public void testResumeFromContextThread() {
        this.taskQueue.execute(() -> {
            WorkerExecutor.Execution execution = this.taskQueue.current();
            CountDownLatch suspend = execution.trySuspend();
            this.taskQueue.execute(() -> {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                execution.resume();
            }, this.executor);
            this.suspendAndAwaitResume(suspend);
            this.testComplete();
        }, this.executor);
        this.await();
    }

    @Test
    public void testResumeWhenIdle() {
        this.taskQueue.execute(() -> {
            AtomicReference ref = new AtomicReference();
            WorkerExecutor.Execution execution = this.taskQueue.current();
            new Thread(() -> {
                Thread th;
                while ((th = (Thread)ref.get()) == null) {
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                try {
                    th.join(2000L);
                }
                catch (InterruptedException ignore) {
                    ignore.printStackTrace(System.out);
                }
                execution.resume();
            }).start();
            CountDownLatch cond = execution.trySuspend();
            this.taskQueue.execute(() -> ref.set(Thread.currentThread()), this.executor);
            this.suspendAndAwaitResume(cond);
            this.testComplete();
        }, this.executor);
        this.await();
    }

    @Test
    public void testUnscheduleRace2() {
        AtomicInteger seq = new AtomicInteger();
        this.taskQueue.execute(() -> {
            this.assertEquals("vert.x-0", Thread.currentThread().getName());
            CompletableFuture cf = new CompletableFuture();
            this.taskQueue.execute(() -> {
                this.assertEquals("vert.x-0", Thread.currentThread().getName());
                this.assertEquals(0L, seq.getAndIncrement());
                WorkerExecutor.Execution execution = this.taskQueue.current();
                cf.whenComplete((v, e) -> execution.resume(() -> {
                    this.assertEquals("vert.x-1", Thread.currentThread().getName());
                    this.assertEquals(2L, seq.getAndIncrement());
                }));
                this.suspendAndAwaitResume(execution.trySuspend());
                this.assertEquals(3L, seq.getAndIncrement());
            }, this.executor);
            AtomicBoolean enqueued = new AtomicBoolean();
            this.taskQueue.execute(() -> {
                this.assertEquals("vert.x-1", Thread.currentThread().getName());
                this.assertEquals(1L, seq.getAndIncrement());
                while (!enqueued.get()) {
                }
                cf.complete(null);
            }, this.executor);
            this.taskQueue.execute(() -> {
                this.assertEquals("vert.x-0", Thread.currentThread().getName());
                this.assertEquals(4L, seq.getAndIncrement());
                this.testComplete();
            }, this.executor);
            enqueued.set(true);
        }, this.executor);
        this.await();
    }

    @Test
    public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
        Executor executorThatAlwaysThrowsRejectedExceptions = command -> {
            throw new RejectedExecutionException();
        };
        TaskQueue taskQueue = new TaskQueue();
        Assertions.assertThatThrownBy(() -> taskQueue.execute(this::fail, executorThatAlwaysThrowsRejectedExceptions)).isInstanceOf(RejectedExecutionException.class);
        Assertions.assertThat((boolean)taskQueue.isEmpty()).isTrue();
    }

    @Test
    public void testCloseSuspendedTasks() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor executor = pending::add;
        Runnable task = () -> {
            CountDownLatch latch = taskQueue.current().trySuspend();
        };
        taskQueue.execute(task, executor);
        this.assertEquals(1L, pending.size());
        ((Runnable)pending.pop()).run();
        TaskQueue.CloseResult result = taskQueue.close();
        this.assertEquals(1L, result.suspendedTasks().size());
        this.assertEquals(1L, result.suspendedThreads().size());
        this.assertSame(task, result.suspendedTasks().get(0));
    }

    @Test
    public void testCloseResumingTasks() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor executor = pending::add;
        AtomicReference ref = new AtomicReference();
        Runnable task = () -> {
            WorkerExecutor.Execution t = taskQueue.current();
            ref.set(t);
            t.trySuspend();
        };
        taskQueue.execute(task, executor);
        this.assertEquals(1L, pending.size());
        taskQueue.execute(() -> {}, command -> {});
        ((Runnable)pending.pop()).run();
        ((WorkerExecutor.Execution)ref.get()).resume();
        TaskQueue.CloseResult result = taskQueue.close();
        this.assertEquals(1L, result.suspendedTasks().size());
        this.assertEquals(1L, result.suspendedThreads().size());
        this.assertSame(task, result.suspendedTasks().get(0));
    }

    @Test
    public void testCloseBeforeSuspend() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor exec = pending::add;
        AtomicReference result = new AtomicReference();
        taskQueue.execute(() -> {
            Thread th = new Thread(() -> result.set(taskQueue.close()));
            th.start();
            try {
                th.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            WorkerExecutor.Execution execution = taskQueue.current();
            CountDownLatch cont = execution.trySuspend();
            this.assertNull(cont);
        }, exec);
        Runnable t = (Runnable)pending.pop();
        t.run();
        this.assertTrue(((TaskQueue.CloseResult)result.get()).suspendedThreads().isEmpty());
        this.assertNotNull(((TaskQueue.CloseResult)result.get()).activeThread());
    }

    @Test
    public void testCloseBeforeResumeExecution() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor exec = pending::add;
        taskQueue.execute(() -> {
            WorkerExecutor.Execution execution = taskQueue.current();
            execution.resume();
            this.assertNull(execution.trySuspend());
            taskQueue.close();
        }, exec);
        Runnable t = (Runnable)pending.pop();
        t.run();
        this.assertEquals(0L, pending.size());
    }

    @Test
    public void testCloseBetweenSuspendAndAwait() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor exec = pending::add;
        AtomicBoolean interrupted = new AtomicBoolean();
        taskQueue.execute(() -> {
            WorkerExecutor.Execution execution = taskQueue.current();
            CountDownLatch latch = execution.trySuspend();
            AtomicBoolean closed = new AtomicBoolean();
            Thread th = new Thread(() -> {
                TaskQueue.CloseResult res = taskQueue.close();
                ((Thread)res.suspendedThreads().get(0)).interrupt();
                closed.set(true);
            });
            th.start();
            while (!closed.get()) {
                Thread.yield();
            }
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                interrupted.set(true);
            }
        }, exec);
        Runnable t = (Runnable)pending.pop();
        t.run();
        this.assertTrue(interrupted.get());
    }

    @Test
    public void testSubmitAfterClose() {
        TaskQueue taskQueue = new TaskQueue();
        taskQueue.close();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor exec = pending::add;
        taskQueue.execute(() -> {}, exec);
        this.assertEquals(1L, pending.size());
    }

    @Test
    public void testSuspendAfterResume() {
        AtomicInteger seq = new AtomicInteger();
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque pending = new ConcurrentLinkedDeque();
        Executor exec = pending::add;
        taskQueue.execute(() -> {
            this.assertEquals(0L, seq.getAndIncrement());
            taskQueue.execute(() -> this.assertEquals(2L, seq.getAndIncrement()), this.executor);
            WorkerExecutor.Execution execution = taskQueue.current();
            this.assertEquals(1L, seq.getAndIncrement());
            execution.resume();
            CountDownLatch latch = execution.trySuspend();
            this.assertNull(latch);
            try {
                Thread.sleep(5L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.fail();
            }
            this.assertEquals(2L, seq.get());
        }, exec);
        ((Runnable)pending.poll()).run();
        this.assertTrue(seq.get() >= 2);
    }
}

