/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.concurrent;

import io.vertx.core.VertxOptions;
import io.vertx.core.streams.impl.MessagePassingQueue;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.junit.Test;

public class MessagePassingQueueStressTest
extends VertxTestBase {
    @Override
    public void setUp() throws Exception {
        super.setUp();
        this.disableThreadChecks();
    }

    @Test
    public void testSimple() throws Exception {
        int i;
        LongAdder counter = new LongAdder();
        MessagePassingQueue.MpSc queue = new MessagePassingQueue.MpSc(foo -> {
            counter.increment();
            return true;
        });
        int numThreads = 10;
        int numEmissions = 10;
        int numReps = 1000;
        Object elt = new Object();
        Thread[] threads = new Thread[numThreads];
        CyclicBarrier barrier = new CyclicBarrier(1 + numThreads);
        for (i = 0; i < numThreads; ++i) {
            Thread thread = new Thread(() -> {
                try {
                    barrier.await();
                }
                catch (Exception e) {
                    this.fail(e);
                }
                for (int j = 0; j < numReps; ++j) {
                    for (int k = 0; k < numEmissions; ++k) {
                        int flags = queue.add(elt);
                        if ((flags & 4) == 0) continue;
                        flags = queue.drain();
                        this.assertEquals(0L, flags & 4);
                    }
                    try {
                        Thread.sleep(1L);
                        continue;
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            thread.start();
            threads[i] = thread;
        }
        barrier.await();
        for (i = 0; i < numThreads; ++i) {
            threads[i].join();
        }
        this.assertEquals(numThreads * numEmissions * numReps, counter.intValue());
    }

    @Repeat(times=50)
    @Test
    public void testWriteQueueFull() throws Exception {
        int i;
        int numProducers = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2;
        int numReps = 10000;
        int[] consumedLocal = new int[1];
        MessagePassingQueue.MpSc queue = new MessagePassingQueue.MpSc(elt -> {
            consumedLocal[0] = consumedLocal[0] + 1;
            return true;
        });
        AtomicInteger numOfConsumedElements = new AtomicInteger();
        AtomicInteger numOfUnwritableSignalsFromDrain = new AtomicInteger();
        AtomicInteger numOfUnwritableSignalsFromSubmit = new AtomicInteger();
        Thread[] producers = new Thread[numProducers];
        CyclicBarrier start = new CyclicBarrier(1 + producers.length);
        for (i = 0; i < producers.length; ++i) {
            int val = i;
            String name = "producer-" + val;
            Thread producer = new Thread(() -> {
                try {
                    start.await();
                }
                catch (Exception e) {
                    this.fail(e);
                    return;
                }
                int iter = numReps;
                while (iter-- > 0) {
                    int flags = queue.add((Object)val);
                    if ((flags & 1) != 0) {
                        numOfUnwritableSignalsFromSubmit.incrementAndGet();
                    }
                    if ((flags & 4) == 0) continue;
                    Class<MessagePassingQueueStressTest> clazz = MessagePassingQueueStressTest.class;
                    // MONITORENTER : io.vertx.tests.concurrent.MessagePassingQueueStressTest.class
                    consumedLocal[0] = 0;
                    int flags2 = queue.drain();
                    numOfConsumedElements.addAndGet(consumedLocal[0]);
                    // MONITOREXIT : clazz
                    if ((flags2 & 2) == 0) continue;
                    int unwritable = MessagePassingQueue.numberOfUnwritableSignals((int)flags2);
                    numOfUnwritableSignalsFromDrain.addAndGet(unwritable);
                }
            }, name);
            producer.start();
            producers[i] = producer;
        }
        start.await();
        for (i = 0; i < numProducers; ++i) {
            producers[i].join(10000L);
        }
        this.assertEquals((long)numProducers * (long)numReps, numOfConsumedElements.get());
        this.assertEquals(numOfUnwritableSignalsFromSubmit.get(), numOfUnwritableSignalsFromDrain.get());
    }
}

