/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Deployable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageConsumerOptions;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.EventBusInternal;
import io.vertx.core.eventbus.impl.MessageConsumerImpl;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.TestUtils;
import io.vertx.tests.eventbus.EventBusTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.junit.Ignore;
import org.junit.Test;

public class LocalEventBusTest
extends EventBusTestBase {
    private EventBusInternal eb;
    private boolean running;

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.vertx.close();
        this.vertx = Vertx.vertx();
        this.eb = (EventBusInternal)this.vertx.eventBus();
        EventBusTestBase.ImmutableObjectCodec immutableObjectCodec = new EventBusTestBase.ImmutableObjectCodec();
        this.eb.registerCodec((MessageCodec)immutableObjectCodec);
        this.eb.codecSelector(obj -> obj instanceof EventBusTestBase.ImmutableObject ? immutableObjectCodec.name() : null);
        this.running = true;
    }

    @Override
    protected void tearDown() throws Exception {
        this.closeVertx();
        super.tearDown();
    }

    private void closeVertx() throws Exception {
        if (this.running) {
            CountDownLatch latch = new CountDownLatch(1);
            this.vertx.close().onComplete(this.onSuccess(v -> latch.countDown()));
            this.assertTrue(latch.await(30L, TimeUnit.SECONDS));
            this.running = false;
        }
    }

    @Override
    protected Vertx[] vertices(int num) {
        Object[] instances = new Vertx[num];
        Arrays.fill(instances, this.vertx);
        return instances;
    }

    @Test
    public void testDeliveryOptions() {
        DeliveryOptions options = new DeliveryOptions();
        TestUtils.assertIllegalArgumentException(() -> options.setSendTimeout(0L));
        TestUtils.assertIllegalArgumentException(() -> options.setSendTimeout(-1L));
        TestUtils.assertNullPointerException(() -> options.addHeader(null, ""));
        TestUtils.assertNullPointerException(() -> options.addHeader("", null));
    }

    @Test
    public void testArgumentValidation() throws Exception {
        TestUtils.assertNullPointerException(() -> this.eb.send(null, (Object)""));
        TestUtils.assertNullPointerException(() -> this.eb.send(null, (Object)"", new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.send("", (Object)"", null));
        TestUtils.assertNullPointerException(() -> this.eb.publish(null, (Object)""));
        TestUtils.assertNullPointerException(() -> this.eb.publish(null, (Object)"", new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.publish("", (Object)"", null));
        TestUtils.assertNullPointerException(() -> this.eb.consumer((String)null));
        TestUtils.assertNullPointerException(() -> this.eb.consumer((MessageConsumerOptions)null));
        TestUtils.assertNullPointerException(() -> this.eb.localConsumer(null));
        TestUtils.assertNullPointerException(() -> this.eb.sender(null));
        TestUtils.assertNullPointerException(() -> this.eb.sender(null, new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.publisher("", null));
        TestUtils.assertNullPointerException(() -> this.eb.publisher(null, new DeliveryOptions()));
        TestUtils.assertNullPointerException(() -> this.eb.registerCodec(null));
        TestUtils.assertNullPointerException(() -> this.eb.unregisterCodec(null));
        TestUtils.assertNullPointerException(() -> this.eb.registerDefaultCodec(null, (MessageCodec)new EventBusTestBase.MyPOJOEncoder1()));
        TestUtils.assertNullPointerException(() -> this.eb.registerDefaultCodec(Object.class, null));
        TestUtils.assertNullPointerException(() -> this.eb.unregisterDefaultCodec(null));
    }

    @Test
    public void testRegisterUnregister() {
        String str = TestUtils.randomUnicodeString(100);
        Handler handler = msg -> this.fail("Should not receive message");
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler);
        this.assertEquals("some-address1", reg.address());
        reg.unregister();
        this.eb.send("some-address1", (Object)str);
        this.vertx.setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testUnregisterTwice() {
        Handler handler = msg -> {};
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler);
        reg.unregister();
        reg.unregister();
        this.testComplete();
    }

    @Test
    public void testMessageConsumerCloseHookIsClosedCorrectly() {
        Vertx vertx = Vertx.vertx();
        final EventBus eb = vertx.eventBus();
        vertx.deployVerticle((Deployable)new AbstractVerticle(){
            MessageConsumer consumer;

            public void start() {
                this.context.exceptionHandler(err -> LocalEventBusTest.this.fail("Unexpected exception"));
                this.consumer = eb.consumer("some-address1").handler(msg -> {});
            }
        }).onComplete(this.onSuccess(deploymentID -> vertx.undeploy(deploymentID).onComplete(this.onSuccess(v -> vertx.setTimer(10L, id -> this.testComplete())))));
        this.await();
    }

    @Test
    public void testRegisterLocal1() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.localConsumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completion().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterLocal2() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.localConsumer("some-address1", msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completion().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterWithCompletionHandler() {
        String str = TestUtils.randomUnicodeString(100);
        MessageConsumer reg = this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        reg.completion().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.eb.send("some-address1", (Object)str);
        });
        this.await();
    }

    @Test
    public void testSendRoundRobin() {
        int i;
        String str = TestUtils.randomUnicodeString(100);
        int numHandlers = 10;
        int numMessages = 100;
        Handler[] handlers = new Handler[numHandlers];
        ConcurrentHashMap countMap = new ConcurrentHashMap();
        AtomicInteger totalCount = new AtomicInteger();
        for (i = 0; i < numHandlers; ++i) {
            int index = i;
            handlers[i] = msg -> {
                this.assertEquals(str, msg.body());
                Integer cnt = (Integer)countMap.get(handlers[index]);
                int icnt = cnt == null ? 0 : cnt;
                countMap.put(handlers[index], ++icnt);
                if (totalCount.incrementAndGet() == numMessages) {
                    this.assertEquals(numHandlers, countMap.size());
                    for (Integer ind : countMap.values()) {
                        this.assertEquals(numMessages / numHandlers, ind.intValue());
                    }
                    this.testComplete();
                }
            };
            this.eb.consumer("some-address1").handler(handlers[i]);
        }
        for (i = 0; i < numMessages; ++i) {
            this.eb.send("some-address1", (Object)str);
        }
        this.await();
    }

    @Test
    public void testSendRegisterSomeUnregisterOne() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger totalCount = new AtomicInteger();
        Handler handler1 = msg -> this.fail("Should not receive message");
        Handler handler2 = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        Handler handler3 = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler1);
        this.eb.consumer("some-address1").handler(handler2);
        this.eb.consumer("some-address1").handler(handler3);
        reg.unregister();
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testSendRegisterSameHandlerMultipleTimes() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger totalCount = new AtomicInteger();
        Handler handler = msg -> {
            this.assertEquals(str, msg.body());
            if (totalCount.incrementAndGet() == 3) {
                this.testComplete();
            }
        };
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.eb.send("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testSendWithNoHandler() {
        this.eb.send("some-address1", (Object)TestUtils.randomUnicodeString(100));
        this.vertx.setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testSendMultipleAddresses() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger cnt = new AtomicInteger();
        this.eb.consumer("some-address1").handler(msg -> this.fail("Should not receive message"));
        this.eb.consumer("some-address2").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (cnt.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.send("some-address2", (Object)str);
        this.eb.send("some-address2", (Object)str);
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutNoReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout));
        this.await();
    }

    @Test
    public void testSendWithReply() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)reply);
        });
        this.eb.request("some-address1", (Object)str).onComplete(this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testReplyToReply() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        String replyReply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.replyAndRequest((Object)reply).onComplete(this.onSuccess(rep -> {
                this.assertEquals(replyReply, rep.body());
                this.testComplete();
            }));
        });
        this.eb.request("some-address1", (Object)str).onComplete(this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            msg.reply((Object)replyReply);
        }));
        this.await();
    }

    @Test
    public void testSendReplyWithTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            long start = System.currentTimeMillis();
            long timeout = 1000L;
            msg.replyAndRequest((Object)reply, new DeliveryOptions().setSendTimeout(timeout)).onComplete(this.onFailure(err -> {
                long now = System.currentTimeMillis();
                this.assertTrue(err instanceof ReplyException);
                ReplyException re = (ReplyException)err;
                this.assertEquals(-1L, re.failureCode());
                this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
                this.assertTrue(now - start >= timeout);
                this.testComplete();
            }));
        });
        this.eb.request("some-address1", (Object)str).onComplete(this.onSuccess(msg -> this.assertEquals(reply, msg.body())));
        this.await();
    }

    @Test
    public void testSendReplyWithTimeoutNoTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        String reply = TestUtils.randomUnicodeString(1000);
        String replyReply = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            long timeout = 1000L;
            msg.replyAndRequest((Object)reply, new DeliveryOptions().setSendTimeout(timeout)).onComplete(this.onSuccess(r -> this.testComplete()));
        });
        this.eb.request("some-address1", (Object)str).onComplete(this.onSuccess(msg -> {
            this.assertEquals(reply, msg.body());
            msg.reply((Object)replyReply);
        }));
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)23);
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.assertEquals(23L, ((Integer)((Message)ar.result()).body()).intValue());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoReply() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> this.assertEquals(str, msg.body()));
        long timeout = 1000L;
        long start = System.currentTimeMillis();
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            long now = System.currentTimeMillis();
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
            this.assertTrue(now - start >= timeout);
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoHandlers() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.NO_HANDLERS, re.failureType());
            this.assertEquals("No handlers for address some-address1", re.getMessage());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutRecipientFailure() {
        String str = TestUtils.randomUnicodeString(1000);
        String failureMsg = TestUtils.randomUnicodeString(1000);
        int failureCode = 123;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.fail(failureCode, failureMsg);
        });
        long timeout = 1000L;
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(failureCode, re.failureCode());
            this.assertEquals(failureMsg, re.getMessage());
            this.assertEquals(ReplyFailure.RECIPIENT_FAILURE, re.failureType());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutReplyAfterTimeout() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.vertx.setTimer((long)((int)((double)timeout * 1.5)), id -> msg.reply((Object)"too late!"));
        });
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            this.assertFalse(ar.succeeded());
            Throwable cause = ar.cause();
            this.assertTrue(cause instanceof ReplyException);
            ReplyException re = (ReplyException)cause;
            this.assertEquals(-1L, re.failureCode());
            this.assertEquals(ReplyFailure.TIMEOUT, re.failureType());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSendWithTimeoutNoTimeoutAfterReply() {
        String str = TestUtils.randomUnicodeString(1000);
        long timeout = 1000L;
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            msg.reply((Object)"a reply");
        });
        AtomicBoolean received = new AtomicBoolean();
        this.eb.request("some-address1", (Object)str, new DeliveryOptions().setSendTimeout(timeout)).onComplete(ar -> {
            this.assertFalse(received.get());
            this.assertTrue(ar.succeeded());
            received.set(true);
            this.vertx.setTimer(timeout * 2L, tid -> this.testComplete());
        });
        this.await();
    }

    @Test
    public void testReplyToSendWithNoReplyHandler() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.reply((Object)"a reply");
            this.testComplete();
        });
        this.eb.send("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testReplyToPublish() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.reply((Object)"a reply");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testFailAfterSend() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.fail(0, "a failure");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testFailAfterPublish() {
        this.eb.consumer("some-address1").handler(msg -> {
            msg.fail(0, "a failure");
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)"whatever");
        this.await();
    }

    @Test
    public void testPublish() {
        String str = TestUtils.randomUnicodeString(100);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlers() {
        String str = TestUtils.randomUnicodeString(100);
        AtomicInteger count = new AtomicInteger();
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        });
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishSameHandlerRegisteredTwice() {
        String str = TestUtils.randomUnicodeString(1000);
        AtomicInteger count = new AtomicInteger();
        Handler handler = msg -> {
            this.assertEquals(str, msg.body());
            if (count.incrementAndGet() == 2) {
                this.testComplete();
            }
        };
        this.eb.consumer("some-address1").handler(handler);
        this.eb.consumer("some-address1").handler(handler);
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlersUnregisterOne() {
        String str = TestUtils.randomUnicodeString(1000);
        Handler handler1 = msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        };
        Handler handler2 = msg -> this.fail("Should not be called");
        this.eb.consumer("some-address1").handler(handler1);
        MessageConsumer reg = this.eb.consumer("some-address1").handler(handler2);
        reg.unregister();
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testPublishMultipleHandlersDifferentAddresses() {
        String str = TestUtils.randomUnicodeString(1000);
        this.eb.consumer("some-address1").handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        });
        this.eb.consumer("some-address2").handler(msg -> this.fail("Should not receive message"));
        this.eb.publish("some-address1", (Object)str);
        this.await();
    }

    @Test
    public void testNonRegisteredCodecType() {
        this.eb.consumer("foo").handler(msg -> this.fail("Should not have gotten here"));
        try {
            class Boom {
                Boom() {
                }
            }
            this.eb.send("foo", (Object)new Boom());
        }
        catch (IllegalArgumentException e) {
            this.testComplete();
        }
        this.await();
    }

    @Test
    public void testCloseEventBus() {
        Promise promise = Promise.promise();
        this.eb.close(promise);
        promise.future().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testInVerticle() throws Exception {
        this.testInVerticle(false);
    }

    @Test
    public void testInWorkerVerticle() throws Exception {
        this.testInVerticle(true);
    }

    private void testInVerticle(final boolean worker) throws Exception {
        class MyVerticle
        extends AbstractVerticle {
            Context ctx;

            MyVerticle() {
            }

            public void start() {
                this.ctx = this.context;
                if (worker) {
                    LocalEventBusTest.this.assertTrue(this.ctx.isWorkerContext());
                } else {
                    LocalEventBusTest.this.assertTrue(this.ctx.isEventLoopContext());
                }
                Thread thr = Thread.currentThread();
                MessageConsumer reg = this.vertx.eventBus().consumer("some-address1").handler(msg -> {
                    LocalEventBusTest.this.assertSame(this.ctx, this.context);
                    if (!worker) {
                        LocalEventBusTest.this.assertSame(thr, Thread.currentThread());
                    }
                    msg.reply((Object)"bar");
                });
                reg.completion().onComplete(ar -> {
                    LocalEventBusTest.this.assertTrue(ar.succeeded());
                    LocalEventBusTest.this.assertSame((Object)this.ctx, (Object)this.context);
                    if (!worker) {
                        LocalEventBusTest.this.assertSame(thr, Thread.currentThread());
                    }
                    this.vertx.eventBus().request("some-address1", (Object)"foo").onComplete(LocalEventBusTest.this.onSuccess(reply -> {
                        LocalEventBusTest.this.assertSame(this.ctx, this.context);
                        if (!worker) {
                            LocalEventBusTest.this.assertSame(thr, Thread.currentThread());
                        }
                        LocalEventBusTest.this.assertEquals("bar", reply.body());
                        LocalEventBusTest.this.testComplete();
                    }));
                });
            }
        }
        MyVerticle verticle = new MyVerticle();
        this.vertx.deployVerticle((Deployable)verticle, new DeploymentOptions().setThreadingModel(worker ? ThreadingModel.WORKER : ThreadingModel.EVENT_LOOP));
        this.await();
    }

    @Test
    public void testContextsSend() throws Exception {
        ConcurrentHashMap.KeySetView contexts = ConcurrentHashMap.newKeySet();
        CountDownLatch latch = new CountDownLatch(2);
        this.vertx.eventBus().consumer("some-address1").handler(msg -> {
            msg.reply((Object)"bar");
            contexts.add(((VertxInternal)this.vertx).getContext());
            latch.countDown();
        });
        this.vertx.eventBus().request("some-address1", (Object)"foo").onComplete(this.onSuccess(reply -> {
            this.assertEquals("bar", reply.body());
            contexts.add(((VertxInternal)this.vertx).getContext());
            latch.countDown();
        }));
        this.awaitLatch(latch);
        this.assertEquals(2L, contexts.size());
    }

    @Test
    public void testContextsPublish() throws Exception {
        ConcurrentHashMap.KeySetView contexts = ConcurrentHashMap.newKeySet();
        AtomicInteger cnt = new AtomicInteger();
        int numHandlers = 10;
        for (int i = 0; i < numHandlers; ++i) {
            this.vertx.eventBus().consumer("some-address1").handler(msg -> {
                contexts.add(((VertxInternal)this.vertx).getContext());
                if (cnt.incrementAndGet() == numHandlers) {
                    this.assertEquals(numHandlers, contexts.size());
                    this.testComplete();
                }
            });
        }
        this.vertx.eventBus().publish("some-address1", (Object)"foo");
        this.await();
    }

    @Test
    public void testSelfSendDoesNotTrampoline() throws Exception {
        this.waitFor(2);
        Context context = this.vertx.getOrCreateContext();
        context.runOnContext(v -> {
            EventBus eb = this.vertx.eventBus();
            AtomicInteger received = new AtomicInteger();
            eb.consumer("some-address1", msg -> {
                received.incrementAndGet();
                this.complete();
            });
            eb.send("some-address1", (Object)"ping");
            this.assertEquals(0L, received.get());
            this.complete();
        });
        this.await();
    }

    @Test
    public void testHeadersCopiedAfterSend() throws Exception {
        MultiMap headers = MultiMap.caseInsensitiveMultiMap();
        headers.add("foo", "bar");
        this.vertx.eventBus().consumer("some-address1").handler(msg -> {
            this.assertNotSame(headers, msg.headers());
            this.assertEquals("bar", msg.headers().get("foo"));
            this.testComplete();
        });
        this.vertx.eventBus().send("some-address1", (Object)"foo", new DeliveryOptions().setHeaders(headers));
        headers.remove("foo");
        this.await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testNoRegisteredDecoder() throws Exception {
        try {
            this.vertx.eventBus().send("some-address1", (Object)"foo", new DeliveryOptions().setCodecName("iqwjdoqiwd"));
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testRegisterSystemDecoder() throws Exception {
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)new EventBusTestBase.MySystemDecoder());
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testUnregisterDecoder() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        this.vertx.eventBus().unregisterCodec(codec.name());
        try {
            this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo"), new DeliveryOptions().setCodecName(codec.name()));
            this.fail("Should throw exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testRegisterTwice() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerCodec((MessageCodec)codec);
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)codec);
            this.fail("Should throw exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testCodecNullName() throws Exception {
        try {
            this.vertx.eventBus().registerCodec((MessageCodec)new EventBusTestBase.NullNameCodec());
            this.fail("Should throw exception");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, null);
    }

    @Test
    public void testNoRegisteredDefaultDecoder() throws Exception {
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo")));
    }

    @Test
    public void testRegisterDefaultSystemDecoder() throws Exception {
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)new EventBusTestBase.MySystemDecoder()));
    }

    @Test
    public void testUnregisterDefaultDecoder() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertx.eventBus().unregisterDefaultCodec(EventBusTestBase.MyPOJO.class);
        TestUtils.assertIllegalArgumentException(() -> this.vertx.eventBus().send("some-address1", (Object)new EventBusTestBase.MyPOJO("foo")));
    }

    @Test
    public void testRegisterDefaultTwice() throws Exception {
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        TestUtils.assertIllegalStateException(() -> this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, codec));
    }

    @Test
    public void testDefaultCodecNullName() throws Exception {
        TestUtils.assertNullPointerException(() -> this.vertx.eventBus().registerDefaultCodec(String.class, (MessageCodec)new EventBusTestBase.NullNameCodec()));
    }

    @Test
    public void testDefaultCodecReplyExceptionSubclass() throws Exception {
        EventBusTestBase.MyReplyException myReplyException = new EventBusTestBase.MyReplyException(23, "my exception");
        this.vertx.eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)new EventBusTestBase.MyReplyExceptionMessageCodec());
        this.eb.consumer("some-address1", msg -> {
            this.assertTrue(msg.body() instanceof EventBusTestBase.MyReplyException);
            this.testComplete();
        });
        this.vertx.eventBus().send("some-address1", (Object)myReplyException);
        this.await();
    }

    @Override
    protected boolean shouldImmutableObjectBeCopied() {
        return false;
    }

    @Test
    public void testPauseResumeMessageStream() {
        this.testPauseResume((consumer, handler) -> consumer.handler(message -> handler.handle((Object)((String)message.body()))));
    }

    @Test
    public void testPauseResumeBodyStream() {
        this.testPauseResume((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testPauseResume(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> register) {
        String[] data = new String[11];
        for (int i = 0; i < data.length; ++i) {
            data[i] = TestUtils.randomAlphaString(10);
        }
        HashSet expected = new HashSet();
        Handler handler = body -> {
            this.assertTrue("Was expecting " + String.valueOf(expected) + " to contain " + body, expected.remove(body));
            if (expected.isEmpty()) {
                this.testComplete();
            }
        };
        MessageConsumer reg = this.eb.consumer(new MessageConsumerOptions().setAddress("some-address1").setMaxBufferedMessages(10));
        ReadStream<?> controller = register.apply((MessageConsumer<String>)reg, (Handler<String>)handler);
        ((MessageConsumerImpl)reg).discardHandler(msg -> {
            this.assertEquals(data[10], msg.body());
            expected.addAll(Arrays.asList(data).subList(0, 10));
            controller.resume();
        });
        controller.pause();
        for (String msg2 : data) {
            this.eb.publish("some-address1", (Object)msg2);
        }
        this.await();
    }

    @Test
    public void testPauseFetchMessageStream() throws Exception {
        this.testPauseFetch((consumer, handler) -> consumer.handler(message -> handler.handle((Object)((String)message.body()))));
    }

    @Test
    public void testPauseFetchBodyStream() throws Exception {
        this.testPauseFetch((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testPauseFetch(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> streamSupplier) throws Exception {
        ArrayList<String> data = new ArrayList<String>();
        for (int i = 0; i < 11; ++i) {
            data.add(TestUtils.randomAlphaString(10));
        }
        List received = Collections.synchronizedList(new ArrayList());
        CountDownLatch receiveLatch = new CountDownLatch(4);
        MessageConsumerImpl consumer = (MessageConsumerImpl)this.eb.consumer(new MessageConsumerOptions().setAddress("some-address1").setMaxBufferedMessages(5));
        streamSupplier.apply((MessageConsumer<String>)consumer, (Handler<String>)((Handler)e -> {
            received.add(e);
            receiveLatch.countDown();
        })).pause().fetch(4L);
        List discarded = Collections.synchronizedList(new ArrayList());
        CountDownLatch discardLatch = new CountDownLatch(2);
        consumer.discardHandler(msg -> {
            discarded.add((String)msg.body());
            discardLatch.countDown();
        });
        ListIterator iterator = data.listIterator();
        while (iterator.nextIndex() < 4) {
            this.eb.publish("some-address1", iterator.next());
        }
        this.awaitLatch(receiveLatch);
        while (iterator.hasNext()) {
            this.eb.publish("some-address1", iterator.next());
        }
        this.awaitLatch(discardLatch);
        this.assertEquals(data.subList(0, 4), received);
        this.assertEquals(data.subList(data.size() - 2, data.size()), discarded);
    }

    @Test
    public void testExceptionWhenDeliveringBufferedMessageWithMessageStream() {
        this.testExceptionWhenDeliveringBufferedMessage((consumer, handler) -> consumer.handler(message -> handler.handle((Object)((String)message.body()))));
    }

    @Test
    public void testExceptionWhenDeliveringBufferedMessageWithBodyStream() {
        this.testExceptionWhenDeliveringBufferedMessage((consumer, handler) -> consumer.bodyStream().handler(handler));
    }

    private void testExceptionWhenDeliveringBufferedMessage(BiFunction<MessageConsumer<String>, Handler<String>, ReadStream<?>> register) {
        String[] data = new String[11];
        for (int i = 0; i < data.length; ++i) {
            data[i] = TestUtils.randomAlphaString(10);
        }
        HashSet expected = new HashSet();
        Handler handler = body -> {
            this.assertTrue("Was expecting " + String.valueOf(expected) + " to contain " + body, expected.remove(body));
            if (!expected.isEmpty()) {
                throw new RuntimeException();
            }
            this.testComplete();
        };
        MessageConsumer reg = this.eb.consumer(new MessageConsumerOptions().setAddress("some-address1").setMaxBufferedMessages(10));
        ReadStream<?> controller = register.apply((MessageConsumer<String>)reg, (Handler<String>)handler);
        ((MessageConsumerImpl)reg).discardHandler(msg -> {
            this.assertEquals(data[10], msg.body());
            expected.addAll(Arrays.asList(data).subList(0, 10));
            controller.resume();
        });
        controller.pause();
        for (String msg2 : data) {
            this.eb.publish("some-address1", (Object)msg2);
        }
        this.await();
    }

    @Test
    public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithMessageStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregisterationOfRegisteredConsumerCallsEndHandler((MessageConsumer<String>)consumer, (ReadStream<?>)consumer);
    }

    @Test
    public void testUnregisterationOfRegisteredConsumerCallsEndHandlerWithBodyStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregisterationOfRegisteredConsumerCallsEndHandler((MessageConsumer<String>)consumer, consumer.bodyStream());
    }

    private void testUnregisterationOfRegisteredConsumerCallsEndHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
        consumer.handler(msg -> {});
        consumer.endHandler(v -> this.testComplete());
        consumer.unregister();
        this.await();
    }

    @Test
    public void testUnregisterThenUnsetEndHandler() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.endHandler(v -> {});
        consumer.unregister().onComplete(res -> this.testComplete());
        consumer.endHandler(null);
        this.await();
    }

    @Test
    public void testUnregistrationWhenSettingNullHandlerWithConsumer() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregistrationWhenSettingNullHandler((MessageConsumer<String>)consumer, (ReadStream<?>)consumer);
    }

    @Test
    public void testUnregistrationWhenSettingNullHandlerWithBodyStream() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        this.testUnregistrationWhenSettingNullHandler((MessageConsumer<String>)consumer, consumer.bodyStream());
    }

    private void testUnregistrationWhenSettingNullHandler(MessageConsumer<String> consumer, ReadStream<?> readStream) {
        readStream.handler(msg -> {});
        this.assertTrue(consumer.isRegistered());
        readStream.handler(null);
        this.assertFalse(consumer.isRegistered());
    }

    @Test
    public void testSender() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer sender = this.eb.sender("some-address1");
        this.eb.consumer("some-address1").handler(message -> {
            if (message.body().equals(str)) {
                this.testComplete();
            }
        });
        sender.write((Object)str);
        this.await();
    }

    @Test
    public void testSenderWithOptions() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer sender = this.eb.sender("some-address1", new DeliveryOptions().addHeader("foo", "foo_value"));
        this.eb.consumer("some-address1").handler(message -> {
            if (message.body().equals(str) && "foo_value".equals(message.headers().get("foo"))) {
                this.testComplete();
            }
        });
        sender.write((Object)str);
        this.await();
    }

    @Test
    public void testPublisher() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer publisher = this.eb.publisher("some-address1");
        this.assertEquals("some-address1", publisher.address());
        AtomicInteger count = new AtomicInteger();
        int n = 2;
        for (int i = 0; i < n; ++i) {
            this.eb.consumer("some-address1").handler(message -> {
                if (message.body().equals(str) && count.incrementAndGet() == n) {
                    this.testComplete();
                }
            });
        }
        publisher.write((Object)str);
        this.await();
    }

    @Test
    public void testPublisherWithOptions() {
        String str = TestUtils.randomUnicodeString(100);
        MessageProducer publisher = this.eb.publisher("some-address1", new DeliveryOptions().addHeader("foo", "foo_value"));
        this.assertEquals("some-address1", publisher.address());
        AtomicInteger count = new AtomicInteger();
        int n = 2;
        for (int i = 0; i < n; ++i) {
            this.eb.consumer("some-address1").handler(message -> {
                if (message.body().equals(str) && "foo_value".equals(message.headers().get("foo")) && count.incrementAndGet() == n) {
                    this.testComplete();
                }
            });
        }
        publisher.write((Object)str);
        this.await();
    }

    @Test
    public void testCloseSender1() {
        this.eb.sender("some-address1").close();
    }

    @Test
    public void testClosePublisher1() {
        this.eb.publisher("some-address1").close();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completion().onComplete(v -> {
            this.assertNull(stack.get());
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.testComplete();
        });
        consumer.handler(msg -> {});
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.handler(msg -> {});
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completion().onComplete(v -> {
            this.assertNull(stack.get());
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testUpdateDeliveryOptionsOnProducer() {
        MessageProducer producer = this.eb.sender("some-address1");
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.completion().onComplete(v -> {
            this.assertTrue(v.succeeded());
            producer.write((Object)"no-header");
        });
        consumer.handler(msg -> {
            String body = (String)msg.body();
            this.assertNotNull(body);
            switch (body) {
                case "no-header": {
                    this.assertNull(msg.headers().get("header-name"));
                    producer.deliveryOptions(new DeliveryOptions().addHeader("header-name", "header-value"));
                    producer.write((Object)"with-header");
                    break;
                }
                case "with-header": {
                    this.assertEquals("header-value", msg.headers().get("header-name"));
                    this.testComplete();
                    break;
                }
                default: {
                    this.fail();
                }
            }
        });
        this.await();
    }

    @Test
    public void testCloseCallsEndHandlerWithRegistrationContext() throws Exception {
        Context ctx = this.vertx.getOrCreateContext();
        CountDownLatch registered = new CountDownLatch(1);
        ctx.runOnContext(v1 -> {
            MessageConsumer consumer = this.eb.consumer("some-address1");
            consumer.endHandler(v2 -> {
                this.assertSame(Vertx.currentContext(), ctx);
                this.testComplete();
            });
            consumer.handler(msg -> {});
            consumer.completion().onComplete(ar -> {
                this.assertTrue(ar.succeeded());
                registered.countDown();
            });
        });
        this.awaitLatch(registered);
        this.closeVertx();
        this.await();
    }

    @Test
    public void testConsumerUnregisterDoesNotCancelTimer0() throws Exception {
        Context ctx = this.vertx.getOrCreateContext();
        ctx.runOnContext(v -> {
            this.vertx.setTimer(50L, id -> {
                this.assertEquals(0L, (long)id);
                this.testComplete();
            });
            this.eb.consumer("some-address1").unregister();
        });
        this.await();
    }

    @Test
    public void testUnregisterConsumerDiscardPendingMessages() {
        MessageConsumer consumer = this.eb.consumer("some-address1");
        consumer.handler(msg -> {
            this.assertEquals("val0", msg.body());
            consumer.pause();
            this.eb.send("some-address1", (Object)"val1");
            Context ctx = Vertx.currentContext();
            ctx.runOnContext(v -> {
                consumer.resume();
                ((MessageConsumerImpl)consumer).discardHandler(discarded -> {
                    this.assertEquals("val1", discarded.body());
                    this.testComplete();
                });
                consumer.handler(null);
            });
        });
        this.eb.send("some-address1", (Object)"val0");
        this.await();
    }

    @Test
    public void testImmediateUnregistration() {
        MessageConsumer consumer = this.vertx.eventBus().consumer("some-address1");
        AtomicInteger completionCount = new AtomicInteger();
        consumer.completion().onComplete(ar -> {
            int val = completionCount.getAndIncrement();
            this.assertEquals(0L, val);
            this.assertTrue(ar.succeeded());
            this.vertx.setTimer(10L, id -> this.testComplete());
        });
        consumer.handler(msg -> {});
        consumer.unregister();
        this.await();
    }

    @Test
    public void testSendWriteHandler() {
        this.waitFor(2);
        this.eb.consumer("some-address1", msg -> this.complete());
        MessageProducer producer = this.eb.sender("some-address1");
        producer.write((Object)"body").onComplete(this.onSuccess(v -> this.complete()));
        this.await();
    }

    @Test
    public void testSendWriteHandlerNoConsumer() {
        MessageProducer producer = this.eb.sender("some-address1");
        producer.write((Object)"body").onComplete(this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testPublishWriteHandler() {
        this.waitFor(2);
        this.eb.consumer("some-address1", msg -> this.complete());
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.write((Object)"body").onComplete(this.onSuccess(v -> this.complete()));
        this.await();
    }

    @Test
    public void testPublishWriteHandlerNoConsumer() {
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.write((Object)"body").onComplete(this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testClosePublisher() {
        MessageProducer producer = this.eb.publisher("some-address1");
        producer.close().onComplete(this.onSuccess(v -> this.testComplete()));
        this.await();
    }

    @Test
    public void testCloseSender() {
        MessageProducer producer = this.eb.sender("some-address1");
        producer.close().onComplete(this.onSuccess(v -> this.testComplete()));
        this.await();
    }

    @Ignore
    @Test
    public void testEarlyTimeoutOfBufferedMessagesOnHandlerUnregistration() {
        this.testEarlyTimeoutOfBufferedMessages(MessageConsumer::pause, MessageConsumer::unregister);
    }

    private void testEarlyTimeoutOfBufferedMessages(Consumer<MessageConsumer<?>> beforeRequest, Consumer<MessageConsumer<?>> afterRequest) {
        DeliveryOptions noTimeout = new DeliveryOptions().setSendTimeout(Long.MAX_VALUE);
        MessageConsumer consumer = this.vertx.eventBus().consumer(new MessageConsumerOptions().setAddress("some-address1").setMaxBufferedMessages(1));
        consumer.handler(message -> message.reply(null));
        ((MessageConsumerImpl)consumer).discardHandler(msg -> {
            if (msg.body().equals(2)) {
                afterRequest.accept(consumer);
            }
        });
        consumer.completion().onComplete(__ -> {
            beforeRequest.accept(consumer);
            this.vertx.eventBus().request("some-address1", (Object)1, noTimeout).onComplete(this.onFailure(err -> {
                this.assertTrue(err instanceof ReplyException);
                this.assertEquals(ReplyFailure.TIMEOUT, ((ReplyException)err).failureType());
                this.testComplete();
            }));
            this.vertx.eventBus().send("some-address1", (Object)2);
        });
        this.await();
    }

    @Ignore
    @Test
    public void testEarlyTimeoutWhenMaxBufferedMessagesExceeded() {
        DeliveryOptions noTimeout = new DeliveryOptions().setSendTimeout(Long.MAX_VALUE);
        MessageConsumer consumer = this.vertx.eventBus().consumer(new MessageConsumerOptions().setAddress("some-address1").setMaxBufferedMessages(0));
        consumer.handler(message -> this.fail());
        consumer.completion().onComplete(__ -> {
            consumer.pause();
            this.vertx.eventBus().request("some-address1", (Object)1, noTimeout).onComplete(this.onFailure(err -> {
                this.assertTrue(err instanceof ReplyException);
                this.assertEquals(ReplyFailure.TIMEOUT, ((ReplyException)err).failureType());
                this.testComplete();
            }));
        });
        this.await();
    }

    @Test
    public void testEarlyTimeoutOnHandlerUnregistration() {
        DeliveryOptions noTimeout = new DeliveryOptions().setSendTimeout(Long.MAX_VALUE);
        MessageConsumer consumer = this.vertx.eventBus().consumer("some-address1");
        consumer.handler(message -> this.fail());
        consumer.completion().onComplete(__ -> {
            this.vertx.eventBus().request("some-address1", (Object)1, noTimeout).onComplete(this.onFailure(err -> {
                this.assertTrue(err instanceof ReplyException);
                this.assertEquals(ReplyFailure.TIMEOUT, ((ReplyException)err).failureType());
                this.testComplete();
            }));
            consumer.unregister();
        });
        this.await();
    }

    @Test
    public void testMessageConsumptionStayOnWorkerThreadAfterResume() {
        ContextInternal worker = ((VertxInternal)this.vertx).createWorkerContext();
        worker.runOnContext(v -> {
            EventBus bus = this.vertx.eventBus();
            AtomicBoolean enabled = new AtomicBoolean(false);
            MessageConsumer consumer = bus.consumer("some-address1", msg -> {
                this.assertTrue(enabled.get());
                this.assertTrue(worker.inThread());
                this.testComplete();
            });
            consumer.pause();
            MessageProducer sender = bus.sender("some-address1");
            sender.write((Object)"msg").onComplete(this.onSuccess(v2 -> worker.runOnContext(v3 -> {
                enabled.set(true);
                consumer.resume();
            })));
        });
        this.await();
    }

    @Test
    public void testUnregisterInHandler() {
        this.waitFor(2);
        MessageConsumerImpl consumer = (MessageConsumerImpl)this.vertx.eventBus().consumer("some-address1");
        consumer.discardHandler(msg -> {
            this.assertEquals("msg-2", msg.body());
            this.complete();
        });
        consumer.handler(msg -> {
            consumer.unregister();
            this.vertx.runOnContext(v -> this.complete());
        });
        consumer.pause();
        this.vertx.eventBus().send("some-address1", (Object)"msg-1");
        this.vertx.eventBus().send("some-address1", (Object)"msg-2");
        this.vertx.runOnContext(v -> consumer.resume());
        this.await();
    }
}

