/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tests.eventbus;

import io.vertx.core.Handler;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.MessageConsumerImpl;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public class EventBusRegistrationRaceTest
extends VertxTestBase {
    private static final int NUM_MSG = 300000;
    private static String TEST_ADDR = "the-addr";
    private static final Handler<Message<Object>> IGNORE_MSG = msg -> {};
    private final AtomicInteger count = new AtomicInteger();

    @Override
    protected VertxMetricsFactory getMetrics() {
        return o -> new VertxMetrics(){

            public EventBusMetrics<Void> createEventBusMetrics() {
                return new EventBusMetrics<Void>(){

                    public void scheduleMessage(Void handler, boolean local) {
                        EventBusRegistrationRaceTest.this.count.incrementAndGet();
                    }

                    public void messageDelivered(Void handler, boolean local) {
                        EventBusRegistrationRaceTest.this.count.decrementAndGet();
                    }

                    public void discardMessage(Void handler, boolean local, Message<?> msg) {
                        EventBusRegistrationRaceTest.this.count.decrementAndGet();
                    }
                };
            }
        };
    }

    @Test
    public void theTest() throws Exception {
        AtomicInteger seq = new AtomicInteger();
        Thread threadA = new Thread(() -> this.threadA(seq));
        threadA.setName("Thread-A");
        Thread threadB = new Thread(() -> this.threadB(seq));
        threadB.setName("Thread-B");
        threadA.start();
        threadB.start();
        threadA.join(20000L);
        threadB.join(20000L);
        EventBusRegistrationRaceTest.assertWaitUntil(() -> this.count.get() == 0);
    }

    private void threadA(AtomicInteger seq) {
        EventBus eventBus = this.vertx.eventBus();
        for (int count = 0; count < 300000; ++count) {
            while (count > seq.get()) {
                Thread.yield();
            }
            MessageConsumerImpl consumer = (MessageConsumerImpl)eventBus.consumer(TEST_ADDR, IGNORE_MSG);
            consumer.discardHandler(IGNORE_MSG);
            consumer.unregister();
        }
    }

    private void threadB(AtomicInteger seq) {
        EventBus eventBus = this.vertx.eventBus();
        MessageConsumer consumer = null;
        int count = 0;
        while (count < 300000) {
            while (count > seq.get()) {
                Thread.yield();
            }
            ++count;
            if (consumer != null) {
                consumer.unregister();
            }
            consumer = eventBus.consumer(TEST_ADDR);
            consumer.handler(event -> seq.incrementAndGet());
            eventBus.publish(TEST_ADDR, (Object)count);
        }
    }
}

