/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBusTestBase;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.core.spi.cluster.WrappedClusterManager;
import io.vertx.core.spi.cluster.WrappedNodeSelector;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Test;

public class ClusteredEventBusTestBase
extends EventBusTestBase {
    protected static final String ADDRESS1 = "some-address1";

    @Override
    protected ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    @Override
    protected <T, R> void testSend(T val, R received, Consumer<T> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            if (consumer == null) {
                this.assertTrue(msg.isSend());
                this.assertEquals(received, msg.body());
                if (options != null) {
                    this.assertNotNull(msg.headers());
                    int numHeaders = options.getHeaders() != null ? options.getHeaders().size() : 0;
                    this.assertEquals(numHeaders, msg.headers().size());
                    if (numHeaders != 0) {
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(msg.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                }
            } else {
                consumer.accept(msg.body());
            }
            this.testComplete();
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            if (options == null) {
                this.vertices[0].eventBus().send(ADDRESS1, val);
            } else {
                this.vertices[0].eventBus().send(ADDRESS1, val, options);
            }
        });
        this.await();
    }

    @Override
    protected <T> void testSend(T val, Consumer<T> consumer) {
        this.testSend(val, val, consumer, null);
    }

    @Override
    protected <T> void testReply(T val, Consumer<T> consumer) {
        this.testReply(val, val, consumer, null);
    }

    @Override
    protected <T, R> void testReply(T val, R received, Consumer<R> consumer, DeliveryOptions options) {
        if (this.vertices == null) {
            this.startNodes(2);
        }
        String str = TestUtils.randomUnicodeString(1000);
        MessageConsumer reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            if (options == null) {
                msg.reply(val);
            } else {
                msg.reply(val, options);
            }
        });
        reg.completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[0].eventBus().request(ADDRESS1, (Object)str, this.onSuccess(reply -> {
                if (consumer == null) {
                    this.assertTrue(reply.isSend());
                    this.assertEquals(received, reply.body());
                    if (options != null && options.getHeaders() != null) {
                        this.assertNotNull(reply.headers());
                        this.assertEquals(options.getHeaders().size(), reply.headers().size());
                        for (Map.Entry entry : options.getHeaders().entries()) {
                            this.assertEquals(reply.headers().get((String)entry.getKey()), entry.getValue());
                        }
                    }
                } else {
                    consumer.accept(reply.body());
                }
                this.testComplete();
            }));
        });
        this.await();
    }

    @Test
    public void testRegisterRemote1() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterRemote2() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completionHandler(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Override
    protected <T> void testPublish(final T val, final Consumer<T> consumer) {
        final int numNodes = 3;
        this.startNodes(numNodes);
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger registerCount = new AtomicInteger(0);
        class MyHandler
        implements Handler<Message<T>> {
            MyHandler() {
            }

            public void handle(Message<T> msg) {
                if (consumer == null) {
                    ClusteredEventBusTestBase.this.assertFalse(msg.isSend());
                    ClusteredEventBusTestBase.this.assertEquals(val, msg.body());
                } else {
                    consumer.accept(msg.body());
                }
                if (count.incrementAndGet() == numNodes - 1) {
                    ClusteredEventBusTestBase.this.testComplete();
                }
            }
        }
        MessageConsumer reg = this.vertices[2].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        class MyRegisterHandler
        implements Handler<AsyncResult<Void>> {
            MyRegisterHandler() {
            }

            public void handle(AsyncResult<Void> ar) {
                ClusteredEventBusTestBase.this.assertTrue(ar.succeeded());
                if (registerCount.incrementAndGet() == 2) {
                    ClusteredEventBusTestBase.this.vertices[0].eventBus().publish(ClusteredEventBusTestBase.ADDRESS1, val);
                }
            }
        }
        reg.completionHandler((Handler)new MyRegisterHandler());
        reg = this.vertices[1].eventBus().consumer(ADDRESS1).handler((Handler)new MyHandler());
        reg.completionHandler((Handler)new MyRegisterHandler());
        this.await();
    }

    @Test
    public void testMessageBodyInterceptor() throws Exception {
        String content = TestUtils.randomUnicodeString(13);
        this.startNodes(2);
        this.waitFor(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[0].eventBus().registerCodec((MessageCodec)new EventBusTestBase.StringLengthCodec()).consumer("whatever", msg -> {
            this.assertEquals(content.length(), ((Integer)msg.body()).intValue());
            this.complete();
        }).completionHandler(ar -> latch.countDown());
        this.awaitLatch(latch);
        EventBusTestBase.StringLengthCodec codec = new EventBusTestBase.StringLengthCodec();
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec).addOutboundInterceptor(sc -> {
            if ("whatever".equals(sc.message().address())) {
                this.assertEquals(content, sc.body());
                this.complete();
            }
            sc.next();
        }).send("whatever", (Object)content, new DeliveryOptions().setCodecName(codec.name()));
        this.await();
    }

    @Test
    public void testClusteredUnregistration() throws Exception {
        final CountDownLatch updateLatch = new CountDownLatch(3);
        Supplier<VertxOptions> options = () -> this.getOptions().setClusterManager((ClusterManager)new WrappedClusterManager(this.getClusterManager()){

            @Override
            public void init(Vertx vertx, NodeSelector nodeSelector) {
                super.init(vertx, new WrappedNodeSelector(nodeSelector){

                    @Override
                    public void registrationsUpdated(RegistrationUpdateEvent event) {
                        super.registrationsUpdated(event);
                        if (event.address().equals("foo") && event.registrations().isEmpty()) {
                            updateLatch.countDown();
                        }
                    }
                });
            }
        });
        this.startNodes(options.get(), options.get());
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foo", msg -> msg.reply(msg.body()));
        consumer.completionHandler(this.onSuccess(reg -> this.vertices[0].eventBus().request("foo", (Object)"echo", this.onSuccess(reply1 -> {
            this.assertEquals("echo", reply1.body());
            this.vertices[1].eventBus().request("foo", (Object)"echo", this.onSuccess(reply2 -> {
                this.assertEquals("echo", reply1.body());
                consumer.unregister(this.onSuccess(unreg -> updateLatch.countDown()));
            }));
        }))));
        this.awaitLatch(updateLatch);
        this.vertices[1].eventBus().request("foo", (Object)"echo", this.onFailure(fail1 -> {
            this.assertThat(fail1, CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ReplyException.class)));
            this.assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException)fail1).failureType());
            this.vertices[0].eventBus().request("foo", (Object)"echo", this.onFailure(fail2 -> {
                this.assertThat(fail2, CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ReplyException.class)));
                this.assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException)fail2).failureType());
                this.testComplete();
            }));
        }));
        this.await();
    }
}

