/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.worker;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Deployable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.VerticleBase;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.impl.VertxThread;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

public class NamedWorkerPoolTest
extends VertxTestBase {
    @Test
    public void testThread() {
        String poolName = TestUtils.randomAlphaString(10);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        AtomicBoolean onVertxThread = new AtomicBoolean();
        AtomicBoolean onWorkerThread = new AtomicBoolean();
        AtomicBoolean onEventLoopThread = new AtomicBoolean();
        AtomicReference threadName = new AtomicReference();
        worker.executeBlocking(() -> {
            onVertxThread.set(Context.isOnVertxThread());
            onWorkerThread.set(Context.isOnWorkerThread());
            onEventLoopThread.set(Context.isOnEventLoopThread());
            threadName.set(Thread.currentThread().getName());
            return null;
        }).onComplete(ar -> this.testComplete());
        NamedWorkerPoolTest.assertWaitUntil(() -> threadName.get() != null);
        this.assertTrue(onVertxThread.get());
        this.assertTrue(onWorkerThread.get());
        this.assertFalse(onEventLoopThread.get());
        this.assertTrue(((String)threadName.get()).startsWith(poolName + "-"));
    }

    @Test
    public void testOrdered() {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        int num = 1000;
        AtomicReference t = new AtomicReference();
        CountDownLatch submitted = new CountDownLatch(1);
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            for (int i = 0; i < num; ++i) {
                boolean first = i == 0;
                boolean last = i == num - 1;
                worker.executeBlocking(() -> {
                    if (first) {
                        try {
                            this.awaitLatch(submitted);
                        }
                        catch (InterruptedException e) {
                            this.fail(e);
                            return null;
                        }
                        this.assertNull(t.get());
                        t.set(Thread.currentThread());
                    } else {
                        this.assertEquals(t.get(), Thread.currentThread());
                    }
                    this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                    return null;
                }).onComplete(ar -> {
                    if (last) {
                        this.testComplete();
                    }
                });
            }
            submitted.countDown();
        });
        this.await();
    }

    @Test
    public void testUnordered() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int num = 5;
        this.waitFor(num);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName);
        CountDownLatch latch1 = new CountDownLatch(num);
        CountDownLatch latch2 = new CountDownLatch(1);
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            for (int i = 0; i < num; ++i) {
                worker.executeBlocking(() -> {
                    latch1.countDown();
                    try {
                        this.awaitLatch(latch2);
                    }
                    catch (InterruptedException e) {
                        this.fail(e);
                        return null;
                    }
                    this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                    return null;
                }, false).onComplete(ar -> this.complete());
            }
        });
        this.awaitLatch(latch1);
        latch2.countDown();
        this.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUseDifferentExecutorWithSameTaskQueue() {
        int count = 10;
        this.waitFor(count);
        WorkerExecutor[] executors = new WorkerExecutor[count];
        for (int i = 0; i < count; ++i) {
            executors[i] = this.vertx.createSharedWorkerExecutor("vert.x-the-executor");
        }
        this.vertx.runOnContext(v1 -> {
            AtomicReference currentThread = new AtomicReference();
            CountDownLatch latch = new CountDownLatch(1);
            for (int i = 0; i < count; ++i) {
                int val = i;
                executors[i].executeBlocking(() -> {
                    Thread current = Thread.currentThread();
                    if (val == 0) {
                        this.assertNull(currentThread.getAndSet(current));
                        this.awaitLatch(latch);
                    } else {
                        this.assertSame("i = " + val, current, currentThread.get());
                    }
                    return null;
                }, true).onComplete(this.onSuccess(v2 -> this.complete()));
                this.vertx.runOnContext(v -> latch.countDown());
            }
        });
        try {
            this.await();
        }
        finally {
            for (WorkerExecutor exec : executors) {
                exec.close().await();
            }
        }
    }

    @Test
    public void testPoolSize() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int poolSize = 5;
        this.waitFor(poolSize);
        WorkerExecutor worker = this.vertx.createSharedWorkerExecutor(poolName, poolSize);
        CountDownLatch latch1 = new CountDownLatch(poolSize * 100);
        Set names = Collections.synchronizedSet(new HashSet());
        for (int i = 0; i < poolSize * 100; ++i) {
            worker.executeBlocking(() -> {
                names.add(Thread.currentThread().getName());
                latch1.countDown();
                return null;
            }, false);
        }
        this.awaitLatch(latch1);
        this.assertEquals(5L, names.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaxExecuteTime1() {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int poolSize = 5;
        long maxExecuteTime = 60L;
        TimeUnit maxExecuteTimeUnit = TimeUnit.SECONDS;
        try (Vertx vertx = Vertx.vertx((VertxOptions)new VertxOptions().setMaxWorkerExecuteTime(maxExecuteTime).setMaxWorkerExecuteTimeUnit(maxExecuteTimeUnit));){
            this.testMaxExecuteTime(vertx.createSharedWorkerExecutor(poolName, poolSize), maxExecuteTime, maxExecuteTimeUnit);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaxExecuteTime2() {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int poolSize = 5;
        long maxExecuteTime = 60L;
        TimeUnit maxExecuteTimeUnit = TimeUnit.SECONDS;
        try (Vertx vertx = Vertx.vertx((VertxOptions)new VertxOptions().setMaxWorkerExecuteTime(maxExecuteTime).setMaxWorkerExecuteTimeUnit(maxExecuteTimeUnit));){
            this.testMaxExecuteTime(vertx.createSharedWorkerExecutor(poolName, poolSize, maxExecuteTime), maxExecuteTime, maxExecuteTimeUnit);
        }
    }

    @Test
    public void testMaxExecuteTime3() {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        int poolSize = 5;
        long maxExecuteTime = 60L;
        TimeUnit maxExecuteTimeUnit = TimeUnit.SECONDS;
        this.testMaxExecuteTime(this.vertx.createSharedWorkerExecutor(poolName, poolSize, maxExecuteTime, maxExecuteTimeUnit), maxExecuteTime, maxExecuteTimeUnit);
    }

    public void testMaxExecuteTime(WorkerExecutor worker, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) {
        worker.executeBlocking(() -> {
            Thread t = Thread.currentThread();
            this.assertTrue(t instanceof VertxThread);
            VertxThread thread = (VertxThread)t;
            this.assertEquals(maxExecuteTime, thread.maxExecTime());
            this.assertEquals((Object)maxExecuteTimeUnit, (Object)thread.maxExecTimeUnit());
            return null;
        }).onComplete(res -> this.testComplete());
        this.await();
    }

    @Test
    public void testCloseWorkerPool() throws Exception {
        String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        AtomicReference thread = new AtomicReference();
        WorkerExecutor worker1 = this.vertx.createSharedWorkerExecutor(poolName);
        WorkerExecutor worker2 = this.vertx.createSharedWorkerExecutor(poolName);
        worker1.executeBlocking(() -> {
            thread.set(Thread.currentThread());
            return null;
        });
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.get() != null);
        worker1.close();
        this.assertNotSame((Object)((Thread)thread.get()).getState(), (Object)Thread.State.TERMINATED);
        worker2.close();
        NamedWorkerPoolTest.assertWaitUntil(() -> ((Thread)thread.get()).getState() == Thread.State.TERMINATED);
    }

    @Test
    public void testDestroyWorkerPoolWhenVerticleUndeploys() throws Exception {
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        CompletableFuture deploymentIdRef = new CompletableFuture();
        final AtomicReference pool = new AtomicReference();
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){

            public void start() throws Exception {
                pool.set(this.vertx.createSharedWorkerExecutor(poolName));
            }
        }).onComplete(this.onSuccess(deploymentIdRef::complete));
        String deploymentId = (String)deploymentIdRef.get(20L, TimeUnit.SECONDS);
        this.vertx.undeploy(deploymentId).onComplete(this.onSuccess(v -> {
            try {
                ((WorkerExecutor)pool.get()).executeBlocking(() -> {
                    this.fail();
                    return null;
                });
                this.fail();
            }
            catch (RejectedExecutionException ignore) {
                this.testComplete();
            }
        }));
        this.await();
    }

    @Test
    public void testDeployUsingNamedPool() throws Exception {
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        final Promise promise = Promise.promise();
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){

            public void start() {
                this.vertx.runOnContext(v1 -> this.vertx.executeBlocking(() -> {
                    Thread current = Thread.currentThread();
                    NamedWorkerPoolTest.this.assertTrue(Context.isOnVertxThread());
                    NamedWorkerPoolTest.this.assertTrue(Context.isOnWorkerThread());
                    NamedWorkerPoolTest.this.assertFalse(Context.isOnEventLoopThread());
                    NamedWorkerPoolTest.this.assertTrue(current.getName().startsWith(poolName + "-"));
                    return current;
                }).onComplete(NamedWorkerPoolTest.this.onSuccess(current -> this.vertx.undeploy(this.context.deploymentID()).onComplete(NamedWorkerPoolTest.this.onSuccess(v2 -> promise.complete(current))))));
            }
        }, new DeploymentOptions().setWorkerPoolName(poolName));
        Thread thread = (Thread)promise.future().await(20L, TimeUnit.SECONDS);
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.getState() == Thread.State.TERMINATED, 20000L, "Unexpected thread state " + String.valueOf((Object)thread.getState()));
    }

    @Test
    public void testNamedWorkerPoolShouldBeClosedAfterVerticleIsUndeployed() {
        final AtomicReference threadName = new AtomicReference();
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){

            public void start() {
            }

            public void stop() {
                threadName.set(Thread.currentThread().getName());
            }
        }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolName("test-worker")).onComplete(this.onSuccess(id -> this.vertx.undeploy(id).onComplete(this.onSuccess(v -> {
            this.assertNotNull(threadName.get());
            this.assertTrue(((String)threadName.get()).startsWith("test-worker"));
            this.testComplete();
        }))));
        this.await();
    }

    @Test
    public void testDeployUsingNamedWorkerDoesNotCreateExtraEventLoop() {
        int instances = this.getOptions().getEventLoopPoolSize();
        final Set threads = Collections.synchronizedSet(new HashSet());
        this.vertx.deployVerticle(() -> new AbstractVerticle(){

            public void start() {
                threads.add(Thread.currentThread());
            }
        }, new DeploymentOptions().setInstances(instances).setWorkerPoolName("the-pool")).onComplete(this.onSuccess(id -> this.vertx.undeploy(id).onComplete(this.onSuccess(v -> {
            this.assertEquals(instances, threads.size());
            this.testComplete();
        }))));
        this.await();
    }

    @Test
    public void testDeployWorkerUsingNamedPool() throws Exception {
        final AtomicReference thread = new AtomicReference();
        final String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
        String deploymentID = (String)this.vertx.deployVerticle((Deployable)new VerticleBase(){

            public Future<?> start() throws Exception {
                thread.set(Thread.currentThread());
                NamedWorkerPoolTest.this.assertTrue(Context.isOnVertxThread());
                NamedWorkerPoolTest.this.assertTrue(Context.isOnWorkerThread());
                NamedWorkerPoolTest.this.assertFalse(Context.isOnEventLoopThread());
                NamedWorkerPoolTest.this.assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
                return super.start();
            }
        }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolName(poolName)).await();
        this.vertx.undeploy(deploymentID).await();
        NamedWorkerPoolTest.assertWaitUntil(() -> thread.get() != null && ((Thread)thread.get()).getState() == Thread.State.TERMINATED);
    }

    @Test
    public void testCloseWorkerPoolsWhenVertxCloses() {
        this.disableThreadChecks();
        Vertx vertx = Vertx.vertx();
        WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-123");
        vertx.close().onComplete(v -> {
            try {
                vertx.executeBlocking(() -> {
                    this.fail();
                    return null;
                }).onComplete(ar -> this.fail());
                this.fail();
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
            try {
                exec.executeBlocking(() -> {
                    this.fail();
                    return null;
                }).onComplete(ar -> this.fail());
                this.fail();
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
            exec.close();
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testReuseWorkerPoolNameAfterVerticleIsUndeployed() throws Exception {
        CountDownLatch deployLatch1 = new CountDownLatch(1);
        AtomicReference ref = new AtomicReference();
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){

            public void start(Promise<Void> startPromise) {
                this.vertx.executeBlocking(() -> null).onComplete(startPromise);
            }
        }, new DeploymentOptions().setWorkerPoolName("foo")).onComplete(this.onSuccess(id -> {
            ref.set(id);
            deployLatch1.countDown();
        }));
        this.awaitLatch(deployLatch1);
        CountDownLatch unDeployLatch = new CountDownLatch(1);
        this.vertx.undeploy((String)ref.get()).onComplete(this.onSuccess(v -> unDeployLatch.countDown()));
        this.awaitLatch(unDeployLatch);
        CountDownLatch deployLatch2 = new CountDownLatch(1);
        this.vertx.deployVerticle((Deployable)new AbstractVerticle(){

            public void start(Promise<Void> startPromise) {
                this.vertx.executeBlocking(() -> null).onComplete(startPromise);
            }
        }, new DeploymentOptions().setWorkerPoolName("foo")).onComplete(this.onSuccess(id -> deployLatch2.countDown()));
        this.awaitLatch(deployLatch2);
    }
}

