/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.EventBusTestBase;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.test.core.TestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.junit.Test;

public class LocalEventBusTest
extends EventBusTestBase {
    private EventBus eb;
    private boolean running;

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.vertx.close();
        this.vertx = Vertx.vertx();
        this.eb = this.vertx.eventBus();
        this.running = true;
    }

    @Override
    protected void tearDown() throws Exception {
        this.closeVertx();
        super.tearDown();
    }

    private void closeVertx() throws Exception {
        if (this.running) {
            CountDownLatch latch = new CountDownLatch(1);
            this.vertx.close(ar -> {
                this.assertTrue(ar.succeeded());
                latch.countDown();
            });
            this.assertTrue(latch.await(30L, TimeUnit.SECONDS));
            this.running = false;
        }
    }

    @Test
    public void testDeliveryOptions() {
        DeliveryOptions options = new DeliveryOptions();
        TestUtils.assertIllegalArgumentException(() -> options.setSendTimeout(0L));
        TestUtils.assertIllegalArgumentException(() -> options.setSendTimeout(-1L));
        TestUtils.assertNullPointerException(() -> options.addHeader(null, ""));
        TestUtils.assertNullPointerException(() -> options.addHeader("", null));
    }

    @Test
    public void testArgumentValidation() throws Exception {
        TestUtils.assertNullPointerException(() -> this.eb.send(null, (Object)""));
        TestUtils.assertNullPointerException(() -> this.eb.request(null, (Object)"", handler -> {}));
        TestUtils.assertNullPointerException(() -> this.eb.send(null, (Object)"", new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.send("", (Object)"", (DeliveryOptions)null));
        TestUtils.assertNullPointerException(() -> this.eb.request(null, (Object)"", new DeliveryOptions(), handler -> {}));
        TestUtils.assertNullPointerException(() -> this.eb.request("", (Object)"", null, handler -> {}));
        TestUtils.assertNullPointerException(() -> this.eb.publish(null, (Object)""));
        TestUtils.assertNullPointerException(() -> this.eb.publish(null, (Object)"", new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.publish("", (Object)"", null));
        TestUtils.assertNullPointerException(() -> this.eb.consumer(null));
        TestUtils.assertNullPointerException(() -> this.eb.consumer(null, msg -> {}));
        TestUtils.assertNullPointerException(() -> this.eb.consumer("some-address1", null));
        TestUtils.assertNullPointerException(() -> this.eb.localConsumer(null));
        TestUtils.assertNullPointerException(() -> this.eb.localConsumer(null, msg -> {}));
        TestUtils.assertNullPointerException(() -> this.eb.localConsumer("some-address1", null));
        TestUtils.assertNullPointerException(() -> this.eb.sender(null));
        TestUtils.assertNullPointerException(() -> this.eb.sender(null, new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.publisher("", null));
        TestUtils.assertNullPointerException(() -> this.eb.publisher(null, new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.registerCodec(null));
        TestUtils.assertNullPointerException(() -> this.eb.unregisterCodec(null));
        TestUtils.assertNullPointerException(() -> this.eb.registerDefaultCodec(null, (MessageCodec)new EventBusTestBase.MyPOJOEncoder1()));
        TestUtils.assertNullPointerException(() -> this.eb.registerDefaultCodec(Object.class, null));
        TestUtils.assertNullPointerException(() -> this.eb.unregisterDefaultCodec(null));
    }

    @Test
    public void testRegisterUnregister() {
        String str = TestUtils.randomUnicodeString(100);
        Handler handler = msg -> this.fail("Should not receive message");
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler);
        this.assertEquals("some-address1", reg.address());
        reg.unregister();
        this.eb.send("some-address1", (Object)str);
        this.vertx.setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testUnregisterTwice() {
        Handler handler = msg -> {};
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler);
        reg.unregister();
        reg.unregister();
        this.testComplete();
    }

    @Test
    public void testMessageConsumerCloseHookIsClosedCorrectly() {
        Vertx vertx = Vertx.vertx();
        final EventBus eb = vertx.eventBus();
        vertx.deployVerticle((Verticle)new AbstractVerticle(){
            MessageConsumer consumer;

            public void start() throws Exception {
                this.context.exceptionHandler(err -> LocalEventBusTest.this.fail("Unexpected exception"));
                this.consumer = eb.consumer("some-address1").handler(msg -> {});
            }
        }, this.onSuccess(deploymentID -> vertx.undeploy(deploymentID, this.onSuccess(v -> vertx.setTimer(10L, id -> this.testComplete())))));
        this.await();
    }

    @Test
    public void testRegisterLocal1() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.localConsumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterLocal2() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.localConsumer("some-address1", msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterWithCompletionHandler() {
        String str = TestUtils.randomUnicodeString(100);
        MessageConsumer reg = this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testSendRoundRobin() {
        int i;
        String str = TestUtils.randomUnicodeString(100);
        int numHandlers = 10;
        int numMessages = 100;
        Handler[] handlers = new Handler[numHandlers];
        ConcurrentHashMap countMap = new ConcurrentHashMap();
        AtomicInteger totalCount = new AtomicInteger();
        for (i = 0; i < numHandlers; ++i) {
            int index = i;
            handlers[i] = msg -> {
                this.assertEquals(str, msg.body());
                Integer cnt = (Integer)countMap.get(handlers[index]);
                int icnt = cnt == null ? 0 : cnt;
                countMap.put(handlers[index], ++icnt);
                if (totalCount.incrementAndGet() == numMessages) {
                    this.assertEquals(numHandlers, countMap.size());
                    for (Integer ind : countMap.values()) {
                        this.assertEquals(numMessages / numHandlers, ind.intValue());
                    }
                    this.testComplete();
                }
            };
            this.eb.consumer("some-address1").handler(handlers[i]);
        }
        for (i = 0; i < numMessages; ++i) {
            this.eb.send("some-address1", (Object)str);
        }
        this.await();
    }

    @Test
    public void testSendRegisterSomeUnregisterOne() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger totalCount = new AtomicInteger();
        Handler handler1 = msg -> this.fail("Should not receive message");
        Handler handler2 = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        Handler handler3 = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler1);
        this.eb.consumer("some-address1").handler(handler2);
        this.eb.consumer("some-address1").handler(handler3);
        reg.unregister();
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testSendRegisterSameHandlerMultipleTimes() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger totalCount = new AtomicInteger();
        Handler handler = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 3) {
                this.testComplete();
            }
        };
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testSendWithNoHandler() {
        this.eb.send("some-address1", (Object)TestUtils.randomUnicodeString(100));
        this.vertx.setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testSendMultipleAddresses() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger cnt = new AtomicInteger();
        this.eb.consumer("some-address1").handler(msg -> this.fail("Should not receive message"));
        this.eb.consumer("some-address2").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (cnt.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.send("some-address2", (Object)str);
        this.eb.send("some-address2", (Object)str);
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutNoReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {});
        this.await();
    }

    @Test
    public void testSendWithReply() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)reply);
        });
        this.eb.request("some-address1", (Object)str, this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testReplyToReply() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        String replyReply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.replyAndRequest((Object)reply, this.onSuccess(rep -> {
                this.assertEquals(replyReply, rep.body());
                this.testComplete();
            }));
        });
        this.eb.request("some-address1", (Object)str, this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            msg.reply((Object)replyReply);
        }));
        this.await();
    }

    @Test
    public void testSendReplyWithTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            long start = System.currentTimeMillis();
            long timeout = 1000L;
            msg.replyAndRequest((Object)reply, new DeliveryOptions().setSendTimeout(timeout), ar -> {
                long now = System.currentTimeMillis();
                this.assertFalse(ar.succeeded());
                Throwable cause = ar.cause();
                this.assertTrue(cause instanceof ReplyException);
                ReplyException re = (ReplyException)cause;
                this.assertEquals(-1L, re.failureCode());
                this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
                this.assertTrue(now - start >= timeout);
                this.testComplete();
            });
        });
        this.eb.request("some-address1", (Object)str, this.onSuccess(msg -> this.assertEquals(reply, msg.body())));
        this.await();
    }

    @Test
    public void testSendReplyWithTimeoutNoTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        String replyReply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            long timeout = 1000L;
            msg.replyAndRequest((Object)reply, new DeliveryOptions().setSendTimeout(timeout), ar -> {
                this.assertTrue(ar.succeeded());
                this.assertEquals(replyReply, ((Message)ar.result()).body());
                this.testComplete();
            });
        });
        this.eb.request("some-address1", (Object)str, this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            msg.reply((Object)replyReply);
        }));
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)23);
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            this.assertTrue(ar.succeeded());
            this.assertEquals(23L, ((Integer)((Message)ar.result()).body()).intValue());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> this.assertEquals(str, msg.body()));
        long timeout = 1000L;
        long start = System.currentTimeMillis();
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            long now = System.currentTimeMillis();
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
            this.assertTrue(now - start >= timeout);
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoHandlers() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.NO_HANDLERS, re.failureType());
            this.assertEquals("No handlers for address some-address1", re.getMessage());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutRecipientFailure() {
        String str = TestUtils.randomUnicodeString(1000);
        String failureMsg = TestUtils.randomUnicodeString(1000);
        int failureCode = 123;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.fail(failureCode, failureMsg);
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(failureCode, re.failureCode());
            this.assertEquals(failureMsg, re.getMessage());
            this.assertEquals(ReplyFailure.RECIPIENT_FAILURE, re.failureType());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutReplyAfterTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.vertx.setTimer((long)((int)((double)timeout * 1.5)), id -> msg.reply((Object)"too late!"));
        });
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutAfterReply() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)"a reply");
        });
        AtomicBoolean received = new AtomicBoolean();
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout), ar -> {
            this.assertFalse(received.get());
            this.assertTrue(ar.succeeded());
            received.set(true);
            this.vertx.setTimer(timeout * 2L, tid -> this.testComplete());
        });
        this.await();
    }

    @Test
    public void testReplyToSendWithNoReplyHandler() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.reply((Object)"a reply");
            this.testComplete();
        });
        this.eb.send("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testReplyToPublish() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.reply((Object)"a reply");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testFailAfterSend() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.fail(0, "a failure");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testFailAfterPublish() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.fail(0, "a failure");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testPublish() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlers() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger count = new AtomicInteger();
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishSameHandlerRegisteredTwice() {
        String str = TestUtils.randomUnicodeString(1000);
        AtomicInteger count = new AtomicInteger();
        Handler handler = msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlersUnregisterOne() {
        String str = TestUtils.randomUnicodeString(1000);
        Handler handler1 = msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        };
        Handler handler2 = msg -> this.fail("Should not be called");
        this.eb.consumer("some-address1").handler(handler1);
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler2);
        reg.unregister();
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlersDifferentAddresses() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        this.eb.consumer("some-address2").handler(msg -> this.fail("Should not receive message"));
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testNonRegisteredCodecType() {
        this.eb.consumer("foo").handler(msg -> this.fail("Should not have gotten here"));
        try {
            class Boom {
                Boom() {
                }
            }
            this.eb.send("foo", (Object)new Boom());
        }
        catch (IllegalArgumentException e) {
            this.testComplete();
        }
        this.await();
    }

    @Test
    public void testCloseEventBus() {
        this.eb.close(ar -> {
            this.assertTrue(ar.succeeded());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testInVerticle() throws Exception {
        this.testInVerticle(false, false);
    }

    @Test
    public void testInWorkerVerticle() throws Exception {
        this.testInVerticle(true, false);
    }

    @Test
    public void testInMultithreadedWorkerVerticle() throws Exception {
        this.testInVerticle(true, true);
    }

    private void testInVerticle(final boolean worker, final boolean multiThreaded) throws Exception {
        class MyVerticle
        extends AbstractVerticle {
            Context ctx;

            MyVerticle() {
            }

            public void start() {
                this.ctx = this.context;
                if (worker) {
                    if (multiThreaded) {
                        LocalEventBusTest.this.assertTrue(this.ctx.isMultiThreadedWorkerContext());
                    } else {
                        LocalEventBusTest.this.assertTrue(this.ctx.isWorkerContext() && !this.ctx.isMultiThreadedWorkerContext());
                    }
                } else {
                    LocalEventBusTest.this.assertTrue(this.ctx instanceof EventLoopContext);
                }
                Thread thr = Thread.currentThread();
                MessageConsumer reg = this.vertx.eventBus().consumer("some-address1").handler(msg -> {
                    LocalEventBusTest.this.assertSame(this.ctx, this.context);
                    if (!worker) {
                        LocalEventBusTest.this.assertSame(thr, Thread.currentThread());
                    }
                    msg.reply((Object)"bar");
                });
                reg.completionHandler(ar -> {
                    LocalEventBusTest.this.assertTrue(ar.succeeded());
                    LocalEventBusTest.this.assertSame(this.ctx, this.context);
                    if (!worker) {
                        LocalEventBusTest.this.assertSame((Object)thr, (Object)Thread.currentThread());
                    }
                    this.vertx.eventBus().request("some-address1", (Object)"foo", LocalEventBusTest.this.onSuccess(reply -> {
                        LocalEventBusTest.this.assertSame(this.ctx, this.context);
                        if (!worker) {
                            LocalEventBusTest.this.assertSame(thr, Thread.currentThread());
                        }
                        LocalEventBusTest.this.assertEquals("bar", reply.body());
                        LocalEventBusTest.this.testComplete();
                    }));
                });
            }
        }
        MyVerticle verticle = new MyVerticle();
        this.vertx.deployVerticle((Verticle)verticle, new DeploymentOptions().setWorker(worker).setMultiThreaded(multiThreaded));
        this.await();
    }

    @Test
    public void testContextsSend() throws Exception {
        ConcurrentHashSet contexts = new ConcurrentHashSet();
        CountDownLatch latch = new CountDownLatch(2);
        this.vertx.eventBus().consumer("some-address1").handler(arg_0 -> this.lambda$testContextsSend$94((Set)contexts, latch, arg_0));
        this.vertx.eventBus().request("some-address1", (Object)"foo", this.onSuccess(arg_0 -> this.lambda$testContextsSend$95((Set)contexts, latch, arg_0)));
        this.awaitLatch(latch);
        this.assertEquals(2L, contexts.size());
    }

    @Test
    public void testContextsPublish() throws Exception {
        ConcurrentHashSet contexts = new ConcurrentHashSet();
        AtomicInteger cnt = new AtomicInteger();
        int numHandlers = 10;
        for (int i = 0; i < numHandlers; ++i) {
            this.vertx.eventBus().consumer("some-address1").handler(arg_0 -> this.lambda$testContextsPublish$96((Set)contexts, cnt, numHandlers, arg_0));
        }
        this.vertx.eventBus().publish("some-address1", (Object)"foo");
        this.await();
    }

    @Test
    public void testHeadersCopiedAfterSend() throws Exception {
        MultiMap headers = MultiMap.caseInsensitiveMultiMap();
        headers.add("foo", "bar");
        this.vertx.eventBus().consumer("some-address1").handler(msg -> {
            this.assertNotSame(headers, msg.headers());
            this.assertEquals("bar", msg.headers().get("foo"));
            this.testComplete();
        });
        this.vertx.eventBus().send("some-address1", (Object)"foo", new DeliveryOptions().setHeaders(headers));
        headers.remove("foo");
        this.await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testNoRegisteredDecoder() throws Exception {
        try {
            this.vertx.eventBus().send("some-address1", (Object)"foo", new DeliveryOptions().setCodecName("iqwjdoqiwd"));
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testRegisterSystemDecoder() throws Exception {
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)new EventBusTestBase.MySystemDecoder());
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testUnregisterDecoder() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        this.vertx.eventBus().unregisterCodec(codec.name());
        try {
            this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo"), new DeliveryOptions().setCodecName(codec.name()));
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testRegisterTwice() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)codec);
            this.fail("Should throw exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testCodecNullName() throws Exception {
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)new EventBusTestBase.NullNameCodec());
            this.fail("Should throw exception");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, null);
    }

    @Test
    public void testNoRegisteredDefaultDecoder() throws Exception {
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo")));
    }

    @Test
    public void testRegisterDefaultSystemDecoder() throws Exception {
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)new EventBusTestBase.MySystemDecoder()));
    }

    @Test
    public void testUnregisterDefaultDecoder() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertx.eventBus().unregisterDefaultCodec(EventBusTestBase.MyPOJO.class);
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo")));
    }

    @Test
    public void testRegisterDefaultTwice() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        TestUtils.assertIllegalStateException(() -> this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, codec));
    }

    @Test
    public void testDefaultCodecNullName() throws Exception {
        TestUtils.assertNullPointerException(() -> this.vertx.eventBus().registerDefaultCodec(String.class, (MessageCodec)new EventBusTestBase.NullNameCodec()));
    }

    @Test
    public void testDefaultCodecReplyExceptionSubclass() throws Exception {
        EventBusTestBase.MyReplyException myReplyException = new EventBusTestBase.MyReplyException(23, "my exception");
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)new EventBusTestBase.MyReplyExceptionMessageCodec());
        this.eb.consumer("some-address1", msg -> {
            this.assertTrue(msg.body() instanceof EventBusTestBase.MyReplyException);
            this.testComplete();
        });
        this.vertx.eventBus().send("some-address1", (Object)myReplyException);
        this.await();
    }

    @Override
    protected <T, R> void testSend(T val, R received, Consumer<T> consumer, DeliveryOptions options) {
        this.eb.consumer("some-address1").handler(msg -> {
            if (consumer == null) {
                this.assertTrue(msg.isSend());
                this.assertEquals(received, msg.body());
                if (options != null && options.getHeaders() != null) {
                    this.assertNotNull(msg.headers());
                    this.assertEquals(options.getHeaders().size(), msg.headers().size());
                    for (Map.Entry entry : options.getHeaders().entries()) {
                        this.assertEquals(msg.headers().get((String)entry.getKey()), entry.getValue());
                    }
                }
            } else {
                consumer.accept(msg.body());
            }
            this.testComplete();
        });
        if (options != null) {
            this.eb.send("some-address1", val, options);
        } else {
            this.eb.send("some-address1", val);
        }
        this.await();
    }

    @Override
    protected <T> void testSend(T val, Consumer<T> consumer) {
        this.testSend(val, val, consumer, null);
    }

    @Override
    protected <T> void testReply(T val, Consumer<T> consumer) {
        this.testReply(val, val, consumer, null);
    }

    @Override
    protected <T, R> void testReply(T val, R received, Consumer<R> consumer, DeliveryOptions options) {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (options != null) {
                msg.reply(val, options);
            } else {
                msg.reply(val);
            }
        });
        this.eb.request("some-address1", (Object)str, this.onSuccess(reply -> {
            if (consumer == null) {
                this.assertTrue(reply.isSend());
                this.assertEquals(received, reply.body());
                if (options != null && options.getHeaders() != null) {
                    this.assertNotNull(reply.headers());
                    this.assertEquals(options.getHeaders().size(), reply.headers().size());
                    for (Map.Entry entry : options.getHeaders().entries()) {
                        this.assertEquals(reply.headers().get((String)entry.getKey()), entry.getValue());
                    }
                }
            } else {
                consumer.accept(reply.body());
            }
            this.testComplete();
        }));
        this.await();
    }

    @Override
    protected <T> void testPublish(final T val, final Consumer<T> consumer) {
        final AtomicInteger count = new AtomicInteger();
        class MyHandler
        implements Handler<Message<T>> {
            MyHandler() {
            }

            public void handle(Message<T> msg) {
                if (consumer == null) {
                    LocalEventBusTest.this.assertFalse(msg.isSend());
                    LocalEventBusTest.this.assertEquals(val, msg.body());
                } else {
                    consumer.accept(msg.body());
                }
                if (count.incrementAndGet() == 2) {
                    LocalEventBusTest.this.testComplete();
                }
            }
        }
        this.eb.consumer("some-address1").handler((Handler)new MyHandler());
        this.eb.consumer("some-address1").handler((Handler)new MyHandler());
        this.eb.publish("some-address1", val);
        this.await();
    }

    @Test
    public void testPauseResumeMessageStream() {
        this.testPauseResume((consumer, handler) -> consumer.handler(message -> handler.handle(message.body())));
    }

    @Test
    public void testPauseResumeBodyStream() {
        this.testPauseResume((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testPauseResume(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> register) {
        String[] data = new String[11];
        for (int i = 0; i < data.length; ++i) {
            data[i] = TestUtils.randomAlphaString(10);
        }
        HashSet expected = new HashSet();
        Handler handler = body -> {
            this.assertTrue("Was expecting " + expected + " to contain " + body, expected.remove(body));
            if (expected.isEmpty()) {
                this.testComplete();
            }
        };
        MessageConsumer reg = this.eb.consumer("some-address1").setMaxBufferedMessages(10);
        ReadStream<?> controller = register.apply((MessageConsumer<String>)reg, (Handler<String>)handler);
        ((HandlerRegistration)reg).discardHandler(msg -> {
            this.assertEquals(data[10], msg.body());
            expected.addAll(Arrays.asList(data).subList(0, 10));
            controller.resume();
        });
        controller.pause();
        for (String msg2 : data) {
            this.eb.publish("some-address1", (Object)msg2);
        }
        this.await();
    }

    @Test
    public void testPauseFetchMessageStream() throws Exception {
        this.testPauseFetch((consumer, handler) -> consumer.handler(message -> handler.handle(message.body())));
    }

    @Test
    public void testPauseFetchBodyStream() throws Exception {
        this.testPauseFetch((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testPauseFetch(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> streamSupplier) throws Exception {
        ArrayList<String> data = new ArrayList<String>();
        for (int i = 0; i < 11; ++i) {
            data.add(TestUtils.randomAlphaString(10));
        }
        List received = Collections.synchronizedList(new ArrayList());
        CountDownLatch receiveLatch = new CountDownLatch(4);
        HandlerRegistration consumer = (HandlerRegistration)this.eb.consumer("some-address1").setMaxBufferedMessages(5);
        streamSupplier.apply((MessageConsumer<String>)consumer, (Handler<String>)((Handler)e -> {
            received.add(e);
            receiveLatch.countDown();
        })).pause().fetch(4L);
        List discarded = Collections.synchronizedList(new ArrayList());
        CountDownLatch discardLatch = new CountDownLatch(2);
        consumer.discardHandler(msg -> {
            discarded.add(msg.body());
            discardLatch.countDown();
        });
        ListIterator iterator = data.listIterator();
        while (iterator.nextIndex() < 4) {
            this.eb.publish("some-address1", iterator.next());
        }
        this.awaitLatch(receiveLatch);
        while (iterator.hasNext()) {
            this.eb.publish("some-address1", iterator.next());
        }
        this.awaitLatch(discardLatch);
        this.assertEquals(data.subList(0, 4), received);
        this.assertEquals(data.subList(data.size() - 2, data.size()), discarded);
    }

    @Test
    public void testSetMaxBufferedMessageDropsMessages() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.handler(msg -> {
            consumer.pause();
            Context ctx = this.vertx.getOrCreateContext();
            this.vertx.setTimer(20L, v -> {
                AtomicInteger count = new AtomicInteger(1);
                ((HandlerRegistration)consumer).discardHandler(discarded -> {
                    int val = (Integer)discarded.body();
                    this.assertEquals(count.getAndIncrement(), val);
                    if (val == 9) {
                        this.testComplete();
                    }
                });
                consumer.setMaxBufferedMessages(10);
            });
        });
        this.vertx.runOnContext(v -> {
            for (int i = 0; i < 20; ++i) {
                this.eb.send("some-address1", (Object)i);
            }
        });
        this.await();
    }

    @Test
    public void testExceptionWhenDeliveringBufferedMessageWithMessageStream() {
        this.testExceptionWhenDeliveringBufferedMessage((consumer, handler) -> consumer.handler(message -> handler.handle(message.body())));
    }

    @Test
    public void testExceptionWhenDeliveringBufferedMessageWithBodyStream() {
        this.testExceptionWhenDeliveringBufferedMessage((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testExceptionWhenDeliveringBufferedMessage(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> register) {
        String[] data = new String[11];
        for (int i = 0; i < data.length; ++i) {
            data[i] = TestUtils.randomAlphaString(10);
        }
        HashSet expected = new HashSet();
        Handler handler = body -> {
            this.assertTrue("Was expecting " + expected + " to contain " + body, expected.remove(body));
            if (!expected.isEmpty()) {
                throw new RuntimeException();
            }
            this.testComplete();
        };
        MessageConsumer reg = this.eb.consumer("some-address1").setMaxBufferedMessages(10);
        ReadStream<?> controller = register.apply((MessageConsumer<String>)reg, (Handler<String>)handler);
        ((HandlerRegistration)reg).discardHandler(msg -> {
            this.assertEquals(data[10], msg.body());
            expected.addAll(Arrays.asList(data).subList(0, 10));
            controller.resume();
        });
        controller.pause();
        for (String msg2 : data) {
            this.eb.publish("some-address1", (Object)msg2);
        }
        this.await();
    }

    @Test
    public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithMessageStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregisterationOfRegisteredConsumerCallsEndHandler((MessageConsumer<String>)consumer, (ReadStream<?>)consumer);
    }

    @Test
    public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithBodyStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregisterationOfRegisteredConsumerCallsEndHandler((MessageConsumer<String>)consumer, consumer.bodyStream());
    }

    private void testUnregisterationOfRegisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
        consumer.handler(msg -> {});
        consumer.endHandler(v -> this.testComplete());
        consumer.unregister();
        this.await();
    }

    @Test
    public void testUnregisterThenUnsetEndHandler() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.endHandler(v -> {});
        consumer.unregister(res -> this.testComplete());
        consumer.endHandler(null);
        this.await();
    }

    @Test
    public void testUnregistrationWhenSettingNullHandlerWithConsumer() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregistrationWhenSettingNullHandler((MessageConsumer<String>)consumer, (ReadStream<?>)consumer);
    }

    @Test
    public void testUnregistrationWhenSettingNullHandlerWithBodyStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregistrationWhenSettingNullHandler((MessageConsumer<String>)consumer, consumer.bodyStream());
    }

    private void testUnregistrationWhenSettingNullHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
        readStream.handler(msg -> {});
        this.assertTrue(consumer.isRegistered());
        readStream.handler(null);
        this.assertFalse(consumer.isRegistered());
    }

    @Test
    public void testSender() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer sender = this.eb.sender("some-address1");
        this.eb.consumer("some-address1").handler(message -> {
            if (message.body().equals(str)) {
                this.testComplete();
            }
        });
        sender.write((Object)str);
        this.await();
    }

    @Test
    public void testSenderWithOptions() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer sender = this.eb.sender("some-address1", new DeliveryOptions().addHeader("foo", "foo_value"));
        this.eb.consumer("some-address1").handler(message -> {
            if (message.body().equals(str) && "foo_value".equals(message.headers().get("foo"))) {
                this.testComplete();
            }
        });
        sender.write((Object)str);
        this.await();
    }

    @Test
    public void testPublisher() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer publisher = this.eb.publisher("some-address1");
        this.assertEquals("some-address1", publisher.address());
        AtomicInteger count = new AtomicInteger();
        int n = 2;
        for (int i = 0; i < n; ++i) {
            this.eb.consumer("some-address1").handler(message -> {
                if (message.body().equals(str) && count.incrementAndGet() == n) {
                    this.testComplete();
                }
            });
        }
        publisher.write((Object)str);
        this.await();
    }

    @Test
    public void testPublisherWithOptions() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer publisher = this.eb.publisher("some-address1", new DeliveryOptions().addHeader("foo", "foo_value"));
        this.assertEquals("some-address1", publisher.address());
        AtomicInteger count = new AtomicInteger();
        int n = 2;
        for (int i = 0; i < n; ++i) {
            this.eb.consumer("some-address1").handler(message -> {
                if (message.body().equals(str) && "foo_value".equals(message.headers().get("foo")) && count.incrementAndGet() == n) {
                    this.testComplete();
                }
            });
        }
        publisher.write((Object)str);
        this.await();
    }

    @Test
    public void testCloseSender1() {
        this.eb.sender("some-address1").close();
    }

    @Test
    public void testCloseSender2() {
        this.eb.sender("some-address1").close(null);
    }

    @Test
    public void testClosePublisher1() {
        this.eb.publisher("some-address1").close();
    }

    @Test
    public void testClosePublisher2() {
        this.eb.publisher("some-address1").close(null);
    }

    @Test
    public void testPump() {
        String str = TestUtils.randomUnicodeString(100);
        ReadStream consumer = this.eb.consumer("some-address1").bodyStream();
        consumer.handler(message -> {
            if (message.equals(str)) {
                this.testComplete();
            }
        });
        MessageProducer producer = this.eb.sender("some-address2");
        Pump.pump((ReadStream)consumer, (WriteStream)producer);
        producer.write((Object)str);
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertNull(stack.get());
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.testComplete();
        });
        consumer.handler(msg -> {});
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.handler(msg -> {});
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertNull(stack.get());
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testUpdateDeliveryOptionsOnProducer() {
        MessageProducer producer = this.eb.sender("some-address1");
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.completionHandler(v -> {
            this.assertTrue(v.succeeded());
            producer.write((Object)"no-header");
        });
        consumer.handler(msg -> {
            String body = (String)msg.body();
            this.assertNotNull(body);
            switch (body) {
                case "no-header": {
                    this.assertNull(msg.headers().get("header-name"));
                    producer.deliveryOptions(new DeliveryOptions().addHeader("header-name", "header-value"));
                    producer.write((Object)"with-header");
                    break;
                }
                case "with-header": {
                    this.assertEquals("header-value", msg.headers().get("header-name"));
                    this.testComplete();
                    break;
                }
                default: {
                    this.fail();
                }
            }
        });
        this.await();
    }

    @Test
    public void testCloseCallsEndHandlerWithRegistrationContext() throws Exception {
        Context ctx = this.vertx.getOrCreateContext();
        CountDownLatch registered = new CountDownLatch(1);
        ctx.runOnContext(v1 -> {
            MessageConsumer consumer = this.eb.consumer("some-address1");
            consumer.endHandler(v2 -> {
                this.assertSame(Vertx.currentContext(), ctx);
                this.testComplete();
            });
            consumer.handler(msg -> {});
            consumer.completionHandler(ar -> {
                this.assertTrue(ar.succeeded());
                registered.countDown();
            });
        });
        this.awaitLatch(registered);
        this.closeVertx();
        this.await();
    }

    @Test
    public void testConsumerUnregisterDoesNotCancelTimer0() throws Exception {
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            this.vertx.setTimer(50L, id -> {
                this.assertEquals(0L, (long)id);
                this.testComplete();
            });
            this.eb.consumer("some-address1").unregister();
        });
        this.await();
    }

    @Test
    public void testMTWorkerConsumer() {
        int num = 3;
        this.waitFor(num);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() {
                CyclicBarrier barrier = new CyclicBarrier(3);
                this.vertx.eventBus().consumer("some-address1", msg -> {
                    try {
                        barrier.await();
                        LocalEventBusTest.this.complete();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        LocalEventBusTest.this.fail(e);
                    }
                    catch (BrokenBarrierException e) {
                        LocalEventBusTest.this.fail(e);
                    }
                });
            }
        }, new DeploymentOptions().setInstances(1).setWorker(true).setMultiThreaded(true), this.onSuccess(id -> {
            for (int i = 0; i < num; ++i) {
                this.vertx.eventBus().send("some-address1", (Object)("msg-" + i));
            }
        }));
        this.await();
    }

    @Test
    public void testMTExecBlockingConsumer() {
        int num = 3;
        this.waitFor(num);
        this.vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() {
                CyclicBarrier barrier = new CyclicBarrier(3);
                this.vertx.eventBus().consumer("some-address1", msg -> this.vertx.executeBlocking(block -> {
                    try {
                        barrier.await();
                        LocalEventBusTest.this.complete();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        LocalEventBusTest.this.fail(e);
                    }
                    catch (BrokenBarrierException e) {
                        LocalEventBusTest.this.fail(e);
                    }
                }, false, null));
            }
        }, this.onSuccess(id -> {
            for (int i = 0; i < num; ++i) {
                this.vertx.eventBus().send("some-address1", (Object)("msg-" + i));
            }
        }));
        this.await();
    }

    @Test
    public void testUnregisterConsumerDiscardPendingMessages() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.handler(msg -> {
            this.assertEquals("val0", msg.body());
            consumer.pause();
            this.eb.send("some-address1", (Object)"val1");
            Context ctx = Vertx.currentContext();
            ctx.runOnContext(v -> {
                consumer.resume();
                ((HandlerRegistration)consumer).discardHandler(discarded -> {
                    this.assertEquals("val1", discarded.body());
                    this.testComplete();
                });
                consumer.handler(null);
            });
        });
        this.eb.send("some-address1", (Object)"val0");
        this.await();
    }

    @Test
    public void testImmediateUnregistration() {
        MessageConsumer consumer = this.vertx.eventBus().consumer("some-address1");
        AtomicInteger completionCount = new AtomicInteger();
        consumer.completionHandler(ar -> {
            int val = completionCount.getAndIncrement();
            this.assertEquals(0L, val);
            this.assertTrue(ar.succeeded());
            this.vertx.setTimer(10L, id -> this.testComplete());
        });
        consumer.handler(msg -> {});
        consumer.unregister();
        this.await();
    }

    @Test
    public void testSendWriteHandler() {
        this.waitFor(2);
        this.eb.consumer("some-address1", msg -> this.complete());
        MessageProducer producer = this.eb.sender("some-address1");
        producer.write((Object)"body", this.onSuccess(v -> this.complete()));
        this.await();
    }

    @Test
    public void testSendWriteHandlerNoConsumer() {
        MessageProducer producer = this.eb.sender("some-address1");
        producer.write((Object)"body", this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testPublishWriteHandler() {
        this.waitFor(2);
        this.eb.consumer("some-address1", msg -> this.complete());
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.write((Object)"body", this.onSuccess(v -> this.complete()));
        this.await();
    }

    @Test
    public void testPublishWriteHandlerNoConsumer() {
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.write((Object)"body", this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testClosePublisher() {
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.close(this.onSuccess(v -> this.testComplete()));
        this.await();
    }

    @Test
    public void testCloseSender() {
        MessageProducer producer = this.eb.sender("some-address1");
        producer.close(this.onSuccess(v -> this.testComplete()));
        this.await();
    }

    private /* synthetic */ void lambda$testContextsPublish$96(Set contexts, AtomicInteger cnt, int numHandlers, Message msg) {
        contexts.add(((VertxInternal)this.vertx).getContext());
        if (cnt.incrementAndGet() == numHandlers) {
            this.assertEquals(numHandlers, contexts.size());
            this.testComplete();
        }
    }

    private /* synthetic */ void lambda$testContextsSend$95(Set contexts, CountDownLatch latch, Message reply) {
        this.assertEquals("bar", reply.body());
        contexts.add(((VertxInternal)this.vertx).getContext());
        latch.countDown();
    }

    private /* synthetic */ void lambda$testContextsSend$94(Set contexts, CountDownLatch latch, Message msg) {
        msg.reply((Object)"bar");
        contexts.add(((VertxInternal)this.vertx).getContext());
        latch.countDown();
    }
}

