/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.eventbus.ClusteredEventBusTest;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.consul.ConsulCluster;
import io.vertx.spi.cluster.consul.ConsulClusterManager;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ConsulApClusteredEventBusTest
extends ClusteredEventBusTest {
    private static int port = 8500;

    @BeforeClass
    public static void startConsulCluster() {
        port = ConsulCluster.init();
    }

    @AfterClass
    public static void shutDownConsulCluster() {
        ConsulCluster.shutDown();
    }

    protected ClusterManager getClusterManager() {
        JsonObject options = new JsonObject().put("port", Integer.valueOf(port)).put("host", "localhost").put("preferConsistency", Boolean.valueOf(false));
        return new ConsulClusterManager(options);
    }

    @Test
    public void testSendWhileUnsubscribing() throws Exception {
        this.startNodes(2);
        final AtomicBoolean unregistered = new AtomicBoolean();
        AbstractVerticle sender = new AbstractVerticle(){

            public void start() throws Exception {
                ConsulApClusteredEventBusTest.this.sleep();
                this.getVertx().runOnContext(v -> this.sendMsg());
            }

            private void sendMsg() {
                if (!unregistered.get()) {
                    this.getVertx().eventBus().send("whatever", (Object)"marseille");
                    this.vertx.setTimer(1L, id -> this.sendMsg());
                } else {
                    this.getVertx().eventBus().send("whatever", (Object)"marseille", ar -> {
                        Throwable cause = ar.cause();
                        ConsulApClusteredEventBusTest.this.assertThat(cause, CoreMatchers.instanceOf(ReplyException.class));
                        ReplyException replyException = (ReplyException)cause;
                        ConsulApClusteredEventBusTest.this.assertEquals(ReplyFailure.NO_HANDLERS, replyException.failureType());
                        ConsulApClusteredEventBusTest.this.testComplete();
                    });
                }
            }
        };
        AbstractVerticle receiver = new AbstractVerticle(){
            boolean unregisterCalled;

            public void start(Future<Void> startFuture) throws Exception {
                EventBus eventBus = this.getVertx().eventBus();
                MessageConsumer consumer = eventBus.consumer("whatever");
                ConsulApClusteredEventBusTest.this.sleep();
                consumer.handler(m -> {
                    if (!this.unregisterCalled) {
                        consumer.unregister(v -> unregistered.set(true));
                        this.unregisterCalled = true;
                    }
                    m.reply((Object)"ok");
                }).completionHandler(startFuture);
            }
        };
        CountDownLatch deployLatch = new CountDownLatch(1);
        this.vertices[0].exceptionHandler(arg_0 -> ((ConsulApClusteredEventBusTest)this).fail(arg_0)).deployVerticle((Verticle)receiver, this.onSuccess(arg_0 -> this.lambda$testSendWhileUnsubscribing$1((Verticle)sender, deployLatch, arg_0)));
        this.awaitLatch(deployLatch);
        this.await();
        CountDownLatch closeLatch = new CountDownLatch(2);
        this.vertices[0].close(v -> closeLatch.countDown());
        this.vertices[1].close(v -> closeLatch.countDown());
        this.awaitLatch(closeLatch);
    }

    protected <T> void testPublish(final T val, final Consumer<T> consumer) {
        final int numNodes = 3;
        this.startNodes(numNodes);
        final AtomicInteger count = new AtomicInteger();
        final AtomicInteger registerCount = new AtomicInteger(0);
        class MyHandler
        implements Handler<Message<T>> {
            MyHandler() {
            }

            public void handle(Message<T> msg) {
                if (consumer == null) {
                    ConsulApClusteredEventBusTest.this.assertFalse(msg.isSend());
                    ConsulApClusteredEventBusTest.this.assertEquals(val, msg.body());
                } else {
                    consumer.accept(msg.body());
                }
                if (count.incrementAndGet() == numNodes - 1) {
                    ConsulApClusteredEventBusTest.this.testComplete();
                }
            }
        }
        MessageConsumer reg = this.vertices[2].eventBus().consumer("some-address1").handler((Handler)new MyHandler());
        class MyRegisterHandler
        implements Handler<AsyncResult<Void>> {
            MyRegisterHandler() {
            }

            public void handle(AsyncResult<Void> ar) {
                ConsulApClusteredEventBusTest.this.assertTrue(ar.succeeded());
                if (registerCount.incrementAndGet() == 2) {
                    ConsulApClusteredEventBusTest.this.vertices[0].eventBus().publish("some-address1", val);
                }
            }
        }
        reg.completionHandler((Handler)new MyRegisterHandler());
        reg = this.vertices[1].eventBus().consumer("some-address1").handler((Handler)new MyHandler());
        reg.completionHandler((Handler)new MyRegisterHandler());
        this.sleep();
        this.vertices[0].eventBus().publish("some-address1", val);
        this.await();
    }

    private void sleep() {
        try {
            Thread.sleep(1500L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private /* synthetic */ void lambda$testSendWhileUnsubscribing$1(Verticle sender, CountDownLatch deployLatch, String receiverId) {
        this.vertices[1].exceptionHandler(arg_0 -> ((ConsulApClusteredEventBusTest)this).fail(arg_0)).deployVerticle(sender, this.onSuccess(senderId -> deployLatch.countDown()));
    }
}

