/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.concurrent;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.VertxOptions;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntConsumer;
import org.junit.Test;

public abstract class InboundMessageQueueTest
extends VertxTestBase {
    private volatile Thread producerThread;
    private volatile Thread consumerThread;
    Context context;
    TestChannel queue;
    final AtomicInteger sequence = new AtomicInteger();

    InboundMessageQueueTest() {
    }

    protected abstract Context createContext(VertxInternal var1);

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.context = this.createContext((VertxInternal)this.vertx);
        this.sequence.set(0);
        this.context.runOnContext(v -> {
            this.consumerThread = Thread.currentThread();
        });
        ((ContextInternal)this.context).nettyEventLoop().execute(() -> {
            this.producerThread = Thread.currentThread();
        });
        InboundMessageQueueTest.waitUntil(() -> this.consumerThread != null && this.producerThread != null);
    }

    @Override
    protected VertxOptions getOptions() {
        return super.getOptions().setWorkerPoolSize(1);
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
    }

    protected final void assertConsumer() {
        this.assertSame(this.consumerThread, Thread.currentThread());
    }

    protected final void assertProducer() {
        this.assertSame(this.producerThread, Thread.currentThread());
    }

    protected final void producerTask(Runnable task) {
        ((ContextInternal)this.context).nettyEventLoop().execute(task);
    }

    protected final void consumerTask(Runnable task) {
        this.context.runOnContext(v -> task.run());
    }

    protected final TestChannel buffer(IntConsumer consumer) {
        return new TestChannel(consumer);
    }

    protected final TestChannel buffer(IntConsumer consumer, int lwm, int hwm) {
        return new TestChannel(consumer, lwm, hwm);
    }

    @Test
    public void testFlowing() {
        AtomicInteger events = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            this.assertEquals(0L, elt);
            this.assertEquals(0L, events.getAndIncrement());
            this.testComplete();
        });
        this.producerTask(() -> this.assertTrue(this.queue.emit()));
        this.await();
    }

    @Test
    public void testTake() {
        AtomicInteger events = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            this.assertEquals(0L, elt);
            this.assertEquals(0L, events.getAndIncrement());
            this.testComplete();
        });
        this.consumerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            this.producerTask(() -> this.queue.emit());
        });
        this.await();
    }

    @Test
    public void testFlowingAdd() {
        AtomicInteger events = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            events.getAndIncrement();
        });
        this.producerTask(() -> {
            this.assertTrue(this.queue.emit());
            InboundMessageQueueTest.assertWaitUntil(() -> events.get() == 1);
            this.assertTrue(this.queue.emit());
            InboundMessageQueueTest.assertWaitUntil(() -> events.get() == 2);
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testFlowingRefill() {
        AtomicInteger events = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            events.getAndIncrement();
        }, 5, 5).drainHandler((Handler<Void>)((Handler)v -> {
            this.assertProducer();
            this.assertEquals(8L, events.get());
            this.testComplete();
        }));
        this.queue.pause();
        this.producerTask(() -> {
            for (int i = 0; i < 8; ++i) {
                this.assertEquals("Expected " + i + " to be bilto", i < 4, this.queue.emit());
            }
            this.queue.resume();
        });
        this.await();
    }

    @Test
    public void testPauseWhenFull() {
        AtomicInteger events = new AtomicInteger();
        AtomicInteger reads = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            this.assertEquals(0L, reads.get());
            this.assertEquals(0L, events.getAndIncrement());
            this.testComplete();
        }, 5, 5).drainHandler((Handler<Void>)((Handler)v2 -> {
            this.assertProducer();
            this.assertEquals(0L, reads.getAndIncrement());
        }));
        this.producerTask(() -> {
            this.queue.pause();
            for (int i = 0; i < 5; ++i) {
                this.assertEquals(i < 4, this.queue.emit());
            }
            this.queue.fetch(1L);
        });
        this.await();
    }

    @Test
    public void testPausedResume() {
        AtomicInteger reads = new AtomicInteger();
        AtomicInteger events = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            events.getAndIncrement();
        }, 5, 5).drainHandler((Handler<Void>)((Handler)v2 -> {
            this.assertProducer();
            this.assertEquals(0L, reads.getAndIncrement());
            this.assertEquals(5L, events.get());
            this.testComplete();
        }));
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            this.queue.resume();
        });
        this.await();
    }

    @Test
    public void testPausedDrain() {
        this.waitFor(2);
        AtomicInteger drained = new AtomicInteger();
        AtomicInteger emitted = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            this.assertEquals(0L, drained.get());
            emitted.getAndIncrement();
        }, 5, 5);
        this.queue.drainHandler((Handler<Void>)((Handler)v2 -> {
            this.assertProducer();
            this.assertEquals(0L, drained.getAndIncrement());
            this.assertEquals(5L, emitted.get());
            this.complete();
        }));
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            this.assertEquals(0L, drained.get());
            this.assertEquals(0L, emitted.get());
            this.queue.resume();
            this.complete();
        });
        this.await();
    }

    @Test
    public void testPausedRequestLimited() {
        AtomicInteger events = new AtomicInteger();
        AtomicInteger reads = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            this.assertConsumer();
            events.getAndIncrement();
        }, 3, 3).drainHandler((Handler<Void>)((Handler)v2 -> {
            this.assertProducer();
            this.assertEquals(0L, reads.getAndIncrement());
        }));
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            this.assertEquals(0L, reads.get());
            this.assertEquals(0L, events.get());
            this.assertTrue(this.queue.emit());
            this.assertEquals(0L, reads.get());
            InboundMessageQueueTest.waitUntilEquals(1, events::get);
            this.assertTrue(this.queue.emit());
            this.assertEquals(0L, reads.get());
            this.assertEquals(1L, events.get());
            this.assertTrue(this.queue.emit());
            this.assertEquals(0L, reads.get());
            this.assertEquals(1L, events.get());
            this.assertFalse(this.queue.emit());
            this.assertEquals(0L, reads.get());
            this.assertEquals(1L, events.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testPushReturnsTrueUntilHighWatermark() {
        AtomicInteger emitted = new AtomicInteger();
        this.queue = this.buffer(elt -> emitted.incrementAndGet(), 2, 2);
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            this.assertTrue(this.queue.emit());
            InboundMessageQueueTest.assertWaitUntil(() -> emitted.get() == 1);
            this.assertTrue(this.queue.emit());
            this.assertFalse(this.queue.emit());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testHighWaterMark() {
        this.queue = this.buffer(elt -> {}, 5, 5);
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            this.assertEquals(5L, this.sequence.get());
            this.testComplete();
        });
        this.await();
    }

    @Test
    public void testAddAllWhenPaused() {
        this.waitFor(2);
        AtomicInteger emitted = new AtomicInteger();
        AtomicInteger emptied = new AtomicInteger();
        AtomicInteger drained = new AtomicInteger();
        this.queue = this.buffer(elt -> {
            emitted.incrementAndGet();
            this.assertEquals(0L, drained.get());
            this.assertEquals(0L, emptied.get());
            this.queue.fetch(1L);
        }, 4, 4).drainHandler((Handler<Void>)((Handler)v2 -> {
            this.assertEquals(5L, emitted.get());
            drained.incrementAndGet();
            this.complete();
        }));
        this.producerTask(() -> {
            this.queue.pause();
            this.assertFalse(this.queue.emit(5));
            this.queue.fetch(1L);
            this.complete();
        });
        this.await();
    }

    @Test
    public void testAddAllWhenFlowing() {
        AtomicInteger emitted = new AtomicInteger();
        AtomicInteger drained = new AtomicInteger();
        this.queue = this.buffer(elt -> emitted.incrementAndGet(), 4, 4).drainHandler((Handler<Void>)((Handler)v2 -> drained.incrementAndGet()));
        this.producerTask(() -> this.queue.emit(4));
        InboundMessageQueueTest.waitUntilEquals(1, drained::get);
        InboundMessageQueueTest.waitUntilEquals(4, emitted::get);
    }

    @Test
    public void testCheckThatPauseAfterResumeWontDoAnyEmission() {
        AtomicInteger emitted = new AtomicInteger();
        this.queue = this.buffer(elt -> emitted.incrementAndGet(), 4, 4);
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            this.consumerTask(() -> {
                this.queue.resume();
                this.queue.pause();
                this.vertx.setTimer(20L, id -> {
                    this.assertEquals(0L, emitted.get());
                    this.testComplete();
                });
            });
        });
        this.await();
    }

    @Test
    public void testBufferSignalingFullImmediately() {
        List emitted = Collections.synchronizedList(new ArrayList());
        AtomicInteger drained = new AtomicInteger();
        this.queue = this.buffer(emitted::add, 1, 1);
        this.producerTask(() -> {
            this.queue.drainHandler((Handler<Void>)((Handler)v -> {
                switch (drained.getAndIncrement()) {
                    case 0: {
                        this.producerTask(() -> {
                            this.assertFalse(this.queue.emit());
                            this.queue.resume();
                        });
                        break;
                    }
                    case 1: {
                        this.assertEquals(Arrays.asList(0, 1), emitted);
                        this.testComplete();
                    }
                }
            }));
            this.queue.emit();
            InboundMessageQueueTest.assertWaitUntil(() -> emitted.size() == 1);
            this.assertEquals(Collections.singletonList(0), emitted);
            this.queue.pause();
        });
        this.await();
    }

    @Test
    public void testClose() {
        List emitted = Collections.synchronizedList(new ArrayList());
        final List disposed = Collections.synchronizedList(new ArrayList());
        this.queue = new TestChannel(emitted::add, 1, 1){

            protected void handleDispose(Integer msg) {
                disposed.add(msg);
            }
        };
        this.producerTask(() -> {
            this.queue.pause();
            this.queue.emit(5);
            this.queue.closeProducer();
            this.consumerTask(() -> {
                this.queue.closeConsumer();
                this.assertEquals(Collections.emptyList(), emitted);
                this.assertEquals(Arrays.asList(0, 1, 2, 3, 4), disposed);
                this.producerTask(() -> {
                    this.queue.write(5);
                    this.testComplete();
                });
            });
        });
        this.await();
    }

    class TestChannel
    extends InboundMessageQueue<Integer> {
        final IntConsumer consumer;
        private Handler<Void> drainHandler;
        private volatile boolean writable;
        private int size;

        public TestChannel(IntConsumer consumer) {
            super(((ContextInternal)InboundMessageQueueTest.this.context).eventLoop(), ((ContextInternal)InboundMessageQueueTest.this.context).executor());
            this.consumer = consumer;
            this.writable = true;
        }

        public TestChannel(IntConsumer consumer, int lwm, int hwm) {
            super(((ContextInternal)InboundMessageQueueTest.this.context).eventLoop(), ((ContextInternal)InboundMessageQueueTest.this.context).executor(), lwm, hwm);
            this.consumer = consumer;
            this.writable = true;
        }

        int size() {
            return this.size;
        }

        protected void handleMessage(Integer msg) {
            --this.size;
            this.consumer.accept(msg);
        }

        protected void handleResume() {
            this.writable = true;
            Handler<Void> handler = this.drainHandler;
            if (handler != null) {
                handler.handle(null);
            }
        }

        public boolean isWritable() {
            return this.writable;
        }

        protected void handlePause() {
            this.writable = false;
        }

        final void resume() {
            this.fetch(Long.MAX_VALUE);
        }

        final int fill() {
            int count = 0;
            boolean drain = false;
            while (this.writable) {
                drain |= this.add(InboundMessageQueueTest.this.sequence.getAndIncrement());
                ++count;
            }
            this.size += count;
            if (drain) {
                this.drain();
            }
            return count;
        }

        final boolean emit() {
            ++this.size;
            this.write(InboundMessageQueueTest.this.sequence.getAndIncrement());
            return this.writable;
        }

        final boolean emit(int count) {
            this.size += count;
            boolean drain = false;
            for (int i = 0; i < count; ++i) {
                drain |= this.add(InboundMessageQueueTest.this.sequence.getAndIncrement());
            }
            if (drain) {
                this.drain();
            }
            return this.writable;
        }

        TestChannel drainHandler(Handler<Void> handler) {
            this.drainHandler = handler;
            return this;
        }
    }
}

