/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.test.core;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.test.core.EventBusTestBase;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class ClusteredEventBusTestBase
extends EventBusTestBase {
    protected static final String ADDRESS1 = "some-address1";

    @Override
    protected ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    @Override
    protected <T, R> void testSend(T val, R received, Consumer<T> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            if (consumer == null) {
                this.assertTrue(msg.isSend());
                this.assertEquals(received, msg.body());
                if (options != null) {
                    this.assertNotNull(msg.headers());
                    int numHeaders = options.getHeaders() != null ? options.getHeaders().size() : 0;
                    this.assertEquals(numHeaders, msg.headers().size());
                    if (numHeaders != 0) {
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(msg.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                }
            } else {
                consumer.accept(msg.body());
            }
            this.testComplete();
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            if (options == null) {
                this.vertices[0].eventBus().send(ADDRESS1, val);
            } else {
                this.vertices[0].eventBus().send(ADDRESS1, val, options);
            }
        });
        this.await();
    }

    @Override
    protected <T> void testSend(T val, Consumer<T> consumer) {
        this.testSend(val, val, consumer, null);
    }

    @Override
    protected <T> void testReply(T val, Consumer<T> consumer) {
        this.testReply(val, val, consumer, null);
    }

    @Override
    protected <T, R> void testReply(T val, R received, Consumer<R> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        String str = TestUtils.randomUnicodeString(1000);
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            if (options == null) {
                msg.reply(val);
            } else {
                msg.reply(val, options);
            }
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[0].eventBus().send(ADDRESS1, (Object)str, this.onSuccess(reply -> {
                if (consumer == null) {
                    this.assertTrue(reply.isSend());
                    this.assertEquals(received, reply.body());
                    if (options != null && options.getHeaders() != null) {
                        this.assertNotNull(reply.headers());
                        this.assertEquals(options.getHeaders().size(), reply.headers().size());
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(reply.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                } else {
                    consumer.accept(reply.body());
                }
                this.testComplete();
            }));
        });
        this.await();
    }

    @Test
    public void testRegisterRemote1() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterRemote2() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Override
    protected <T> void testPublish(final T val, final Consumer<T> consumer) {
        final int numNodes = 3;
        this.startNodes(numNodes);
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger registerCount = new AtomicInteger(0);
        class MyHandler
        implements Handler<Message<T>> {
            MyHandler() {
            }

            public void handle(Message<T> msg) {
                if (consumer == null) {
                    ClusteredEventBusTestBase.this.assertFalse(msg.isSend());
                    ClusteredEventBusTestBase.this.assertEquals(val, msg.body());
                } else {
                    consumer.accept(msg.body());
                }
                if (count.incrementAndGet() == numNodes - 1) {
                    ClusteredEventBusTestBase.this.testComplete();
                }
            }
        }
        MessageConsumer reg = this.vertices[2].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        class MyRegisterHandler
        implements Handler<AsyncResult<Void>> {
            MyRegisterHandler() {
            }

            public void handle(AsyncResult<Void> ar) {
                ClusteredEventBusTestBase.this.assertTrue(ar.succeeded());
                if (registerCount.incrementAndGet() == 2) {
                    ClusteredEventBusTestBase.this.vertices[0].eventBus().publish(ClusteredEventBusTestBase.ADDRESS1, val);
                }
            }
        }
        reg.completionHandler((Handler)new MyRegisterHandler());
        reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        reg.completionHandler((Handler)new MyRegisterHandler());
        this.vertices[0].eventBus().publish(ADDRESS1, val);
        this.await();
    }

    @Test
    public void testSendWhileUnsubscribing() throws Exception {
        this.startNodes(2);
        final AtomicBoolean unregistered = new AtomicBoolean();
        AbstractVerticle sender = new AbstractVerticle(){

            public void start() throws Exception {
                this.getVertx().runOnContext(v -> this.sendMsg());
            }

            private void sendMsg() {
                if (!unregistered.get()) {
                    this.getVertx().eventBus().send("whatever", (Object)"marseille");
                    this.vertx.setTimer(1L, id -> this.sendMsg());
                } else {
                    this.getVertx().eventBus().send("whatever", (Object)"marseille", ar -> {
                        Throwable cause = ar.cause();
                        ClusteredEventBusTestBase.this.assertThat(cause, CoreMatchers.instanceOf(ReplyException.class));
                        ReplyException replyException = (ReplyException)cause;
                        ClusteredEventBusTestBase.this.assertEquals(ReplyFailure.NO_HANDLERS, replyException.failureType());
                        ClusteredEventBusTestBase.this.testComplete();
                    });
                }
            }
        };
        AbstractVerticle receiver = new AbstractVerticle(){
            boolean unregisterCalled;

            public void start(Future<Void> startFuture) throws Exception {
                EventBus eventBus = this.getVertx().eventBus();
                MessageConsumer consumer = eventBus.consumer("whatever");
                consumer.handler(m -> {
                    if (!this.unregisterCalled) {
                        consumer.unregister(v -> unregistered.set(true));
                        this.unregisterCalled = true;
                    }
                    m.reply((Object)"ok");
                }).completionHandler(startFuture);
            }
        };
        CountDownLatch deployLatch = new CountDownLatch(1);
        this.vertices[0].exceptionHandler(this::fail).deployVerticle((Verticle)receiver, this.onSuccess(arg_0 -> this.lambda$testSendWhileUnsubscribing$10((Verticle)sender, deployLatch, arg_0)));
        this.awaitLatch(deployLatch);
        this.await();
        CountDownLatch closeLatch = new CountDownLatch(2);
        this.vertices[0].close(v -> closeLatch.countDown());
        this.vertices[1].close(v -> closeLatch.countDown());
        this.awaitLatch(closeLatch);
    }

    @Test
    public void testMessageBodyInterceptor() throws Exception {
        String content = TestUtils.randomUnicodeString(13);
        this.startNodes(2);
        this.waitFor(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[0].eventBus().registerCodec((MessageCodec)new EventBusTestBase.StringLengthCodec()).consumer("whatever", msg -> {
            this.assertEquals(content.length(), ((Integer)msg.body()).intValue());
            this.complete();
        }).completionHandler(ar -> latch.countDown());
        this.awaitLatch(latch);
        EventBusTestBase.StringLengthCodec codec = new EventBusTestBase.StringLengthCodec();
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec).addInterceptor(sc -> {
            if ("whatever".equals(sc.message().address())) {
                this.assertEquals(content, sc.sentBody());
                this.complete();
            }
            sc.next();
        }).send("whatever", (Object)content, new DeliveryOptions().setCodecName(codec.name()));
        this.await();
    }

    private /* synthetic */ void lambda$testSendWhileUnsubscribing$10(Verticle sender, CountDownLatch deployLatch, String receiverId) {
        this.vertices[1].exceptionHandler(this::fail).deployVerticle(sender, this.onSuccess(senderId -> deployLatch.countDown()));
    }
}

