/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.test.core;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.test.core.EventBusTestBase;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.junit.Test;

public class ClusteredEventBusTest
extends EventBusTestBase {
    private static final String ADDRESS1 = "some-address1";

    @Override
    protected ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    @Override
    protected <T, R> void testSend(T val, R received, Consumer<T> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            if (consumer == null) {
                this.assertEquals(received, msg.body());
                if (options != null) {
                    this.assertNotNull(msg.headers());
                    int numHeaders = options.getHeaders() != null ? options.getHeaders().size() : 0;
                    this.assertEquals(numHeaders, msg.headers().size());
                    if (numHeaders != 0) {
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(msg.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                }
            } else {
                consumer.accept(msg.body());
            }
            this.testComplete();
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            if (options == null) {
                this.vertices[0].eventBus().send(ADDRESS1, val);
            } else {
                this.vertices[0].eventBus().send(ADDRESS1, val, options);
            }
        });
        this.await();
    }

    @Override
    protected <T> void testSend(T val, Consumer<T> consumer) {
        this.testSend(val, val, consumer, null);
    }

    @Override
    protected <T> void testReply(T val, Consumer<T> consumer) {
        this.testReply(val, val, consumer, null);
    }

    @Override
    protected <T, R> void testReply(T val, R received, Consumer<R> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        String str = TestUtils.randomUnicodeString(1000);
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            if (options == null) {
                msg.reply(val);
            } else {
                msg.reply(val, options);
            }
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[0].eventBus().send(ADDRESS1, (Object)str, this.onSuccess(reply -> {
                if (consumer == null) {
                    this.assertEquals(received, reply.body());
                    if (options != null && options.getHeaders() != null) {
                        this.assertNotNull(reply.headers());
                        this.assertEquals(options.getHeaders().size(), reply.headers().size());
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(reply.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                } else {
                    consumer.accept(reply.body());
                }
                this.testComplete();
            }));
        });
        this.await();
    }

    @Test
    public void testRegisterRemote1() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterRemote2() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Override
    protected <T> void testPublish(final T val, final Consumer<T> consumer) {
        final int numNodes = 3;
        this.startNodes(numNodes);
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger registerCount = new AtomicInteger(0);
        class MyHandler
        implements Handler<Message<T>> {
            MyHandler() {
            }

            public void handle(Message<T> msg) {
                if (consumer == null) {
                    ClusteredEventBusTest.this.assertEquals(val, msg.body());
                } else {
                    consumer.accept(msg.body());
                }
                if (count.incrementAndGet() == numNodes - 1) {
                    ClusteredEventBusTest.this.testComplete();
                }
            }
        }
        MessageConsumer reg = this.vertices[2].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        class MyRegisterHandler
        implements Handler<AsyncResult<Void>> {
            MyRegisterHandler() {
            }

            public void handle(AsyncResult<Void> ar) {
                ClusteredEventBusTest.this.assertTrue(ar.succeeded());
                if (registerCount.incrementAndGet() == 2) {
                    ClusteredEventBusTest.this.vertices[0].eventBus().publish(ClusteredEventBusTest.ADDRESS1, val);
                }
            }
        }
        reg.completionHandler((Handler)new MyRegisterHandler());
        reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        reg.completionHandler((Handler)new MyRegisterHandler());
        this.vertices[0].eventBus().publish(ADDRESS1, val);
        this.await();
    }

    @Test
    public void testLocalHandlerNotReceive() throws Exception {
        this.startNodes(2);
        this.vertices[1].eventBus().localConsumer(ADDRESS1).handler(msg -> this.fail("Should not receive message"));
        this.vertices[0].eventBus().send(ADDRESS1, (Object)"foo");
        this.vertices[0].setTimer(1000L, id -> this.testComplete());
        this.await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec((MessageCodec)codec);
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, new DeliveryOptions().setCodecName(codec.name()));
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testSend(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 codec = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        this.testReply(new EventBusTestBase.MyPOJO(str), str, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testSend(pojo, pojo, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        this.startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 codec = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, (MessageCodec)codec);
        String str = TestUtils.randomAlphaString(100);
        EventBusTestBase.MyPOJO pojo = new EventBusTestBase.MyPOJO(str);
        this.testReply(pojo, pojo, null, null);
    }

    @Test
    public void testClusteredPong() throws Exception {
        this.startNodes(2, new VertxOptions().setClusterPingInterval(500L).setClusterPingReplyInterval(500L));
        AtomicBoolean sending = new AtomicBoolean();
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foobar").handler(msg -> {
            if (!sending.get()) {
                sending.set(true);
                this.vertx.setTimer(4000L, id -> this.vertices[1].eventBus().send("foobar", (Object)"whatever2"));
            } else {
                this.testComplete();
            }
        });
        consumer.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send("foobar", (Object)"whatever");
        });
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously1() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer(ADDRESS1);
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        consumer.handler(msg -> {});
        this.await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        this.startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer(ADDRESS1);
        consumer.handler(msg -> {});
        ThreadLocal<Boolean> stack = new ThreadLocal<Boolean>();
        stack.set(true);
        consumer.completionHandler(v -> {
            this.assertTrue(Vertx.currentContext().isEventLoopContext());
            this.assertNull(stack.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testSubsRemovedForClosedNode() throws Exception {
        this.testSubsRemoved(latch -> this.vertices[1].close(this.onSuccess(v -> latch.countDown())));
    }

    @Test
    public void testSubsRemovedForKilledNode() throws Exception {
        this.testSubsRemoved(latch -> {
            VertxInternal vi = (VertxInternal)this.vertices[1];
            vi.getClusterManager().leave(this.onSuccess(v -> latch.countDown()));
        });
    }

    private void testSubsRemoved(Consumer<CountDownLatch> action) throws Exception {
        this.startNodes(3);
        CountDownLatch regLatch = new CountDownLatch(1);
        AtomicInteger cnt = new AtomicInteger();
        this.vertices[0].eventBus().consumer(ADDRESS1, msg -> {
            int c = cnt.getAndIncrement();
            this.assertEquals(msg.body(), "foo" + c);
            if (c == 9) {
                this.testComplete();
            }
            if (c > 9) {
                this.fail("too many messages");
            }
        }).completionHandler(this.onSuccess(v -> this.vertices[1].eventBus().consumer(ADDRESS1, msg -> this.fail("shouldn't get message")).completionHandler(this.onSuccess(v2 -> regLatch.countDown()))));
        this.awaitLatch(regLatch);
        CountDownLatch closeLatch = new CountDownLatch(1);
        action.accept(closeLatch);
        this.awaitLatch(closeLatch);
        Thread.sleep(2000L);
        this.vertices[2].runOnContext(v -> {
            EventBus ebSender = this.vertices[2].eventBus();
            for (int i = 0; i < 10; ++i) {
                ebSender.send(ADDRESS1, (Object)("foo" + i));
            }
        });
        this.await();
    }

    @Test
    public void sendNoContext() throws Exception {
        int size = 1000;
        ConcurrentLinkedDeque<Integer> expected = new ConcurrentLinkedDeque<Integer>();
        ConcurrentLinkedDeque obtained = new ConcurrentLinkedDeque();
        this.startNodes(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[1].eventBus().consumer(ADDRESS1, msg -> {
            obtained.add(msg.body());
            if (obtained.size() == expected.size()) {
                this.assertEquals(new ArrayList(expected), new ArrayList(obtained));
                this.testComplete();
            }
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            latch.countDown();
        });
        latch.await();
        EventBus bus = this.vertices[0].eventBus();
        for (int i = 0; i < size; ++i) {
            expected.add(i);
            bus.send(ADDRESS1, (Object)i);
        }
        this.await();
    }
}

