/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Deployable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import io.vertx.tests.eventbus.EventBusTestBase;
import io.vertx.tests.eventbus.WrappedClusterManager;
import io.vertx.tests.shareddata.AsyncMapTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Test;

public class ClusteredEventBusTestBase
extends EventBusTestBase {
    protected static final String ADDRESS1 = "some-address1";

    @Override
    protected ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    @Override
    protected Future<Vertx> clusteredVertx(VertxOptions options, ClusterManager clusterManager) {
        Future<Vertx> fut = super.clusteredVertx(options, clusterManager);
        return fut.onSuccess(vertx -> {
            EventBusTestBase.ImmutableObjectCodec immutableObjectCodec = new EventBusTestBase.ImmutableObjectCodec();
            vertx.eventBus().registerCodec((MessageCodec)immutableObjectCodec);
            vertx.eventBus().codecSelector(obj -> obj instanceof EventBusTestBase.ImmutableObject ? immutableObjectCodec.name() : null);
            vertx.eventBus().clusterSerializableChecker(className -> className.startsWith(AsyncMapTest.class.getName()));
            vertx.eventBus().serializableChecker(className -> (Boolean)EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(className) != false || className.startsWith(AsyncMapTest.class.getName()));
        });
    }

    @Override
    protected boolean shouldImmutableObjectBeCopied() {
        return true;
    }

    @Override
    protected Vertx[] vertices(int num) {
        if (this.vertices == null) {
            this.startNodes(num);
        }
        Vertx[] instances = new Vertx[num];
        for (int i = 0; i < num; ++i) {
            instances[i] = this.vertices[i];
        }
        return instances;
    }

    @Test
    public void testRegisterRemote1() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completion().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Test
    public void testRegisterRemote2() {
        this.startNodes(2);
        String str = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, msg -> {
            this.assertEquals(str, msg.body());
            this.testComplete();
        }).completion().onComplete(ar -> {
            this.assertTrue(ar.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, (Object)str);
        });
        this.await();
    }

    @Test
    public void testMessageBodyInterceptor() throws Exception {
        String content = TestUtils.randomUnicodeString(13);
        this.startNodes(2);
        this.waitFor(2);
        CountDownLatch latch = new CountDownLatch(1);
        this.vertices[0].eventBus().registerCodec((MessageCodec)new EventBusTestBase.StringLengthCodec()).consumer("whatever", msg -> {
            this.assertEquals(content.length(), ((Integer)msg.body()).intValue());
            this.complete();
        }).completion().onComplete(ar -> latch.countDown());
        this.awaitLatch(latch);
        EventBusTestBase.StringLengthCodec codec = new EventBusTestBase.StringLengthCodec();
        this.vertices[1].eventBus().registerCodec((MessageCodec)codec).addOutboundInterceptor(sc -> {
            if ("whatever".equals(sc.message().address())) {
                this.assertEquals(content, sc.body());
                this.complete();
            }
            sc.next();
        }).send("whatever", (Object)content, new DeliveryOptions().setCodecName(codec.name()));
        this.await();
    }

    @Test
    public void testClusteredUnregistration() throws Exception {
        final CountDownLatch updateLatch = new CountDownLatch(3);
        this.startNodes(2, () -> new WrappedClusterManager(this.getClusterManager()){

            @Override
            public void registrationsUpdated(RegistrationUpdateEvent event) {
                super.registrationsUpdated(event);
                if (event.address().equals("foo") && event.registrations().isEmpty()) {
                    updateLatch.countDown();
                }
            }
        });
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foo", msg -> msg.reply(msg.body()));
        consumer.completion().onComplete(this.onSuccess(reg -> this.vertices[0].eventBus().request("foo", (Object)"echo").onComplete(this.onSuccess(reply1 -> {
            this.assertEquals("echo", reply1.body());
            this.vertices[1].eventBus().request("foo", (Object)"echo").onComplete(this.onSuccess(reply2 -> {
                this.assertEquals("echo", reply1.body());
                consumer.unregister().onComplete(this.onSuccess(unreg -> updateLatch.countDown()));
            }));
        }))));
        this.awaitLatch(updateLatch);
        this.vertices[1].eventBus().request("foo", (Object)"echo").onComplete(this.onFailure(fail1 -> {
            this.assertThat(fail1, CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ReplyException.class)));
            this.assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException)fail1).failureType());
            this.vertices[0].eventBus().request("foo", (Object)"echo").onComplete(this.onFailure(fail2 -> {
                this.assertThat(fail2, CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ReplyException.class)));
                this.assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException)fail2).failureType());
                this.testComplete();
            }));
        }));
        this.await();
    }

    @Test
    public void testMessagingInStopMethod() throws Exception {
        this.startNodes(2);
        final AtomicInteger count = new AtomicInteger();
        this.waitFor(2);
        class MyVerticle
        extends AbstractVerticle {
            final String pingServerAddress;
            final String pingClientAddress;

            MyVerticle(String pingServerAddress, String pingClientAddress) {
                this.pingServerAddress = pingServerAddress;
                this.pingClientAddress = pingClientAddress;
            }

            public void start(Promise<Void> startPromise) throws Exception {
                this.vertx.eventBus().consumer(this.pingServerAddress, msg -> msg.reply((Object)"pong")).completion().onComplete(startPromise);
            }

            public void stop(Promise<Void> stopPromise) throws Exception {
                this.vertx.eventBus().request(this.pingClientAddress, (Object)"ping").onComplete(ClusteredEventBusTestBase.this.onSuccess(msg -> {
                    ClusteredEventBusTestBase.this.assertEquals("pong", msg.body());
                    count.incrementAndGet();
                    this.vertx.setPeriodic(10L, l -> {
                        if (count.get() == 2) {
                            stopPromise.complete();
                        }
                    });
                }));
            }
        }
        this.vertices[0].deployVerticle((Deployable)new MyVerticle("foo", "bar")).onComplete(this.onSuccess(id1 -> this.vertices[1].deployVerticle((Deployable)new MyVerticle("bar", "foo")).onComplete(this.onSuccess(id2 -> {
            this.vertices[0].close().onComplete(this.onSuccess(v -> this.complete()));
            this.vertices[1].close().onComplete(this.onSuccess(v -> this.complete()));
        }))));
        this.await();
    }
}

