/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.VertxConnection;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.transport.Transport;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;

public class VertxConnectionTest
extends VertxTestBase {
    private NetClient client;
    private NetServer server;
    private volatile Handler<NetSocketInternal> connectHandler;

    @Override
    public void before() throws Exception {
        super.before();
        this.client = this.vertx.createNetClient();
        this.server = this.vertx.createNetServer().connectHandler(so -> {
            Handler<NetSocketInternal> handler = this.connectHandler;
            if (handler != null) {
                handler.handle((Object)((NetSocketInternal)so));
            } else {
                so.close();
            }
        });
        this.awaitFuture(this.server.listen(1234, "localhost"));
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        this.client = null;
        this.server = null;
    }

    @Test
    public void testQueueMessagesMissMessage() throws Exception {
        this.disableThreadChecks();
        final CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        this.connectHandler = conn -> {
            ChannelHandlerContext chctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = chctx.pipeline();
            final ArrayList received = new ArrayList();
            pipeline.addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler(){
                int flushCount;

                public void write(ChannelHandlerContext chctx, Object msg, ChannelPromise promise) throws Exception {
                    if (msg instanceof String) {
                        received.add((String)msg);
                    } else {
                        super.write(chctx, msg, promise);
                    }
                }

                public void flush(ChannelHandlerContext chctx) throws Exception {
                    super.flush(chctx);
                    switch (++this.flushCount) {
                        case 1: {
                            VertxConnectionTest.this.assertEquals(List.of("msg-1"), received);
                            latch1.countDown();
                            VertxConnectionTest.this.awaitLatch(latch2);
                            break;
                        }
                        case 2: {
                            VertxConnectionTest.this.assertEquals(List.of("msg-1", "msg-2"), received);
                            VertxConnectionTest.this.testComplete();
                            break;
                        }
                    }
                }
            });
            this.executeAsyncTask(() -> {
                conn.writeMessage((Object)"msg-1");
                try {
                    this.awaitLatch(latch1);
                }
                catch (InterruptedException e) {
                    this.fail(e);
                }
                conn.writeMessage((Object)"msg-2");
                latch2.countDown();
            });
        };
        this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    @Test
    public void testQueueMessageFromInnerWrite() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            final ArrayList order = new ArrayList();
            pipeline.addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler((NetSocketInternal)conn){
                final /* synthetic */ NetSocketInternal val$conn;
                {
                    this.val$conn = netSocketInternal;
                }

                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    if (msg instanceof String) {
                        String s = (String)msg;
                        order.add(s);
                        if ("msg1".equals(s)) {
                            this.val$conn.writeMessage((Object)"msg3");
                        }
                        if (order.size() == 3) {
                            VertxConnectionTest.this.vertx.runOnContext(v -> {
                                VertxConnectionTest.this.assertEquals(Arrays.asList("msg1", "msg2", "msg3"), order);
                                VertxConnectionTest.this.testComplete();
                            });
                        }
                    } else {
                        super.write(ctx, msg, promise);
                    }
                }
            });
            this.executeAsyncTaskAndAwait(() -> {
                conn.writeMessage((Object)"msg1");
                conn.writeMessage((Object)"msg2");
            });
        };
        this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    @Test
    public void testQueueFlushFromEventLoop() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            final ArrayList order = new ArrayList();
            Runnable checkOrder = () -> this.vertx.runOnContext(v -> {
                this.assertEquals(Arrays.asList("msg1", "msg2", "flush"), order);
                this.testComplete();
            });
            pipeline.addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler((NetSocketInternal)conn, checkOrder){
                int flushes;
                final /* synthetic */ NetSocketInternal val$conn;
                final /* synthetic */ Runnable val$checkOrder;
                {
                    this.val$conn = netSocketInternal;
                    this.val$checkOrder = runnable;
                }

                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    if (msg instanceof String) {
                        String s = (String)msg;
                        order.add(s);
                        if ("msg1".equals(s)) {
                            ((VertxConnection)this.val$conn).flush();
                        }
                    } else {
                        super.write(ctx, msg, promise);
                    }
                }

                public void flush(ChannelHandlerContext ctx) throws Exception {
                    if (this.flushes++ < 1) {
                        order.add("flush");
                        if (this.flushes == 1) {
                            this.val$checkOrder.run();
                        }
                    }
                    super.flush(ctx);
                }
            });
            CountDownLatch latch = new CountDownLatch(1);
            this.executeAsyncTaskAndAwait(() -> {
                conn.writeMessage((Object)"msg1");
                conn.writeMessage((Object)"msg2");
                latch.countDown();
            });
            try {
                latch.await(20L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    @Test
    public void testOverflowDrain() throws Exception {
        BufferInternal chunk = BufferInternal.buffer((String)TestUtils.randomAlphaString(16384));
        CompletableFuture drain = new CompletableFuture();
        this.connectHandler = so -> {
            ContextInternal ctx = (ContextInternal)this.vertx.getOrCreateContext();
            VertxConnection conn = (VertxConnection)so;
            int num = 0;
            while (conn.writeToChannel((Object)chunk.getByteBuf())) {
                ++num;
            }
            Future.future(future -> conn.writeToChannel((Object)Unpooled.EMPTY_BUFFER, future)).onComplete(this.onSuccess(v1 -> ctx.emit(v2 -> this.testComplete())));
            drain.complete(null);
        };
        this.client.connect(1234, "localhost").onComplete(this.onSuccess(so -> {
            so.pause();
            drain.whenComplete((v, e) -> so.resume());
        }));
        this.await();
    }

    private void fill(NetSocketInternal so, BufferInternal buffer, Handler<Void> cont) {
        Runnable saturate = () -> {
            while (((VertxConnection)so).writeToChannel((Object)buffer.getByteBuf())) {
            }
        };
        long id = this.vertx.setTimer(1000L, id_ -> {
            saturate.run();
            cont.handle(null);
        });
        saturate.run();
        so.drainHandler(v -> {
            if (this.vertx.cancelTimer(id)) {
                this.fill(so, buffer, cont);
            } else {
                this.fail();
            }
        });
    }

    @Test
    public void testFailedQueueMessages() throws Exception {
        Assume.assumeFalse((TRANSPORT == Transport.IO_URING ? 1 : 0) != 0);
        BufferInternal buffer = BufferInternal.buffer((String)TestUtils.randomAlphaString(16384));
        CompletableFuture latch = new CompletableFuture();
        this.connectHandler = conn -> this.fill((NetSocketInternal)conn, buffer, (Handler<Void>)((Handler)v1 -> {
            Future fut = conn.write((Object)buffer);
            fut.onComplete(this.onFailure(v2 -> this.testComplete()));
            latch.complete(null);
        }));
        NetSocket so = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        so.pause();
        latch.whenComplete((v, err) -> so.close());
        this.await();
    }

    @Test
    public void testDrainReentrancy() throws Exception {
        this.connectHandler = so -> {
            ChannelHandlerContext chctx = so.channelHandlerContext();
            chctx.pipeline().addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler(){
                int reentrant;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    VertxConnectionTest.this.assertEquals(0L, this.reentrant++);
                    try {
                        switch (msg.toString()) {
                            case "msg1": {
                                VertxConnectionTest.this.assertTrue(ctx.channel().isWritable());
                                ChannelFuture f = ctx.write((Object)BufferInternal.buffer((String)TestUtils.randomAlphaString((int)ctx.channel().bytesBeforeUnwritable())).getByteBuf());
                                VertxConnectionTest.this.assertFalse(ctx.channel().isWritable());
                                ctx.flush();
                                VertxConnectionTest.this.assertTrue(ctx.channel().isWritable());
                                return;
                            }
                            case "msg2": {
                                VertxConnectionTest.this.testComplete();
                                return;
                            }
                        }
                        return;
                    }
                    finally {
                        --this.reentrant;
                    }
                }
            });
            VertxConnection conn = (VertxConnection)so;
            CountDownLatch latch = new CountDownLatch(1);
            this.executeAsyncTask(() -> {
                conn.writeToChannel((Object)"msg1");
                conn.writeToChannel((Object)"msg2");
                latch.countDown();
            });
            try {
                this.awaitLatch(latch);
            }
            catch (InterruptedException e) {
                this.fail(e);
            }
        };
        NetSocket so2 = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    @Test
    public void testConsolidateFlushInDrain() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            pipeline.addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler(){
                int flushes;
                int writes;

                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    ++this.writes;
                }

                public void flush(ChannelHandlerContext ctx) throws Exception {
                    ++this.flushes;
                    if (this.writes == 2) {
                        VertxConnectionTest.this.assertEquals(1L, this.flushes);
                        VertxConnectionTest.this.testComplete();
                    }
                }
            });
            CountDownLatch latch = new CountDownLatch(1);
            this.executeAsyncTaskAndAwait(() -> {
                conn.writeMessage((Object)"msg1");
                conn.writeMessage((Object)"msg2");
                latch.countDown();
            });
            try {
                latch.await(20L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    @Test
    public void testConsolidateFlushInDrainWhenResume() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            pipeline.addBefore("handler", "myhandler", (ChannelHandler)new ChannelDuplexHandler((NetSocketInternal)conn){
                final /* synthetic */ NetSocketInternal val$conn;
                {
                    this.val$conn = netSocketInternal;
                }

                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
                        case "outbound-1": {
                            this.val$conn.resume();
                        }
                    }
                    ctx.write(msg);
                }

                public void flush(ChannelHandlerContext ctx) {
                    ctx.write((Object)Unpooled.copiedBuffer((CharSequence)"flush", (Charset)StandardCharsets.UTF_8)).addListener((GenericFutureListener)((ChannelFutureListener)future -> this.val$conn.channelHandlerContext().close()));
                    ctx.flush();
                }
            });
            conn.messageHandler(msg -> {
                switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
                    case "inbound-1": {
                        conn.pause();
                        this.vertx.runOnContext(v -> {
                            pipeline.fireChannelRead((Object)Unpooled.copiedBuffer((CharSequence)"inbound-2", (Charset)StandardCharsets.UTF_8));
                            pipeline.fireChannelReadComplete();
                            new Thread(() -> conn.writeMessage((Object)Unpooled.copiedBuffer((CharSequence)"outbound-1", (Charset)StandardCharsets.UTF_8))).start();
                        });
                        break;
                    }
                    case "inbound-2": {
                        conn.writeMessage((Object)Unpooled.copiedBuffer((CharSequence)"outbound-2", (Charset)StandardCharsets.UTF_8));
                    }
                }
            });
        };
        NetSocket so = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        Buffer received = Buffer.buffer();
        so.handler(arg_0 -> ((Buffer)received).appendBuffer(arg_0));
        so.closeHandler(v -> {
            this.assertEquals("outbound-1outbound-2flush", received.toString());
            this.testComplete();
        });
        so.write("inbound-1").await();
        this.await();
    }

    @Test
    public void testReentrantRead() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            AtomicInteger reentrant = new AtomicInteger();
            conn.messageHandler(msg -> {
                this.assertEquals(0L, reentrant.getAndIncrement());
                switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
                    case "inbound-1": {
                        pipeline.fireChannelRead((Object)Unpooled.copiedBuffer((CharSequence)"inbound-2", (Charset)StandardCharsets.UTF_8));
                        break;
                    }
                    case "inbound-2": {
                        conn.end((Object)Buffer.buffer((String)"outbound-1"));
                    }
                }
                reentrant.decrementAndGet();
            });
        };
        NetSocket so = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        Buffer received = Buffer.buffer();
        so.handler(arg_0 -> ((Buffer)received).appendBuffer(arg_0));
        so.closeHandler(v -> {
            this.assertEquals("outbound-1", received.toString());
            this.testComplete();
        });
        so.write("inbound-1").await();
        this.await();
    }

    @Test
    public void testResumeWhenRead() throws Exception {
        this.connectHandler = conn -> {
            ChannelHandlerContext ctx = conn.channelHandlerContext();
            ChannelPipeline pipeline = ctx.pipeline();
            conn.messageHandler(msg -> {
                switch (((ByteBuf)msg).toString(StandardCharsets.UTF_8)) {
                    case "inbound-1": {
                        conn.pause();
                        pipeline.fireChannelRead((Object)Unpooled.copiedBuffer((CharSequence)"inbound-2", (Charset)StandardCharsets.UTF_8));
                        conn.resume();
                        break;
                    }
                    case "inbound-2": {
                        conn.end((Object)Buffer.buffer((String)"outbound-1"));
                    }
                }
            });
        };
        NetSocket so = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        Buffer received = Buffer.buffer();
        so.handler(arg_0 -> ((Buffer)received).appendBuffer(arg_0));
        so.closeHandler(v -> {
            this.assertEquals("outbound-1", received.toString());
            this.testComplete();
        });
        so.write("inbound-1").await();
        this.await();
    }

    @Test
    public void testWriteQueueDrain() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        BufferInternal buffer = BufferInternal.buffer((String)TestUtils.randomAlphaString(1024));
        this.connectHandler = conn -> conn.handler(ping -> this.fill((NetSocketInternal)conn, buffer, (Handler<Void>)((Handler)v1 -> {
            latch.countDown();
            conn.drainHandler(v2 -> this.testComplete());
        })));
        NetSocket so = (NetSocket)this.awaitFuture(this.client.connect(1234, "localhost"));
        so.pause();
        so.write("ping");
        this.awaitLatch(latch);
        so.resume();
        this.await();
    }

    @Test
    public void testChannelPromisePiggiesBackOnEventLoop() throws Exception {
        this.waitFor(2);
        this.disableThreadChecks();
        this.connectHandler = conn -> {
            Promise promise = Promise.promise();
            ChannelPromise channelPromise = ((VertxConnection)conn).write((Object)Unpooled.copiedBuffer((CharSequence)"outbound-1", (Charset)StandardCharsets.UTF_8), false, promise);
            Future future = promise.future();
            future.onComplete(this.onSuccess(v -> new Thread(() -> {
                Thread curr = Thread.currentThread();
                future.onComplete(ar -> {
                    this.assertSame(curr, Thread.currentThread());
                    this.complete();
                });
                channelPromise.addListener((GenericFutureListener)((ChannelFutureListener)f -> {
                    this.assertTrue(channelPromise.channel().eventLoop().inEventLoop());
                    this.complete();
                }));
            }).start()));
        };
        this.awaitFuture(this.client.connect(1234, "localhost"));
        this.await();
    }

    private CountDownLatch executeAsyncTask(Runnable runnable) {
        this.assertTrue(Context.isOnEventLoopThread());
        CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            runnable.run();
            latch.countDown();
        }).start();
        return latch;
    }

    private void executeAsyncTaskAndAwait(Runnable runnable) {
        try {
            this.executeAsyncTask(runnable).await(20L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testClose() {
        final AtomicBoolean shutdown = new AtomicBoolean();
        final AtomicBoolean closed = new AtomicBoolean();
        EmbeddedChannel channel = this.channel((ctx, chctx) -> new VertxConnection((ContextInternal)ctx, (ChannelHandlerContext)chctx){

            protected void handleEvent(Object event) {
                if ("test".equals(event)) {
                    this.close();
                }
            }

            protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
                shutdown.set(true);
            }

            protected void handleClose(Object reason, ChannelPromise promise) {
                closed.set(true);
                promise.setSuccess();
            }
        });
        channel.pipeline().fireUserEventTriggered((Object)"test");
        this.assertFalse(shutdown.get());
        this.assertTrue(closed.get());
    }

    @Test
    public void testShutdownZeroDoesClose() {
        final AtomicBoolean shutdown = new AtomicBoolean();
        final AtomicBoolean closed = new AtomicBoolean();
        EmbeddedChannel channel = this.channel((ctx, chctx) -> new VertxConnection((ContextInternal)ctx, (ChannelHandlerContext)chctx){

            protected void handleEvent(Object event) {
                if ("test".equals(event)) {
                    this.shutdown(0L, TimeUnit.SECONDS);
                }
            }

            protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
                shutdown.set(true);
            }

            protected void handleClose(Object reason, ChannelPromise promise) {
                closed.set(true);
            }
        });
        channel.pipeline().fireUserEventTriggered((Object)"test");
        this.assertFalse(shutdown.get());
        this.assertTrue(closed.get());
    }

    @Ignore
    @Test
    public void testShutdownReentrantClose() {
        final AtomicBoolean shutdown = new AtomicBoolean();
        final AtomicBoolean closed = new AtomicBoolean();
        EmbeddedChannel channel = this.channel((ctx, chctx) -> new VertxConnection((ContextInternal)ctx, (ChannelHandlerContext)chctx){

            protected void handleEvent(Object event) {
                if ("test".equals(event)) {
                    this.shutdown(10L, TimeUnit.SECONDS);
                }
            }

            protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
                shutdown.set(true);
                this.close(reason);
                VertxConnectionTest.this.assertTrue(closed.get());
            }

            protected void handleClose(Object reason, ChannelPromise promise) {
                closed.set(true);
            }
        });
        channel.pipeline().fireUserEventTriggered((Object)"test");
        this.assertTrue(shutdown.get());
        this.assertTrue(closed.get());
    }

    @Test
    public void testShutdownTimeout() {
        final AtomicInteger shutdown = new AtomicInteger();
        final AtomicInteger closed = new AtomicInteger();
        EmbeddedChannel channel = this.channel((ctx, chctx) -> new VertxConnection((ContextInternal)ctx, (ChannelHandlerContext)chctx){

            protected void handleEvent(Object event) {
                if ("test".equals(event)) {
                    this.shutdown(100L, TimeUnit.MILLISECONDS);
                }
            }

            protected void handleShutdown(Object reason, long timeout, TimeUnit unit, ChannelPromise promise) {
                shutdown.getAndIncrement();
                VertxConnectionTest.this.assertEquals(0L, closed.get());
                EmbeddedChannel a = (EmbeddedChannel)this.chctx.channel();
                a.advanceTimeBy(100L, TimeUnit.MILLISECONDS);
                a.runPendingTasks();
                VertxConnectionTest.this.assertEquals(1L, closed.get());
            }

            protected void handleClose(Object reason, ChannelPromise promise) {
                closed.getAndIncrement();
            }
        });
        channel.pipeline().fireUserEventTriggered((Object)"test");
        this.assertEquals(1L, shutdown.get());
        this.assertEquals(1L, closed.get());
    }

    private <C extends VertxConnection> EmbeddedChannel channel(BiFunction<ContextInternal, ChannelHandlerContext, C> connectionFactory) {
        return new EmbeddedChannel(new ChannelHandler[]{VertxHandler.create(chctx -> {
            ContextInternal ctx = ((VertxInternal)this.vertx).contextBuilder().withEventLoop(chctx.channel().eventLoop()).build();
            return (VertxConnection)connectionFactory.apply(ctx, (ChannelHandlerContext)chctx);
        })});
    }

    @Test
    public void testDisableAutoReadWhenPaused() {
        ArrayList receivedMessages = new ArrayList();
        MessageFactory factory = new MessageFactory();
        EmbeddedChannel ch = new EmbeddedChannel();
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(chctx -> new TestConnection((ChannelHandlerContext)chctx))});
        TestConnection connection = (TestConnection)((VertxHandler)pipeline.get(VertxHandler.class)).getConnection();
        connection.handler = receivedMessages::add;
        connection.pause();
        ch.writeInbound((Object[])factory.next(8));
        this.assertEquals(Collections.emptyList(), receivedMessages);
        this.assertFalse(ch.config().isAutoRead());
    }

    @Test
    public void testConsolidatesFlushesWhenResuming() {
        Object outbound;
        MessageFactory factory = new MessageFactory();
        EmbeddedChannel ch = new EmbeddedChannel(){};
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(chctx -> new TestConnection((ChannelHandlerContext)chctx))});
        TestConnection connection = (TestConnection)((VertxHandler)pipeline.get(VertxHandler.class)).getConnection();
        connection.pause();
        ch.writeInbound((Object[])factory.next(8));
        this.assertFalse(ch.config().isAutoRead());
        connection.resume();
        this.assertTrue(ch.hasPendingTasks());
        connection.handler = msg -> connection.writeToChannel(msg);
        connection.readCompleteHandler = v -> connection.writeToChannel("read-complete");
        ch.runPendingTasks();
        ArrayList<Object> flushed = new ArrayList<Object>();
        while ((outbound = ch.readOutbound()) != null) {
            flushed.add(outbound);
        }
        this.assertEquals(9L, flushed.size());
        this.assertTrue(ch.config().isAutoRead());
        this.assertEquals("read-complete", flushed.get(8));
    }

    @Test
    public void testPauseWhenResuming() {
        MessageFactory factory = new MessageFactory();
        EmbeddedChannel ch = new EmbeddedChannel(){};
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(chctx -> new TestConnection((ChannelHandlerContext)chctx))});
        TestConnection connection = (TestConnection)((VertxHandler)pipeline.get(VertxHandler.class)).getConnection();
        connection.pause();
        ch.writeInbound((Object[])factory.next(4));
        connection.resume();
        this.assertTrue(ch.hasPendingTasks());
        AtomicInteger count = new AtomicInteger();
        connection.handler = event -> {
            if (count.incrementAndGet() == 2) {
                connection.pause();
            }
        };
        ch.runPendingTasks();
        this.assertEquals(2L, count.get());
    }

    @Test
    public void testResumeWhenReadInProgress() {
        MessageFactory factory = new MessageFactory();
        EmbeddedChannel ch = new EmbeddedChannel();
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(chctx -> new TestConnection((ChannelHandlerContext)chctx))});
        TestConnection connection = (TestConnection)((VertxHandler)pipeline.get(VertxHandler.class)).getConnection();
        AtomicInteger count = new AtomicInteger();
        connection.handler = event -> count.incrementAndGet();
        connection.pause();
        pipeline.fireChannelRead((Object)factory.next());
        this.assertEquals(0L, count.get());
        Object expected = new Object();
        connection.write(expected, false);
        connection.resume();
        this.assertEquals(0L, count.get());
        this.assertTrue(ch.hasPendingTasks());
        ch.runPendingTasks();
        this.assertEquals(0L, count.get());
        Object outbound = ch.readOutbound();
        this.assertNull(outbound);
        pipeline.fireChannelReadComplete();
        this.assertEquals(1L, count.get());
        outbound = ch.readOutbound();
        this.assertSame(expected, outbound);
    }

    static class MessageFactory {
        int seq = 0;

        MessageFactory() {
        }

        Message next() {
            return new Message("msg-" + this.seq++);
        }

        Message[] next(int num) {
            Message[] messages = new Message[num];
            for (int i = 0; i < num; ++i) {
                messages[i] = this.next();
            }
            return messages;
        }
    }

    static class Message {
        final String id;

        Message(String id) {
            this.id = id;
        }
    }

    private class TestConnection
    extends VertxConnection {
        Handler<Message> handler;
        Handler<Void> readCompleteHandler;

        public TestConnection(ChannelHandlerContext chctx) {
            super(((VertxInternal)VertxConnectionTest.this.vertx).contextBuilder().withEventLoop((EventLoop)chctx.executor()).build(), chctx);
        }

        protected void handleMessage(Object msg) {
            Handler<Message> h = this.handler;
            if (h != null) {
                h.handle((Object)((Message)msg));
            }
        }

        protected void handleReadComplete() {
            Handler<Void> h = this.readCompleteHandler;
            if (h != null) {
                h.handle(null);
            }
        }

        public void pause() {
            this.doPause();
        }

        public void resume() {
            this.chctx.executor().execute(() -> ((TestConnection)this).doResume());
        }
    }
}

