/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.test.core;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.test.core.VertxTestBase;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

public class EventBusFlowControlTest
extends VertxTestBase {
    protected EventBus eb;

    @Test
    public void testFlowControl() {
        MessageProducer prod = this.eb.sender("some-address");
        int numBatches = 1000;
        int wqms = 2000;
        prod.setWriteQueueMaxSize(wqms);
        MessageConsumer consumer = this.eb.consumer("some-address");
        AtomicInteger cnt = new AtomicInteger();
        consumer.handler(msg -> {
            int c = cnt.incrementAndGet();
            if (c == numBatches * wqms) {
                this.testComplete();
            }
        });
        this.sendBatch((MessageProducer<String>)prod, wqms, numBatches, 0);
        this.await();
    }

    private void sendBatch(MessageProducer<String> prod, int batchSize, int numBatches, int batchNumber) {
        boolean drainHandlerSet = false;
        while (batchNumber < numBatches && !drainHandlerSet) {
            for (int i = 0; i < batchSize; ++i) {
                prod.send((Object)("message-" + i));
                if (!prod.writeQueueFull() || drainHandlerSet) continue;
                prod.drainHandler(v -> {
                    if (batchNumber < numBatches - 1) {
                        this.sendBatch(prod, batchSize, numBatches, batchNumber + 1);
                    }
                });
                drainHandlerSet = true;
            }
        }
    }

    @Test
    public void testFlowControlPauseConsumer() {
        MessageProducer prod = this.eb.sender("some-address");
        int numBatches = 10;
        int wqms = 100;
        prod.setWriteQueueMaxSize(wqms);
        MessageConsumer consumer = this.eb.consumer("some-address");
        AtomicInteger cnt = new AtomicInteger();
        AtomicBoolean paused = new AtomicBoolean();
        consumer.handler(msg -> {
            this.assertFalse(paused.get());
            int c = cnt.incrementAndGet();
            if (c == numBatches * wqms) {
                this.testComplete();
            }
            if (c % 100 == 0) {
                consumer.pause();
                paused.set(true);
                this.vertx.setTimer(100L, tid -> {
                    paused.set(false);
                    consumer.resume();
                });
            }
        });
        this.sendBatch((MessageProducer<String>)prod, wqms, numBatches, 0);
        this.await();
    }

    @Test
    public void testFlowControlNoConsumer() {
        MessageProducer prod = this.eb.sender("some-address");
        int wqms = 2000;
        prod.setWriteQueueMaxSize(wqms);
        boolean drainHandlerSet = false;
        for (int i = 0; i < wqms * 2; ++i) {
            prod.send((Object)("message-" + i));
            if (!prod.writeQueueFull() || drainHandlerSet) continue;
            prod.drainHandler(v -> this.fail("Should not be called"));
            drainHandlerSet = true;
        }
        this.assertTrue(drainHandlerSet);
        this.vertx.setTimer(500L, tid -> this.testComplete());
        this.await();
    }

    @Test
    public void testResumePausedProducer() {
        LinkedBlockingQueue sequence = new LinkedBlockingQueue();
        AtomicReference handlerContext = new AtomicReference();
        MessageConsumer consumer = this.eb.consumer("some-address", msg -> {
            if (sequence.isEmpty()) {
                handlerContext.set(Vertx.currentContext());
            } else {
                this.assertEquals(Vertx.currentContext(), handlerContext.get());
            }
            sequence.add(msg.body());
        });
        consumer.pause();
        MessageProducer prod = this.eb.sender("some-address");
        LinkedList<Integer> expected = new LinkedList<Integer>();
        int count = 0;
        while (!prod.writeQueueFull()) {
            int val = count++;
            expected.add(val);
            prod.send((Object)val);
        }
        consumer.resume();
        this.waitUntil(() -> !prod.writeQueueFull());
        int theCount = count;
        this.waitUntil(() -> sequence.size() == theCount);
        while (expected.size() > 0) {
            this.assertEquals(expected.removeFirst(), sequence.poll());
        }
        this.assertNotNull(handlerContext.get());
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.eb = this.vertx.eventBus();
    }
}

