/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.test.core.VertxTestBase;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class MessageQueueOnWorkerThreadTest
extends VertxTestBase {
    private Vertx vertx;

    @Override
    public void setUp() throws Exception {
        super.setUp();
        CustomNodeSelector selector = new CustomNodeSelector();
        VertxBuilder factory = new VertxBuilder().init().clusterNodeSelector((NodeSelector)selector);
        Promise promise = Promise.promise();
        factory.clusteredVertx((Handler)promise);
        this.vertx = (Vertx)promise.future().toCompletionStage().toCompletableFuture().get();
    }

    @Test
    public void testWorkerContext() throws Exception {
        this.test(true);
    }

    @Test
    public void testExecuteBlocking() throws Exception {
        this.test(false);
    }

    private void test(boolean worker) throws Exception {
        int senderInstances = 20;
        int messagesToSend = 100;
        int expected = senderInstances * messagesToSend;
        this.waitFor(expected);
        this.vertx.eventBus().consumer("foo", msg -> this.complete()).completionHandler(this.onSuccess(registered -> {
            DeploymentOptions options = new DeploymentOptions().setWorker(worker).setInstances(senderInstances);
            this.vertx.deployVerticle(() -> new SenderVerticle(worker, messagesToSend), options);
        }));
        this.await(5L, TimeUnit.SECONDS);
    }

    @Override
    protected void tearDown() throws Exception {
        try {
            if (this.vertx != null) {
                this.closeClustered(Collections.singletonList(this.vertx));
            }
        }
        finally {
            super.tearDown();
        }
    }

    private static class SenderVerticle
    extends AbstractVerticle {
        final boolean worker;
        int count;

        SenderVerticle(boolean worker, int count) {
            this.worker = worker;
            this.count = count;
        }

        public void start() throws Exception {
            this.sendMessage();
        }

        void sendMessage() {
            if (this.worker) {
                this.vertx.executeBlocking(prom -> {
                    if (this.count > 0) {
                        this.vertx.eventBus().send("foo", (Object)"bar");
                        --this.count;
                        prom.complete();
                    }
                }, ar -> this.vertx.runOnContext(v -> this.sendMessage()));
            } else if (this.count > 0) {
                this.vertx.eventBus().send("foo", (Object)"bar");
                --this.count;
                this.vertx.runOnContext(v -> this.sendMessage());
            }
        }
    }

    private static class CustomNodeSelector
    implements NodeSelector {
        ClusterManager clusterManager;
        String nodeId;

        private CustomNodeSelector() {
        }

        public void init(Vertx vertx, ClusterManager clusterManager) {
            this.clusterManager = clusterManager;
        }

        public void eventBusStarted() {
            this.nodeId = this.clusterManager.getNodeId();
        }

        public void selectForSend(Message<?> message, Promise<String> promise) {
            try {
                TimeUnit.NANOSECONDS.sleep(150L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            promise.tryComplete((Object)this.nodeId);
        }

        public void selectForPublish(Message<?> message, Promise<Iterable<String>> promise) {
            throw new UnsupportedOperationException();
        }

        public void registrationsUpdated(RegistrationUpdateEvent event) {
        }

        public void registrationsLost() {
        }
    }
}

