/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.ClusteredEventBusTestBase;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.EventBusTestBase;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.shareddata.AsyncMapTest;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.core.spi.cluster.WrappedClusterManager;
import io.vertx.core.spi.cluster.WrappedNodeSelector;
import io.vertx.test.core.TestUtils;
import io.vertx.test.tls.Cert;
import java.io.InvalidClassException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Test;

public class ClusteredEventBusTest
extends ClusteredEventBusTestBase {
    @Test
    public void testLocalHandlerNotVisibleRemotely() throws Exception {
        this.startNodes(2);
        this.vertices[1].eventBus().localConsumer("some-address1").handler(msg -> this.fail("Should not receive message"));
        this.vertices[0].eventBus().send("some-address1", (Object)"foo");
        this.vertices[0].eventBus().publish("some-address1", (Object)"foo");
        this.vertices[0].setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testLocalHandlerClusteredSend() throws Exception {
        this.startNodes(2);
        this.waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", msg -> this.complete()).completionHandler(v1 -> this.vertices[0].eventBus().localConsumer("some-address1", msg -> this.complete()).completionHandler(v2 -> {
            this.vertices[0].eventBus().send("some-address1", (Object)"foo");
            this.vertices[0].eventBus().send("some-address1", (Object)"foo");
        }));
        this.await();
    }

    @Test
    public void testLocalHandlerClusteredPublish() throws Exception {
        this.startNodes(2);
        this.waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", msg -> this.complete()).completionHandler(v1 -> this.vertices[0].eventBus().localConsumer("some-address1", msg -> this.complete()).completionHandler(v2 -> this.vertices[0].eventBus().publish("some-address1", (Object)"foo")));
        this.await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultCodecReplyExceptionSubclass() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyReplyException myReplyException = new EventBusTestBase.MyReplyException(23, "my exception");
        EventBusTestBase.MyReplyExceptionMessageCodec codec = new EventBusTestBase.MyReplyExceptionMessageCodec();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)codec);
        MessageConsumer reg = this.vertices[0].eventBus().consumer("some-address1", msg -> {
            this.assertTrue(msg.body() instanceof EventBusTestBase.MyReplyException);
            this.testComplete();
        });
        reg.completionHandler(ar -> this.vertices[1].eventBus().send("some-address1", (Object)myReplyException));
        this.await();
    }

    @Test
    public void testClusteredPong() throws Exception {
        VertxOptions options = new VertxOptions();
        options.getEventBusOptions().setClusterPingInterval(500L).setClusterPingReplyInterval(500L);
        this.startNodes(2, options);
        AtomicBoolean sending = new AtomicBoolean();
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foobar").handler(msg -> {
            if (!sending.get()) {
                sending.set(true);
                this.vertx.setTimer(4000L, id -> this.vertices[1].eventBus().send("foobar", (Object)"whatever2"));
            } else {
                this.testComplete();
            }
        });
        consumer.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send("foobar", (Object)"whatever");
        });
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously1() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        consumer.handler(msg -> {});
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        consumer.handler(msg -> {});
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSubsRemovedForClosedNode() throws Exception {
        this.testSubsRemoved(latch -> this.vertices[1].close(this.onSuccess(v -> latch.countDown())));
    }

    @Test
    public void testSubsRemovedForKilledNode() throws Exception {
        this.testSubsRemoved(latch -> {
            VertxInternal vi = (VertxInternal)this.vertices[1];
            PromiseInternal promise = vi.getOrCreateContext().promise();
            vi.getClusterManager().leave((Promise)promise);
            promise.future().onComplete(this.onSuccess(v -> latch.countDown()));
        });
    }

    private void testSubsRemoved(Consumer<CountDownLatch> action) throws Exception {
        this.startNodes(3);
        CountDownLatch regLatch = new CountDownLatch(1);
        AtomicInteger cnt = new AtomicInteger();
        this.vertices[0].eventBus().consumer("some-address1", msg -> {
            int c = cnt.getAndIncrement();
            this.assertEquals(msg.body(), "foo" + c);
            if (c == 9) {
                this.testComplete();
            }
            if (c > 9) {
                this.fail("too many messages");
            }
        }).completionHandler(this.onSuccess(v -> this.vertices[1].eventBus().consumer("some-address1", msg -> this.fail("shouldn't get message")).completionHandler(this.onSuccess(v2 -> regLatch.countDown()))));
        this.awaitLatch(regLatch);
        CountDownLatch closeLatch = new CountDownLatch(1);
        action.accept(closeLatch);
        this.awaitLatch(closeLatch);
        Thread.sleep(2000L);
        this.vertices[2].runOnContext(v -> {
            EventBus ebSender = this.vertices[2].eventBus();
            for (int i = 0; i < 10; ++i) {
                ebSender.send("some-address1", (Object)("foo" + i));
            }
        });
        this.await();
    }

    @Test
    public void sendNoContext() throws Exception {
        int size = 1000;
        List<Integer> expected = Stream.iterate(0, i -> i + 1).limit(size).collect(Collectors.toList());
        ConcurrentLinkedDeque obtained = new ConcurrentLinkedDeque();
        this.startNodes(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[1].eventBus().consumer("some-address1", msg -> {
            obtained.add(msg.body());
            if (obtained.size() == expected.size()) {
                this.assertEquals(expected, new ArrayList(obtained));
                this.testComplete();
            }
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            latch.countDown();
        });
        latch.await();
        EventBus bus = this.vertices[0].eventBus();
        expected.forEach(val -> bus.send("some-address1", val));
        this.await();
    }

    @Test
    public void testSendLocalOnly() {
        this.testDeliveryOptionsLocalOnly(true);
    }

    @Test
    public void testPublishLocalOnly() {
        this.testDeliveryOptionsLocalOnly(false);
    }

    private void testDeliveryOptionsLocalOnly(boolean send) {
        this.waitFor(30);
        this.startNodes(2);
        AtomicLong localConsumer0 = new AtomicLong();
        this.vertices[0].eventBus().localConsumer("some-address1").handler(msg -> {
            localConsumer0.incrementAndGet();
            this.complete();
        });
        AtomicLong consumer1 = new AtomicLong();
        this.vertices[1].eventBus().consumer("some-address1").handler(msg -> consumer1.incrementAndGet()).completionHandler(this.onSuccess(v -> {
            for (int i = 0; i < 30; ++i) {
                if (send) {
                    this.vertices[0].eventBus().send("some-address1", (Object)"msg", new DeliveryOptions().setLocalOnly(true));
                    continue;
                }
                this.vertices[0].eventBus().publish("some-address1", (Object)"msg", new DeliveryOptions().setLocalOnly(true));
            }
        }));
        this.await();
        this.assertEquals(30L, localConsumer0.get());
        this.assertEquals(0L, consumer1.get());
    }

    @Test
    public void testLocalOnlyDoesNotApplyToReplies() {
        this.startNodes(2);
        this.vertices[1].eventBus().consumer("some-address1").handler(msg -> msg.reply((Object)"pong", new DeliveryOptions().setLocalOnly(true))).completionHandler(this.onSuccess(v -> this.vertices[0].eventBus().request("some-address1", (Object)"ping", new DeliveryOptions().setSendTimeout(500L), this.onSuccess(msg -> this.testComplete()))));
        this.await();
    }

    @Test
    public void testImmediateUnregistration() {
        this.startNodes(1);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        AtomicInteger completionCount = new AtomicInteger();
        consumer.completionHandler(v -> {
            int val = completionCount.getAndIncrement();
            this.assertEquals(0L, val);
        });
        consumer.handler(msg -> {});
        consumer.unregister(this.onSuccess(v -> {
            int val = completionCount.getAndIncrement();
            this.assertEquals(1L, val);
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testSendWriteHandler() throws Exception {
        final CountDownLatch updateLatch = new CountDownLatch(3);
        Supplier<VertxOptions> options = () -> this.getOptions().setClusterManager((ClusterManager)new WrappedClusterManager(this.getClusterManager()){

            @Override
            public void init(Vertx vertx, NodeSelector nodeSelector) {
                super.init(vertx, new WrappedNodeSelector(nodeSelector){

                    @Override
                    public void registrationsUpdated(RegistrationUpdateEvent event) {
                        super.registrationsUpdated(event);
                        if (event.address().equals("some-address1") && event.registrations().size() == 1) {
                            updateLatch.countDown();
                        }
                    }

                    @Override
                    public boolean wantsUpdatesFor(String address) {
                        return true;
                    }
                });
            }
        });
        this.startNodes(options.get(), options.get());
        this.waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", msg -> this.complete()).completionHandler(this.onSuccess(v1 -> updateLatch.countDown()));
        this.awaitLatch(updateLatch);
        MessageProducer producer = this.vertices[0].eventBus().sender("some-address1");
        producer.write((Object)"body", this.onSuccess(v2 -> this.complete()));
        this.await();
    }

    @Test
    public void testSendWriteHandlerNoConsumer() {
        this.startNodes(2);
        MessageProducer producer = this.vertices[0].eventBus().sender("some-address1");
        producer.write((Object)"body", this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testPublishWriteHandler() {
        this.startNodes(2);
        this.waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", msg -> this.complete()).completionHandler(this.onSuccess(v1 -> {
            MessageProducer producer = this.vertices[0].eventBus().publisher("some-address1");
            producer.write((Object)"body", this.onSuccess(v -> this.complete()));
        }));
        this.await();
    }

    @Test
    public void testPublishWriteHandlerNoConsumer() {
        this.startNodes(2);
        MessageProducer producer = this.vertices[0].eventBus().publisher("some-address1");
        producer.write((Object)"body", this.onFailure(err -> {
            this.assertTrue(err instanceof ReplyException);
            ReplyException replyException = (ReplyException)err;
            this.assertEquals(-1L, replyException.failureCode());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testWriteHandlerConnectFailure() {
        VertxOptions options = this.getOptions();
        options.getEventBusOptions().setSsl(true).setTrustAll(false).setKeyCertOptions((KeyCertOptions)Cert.SERVER_JKS.get());
        this.startNodes(2, options);
        this.vertices[1].eventBus().consumer("some-address1", msg -> {}).completionHandler(this.onSuccess(v1 -> {
            MessageProducer producer = this.vertices[0].eventBus().sender("some-address1");
            producer.write((Object)"body", this.onFailure(err -> this.testComplete()));
        }));
        this.await();
    }

    @Test
    public void testSelectorWantsUpdates() throws Exception {
        final AtomicReference nodeSelectorRef = new AtomicReference();
        VertxOptions options = this.getOptions().setClusterManager((ClusterManager)new WrappedClusterManager(this.getClusterManager()){

            @Override
            public void init(Vertx vertx, NodeSelector nodeSelector) {
                nodeSelectorRef.set(nodeSelector);
                super.init(vertx, nodeSelector);
            }
        });
        this.startNodes(options);
        this.assertNotNull(nodeSelectorRef.get());
        this.vertices[0].eventBus().consumer("some-address1", msg -> {
            this.assertTrue(((NodeSelector)nodeSelectorRef.get()).wantsUpdatesFor("some-address1"));
            this.testComplete();
        }).completionHandler(this.onSuccess(v -> this.vertices[0].eventBus().send("some-address1", (Object)"foo")));
        this.await();
    }

    @Test
    public void testSelectorDoesNotWantUpdates() throws Exception {
        final AtomicReference nodeSelectorRef = new AtomicReference();
        VertxOptions options = this.getOptions().setClusterManager((ClusterManager)new WrappedClusterManager(this.getClusterManager()){

            @Override
            public void init(Vertx vertx, NodeSelector nodeSelector) {
                nodeSelectorRef.set(nodeSelector);
                super.init(vertx, nodeSelector);
            }
        });
        this.startNodes(options);
        this.assertNotNull(nodeSelectorRef.get());
        this.assertFalse(((NodeSelector)nodeSelectorRef.get()).wantsUpdatesFor("some-address1"));
    }

    @Test
    public void testLocalConsumerNeverGetsMessagePublishedFromRemote() throws Exception {
        this.startNodes(2);
        this.waitFor(3);
        CountDownLatch completionLatch = new CountDownLatch(4);
        EventBus eb0 = this.vertices[0].eventBus();
        String firstAddress = "foo";
        eb0.localConsumer(firstAddress, message -> this.fail()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        eb0.consumer(firstAddress, message -> this.complete()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        String secondAddress = "bar";
        eb0.consumer(secondAddress, message -> this.complete()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        eb0.localConsumer(secondAddress, message -> this.fail()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        this.awaitLatch(completionLatch);
        EventBus eb1 = this.vertices[1].eventBus();
        eb1.publish(firstAddress, (Object)"content");
        eb1.publish(secondAddress, (Object)"content");
        this.vertx.setTimer(500L, l -> this.complete());
        this.await();
    }

    @Test
    public void testLocalConsumerNeverGetsMessageSentFromRemote() throws Exception {
        class CountingHandler
        implements Handler<Message<Object>> {
            AtomicInteger counter = new AtomicInteger();

            CountingHandler() {
            }

            public void handle(Message<Object> msg) {
                ClusteredEventBusTest.this.assertTrue(this.counter.incrementAndGet() <= maxMessages);
                ClusteredEventBusTest.this.complete();
            }
        }
        this.startNodes(2);
        final int maxMessages = 50;
        this.waitFor(4 * maxMessages);
        CountDownLatch completionLatch = new CountDownLatch(8);
        EventBus eb0 = this.vertices[0].eventBus();
        String firstAddress = "foo";
        for (int i = 0; i < 2; ++i) {
            eb0.localConsumer(firstAddress, message -> this.fail()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
            eb0.consumer(firstAddress, (Handler)new CountingHandler()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        }
        String secondAddress = "bar";
        for (int i = 0; i < 2; ++i) {
            eb0.consumer(secondAddress, (Handler)new CountingHandler()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
            eb0.localConsumer(secondAddress, message -> this.fail()).completionHandler(this.onSuccess(v -> completionLatch.countDown()));
        }
        this.awaitLatch(completionLatch);
        EventBus eb1 = this.vertices[1].eventBus();
        String[] addresses = new String[]{firstAddress, secondAddress};
        for (int i = 0; i < 2 * maxMessages; ++i) {
            for (String address : addresses) {
                eb1.send(address, (Object)"content");
            }
        }
        this.await();
    }

    @Test
    public void testRejectedClusterSerializableNotSent() {
        this.testRejectedNotSent(AsyncMapTest.SomeClusterSerializableObject.class, new AsyncMapTest.SomeClusterSerializableObject("bar"));
    }

    @Test
    public void testRejectedClusterSerializableImplNotSent() {
        this.testRejectedNotSent(AsyncMapTest.SomeClusterSerializableImplObject.class, new AsyncMapTest.SomeClusterSerializableImplObject("bar"));
    }

    @Test
    public void testRejectedSerializableNotSent() {
        this.testRejectedNotSent(AsyncMapTest.SomeSerializableObject.class, new AsyncMapTest.SomeSerializableObject("bar"));
    }

    private <T> void testRejectedNotSent(Class<T> clazz, T message) {
        this.startNodes(2);
        this.vertices[0].eventBus().clusterSerializableChecker(s -> Boolean.FALSE).serializableChecker(s -> Boolean.FALSE);
        this.vertices[1].eventBus().consumer("foo", msg -> this.fail()).completionHandler(this.onSuccess(reg -> {
            try {
                this.vertices[0].eventBus().send("foo", message);
                this.fail();
            }
            catch (IllegalArgumentException e) {
                this.assertEquals("No message codec for type: class " + clazz.getName(), e.getMessage());
                this.testComplete();
            }
        }));
        this.await();
    }

    @Test
    public void testRejectedClusterSerializableNotReceived() {
        this.testRejectedNotReceived(AsyncMapTest.SomeClusterSerializableObject.class, new AsyncMapTest.SomeClusterSerializableObject("bar"));
    }

    @Test
    public void testRejectedClusterSerializableImplNotReceived() {
        this.testRejectedNotReceived(AsyncMapTest.SomeClusterSerializableImplObject.class, new AsyncMapTest.SomeClusterSerializableImplObject("bar"));
    }

    @Test
    public void testRejectedSerializableNotReceived() {
        this.testRejectedNotReceived(AsyncMapTest.SomeSerializableObject.class, new AsyncMapTest.SomeSerializableObject("bar"));
    }

    private <T> void testRejectedNotReceived(Class<T> clazz, T message) {
        this.startNodes(2);
        this.vertices[1].eventBus().clusterSerializableChecker(s -> Boolean.FALSE).serializableChecker(s -> Boolean.FALSE);
        this.vertices[1].eventBus().consumer("foo", msg -> {
            try {
                Object body = msg.body();
                this.fail(String.valueOf(body));
            }
            catch (RuntimeException e) {
                Throwable cause = e.getCause();
                String exceptionMsg = cause instanceof InvalidClassException ? cause.getMessage() : e.getMessage();
                this.assertEquals("Class not allowed: " + clazz.getName(), exceptionMsg);
                this.testComplete();
            }
        }).completionHandler(this.onSuccess(reg -> this.vertices[0].eventBus().send("foo", message)));
        this.await();
    }
}

