/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.test.core;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocket;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.metrics.impl.DummyVertxMetrics;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.DatagramSocketMetrics;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.test.core.AsyncTestBase;
import io.vertx.test.core.ConfigurableMetricsFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.Ignore;
import org.junit.Test;

public class MetricsContextTest
extends AsyncTestBase {
    private Function<Vertx, Context> eventLoopContextFactory = Vertx::getOrCreateContext;
    private BiConsumer<Thread, Context> eventLoopChecker = (thread, ctx) -> {
        this.assertSame(Vertx.currentContext(), ctx);
        this.assertSame(Thread.currentThread(), thread);
    };
    private Function<Vertx, Context> workerContextFactory = vertx -> {
        final AtomicReference ctx = new AtomicReference();
        vertx.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                ctx.set(this.context);
                super.start();
            }
        }, new DeploymentOptions().setWorker(true));
        this.waitUntil(() -> ctx.get() != null);
        return (Context)ctx.get();
    };
    private BiConsumer<Thread, Context> workerChecker = (thread, ctx) -> {
        this.assertSame(Vertx.currentContext(), ctx);
        this.assertTrue(Context.isOnWorkerThread());
    };

    @Test
    public void testFactory() throws Exception {
        AtomicReference metricsThread = new AtomicReference();
        AtomicReference metricsContext = new AtomicReference();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> {
            metricsThread.set(Thread.currentThread());
            metricsContext.set(Vertx.currentContext());
            return new DummyVertxMetrics();
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        this.assertSame(Thread.currentThread(), metricsThread.get());
        this.assertNull(metricsContext.get());
    }

    @Test
    public void testFactoryInCluster() throws Exception {
        AtomicReference metricsThread = new AtomicReference();
        AtomicReference metricsContext = new AtomicReference();
        Thread testThread = Thread.currentThread();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> {
            metricsThread.set(Thread.currentThread());
            metricsContext.set(Vertx.currentContext());
            return new DummyVertxMetrics();
        };
        Vertx.clusteredVertx((VertxOptions)new VertxOptions().setClustered(true).setMetricsOptions(new MetricsOptions().setEnabled(true)), this.onSuccess(vertx -> {
            this.assertSame(testThread, metricsThread.get());
            this.assertNull(metricsContext.get());
            this.testComplete();
        }));
        this.await();
    }

    @Test
    public void testHttpServerRequestEventLoop() throws Exception {
        this.testHttpServerRequest(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testHttpServerRequestWorker() throws Exception {
        this.testHttpServerRequest(this.workerContextFactory, this.workerChecker);
    }

    private void testHttpServerRequest(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean requestBeginCalled = new AtomicBoolean();
        final AtomicBoolean responseEndCalled = new AtomicBoolean();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public HttpServerMetrics createMetrics(HttpServer server, SocketAddress localAddress, HttpServerOptions options) {
                return new DummyVertxMetrics.DummyHttpServerMetrics(){

                    public Void requestBegin(Void socketMetric, HttpServerRequest request) {
                        requestBeginCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void responseEnd(Void requestMetric, HttpServerResponse response) {
                        responseEndCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public boolean isEnabled() {
                        return true;
                    }

                    public void close() {
                        closeCalled.set(true);
                    }
                };
            }
        };
        CountDownLatch latch = new CountDownLatch(1);
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            HttpServer server = vertx2.createHttpServer().requestHandler(req -> {
                HttpServerResponse response = req.response();
                response.setStatusCode(200).setChunked(true).write("bye").end();
                response.close();
            });
            server.listen(8080, "localhost", this.onSuccess(s -> {
                expectedThread.set(Thread.currentThread());
                expectedContext.set(Vertx.currentContext());
                latch.countDown();
            }));
        });
        this.awaitLatch(latch);
        HttpClient client = vertx2.createHttpClient();
        client.put(8080, "localhost", "/", resp -> resp.netSocket().closeHandler(v -> vertx2.close(v4 -> {
            this.assertTrue(requestBeginCalled.get());
            this.assertTrue(responseEndCalled.get());
            this.assertTrue(bytesReadCalled.get());
            this.assertTrue(bytesWrittenCalled.get());
            this.assertTrue(socketConnectedCalled.get());
            this.assertTrue(socketDisconnectedCalled.get());
            this.assertTrue(closeCalled.get());
            this.testComplete();
        }))).exceptionHandler(err -> this.fail(err.getMessage())).setChunked(true).write(Buffer.buffer((String)"hello")).end();
        this.await();
    }

    @Test
    public void testHttpServerWebsocketEventLoop() throws Exception {
        this.testHttpServerWebsocket(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testHttpServerWebsocketWorker() throws Exception {
        this.testHttpServerWebsocket(this.workerContextFactory, this.workerChecker);
    }

    private void testHttpServerWebsocket(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean websocketConnected = new AtomicBoolean();
        final AtomicBoolean websocketDisconnected = new AtomicBoolean();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public HttpServerMetrics createMetrics(HttpServer server, SocketAddress localAddress, HttpServerOptions options) {
                return new DummyVertxMetrics.DummyHttpServerMetrics(){

                    public Void connected(Void socketMetric, ServerWebSocket serverWebSocket) {
                        websocketConnected.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void serverWebSocketMetric) {
                        websocketDisconnected.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public boolean isEnabled() {
                        return true;
                    }

                    public void close() {
                        closeCalled.set(true);
                    }
                };
            }
        };
        CountDownLatch latch = new CountDownLatch(1);
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            HttpServer server = vertx2.createHttpServer().websocketHandler(ws -> ws.handler(buf -> ws.write(Buffer.buffer((String)"bye"))));
            server.listen(8080, "localhost", this.onSuccess(s -> {
                expectedThread.set(Thread.currentThread());
                expectedContext.set(Vertx.currentContext());
                latch.countDown();
            }));
        });
        this.awaitLatch(latch);
        HttpClient client = vertx2.createHttpClient();
        client.websocket(8080, "localhost", "/", ws -> {
            ws.handler(buf -> {
                ws.closeHandler(v -> vertx2.close(v4 -> {
                    this.assertTrue(websocketConnected.get());
                    this.assertTrue(websocketDisconnected.get());
                    this.assertTrue(bytesReadCalled.get());
                    this.assertTrue(bytesWrittenCalled.get());
                    this.assertTrue(socketConnectedCalled.get());
                    this.assertTrue(socketDisconnectedCalled.get());
                    this.assertTrue(closeCalled.get());
                    this.testComplete();
                }));
                ws.close();
            });
            ws.write(Buffer.buffer((String)"hello"));
        });
        this.await();
    }

    @Test
    public void testHttpClientRequestEventLoop() throws Exception {
        this.testHttpClientRequest(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testHttpClientRequestWorker() throws Exception {
        this.testHttpClientRequest(this.workerContextFactory, this.workerChecker);
    }

    private void testHttpClientRequest(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean requestBeginCalled = new AtomicBoolean();
        final AtomicBoolean responseEndCalled = new AtomicBoolean();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public HttpClientMetrics createMetrics(HttpClient client, HttpClientOptions options) {
                return new DummyVertxMetrics.DummyHttpClientMetrics(){

                    public Void requestBegin(Void socketMetric, SocketAddress localAddress, SocketAddress remoteAddress, HttpClientRequest request) {
                        requestBeginCalled.set(true);
                        return null;
                    }

                    public void responseEnd(Void requestMetric, HttpClientResponse response) {
                        responseEndCalled.set(true);
                    }

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void close() {
                        closeCalled.set(true);
                    }

                    public boolean isEnabled() {
                        return true;
                    }
                };
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        HttpServer server = vertx2.createHttpServer();
        server.requestHandler(req -> req.endHandler(buf -> {
            HttpServerResponse resp = req.response();
            resp.setChunked(true).write(Buffer.buffer((String)"bye")).end();
            resp.close();
        }));
        CountDownLatch latch = new CountDownLatch(1);
        server.listen(8080, "localhost", this.onSuccess(s -> latch.countDown()));
        this.awaitLatch(latch);
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            expectedThread.set(Thread.currentThread());
            expectedContext.set(Vertx.currentContext());
            HttpClient client = vertx2.createHttpClient();
            checker.accept((Thread)expectedThread.get(), (Context)expectedContext.get());
            HttpClientRequest req = client.put(8080, "localhost", "/");
            req.handler(resp -> this.executeInVanillaThread(() -> {
                client.close();
                vertx2.close(v2 -> {
                    this.assertTrue(requestBeginCalled.get());
                    this.assertTrue(responseEndCalled.get());
                    this.assertTrue(socketConnectedCalled.get());
                    this.assertTrue(socketDisconnectedCalled.get());
                    this.assertTrue(bytesReadCalled.get());
                    this.assertTrue(bytesWrittenCalled.get());
                    this.assertTrue(closeCalled.get());
                    this.testComplete();
                });
            }));
            req.setChunked(true).write("hello");
            req.end();
        });
        this.await();
    }

    @Test
    public void testHttpClientWebsocketEventLoop() throws Exception {
        this.testHttpClientWebsocket(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    @Ignore
    public void testHttpClientWebsocketWorker() throws Exception {
        this.testHttpClientWebsocket(this.workerContextFactory, this.workerChecker);
    }

    private void testHttpClientWebsocket(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean websocketConnected = new AtomicBoolean();
        final AtomicBoolean websocketDisconnected = new AtomicBoolean();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public HttpClientMetrics createMetrics(HttpClient client, HttpClientOptions options) {
                return new DummyVertxMetrics.DummyHttpClientMetrics(){

                    public Void connected(Void socketMetric, WebSocket webSocket) {
                        websocketConnected.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void webSocketMetric) {
                        websocketDisconnected.set(true);
                    }

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void close() {
                        closeCalled.set(true);
                    }

                    public boolean isEnabled() {
                        return true;
                    }
                };
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        HttpServer server = vertx2.createHttpServer();
        server.websocketHandler(ws -> ws.handler(buf -> ws.write(Buffer.buffer((String)"bye"))));
        CountDownLatch latch = new CountDownLatch(1);
        server.listen(8080, "localhost", this.onSuccess(s -> latch.countDown()));
        this.awaitLatch(latch);
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            expectedThread.set(Thread.currentThread());
            expectedContext.set(Vertx.currentContext());
            HttpClient client = vertx2.createHttpClient();
            checker.accept((Thread)expectedThread.get(), (Context)expectedContext.get());
            client.websocket(8080, "localhost", "/", ws -> {
                ws.handler(buf -> {
                    ws.closeHandler(v2 -> this.executeInVanillaThread(() -> {
                        client.close();
                        vertx2.close(v3 -> {
                            this.assertTrue(websocketConnected.get());
                            this.assertTrue(websocketDisconnected.get());
                            this.assertTrue(socketConnectedCalled.get());
                            this.assertTrue(socketDisconnectedCalled.get());
                            this.assertTrue(bytesReadCalled.get());
                            this.assertTrue(bytesWrittenCalled.get());
                            this.assertTrue(closeCalled.get());
                            this.testComplete();
                        });
                    }));
                    ws.close();
                });
                ws.write(Buffer.buffer((String)"hello"));
            });
        });
        this.await();
    }

    @Test
    public void testNetServerEventLoop() throws Exception {
        this.testNetServer(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testNetServerWorker() throws Exception {
        this.testNetServer(this.workerContextFactory, this.workerChecker);
    }

    private void testNetServer(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public TCPMetrics createMetrics(NetServer server, SocketAddress localAddress, NetServerOptions options) {
                return new DummyVertxMetrics.DummyTCPMetrics(){

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public boolean isEnabled() {
                        return true;
                    }

                    public void close() {
                        closeCalled.set(true);
                    }
                };
            }
        };
        CountDownLatch latch = new CountDownLatch(1);
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            NetServer server = vertx2.createNetServer().connectHandler(so -> so.handler(buf -> so.write("bye")));
            server.listen(1234, "localhost", this.onSuccess(s -> {
                expectedThread.set(Thread.currentThread());
                expectedContext.set(Vertx.currentContext());
                checker.accept((Thread)expectedThread.get(), (Context)expectedContext.get());
                latch.countDown();
            }));
        });
        this.awaitLatch(latch);
        NetClient client = vertx2.createNetClient();
        client.connect(1234, "localhost", this.onSuccess(so -> {
            so.handler(buf -> {
                so.closeHandler(v -> this.executeInVanillaThread(() -> vertx2.close(v4 -> {
                    this.assertTrue(bytesReadCalled.get());
                    this.assertTrue(bytesWrittenCalled.get());
                    this.assertTrue(socketConnectedCalled.get());
                    this.assertTrue(socketDisconnectedCalled.get());
                    this.assertTrue(closeCalled.get());
                    this.testComplete();
                })));
                so.close();
            });
            so.write("hello");
        }));
        this.await();
    }

    @Test
    public void testNetClientEventLoop() throws Exception {
        this.testNetClient(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testNetClientWorker() throws Exception {
        this.testNetClient(this.workerContextFactory, this.workerChecker);
    }

    private void testNetClient(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) throws Exception {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean socketConnectedCalled = new AtomicBoolean();
        final AtomicBoolean socketDisconnectedCalled = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public TCPMetrics createMetrics(NetClient client, NetClientOptions options) {
                return new DummyVertxMetrics.DummyTCPMetrics(){

                    public Void connected(SocketAddress remoteAddress) {
                        socketConnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                        return null;
                    }

                    public void disconnected(Void socketMetric, SocketAddress remoteAddress) {
                        socketDisconnectedCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public boolean isEnabled() {
                        return true;
                    }

                    public void close() {
                        closeCalled.set(true);
                    }
                };
            }
        };
        CountDownLatch latch = new CountDownLatch(1);
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        Context ctx = contextFactory.apply(vertx2);
        NetServer server = vertx2.createNetServer().connectHandler(so -> so.handler(buf -> so.write("bye")));
        server.listen(1234, "localhost", this.onSuccess(s -> latch.countDown()));
        this.awaitLatch(latch);
        ctx.runOnContext(v1 -> {
            NetClient client = vertx2.createNetClient();
            expectedThread.set(Thread.currentThread());
            expectedContext.set(Vertx.currentContext());
            client.connect(1234, "localhost", this.onSuccess(so -> {
                so.handler(buf -> {
                    so.closeHandler(v -> {
                        this.assertTrue(bytesReadCalled.get());
                        this.assertTrue(bytesWrittenCalled.get());
                        this.assertTrue(socketConnectedCalled.get());
                        this.assertTrue(socketDisconnectedCalled.get());
                        this.executeInVanillaThread(() -> {
                            client.close();
                            vertx2.close(v4 -> {
                                this.assertTrue(closeCalled.get());
                                this.testComplete();
                            });
                        });
                    });
                    so.close();
                });
                so.write("hello");
            }));
        });
        this.await();
    }

    @Test
    public void testDatagramEventLoop() throws Exception {
        this.testDatagram(this.eventLoopContextFactory, this.eventLoopChecker);
    }

    @Test
    public void testDatagramWorker() throws Exception {
        this.testDatagram(this.workerContextFactory, this.workerChecker);
    }

    private void testDatagram(Function<Vertx, Context> contextFactory, final BiConsumer<Thread, Context> checker) {
        final AtomicReference expectedThread = new AtomicReference();
        final AtomicReference expectedContext = new AtomicReference();
        final AtomicBoolean listening = new AtomicBoolean();
        final AtomicBoolean bytesReadCalled = new AtomicBoolean();
        final AtomicBoolean bytesWrittenCalled = new AtomicBoolean();
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) {
                return new DummyVertxMetrics.DummyDatagramMetrics(){

                    public void listening(SocketAddress localAddress) {
                        listening.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesReadCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
                        bytesWrittenCalled.set(true);
                        checker.accept(expectedThread.get(), expectedContext.get());
                    }

                    public void close() {
                        closeCalled.set(true);
                    }

                    public boolean isEnabled() {
                        return true;
                    }
                };
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        Context ctx = contextFactory.apply(vertx2);
        ctx.runOnContext(v1 -> {
            expectedThread.set(Thread.currentThread());
            expectedContext.set(Vertx.currentContext());
            DatagramSocket socket = vertx2.createDatagramSocket();
            socket.listen(1234, "localhost", ar1 -> {
                this.assertTrue(ar1.succeeded());
                checker.accept((Thread)expectedThread.get(), (Context)expectedContext.get());
                socket.handler(packet -> {
                    this.assertTrue(listening.get());
                    this.assertTrue(bytesReadCalled.get());
                    this.assertTrue(bytesWrittenCalled.get());
                    this.executeInVanillaThread(() -> socket.close(ar2 -> {
                        this.assertTrue(closeCalled.get());
                        this.assertTrue(ar2.succeeded());
                        this.testComplete();
                    }));
                });
                socket.send(Buffer.buffer((String)"msg"), 1234, "localhost", ar2 -> this.assertTrue(ar2.succeeded()));
            });
        });
        this.await();
    }

    @Test
    public void testEventBusLifecycle() {
        final AtomicBoolean closeCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public EventBusMetrics createMetrics(EventBus eventBus) {
                return new DummyVertxMetrics.DummyEventBusMetrics(){

                    public boolean isEnabled() {
                        return true;
                    }

                    public void close() {
                        closeCalled.set(true);
                    }
                };
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        vertx2.eventBus();
        this.executeInVanillaThread(() -> vertx2.close(this.onSuccess(v -> {
            this.assertTrue(closeCalled.get());
            this.testComplete();
        })));
        this.await();
    }

    @Test
    public void testMessageHandler() {
        this.testMessageHandler((vertx, handler) -> handler.handle(null), this.eventLoopChecker);
    }

    @Test
    public void testMessageHandlerEventLoop() {
        this.testMessageHandler((vertx, handler) -> this.eventLoopContextFactory.apply((Vertx)vertx).runOnContext(handler), this.eventLoopChecker);
    }

    private void testMessageHandler(BiConsumer<Vertx, Handler<Void>> runOnContext, final BiConsumer<Thread, Context> checker) {
        final AtomicReference consumerThread = new AtomicReference();
        final AtomicReference consumerContext = new AtomicReference();
        final AtomicBoolean registeredCalled = new AtomicBoolean();
        final AtomicBoolean unregisteredCalled = new AtomicBoolean();
        final AtomicBoolean beginHandleCalled = new AtomicBoolean();
        final AtomicBoolean endHandleCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public EventBusMetrics createMetrics(EventBus eventBus) {
                return new DummyVertxMetrics.DummyEventBusMetrics(){

                    public boolean isEnabled() {
                        return true;
                    }

                    public Void handlerRegistered(String address, boolean replyHandler) {
                        registeredCalled.set(true);
                        return null;
                    }

                    public void handlerUnregistered(Void handler) {
                        unregisteredCalled.set(true);
                    }

                    public void beginHandleMessage(Void handler, boolean local) {
                        consumerThread.set(Thread.currentThread());
                        consumerContext.set(Vertx.currentContext());
                        beginHandleCalled.set(true);
                    }

                    public void endHandleMessage(Void handler, Throwable failure) {
                        endHandleCalled.set(true);
                        checker.accept(consumerThread.get(), consumerContext.get());
                    }
                };
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        EventBus eb = vertx2.eventBus();
        runOnContext.accept(vertx2, (Handler<Void>)((Handler)v -> {
            MessageConsumer consumer = eb.consumer("the_address");
            consumer.handler(msg -> {
                checker.accept((Thread)consumerThread.get(), (Context)consumerContext.get());
                this.executeInVanillaThread(() -> vertx2.getOrCreateContext().runOnContext(v2 -> consumer.unregister(this.onSuccess(v3 -> {
                    this.assertTrue(registeredCalled.get());
                    this.assertTrue(beginHandleCalled.get());
                    this.assertTrue(endHandleCalled.get());
                    this.waitUntil(() -> unregisteredCalled.get());
                    this.testComplete();
                }))));
            }).completionHandler(this.onSuccess(v2 -> eb.send("the_address", (Object)"the_msg")));
        }));
        this.await();
    }

    @Test
    public void testDeployEventLoop() {
        this.testDeploy(false, false, this.eventLoopChecker);
    }

    @Test
    public void testDeployWorker() {
        this.testDeploy(true, false, this.workerChecker);
    }

    @Test
    public void testDeployMultiThreadedWorker() {
        this.testDeploy(true, true, this.workerChecker);
    }

    private void testDeploy(boolean worker, boolean multiThreaded, final BiConsumer<Thread, Context> checker) {
        final AtomicReference verticleThread = new AtomicReference();
        final AtomicReference verticleContext = new AtomicReference();
        final AtomicBoolean deployedCalled = new AtomicBoolean();
        final AtomicBoolean undeployedCalled = new AtomicBoolean();
        ConfigurableMetricsFactory.delegate = (vertx, options) -> new DummyVertxMetrics(){

            public void verticleDeployed(Verticle verticle) {
                deployedCalled.set(true);
                checker.accept(verticleThread.get(), verticleContext.get());
            }

            public void verticleUndeployed(Verticle verticle) {
                undeployedCalled.set(true);
                checker.accept(verticleThread.get(), verticleContext.get());
            }
        };
        Vertx vertx2 = Vertx.vertx((VertxOptions)new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true)));
        vertx2.deployVerticle((Verticle)new AbstractVerticle(){

            public void start() throws Exception {
                verticleThread.set(Thread.currentThread());
                verticleContext.set(Vertx.currentContext());
            }
        }, new DeploymentOptions().setWorker(worker).setMultiThreaded(multiThreaded), ar1 -> {
            this.assertTrue(ar1.succeeded());
            vertx2.undeploy((String)ar1.result(), ar2 -> {
                this.assertTrue(ar1.succeeded());
                this.assertTrue(deployedCalled.get());
                this.assertTrue(undeployedCalled.get());
                this.testComplete();
            });
        });
        this.await();
    }

    private void executeInVanillaThread(Runnable task) {
        new Thread(task).start();
    }
}

