/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Completable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.impl.clustered.NodeSelector;
import io.vertx.core.impl.VertxBootstrapImpl;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.ClusteredNode;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class MessageQueueOnWorkerThreadTest
extends VertxTestBase {
    private Vertx vertx;

    @Override
    public void setUp() throws Exception {
        super.setUp();
        CustomNodeSelector selector = new CustomNodeSelector();
        VertxBootstrapImpl factory = new VertxBootstrapImpl().init().clusterManager((ClusterManager)new FakeClusterManager()).clusterNodeSelector((NodeSelector)selector);
        Future fut = factory.clusteredVertx();
        this.vertx = (Vertx)fut.await();
    }

    @Test
    public void testWorkerContext() throws Exception {
        this.test(true);
    }

    @Test
    public void testExecuteBlocking() throws Exception {
        this.test(false);
    }

    private void test(boolean worker) throws Exception {
        int senderInstances = 20;
        int messagesToSend = 100;
        int expected = senderInstances * messagesToSend;
        this.waitFor(expected);
        this.vertx.eventBus().consumer("foo", msg -> this.complete()).completion().onComplete(this.onSuccess(registered -> {
            DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setInstances(senderInstances);
            this.vertx.deployVerticle(() -> new SenderVerticle(worker, messagesToSend), options);
        }));
        this.await(5L, TimeUnit.SECONDS);
    }

    @Override
    protected void tearDown() throws Exception {
        try {
            if (this.vertx != null) {
                this.close(Collections.singletonList(this.vertx));
            }
        }
        finally {
            super.tearDown();
        }
    }

    private class SenderVerticle
    extends AbstractVerticle {
        final boolean worker;
        int count;

        SenderVerticle(boolean worker, int count) {
            this.worker = worker;
            this.count = count;
        }

        public void start() {
            this.sendMessage();
        }

        void sendMessage() {
            if (this.worker) {
                this.vertx.executeBlocking(() -> {
                    if (this.count > 0) {
                        this.vertx.eventBus().send("foo", (Object)"bar");
                        --this.count;
                        return true;
                    }
                    return false;
                }).onComplete(MessageQueueOnWorkerThreadTest.this.onSuccess(cont -> {
                    if (cont.booleanValue()) {
                        this.vertx.runOnContext(v -> this.sendMessage());
                    }
                }));
            } else if (this.count > 0) {
                this.vertx.eventBus().send("foo", (Object)"bar");
                --this.count;
                this.vertx.runOnContext(v -> this.sendMessage());
            }
        }
    }

    private static class CustomNodeSelector
    implements NodeSelector {
        ClusteredNode clusterManager;
        String nodeId;

        private CustomNodeSelector() {
        }

        public void init(ClusteredNode clusterManager) {
            this.clusterManager = clusterManager;
        }

        public void eventBusStarted() {
            this.nodeId = this.clusterManager.getNodeId();
        }

        public void selectForSend(String address, Completable<String> promise) {
            try {
                TimeUnit.NANOSECONDS.sleep(150L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            promise.succeed((Object)this.nodeId);
        }

        public void selectForPublish(String address, Completable<Iterable<String>> promise) {
            throw new UnsupportedOperationException();
        }
    }
}

