/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.ClusteredEventBusTestBase;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.EventBusTestBase;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.test.core.TestUtils;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.Test;

public class ClusteredEventBusTest
extends ClusteredEventBusTestBase {
    @Test
    public void testLocalHandlerNotReceive() throws Exception {
        this.startNodes(2);
        this.vertices[1].eventBus().localConsumer("some-address1").handler(msg -> this.fail("Should not receive message"));
        this.vertices[0].eventBus().send("some-address1", (Object)"foo");
        this.vertices[0].setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultCodecReplyExceptionSubclass() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyReplyException myReplyException = new EventBusTestBase.MyReplyException(23, "my exception");
        EventBusTestBase.MyReplyExceptionMessageCodec codec = new EventBusTestBase.MyReplyExceptionMessageCodec();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, (MessageCodec)codec);
        MessageConsumer reg = this.vertices[0].eventBus().consumer("some-address1", msg -> {
            this.assertTrue(msg.body() instanceof EventBusTestBase.MyReplyException);
            this.testComplete();
        });
        reg.completionHandler(ar -> this.vertices[1].eventBus().send("some-address1", (Object)myReplyException));
        this.await();
    }

    @Test
    public void testClusteredPong() throws Exception {
        this.startNodes(2, new VertxOptions().setClusterPingInterval(500L).setClusterPingReplyInterval(500L));
        AtomicBoolean sending = new AtomicBoolean();
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foobar").handler(msg -> {
            if (!sending.get()) {
                sending.set(true);
                this.vertx.setTimer(4000L, id -> this.vertices[1].eventBus().send("foobar", (Object)"whatever2"));
            } else {
                this.testComplete();
            }
        });
        consumer.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send("foobar", (Object)"whatever");
        });
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously1() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        consumer.handler(msg -> {});
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        consumer.handler(msg -> {});
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSubsRemovedForClosedNode() throws Exception {
        this.testSubsRemoved(latch -> this.vertices[1].close(this.onSuccess(v -> latch.countDown())));
    }

    @Test
    public void testSubsRemovedForKilledNode() throws Exception {
        this.testSubsRemoved(latch -> {
            VertxInternal vi = (VertxInternal)this.vertices[1];
            vi.getClusterManager().leave(this.onSuccess(v -> latch.countDown()));
        });
    }

    private void testSubsRemoved(Consumer<CountDownLatch> action) throws Exception {
        this.startNodes(3);
        CountDownLatch regLatch = new CountDownLatch(1);
        AtomicInteger cnt = new AtomicInteger();
        this.vertices[0].eventBus().consumer("some-address1", msg -> {
            int c = cnt.getAndIncrement();
            this.assertEquals(msg.body(), "foo" + c);
            if (c == 9) {
                this.testComplete();
            }
            if (c > 9) {
                this.fail("too many messages");
            }
        }).completionHandler(this.onSuccess(v -> this.vertices[1].eventBus().consumer("some-address1", msg -> this.fail("shouldn't get message")).completionHandler(this.onSuccess(v2 -> regLatch.countDown()))));
        this.awaitLatch(regLatch);
        CountDownLatch closeLatch = new CountDownLatch(1);
        action.accept(closeLatch);
        this.awaitLatch(closeLatch);
        Thread.sleep(2000L);
        this.vertices[2].runOnContext(v -> {
            EventBus ebSender = this.vertices[2].eventBus();
            for (int i = 0; i < 10; ++i) {
                ebSender.send("some-address1", (Object)("foo" + i));
            }
        });
        this.await();
    }

    @Test
    public void sendNoContext() throws Exception {
        int size = 1000;
        ConcurrentLinkedDeque<Integer> expected = new ConcurrentLinkedDeque<Integer>();
        ConcurrentLinkedDeque obtained = new ConcurrentLinkedDeque();
        this.startNodes(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[1].eventBus().consumer("some-address1", msg -> {
            obtained.add(msg.body());
            if (obtained.size() == expected.size()) {
                this.assertEquals(new ArrayList(expected), new ArrayList(obtained));
                this.testComplete();
            }
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            latch.countDown();
        });
        latch.await();
        EventBus bus = this.vertices[0].eventBus();
        for (int i = 0; i < size; ++i) {
            expected.add(i);
            bus.send("some-address1", (Object)i);
        }
        this.await();
    }

    @Test
    public void testSendLocalOnly() {
        this.testDeliveryOptionsLocalOnly(true);
    }

    @Test
    public void testPublishLocalOnly() {
        this.testDeliveryOptionsLocalOnly(false);
    }

    private void testDeliveryOptionsLocalOnly(boolean send) {
        this.waitFor(30);
        this.startNodes(2);
        AtomicLong localConsumer0 = new AtomicLong();
        this.vertices[0].eventBus().localConsumer("some-address1").handler(msg -> {
            localConsumer0.incrementAndGet();
            this.complete();
        });
        AtomicLong consumer1 = new AtomicLong();
        this.vertices[1].eventBus().consumer("some-address1").handler(msg -> consumer1.incrementAndGet()).completionHandler(this.onSuccess(v -> {
            for (int i = 0; i < 30; ++i) {
                if (send) {
                    this.vertices[0].eventBus().send("some-address1", (Object)"msg", new DeliveryOptions().setLocalOnly(true));
                    continue;
                }
                this.vertices[0].eventBus().publish("some-address1", (Object)"msg", new DeliveryOptions().setLocalOnly(true));
            }
        }));
        this.await();
        this.assertEquals(30L, localConsumer0.get());
        this.assertEquals(0L, consumer1.get());
    }

    @Test
    public void testLocalOnlyDoesNotApplyToReplies() {
        this.startNodes(2);
        this.vertices[1].eventBus().consumer("some-address1").handler(msg -> msg.reply((Object)"pong", new DeliveryOptions().setLocalOnly(true))).completionHandler(this.onSuccess(v -> this.vertices[0].eventBus().send("some-address1", (Object)"ping", new DeliveryOptions().setSendTimeout(500L), this.onSuccess(msg -> this.testComplete()))));
        this.await();
    }
}

