/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.async;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.AsynchronousEventProcessingStrategy;
import org.axonframework.messaging.Message;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class AsynchronousEventProcessorConcurrencyTest {
    private ExecutorService executor;
    private AsynchronousEventProcessingStrategy testSubject;

    AsynchronousEventProcessorConcurrencyTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newCachedThreadPool();
        this.testSubject = new AsynchronousEventProcessingStrategy((Executor)this.executor, Message::getPayload);
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    void handleEvents() throws InterruptedException {
        AtomicInteger counter = new AtomicInteger();
        Consumer<List<? extends EventMessage<?>>> processor = eventMessages -> counter.addAndGet(eventMessages.size());
        int threadCount = 50;
        for (int i = 0; i < threadCount; ++i) {
            this.executor.submit(new EventsPublisher(processor));
        }
        while (counter.get() < threadCount * 30000) {
            Thread.sleep(10L);
        }
        this.executor.shutdown();
        Assertions.assertTrue((boolean)this.executor.awaitTermination(10L, TimeUnit.SECONDS), (String)"Executor not closed within a reasonable timeframe");
        Assertions.assertEquals((int)(threadCount * 30000), (int)counter.get());
    }

    private class EventsPublisher
    implements Runnable {
        private static final int ITERATIONS = 10000;
        private static final int EVENTS_COUNT = 30000;
        private final Consumer<List<? extends EventMessage<?>>> processor;

        public EventsPublisher(Consumer<List<? extends EventMessage<?>>> processor) {
            this.processor = processor;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; ++i) {
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(GenericEventMessage.asEventMessage((Object)"1")), this.processor);
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(GenericEventMessage.asEventMessage((Object)"2")), this.processor);
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(GenericEventMessage.asEventMessage((Object)"3")), this.processor);
            }
        }
    }
}

