/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DefaultEventProcessorSpanFactory;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.tracing.SpanFactory;
import org.axonframework.tracing.TestSpanFactory;
import org.axonframework.utils.AssertUtils;
import org.axonframework.utils.DelegateScheduledExecutorService;
import org.axonframework.utils.InMemoryStreamableEventSource;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PooledStreamingEventProcessorTest {
    private static final Logger logger = LoggerFactory.getLogger(PooledStreamingEventProcessorTest.class);
    private static final String PROCESSOR_NAME = "test";
    private PooledStreamingEventProcessor testSubject;
    private EventHandlerInvoker stubEventHandler;
    private InMemoryStreamableEventSource stubMessageSource;
    private InMemoryTokenStore tokenStore;
    private ScheduledExecutorService coordinatorExecutor;
    private ScheduledExecutorService workerExecutor;
    private TestSpanFactory spanFactory;

    PooledStreamingEventProcessorTest() {
    }

    @BeforeEach
    void setUp() {
        this.stubMessageSource = new InMemoryStreamableEventSource();
        this.stubEventHandler = (EventHandlerInvoker)Mockito.mock(EventHandlerInvoker.class);
        this.tokenStore = (InMemoryTokenStore)Mockito.spy((Object)new InMemoryTokenStore());
        this.coordinatorExecutor = new DelegateScheduledExecutorService(Executors.newScheduledThreadPool(2));
        this.workerExecutor = new DelegateScheduledExecutorService(Executors.newScheduledThreadPool(8));
        this.spanFactory = new TestSpanFactory();
        this.setTestSubject(this.createTestSubject());
        Mockito.when((Object)this.stubEventHandler.canHandleType((Class)Mockito.any())).thenReturn((Object)true);
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any())).thenReturn((Object)true);
    }

    private void setTestSubject(PooledStreamingEventProcessor testSubject) {
        this.testSubject = testSubject;
    }

    private PooledStreamingEventProcessor createTestSubject() {
        return this.createTestSubject(builder -> builder);
    }

    private PooledStreamingEventProcessor createTestSubject(UnaryOperator<PooledStreamingEventProcessor.Builder> customization) {
        PooledStreamingEventProcessor.Builder processorBuilder = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).rollbackConfiguration((RollbackConfiguration)RollbackConfigurationType.ANY_THROWABLE).errorHandler((ErrorHandler)PropagatingErrorHandler.instance()).messageSource((StreamableMessageSource)this.stubMessageSource).tokenStore((TokenStore)this.tokenStore).transactionManager(NoTransactionManager.instance()).coordinatorExecutor(this.coordinatorExecutor).workerExecutor(this.workerExecutor).initialSegmentCount(8).claimExtensionThreshold(500L).spanFactory((EventProcessorSpanFactory)DefaultEventProcessorSpanFactory.builder().spanFactory((SpanFactory)this.spanFactory).build());
        return ((PooledStreamingEventProcessor.Builder)customization.apply(processorBuilder)).build();
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
        this.coordinatorExecutor.shutdown();
        this.workerExecutor.shutdown();
    }

    @Test
    void retriesWhenTokenInitializationInitiallyFails() {
        InMemoryTokenStore spy = (InMemoryTokenStore)Mockito.spy((Object)this.tokenStore);
        this.setTestSubject(this.createTestSubject(b -> b.tokenStore((TokenStore)spy)));
        ((InMemoryTokenStore)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Simulated failure")}).doCallRealMethod().when((Object)spy)).initializeTokenSegments((String)Mockito.any(), Mockito.anyInt(), (TrackingToken)Mockito.any());
        List<EventMessage> events = IntStream.range(0, 100).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.mockEventHandlerInvoker();
        this.testSubject.start();
        Assertions.assertTrue((boolean)this.testSubject.isRunning());
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            long nonNullTokens = IntStream.range(0, 8).mapToObj(i -> this.tokenStore.fetchToken(PROCESSOR_NAME, i)).filter(Objects::nonNull).count();
            Assertions.assertEquals((long)8L, (long)nonNullTokens);
        });
        Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size());
    }

    @Test
    void startShutsDownImmediatelyIfCoordinatorExecutorThrowsAnException() {
        ScheduledExecutorService spiedCoordinatorExecutor = (ScheduledExecutorService)Mockito.spy((Object)this.coordinatorExecutor);
        ((ScheduledExecutorService)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalArgumentException("Some exception")}).when((Object)spiedCoordinatorExecutor)).submit((Runnable)Mockito.any(Runnable.class));
        this.setTestSubject(this.createTestSubject(builder -> builder.coordinatorExecutor(spiedCoordinatorExecutor)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> ((PooledStreamingEventProcessor)this.testSubject).start());
        Assertions.assertFalse((boolean)this.testSubject.isRunning());
    }

    @Test
    void secondStartInvocationIsIgnored() {
        ScheduledExecutorService spiedCoordinatorExecutor = (ScheduledExecutorService)Mockito.spy((Object)this.coordinatorExecutor);
        this.setTestSubject(this.createTestSubject(builder -> builder.coordinatorExecutor(spiedCoordinatorExecutor)));
        this.testSubject.start();
        this.testSubject.start();
        ((ScheduledExecutorService)Mockito.verify((Object)spiedCoordinatorExecutor, (VerificationMode)Mockito.times((int)1))).submit((Runnable)Mockito.any(Runnable.class));
    }

    @Test
    void startingProcessorClaimsAllAvailableTokens() {
        this.startAndAssertProcessorClaimsAllTokens();
    }

    private void startAndAssertProcessorClaimsAllTokens() {
        List<EventMessage> events = IntStream.range(0, 100).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.mockEventHandlerInvoker();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            long nonNullTokens = IntStream.range(0, 8).mapToObj(i -> this.tokenStore.fetchToken(PROCESSOR_NAME, i)).filter(Objects::nonNull).count();
            Assertions.assertEquals((long)8L, (long)nonNullTokens);
        });
        Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size());
    }

    @Test
    void handlingEventsAreCorrectlyTraced() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        CopyOnWriteArrayList invokedMessages = new CopyOnWriteArrayList();
        this.mockEventHandlerInvoker();
        ((EventHandlerInvoker)Mockito.doAnswer(answer -> {
            EventMessage message = (EventMessage)answer.getArgument(0, EventMessage.class);
            invokedMessages.add(message);
            this.spanFactory.verifySpanActive("StreamingEventProcessor.batch");
            this.spanFactory.verifySpanActive("StreamingEventProcessor.process", (Message<?>)message);
            countDownLatch.countDown();
            return null;
        }).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        List<EventMessage> events = IntStream.range(0, 8).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue((boolean)countDownLatch.await(5L, TimeUnit.SECONDS));
        invokedMessages.forEach(e -> AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> this.spanFactory.verifySpanCompleted("StreamingEventProcessor.process", (Message<?>)e)));
        this.spanFactory.verifySpanCompleted("StreamingEventProcessor.batch");
    }

    @Test
    void handlingEventsHaveSegmentAndTokenInUnitOfWork() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        this.mockEventHandlerInvoker();
        ((EventHandlerInvoker)Mockito.doAnswer(answer -> {
            Map resources = CurrentUnitOfWork.get().resources();
            boolean containsSegment = resources.containsKey("Processor[test]/SegmentId");
            boolean containsToken = resources.containsKey("Processor[test]/Token");
            if (!containsSegment) {
                logger.error("UoW didn't contain the segment!");
                return null;
            }
            if (!containsToken) {
                logger.error("UoW didn't contain the token!");
                return null;
            }
            countDownLatch.countDown();
            return null;
        }).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        List<EventMessage> events = IntStream.range(0, 8).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue((boolean)countDownLatch.await(5L, TimeUnit.SECONDS));
    }

    @Test
    void processorOnlyTriesToClaimAvailableSegments() {
        this.tokenStore.storeToken((TrackingToken)new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 0);
        this.tokenStore.storeToken((TrackingToken)new GlobalSequenceTrackingToken(2L), PROCESSOR_NAME, 1);
        this.tokenStore.storeToken((TrackingToken)new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 2);
        this.tokenStore.storeToken((TrackingToken)new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 3);
        Mockito.when((Object)this.tokenStore.fetchAvailableSegments(this.testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment((int)2, (int[])new int[]{0, 1, 2, 3})));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.testSubject.processingStatus().containsKey(2)));
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.never())).fetchToken((String)Mockito.eq((Object)this.testSubject.getName()), Mockito.intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
    }

    @Test
    void startingAfterShutdownLetsProcessorProceed() {
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.testSubject.start();
        this.testSubject.shutDown();
        List<EventMessage> events = IntStream.range(0, 100).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            long nonNullTokens = IntStream.range(0, 8).mapToObj(i -> this.tokenStore.fetchToken(PROCESSOR_NAME, i)).filter(Objects::nonNull).count();
            Assertions.assertEquals((long)8L, (long)nonNullTokens);
        });
        Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size());
    }

    @Test
    void allTokensUpdatedToLatestValue() {
        List<EventMessage> events = IntStream.range(0, 100).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.mockEventHandlerInvoker();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(6, TimeUnit.SECONDS, () -> {
            long lowestToken = IntStream.range(0, 8).mapToObj(i -> this.tokenStore.fetchToken(this.testSubject.getName(), i)).mapToLong(this::tokenPosition).min().orElse(-1L);
            Assertions.assertEquals((long)100L, (long)lowestToken);
        });
    }

    private long tokenPosition(TrackingToken token) {
        return token == null ? 0L : token.position().orElse(0L);
    }

    @Test
    void exceptionWhileHandlingEventAbortsWorker() throws Exception {
        List<EventMessage> events = Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).collect(Collectors.toList());
        this.mockEventHandlerInvoker();
        ((EventHandlerInvoker)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Simulating worker failure")}).doNothing().when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.argThat(em -> em.getIdentifier().equals(((EventMessage)events.get(2)).getIdentifier())), (Segment)Mockito.any());
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        Assertions.assertEquals((int)8, (int)this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler, (VerificationMode)Mockito.never())).canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        events.forEach(e -> this.stubMessageSource.publishMessage((EventMessage<?>)e));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            try {
                ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler)).handle((EventMessage)Mockito.argThat(em -> em.getIdentifier().equals(((EventMessage)events.get(2)).getIdentifier())), (Segment)Mockito.argThat(s -> s.getSegmentId() == ((Integer)((EventMessage)events.get(2)).getPayload()).intValue()));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals((int)7, (int)this.testSubject.processingStatus().size());
            Assertions.assertFalse((boolean)this.testSubject.processingStatus().containsKey(2));
        });
    }

    @Test
    void workPackageIsAbortedWhenExtendingClaimFails() {
        InMemoryTokenStore spy = (InMemoryTokenStore)Mockito.spy((Object)this.tokenStore);
        this.setTestSubject(this.createTestSubject(b -> b.tokenStore((TokenStore)spy).messageSource((StreamableMessageSource)new InMemoryStreamableEventSource(true)).claimExtensionThreshold(10L)));
        ((InMemoryTokenStore)Mockito.doThrow((Throwable[])new Throwable[]{new MockException("Simulated failure")}).when((Object)spy)).extendClaim((String)Mockito.any(), Mockito.anyInt());
        this.testSubject.start();
        AssertUtils.assertWithin(250, TimeUnit.MILLISECONDS, () -> ((InMemoryTokenStore)Mockito.verify((Object)spy, (VerificationMode)Mockito.atLeastOnce())).extendClaim(this.testSubject.getName(), 0));
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.testSubject.processingStatus().isEmpty()));
    }

    @Test
    void handlingUnknownMessageTypeWillAdvanceToken() {
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1)));
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any())).thenReturn((Object)false);
        Mockito.when((Object)this.stubEventHandler.canHandleType(Integer.class)).thenReturn((Object)false);
        EventMessage eventToIgnoreOne = GenericEventMessage.asEventMessage((Object)1337);
        this.stubMessageSource.publishMessage(eventToIgnoreOne);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((long)1L, (long)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).getCurrentPosition().orElse(0L)));
        Assertions.assertEquals((int)1, (int)this.stubMessageSource.getIgnoredEvents().size());
    }

    @Test
    void tokenStoreReturningSingleNullToken() {
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any())).thenReturn((Object)false);
        Mockito.when((Object)this.stubEventHandler.canHandleType(Integer.class)).thenReturn((Object)false);
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        this.tokenStore.storeToken((TrackingToken)new GlobalSequenceTrackingToken(0L), this.testSubject.getName(), 1);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)2, (int)this.testSubject.processingStatus().size()));
    }

    @Test
    void eventsWhichMustBeIgnoredAreNotHandledOnlyValidated() throws Exception {
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1)));
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.argThat(argument -> argument != null && Integer.class.equals((Object)argument.getPayloadType())), (Segment)Mockito.any())).thenReturn((Object)false);
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.argThat(argument -> argument != null && String.class.equals((Object)argument.getPayloadType())), (Segment)Mockito.any())).thenReturn((Object)true);
        Mockito.when((Object)this.stubEventHandler.canHandleType(Integer.class)).thenReturn((Object)false);
        Mockito.when((Object)this.stubEventHandler.canHandleType(String.class)).thenReturn((Object)true);
        EventMessage eventToIgnoreOne = GenericEventMessage.asEventMessage((Object)1337);
        EventMessage eventToIgnoreTwo = GenericEventMessage.asEventMessage((Object)42);
        EventMessage eventToIgnoreThree = GenericEventMessage.asEventMessage((Object)9001);
        ArrayList<Object> eventsToIgnore = new ArrayList<Object>();
        eventsToIgnore.add(eventToIgnoreOne.getPayload());
        eventsToIgnore.add(eventToIgnoreTwo.getPayload());
        eventsToIgnore.add(eventToIgnoreThree.getPayload());
        EventMessage eventToHandleOne = GenericEventMessage.asEventMessage((Object)"some-text");
        EventMessage eventToHandleTwo = GenericEventMessage.asEventMessage((Object)"some-other-text");
        ArrayList<Object> eventsToHandle = new ArrayList<Object>();
        eventsToHandle.add(eventToHandleOne.getPayload());
        eventsToHandle.add(eventToHandleTwo.getPayload());
        ArrayList<Object> eventsToValidate = new ArrayList<Object>();
        eventsToValidate.add(eventToIgnoreOne.getPayload());
        eventsToValidate.add(eventToIgnoreTwo.getPayload());
        eventsToValidate.add(eventToIgnoreThree.getPayload());
        eventsToValidate.add(eventToHandleOne.getPayload());
        eventsToValidate.add(eventToHandleTwo.getPayload());
        this.stubMessageSource.publishMessage(eventToIgnoreOne);
        this.stubMessageSource.publishMessage(eventToIgnoreTwo);
        this.stubMessageSource.publishMessage(eventToIgnoreThree);
        this.stubMessageSource.publishMessage(eventToHandleOne);
        this.stubMessageSource.publishMessage(eventToHandleTwo);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size()));
        ArgumentCaptor validatedEventCaptor = ArgumentCaptor.forClass(EventMessage.class);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler, (VerificationMode)Mockito.timeout((long)500L).times(5))).canHandle((EventMessage)validatedEventCaptor.capture(), (Segment)Mockito.any());
        List validatedEvents = validatedEventCaptor.getAllValues();
        Assertions.assertEquals((int)5, (int)validatedEvents.size());
        for (EventMessage validatedEvent : validatedEvents) {
            Assertions.assertTrue((boolean)eventsToValidate.contains(validatedEvent.getPayload()));
        }
        ArgumentCaptor handledEventsCaptor = ArgumentCaptor.forClass(EventMessage.class);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler, (VerificationMode)Mockito.timeout((long)500L).times(2))).handle((EventMessage)handledEventsCaptor.capture(), (Segment)Mockito.any());
        List handledEvents = handledEventsCaptor.getAllValues();
        Assertions.assertEquals((int)2, (int)handledEvents.size());
        for (EventMessage validatedEvent : handledEvents) {
            Assertions.assertTrue((boolean)eventsToHandle.contains(validatedEvent.getPayload()));
        }
        List<TrackedEventMessage<?>> ignoredEvents = this.stubMessageSource.getIgnoredEvents();
        Assertions.assertEquals((int)3, (int)ignoredEvents.size());
        for (TrackedEventMessage<?> ignoredMessage : ignoredEvents) {
            Assertions.assertTrue((boolean)eventsToIgnore.contains(ignoredMessage.getPayload()));
        }
    }

    @Test
    void coordinationIsTriggeredThroughEventAvailabilityCallback() {
        boolean streamCallbackSupported = true;
        InMemoryStreamableEventSource testMessageSource = new InMemoryStreamableEventSource(streamCallbackSupported);
        this.setTestSubject(this.createTestSubject(builder -> builder.messageSource((StreamableMessageSource)testMessageSource)));
        this.mockEventHandlerInvoker();
        Stream.of(0, 1, 2, 3).map(GenericEventMessage::new).forEach(testMessageSource::publishMessage);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)8, (int)this.testSubject.processingStatus().size()));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            long lowestToken = this.testSubject.processingStatus().values().stream().map(status -> status.getCurrentPosition().orElse(-1L)).min(Long::compareTo).orElse(-1L);
            Assertions.assertEquals((long)4L, (long)lowestToken);
        });
        Stream.of(4, 5, 6, 7).map(GenericEventMessage::new).forEach(testMessageSource::publishMessage);
        testMessageSource.runOnAvailableCallback();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            long lowestToken = this.testSubject.processingStatus().values().stream().map(status -> status.getCurrentPosition().orElse(-1L)).min(Long::compareTo).orElse(-1L);
            Assertions.assertEquals((long)8L, (long)lowestToken);
        });
    }

    @Test
    void shutdownCompletesAfterAbortingWorkPackages() throws InterruptedException, ExecutionException, TimeoutException {
        this.testSubject.start();
        Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertFalse((boolean)this.testSubject.processingStatus().isEmpty()));
        this.testSubject.shutdownAsync().get(1L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)0, (int)this.testSubject.processingStatus().size()));
        Assertions.assertFalse((boolean)this.coordinatorExecutor.isShutdown());
        Assertions.assertFalse((boolean)this.workerExecutor.isShutdown());
    }

    @Test
    void shutdownProcessorWhichHasNotStartedYetReturnsCompletedFuture() {
        Assertions.assertTrue((boolean)this.testSubject.shutdownAsync().isDone());
    }

    @Test
    void shutdownProcessorAsyncTwiceReturnsSameFuture() {
        this.testSubject.start();
        CompletableFuture resultOne = this.testSubject.shutdownAsync();
        CompletableFuture resultTwo = this.testSubject.shutdownAsync();
        Assertions.assertSame((Object)resultOne, (Object)resultTwo);
    }

    @Test
    void startFailsWhenShutdownIsInProgress() throws Exception {
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any())).thenReturn((Object)true);
        CountDownLatch latch = new CountDownLatch(1);
        ((EventHandlerInvoker)Mockito.doAnswer(i -> latch.await(10L, TimeUnit.MILLISECONDS)).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        this.testSubject.start();
        Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertFalse((boolean)this.testSubject.processingStatus().isEmpty()));
        CompletableFuture shutdownComplete = this.testSubject.shutdownAsync();
        Assertions.assertThrows(IllegalStateException.class, () -> this.testSubject.start());
        latch.countDown();
        shutdownComplete.get(1L, TimeUnit.SECONDS);
        Assertions.assertDoesNotThrow(() -> this.testSubject.start());
    }

    @Test
    void isRunningOnlyReturnsTrueForStartedProcessor() {
        Assertions.assertFalse((boolean)this.testSubject.isRunning());
        this.testSubject.start();
        Assertions.assertTrue((boolean)this.testSubject.isRunning());
    }

    @Test
    void isErrorForFailingMessageSourceOperation() {
        Assertions.assertFalse((boolean)this.testSubject.isError());
        this.testSubject.start();
        Assertions.assertFalse((boolean)this.testSubject.isError());
        this.stubMessageSource.publishMessage(InMemoryStreamableEventSource.FAIL_EVENT);
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.testSubject.isError()));
        Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1500, TimeUnit.MILLISECONDS, () -> Assertions.assertFalse((boolean)this.testSubject.isError()));
    }

    @Test
    void isErrorWhenOpeningTheStreamFails() {
        StreamableMessageSource spiedMessageSource = (StreamableMessageSource)Mockito.spy((Object)new InMemoryStreamableEventSource());
        Mockito.when((Object)spiedMessageSource.openStream((TrackingToken)Mockito.any())).thenThrow(new Throwable[]{new IllegalStateException("Failed to open the stream")}).thenCallRealMethod();
        this.setTestSubject(this.createTestSubject(builder -> builder.messageSource(spiedMessageSource)));
        Assertions.assertFalse((boolean)this.testSubject.isError());
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.testSubject.isError()));
        Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1500, TimeUnit.MILLISECONDS, () -> Assertions.assertFalse((boolean)this.testSubject.isError()));
    }

    @Test
    void getTokenStoreIdentifier() {
        String expectedIdentifier = "some-identifier";
        TokenStore tokenStore = (TokenStore)Mockito.mock(TokenStore.class);
        Mockito.when((Object)tokenStore.retrieveStorageIdentifier()).thenReturn(Optional.of(expectedIdentifier));
        this.setTestSubject(this.createTestSubject(builder -> builder.tokenStore(tokenStore)));
        Assertions.assertEquals((Object)expectedIdentifier, (Object)this.testSubject.getTokenStoreIdentifier());
    }

    @Test
    void releaseSegmentMakesTheTokenUnclaimedForTwiceTheTokenClaimInterval() {
        int testSegmentId = 0;
        int testTokenClaimInterval = 500;
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1).tokenClaimInterval((long)testTokenClaimInterval)));
        this.testSubject.start();
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
        this.testSubject.releaseSegment(testSegmentId);
        AssertUtils.assertWithin(testTokenClaimInterval + 50, TimeUnit.MILLISECONDS, () -> Assertions.assertNull(this.testSubject.processingStatus().get(testSegmentId)));
        AssertUtils.assertWithin(testTokenClaimInterval * 2 + 50, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
    }

    @Test
    void splitSegmentIsNotSupported() {
        TokenStore tokenStoreWhichCannotSplitSegments = (TokenStore)Mockito.mock(TokenStore.class);
        Mockito.when((Object)tokenStoreWhichCannotSplitSegments.requiresExplicitSegmentInitialization()).thenReturn((Object)false);
        this.setTestSubject(this.createTestSubject(builder -> builder.tokenStore(tokenStoreWhichCannotSplitSegments)));
        CompletableFuture result = this.testSubject.splitSegment(0);
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        result.exceptionally(exception -> {
            Assertions.assertTrue((boolean)exception.getClass().isAssignableFrom(UnsupportedOperationException.class));
            return null;
        });
    }

    @Test
    void splitSegment() {
        int testSegmentId = 0;
        int testTokenClaimInterval = 500;
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1).tokenClaimInterval((long)testTokenClaimInterval)));
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
        CompletableFuture result = this.testSubject.splitSegment(testSegmentId);
        AssertUtils.assertWithin(testTokenClaimInterval * 2, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)result.isDone()));
        Assertions.assertFalse((boolean)result.isCompletedExceptionally());
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(1)));
    }

    @Test
    void mergeSegmentIsNotSupported() {
        TokenStore tokenStoreWhichCannotMergeSegments = (TokenStore)Mockito.mock(TokenStore.class);
        Mockito.when((Object)tokenStoreWhichCannotMergeSegments.requiresExplicitSegmentInitialization()).thenReturn((Object)false);
        this.setTestSubject(this.createTestSubject(builder -> builder.tokenStore(tokenStoreWhichCannotMergeSegments)));
        CompletableFuture result = this.testSubject.mergeSegment(0);
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        result.exceptionally(exception -> {
            Assertions.assertTrue((boolean)exception.getClass().isAssignableFrom(UnsupportedOperationException.class));
            return null;
        });
    }

    @Test
    void mergeSegment() {
        int testSegmentId = 0;
        int testSegmentIdToMerge = 1;
        int testTokenClaimInterval = 500;
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(2).tokenClaimInterval((long)testTokenClaimInterval)));
        this.testSubject.start();
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId));
            Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentIdToMerge));
        });
        CompletableFuture result = this.testSubject.mergeSegment(testSegmentId);
        AssertUtils.assertWithin(testTokenClaimInterval * 2, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)result.isDone()));
        Assertions.assertFalse((boolean)result.isCompletedExceptionally());
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNull(this.testSubject.processingStatus().get(testSegmentIdToMerge)));
    }

    @Test
    void releaseAndClaimSegment() {
        int testSegmentId = 0;
        int testTokenClaimInterval = 5000;
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(2).tokenClaimInterval((long)testTokenClaimInterval)));
        this.testSubject.start();
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertNotNull(this.testSubject.processingStatus().get(testSegmentId)));
        this.testSubject.releaseSegment(testSegmentId, 180L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size()));
        this.testSubject.claimSegment(testSegmentId);
        AssertUtils.assertWithin(testTokenClaimInterval, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((int)2, (int)this.testSubject.processingStatus().size()));
    }

    @Test
    void supportReset() {
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        Assertions.assertTrue((boolean)this.testSubject.supportsReset());
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)false);
        Assertions.assertFalse((boolean)this.testSubject.supportsReset());
    }

    @Test
    void resetTokensFailsIfTheProcessorIsStillRunning() {
        this.testSubject.start();
        Assertions.assertThrows(IllegalStateException.class, () -> this.testSubject.resetTokens());
    }

    @Test
    void resetTokens() {
        int expectedSegmentCount = 2;
        GlobalSequenceTrackingToken expectedToken = new GlobalSequenceTrackingToken(42L);
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.setTestSubject(this.createTestSubject(arg_0 -> PooledStreamingEventProcessorTest.lambda$resetTokens$83(expectedSegmentCount, (TrackingToken)expectedToken, arg_0)));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)this.tokenStore.fetchSegments(PROCESSOR_NAME).length, (int)expectedSegmentCount));
        this.testSubject.shutDown();
        this.testSubject.resetTokens();
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler)).performReset(null);
        int[] segments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
    }

    @Test
    void resetTokensWithContext() {
        int expectedSegmentCount = 2;
        GlobalSequenceTrackingToken expectedToken = new GlobalSequenceTrackingToken(42L);
        String expectedContext = "my-context";
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.setTestSubject(this.createTestSubject(arg_0 -> PooledStreamingEventProcessorTest.lambda$resetTokensWithContext$86(expectedSegmentCount, (TrackingToken)expectedToken, arg_0)));
        this.testSubject.start();
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> Assertions.assertEquals((int)expectedSegmentCount, (int)this.tokenStore.fetchSegments(PROCESSOR_NAME).length));
        this.testSubject.shutDown();
        this.testSubject.resetTokens((Object)expectedContext);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler)).performReset((Object)expectedContext);
        int[] segments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler, (VerificationMode)Mockito.times((int)2))).segmentReleased((Segment)Mockito.any(Segment.class)));
    }

    @Test
    void resetTokensFromDefinedPosition() {
        GlobalSequenceTrackingToken testToken = new GlobalSequenceTrackingToken(42L);
        int expectedSegmentCount = 2;
        TrackingToken expectedToken = ReplayToken.createReplayToken((TrackingToken)testToken, null);
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.setTestSubject(this.createTestSubject(arg_0 -> PooledStreamingEventProcessorTest.lambda$resetTokensFromDefinedPosition$90(expectedSegmentCount, (TrackingToken)testToken, arg_0)));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)this.tokenStore.fetchSegments(PROCESSOR_NAME).length, (int)expectedSegmentCount));
        this.testSubject.shutDown();
        this.testSubject.resetTokens(StreamableMessageSource::createTailToken);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler)).performReset(null);
        int[] segments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
    }

    @Test
    void resetTokensFromDefinedPositionAndWithResetContext() {
        GlobalSequenceTrackingToken testToken = new GlobalSequenceTrackingToken(42L);
        int expectedSegmentCount = 2;
        String expectedContext = "my-context";
        TrackingToken expectedToken = ReplayToken.createReplayToken((TrackingToken)testToken, null, (Object)expectedContext);
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.setTestSubject(this.createTestSubject(arg_0 -> PooledStreamingEventProcessorTest.lambda$resetTokensFromDefinedPositionAndWithResetContext$93(expectedSegmentCount, (TrackingToken)testToken, arg_0)));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)this.tokenStore.fetchSegments(PROCESSOR_NAME).length, (int)expectedSegmentCount));
        this.testSubject.shutDown();
        this.testSubject.resetTokens(StreamableMessageSource::createTailToken, (Object)expectedContext);
        ((EventHandlerInvoker)Mockito.verify((Object)this.stubEventHandler)).performReset((Object)expectedContext);
        int[] segments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[0]));
        Assertions.assertEquals((Object)expectedToken, (Object)this.tokenStore.fetchToken(PROCESSOR_NAME, segments[1]));
    }

    @Test
    void maxCapacityDefaultsToShortMax() {
        Assertions.assertEquals((int)Short.MAX_VALUE, (int)this.testSubject.maxCapacity());
    }

    @Test
    void maxCapacityReturnsConfiguredCapacity() {
        int expectedMaxCapacity = 500;
        this.setTestSubject(this.createTestSubject(builder -> builder.maxClaimedSegments(expectedMaxCapacity)));
        Assertions.assertEquals((int)expectedMaxCapacity, (int)this.testSubject.maxCapacity());
    }

    @Test
    void processingStatusIsUpdatedWithTrackingToken() {
        this.testSubject.start();
        this.mockEventHandlerInvoker();
        Stream.of(1, 2, 2, 4, 5).map(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> this.testSubject.processingStatus().values().forEach(status -> Assertions.assertEquals((long)5L, (long)status.getCurrentPosition().orElse(0L))));
    }

    private void mockEventHandlerInvoker() {
        Mockito.when((Object)this.stubEventHandler.canHandleType((Class)Mockito.any())).thenReturn((Object)true);
        Mockito.when((Object)this.stubEventHandler.canHandle((EventMessage)Mockito.any(), (Segment)Mockito.any())).thenAnswer(answer -> ((EventMessage)answer.getArgument(0, EventMessage.class)).getPayload().equals(((Segment)answer.getArgument(1, Segment.class)).getSegmentId()));
    }

    @Test
    void buildWithNullSpanFactoryThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.spanFactory((SpanFactory)null));
    }

    @Test
    void buildWithNullMessageSourceThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.messageSource(null));
    }

    @Test
    void buildWithoutMessageSourceThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder().tokenStore((TokenStore)new InMemoryTokenStore()).transactionManager((TransactionManager)NoTransactionManager.INSTANCE);
        Assertions.assertThrows(AxonConfigurationException.class, () -> ((PooledStreamingEventProcessor.Builder)builderTestSubject).build());
    }

    @Test
    void buildWithNullTokenStoreThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.tokenStore(null));
    }

    @Test
    void buildWithoutTokenStoreThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).messageSource((StreamableMessageSource)this.stubMessageSource).transactionManager((TransactionManager)NoTransactionManager.INSTANCE);
        Assertions.assertThrows(AxonConfigurationException.class, () -> ((PooledStreamingEventProcessor.Builder)builderTestSubject).build());
    }

    @Test
    void buildWithNullTransactionManagerThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.transactionManager(null));
    }

    @Test
    void buildWithoutTransactionManagerThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).messageSource((StreamableMessageSource)this.stubMessageSource).tokenStore((TokenStore)new InMemoryTokenStore());
        Assertions.assertThrows(AxonConfigurationException.class, () -> ((PooledStreamingEventProcessor.Builder)builderTestSubject).build());
    }

    @Test
    void buildWithNullCoordinatorExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.coordinatorExecutor((ScheduledExecutorService)null));
    }

    @Test
    void buildWithNullCoordinatorExecutorBuilderThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.coordinatorExecutor((Function)null));
    }

    @Test
    void buildWithoutCoordinatorExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).messageSource((StreamableMessageSource)this.stubMessageSource).tokenStore((TokenStore)new InMemoryTokenStore()).transactionManager(NoTransactionManager.instance());
        Assertions.assertThrows(AxonConfigurationException.class, () -> ((PooledStreamingEventProcessor.Builder)builderTestSubject).build());
    }

    @Test
    void buildWithNullWorkerExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.workerExecutor((ScheduledExecutorService)null));
    }

    @Test
    void buildWithNullWorkerExecutorBuilderThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.workerExecutor((Function)null));
    }

    @Test
    void buildWithoutWorkerExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).messageSource((StreamableMessageSource)this.stubMessageSource).tokenStore((TokenStore)new InMemoryTokenStore()).transactionManager(NoTransactionManager.instance()).coordinatorExecutor(this.coordinatorExecutor);
        Assertions.assertThrows(AxonConfigurationException.class, () -> ((PooledStreamingEventProcessor.Builder)builderTestSubject).build());
    }

    @Test
    void buildWithZeroOrNegativeInitialSegmentCountThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.initialSegmentCount(0));
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.initialSegmentCount(-1));
    }

    @Test
    void buildWithNullInitialTokenThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.initialToken(null));
    }

    @Test
    void buildWithZeroOrNegativeTokenClaimIntervalThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.tokenClaimInterval(0L));
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.tokenClaimInterval(-1L));
    }

    @Test
    void buildWithZeroOrNegativeMaxCapacityThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.maxClaimedSegments(0));
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.maxClaimedSegments(-1));
    }

    @Test
    void buildWithZeroOrNegativeClaimExtensionThresholdThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.claimExtensionThreshold(0L));
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.claimExtensionThreshold(-1L));
    }

    @Test
    void buildWithZeroOrNegativeBatchSizeThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.batchSize(0));
        Assertions.assertThrows(AxonConfigurationException.class, () -> builderTestSubject.batchSize(-1));
    }

    @Test
    void isReplaying() {
        this.mockEventHandlerInvoker();
        Mockito.when((Object)this.stubEventHandler.supportsReset()).thenReturn((Object)true);
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1)));
        List<EventMessage> events = IntStream.range(0, 100).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        this.testSubject.start();
        events.forEach(this.stubMessageSource::publishMessage);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size());
            Assertions.assertTrue((boolean)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isCaughtUp());
            Assertions.assertFalse((boolean)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isReplaying());
            Assertions.assertFalse((boolean)this.testSubject.isReplaying());
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens(StreamableMessageSource::createTailToken);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size());
            Assertions.assertTrue((boolean)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isCaughtUp());
            Assertions.assertTrue((boolean)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isReplaying());
            Assertions.assertFalse((boolean)this.testSubject.isReplaying());
        });
    }

    @Test
    void isCaughtUpWhenDoneProcessing() throws Exception {
        this.mockSlowEventHandler();
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1)));
        List<EventMessage> events = IntStream.range(0, 3).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        AtomicReference<Object> startedProcessing = new AtomicReference<Object>(null);
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals((int)1, (int)this.testSubject.processingStatus().size());
            startedProcessing.compareAndSet(null, Instant.now());
        });
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isCaughtUp()));
        Instant now = Instant.now();
        Assertions.assertTrue((Duration.between(startedProcessing.get(), now).getSeconds() >= 2L ? 1 : 0) != 0);
    }

    @Test
    void existingEventsBeforeProcessorStartAreConsideredReplayed() throws Exception {
        this.setTestSubject(this.createTestSubject(b -> b.initialSegmentCount(1)));
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(uow -> countDownLatch.countDown());
            return interceptorChain.proceed();
        });
        IntStream.range(0, 3).mapToObj(GenericEventMessage::new).forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue((boolean)countDownLatch.await(5L, TimeUnit.SECONDS), (String)"Expected Unit of Work to have reached clean up phase");
        TrackingToken trackingToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertTrue((boolean)ReplayToken.isReplay((TrackingToken)trackingToken), (String)("Not a replay token: " + trackingToken));
    }

    @Test
    void eventsPublishedAfterProcessorStartAreNotConsideredReplayed() throws Exception {
        this.setTestSubject(this.createTestSubject(b -> b.initialSegmentCount(1)));
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(uow -> countDownLatch.countDown());
            return interceptorChain.proceed();
        });
        this.stubMessageSource.publishMessage(GenericEventMessage.asEventMessage((Object)0));
        this.stubMessageSource.publishMessage(GenericEventMessage.asEventMessage((Object)1));
        this.testSubject.start();
        this.stubMessageSource.publishMessage(GenericEventMessage.asEventMessage((Object)2));
        Assertions.assertTrue((boolean)countDownLatch.await(5L, TimeUnit.SECONDS), (String)"Expected Unit of Work to have reached clean up phase");
        TrackingToken trackingToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertFalse((boolean)ReplayToken.isReplay((TrackingToken)trackingToken), (String)("Not a replay token: " + trackingToken));
    }

    private void mockSlowEventHandler() throws Exception {
        ((EventHandlerInvoker)Mockito.doAnswer(invocation -> {
            Thread.sleep(1000L);
            return null;
        }).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
    }

    @Test
    void coordinatorExtendsClaimsEarlierForBusyWorkPackages() throws Exception {
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1).enableCoordinatorClaimExtension()));
        AtomicBoolean isWaiting = new AtomicBoolean(false);
        CountDownLatch handleLatch = new CountDownLatch(1);
        this.mockEventHandlerInvoker();
        ((EventHandlerInvoker)Mockito.doAnswer(invocation -> {
            isWaiting.set(true);
            return handleLatch.await(5L, TimeUnit.SECONDS);
        }).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        List<EventMessage> events = IntStream.range(0, 42).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(isWaiting::get);
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.timeout((long)5000L))).extendClaim(PROCESSOR_NAME, 0);
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.never())).storeToken((TrackingToken)Mockito.any(), (String)Mockito.eq((Object)PROCESSOR_NAME), Mockito.eq((int)0));
        handleLatch.countDown();
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(() -> ((EventTrackerStatus)this.testSubject.processingStatus().get(0)).isCaughtUp());
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.timeout((long)5000L).atLeastOnce())).storeToken((TrackingToken)Mockito.any(), (String)Mockito.eq((Object)PROCESSOR_NAME), Mockito.eq((int)0));
    }

    @Test
    void coordinatorExtendingClaimFailsAndAbortsWorkPackage() throws Exception {
        this.setTestSubject(this.createTestSubject(builder -> builder.initialSegmentCount(1).enableCoordinatorClaimExtension()));
        String expectedExceptionMessage = "bummer";
        ((InMemoryTokenStore)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException(expectedExceptionMessage)}).when((Object)this.tokenStore)).extendClaim(PROCESSOR_NAME, 0);
        AtomicBoolean isWaiting = new AtomicBoolean(false);
        CountDownLatch handleLatch = new CountDownLatch(1);
        this.mockEventHandlerInvoker();
        ((EventHandlerInvoker)Mockito.doAnswer(invocation -> {
            isWaiting.set(true);
            return handleLatch.await(5L, TimeUnit.SECONDS);
        }).when((Object)this.stubEventHandler)).handle((EventMessage)Mockito.any(), (Segment)Mockito.any());
        List<EventMessage> events = IntStream.range(0, 42).mapToObj(GenericEventMessage::new).collect(Collectors.toList());
        events.forEach(this.stubMessageSource::publishMessage);
        this.testSubject.start();
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(isWaiting::get);
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.timeout((long)5000L))).extendClaim(PROCESSOR_NAME, 0);
        ((InMemoryTokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.never())).storeToken((TrackingToken)Mockito.any(), (String)Mockito.eq((Object)PROCESSOR_NAME), Mockito.eq((int)0));
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(() -> ((EventTrackerStatus)this.testSubject.processingStatus().get(0)).getError().getMessage().equals(expectedExceptionMessage));
        handleLatch.countDown();
    }

    private static /* synthetic */ PooledStreamingEventProcessor.Builder lambda$resetTokensFromDefinedPositionAndWithResetContext$93(int expectedSegmentCount, TrackingToken testToken, PooledStreamingEventProcessor.Builder builder) {
        return builder.initialSegmentCount(expectedSegmentCount).initialToken(source -> testToken);
    }

    private static /* synthetic */ PooledStreamingEventProcessor.Builder lambda$resetTokensFromDefinedPosition$90(int expectedSegmentCount, TrackingToken testToken, PooledStreamingEventProcessor.Builder builder) {
        return builder.initialSegmentCount(expectedSegmentCount).initialToken(source -> testToken);
    }

    private static /* synthetic */ PooledStreamingEventProcessor.Builder lambda$resetTokensWithContext$86(int expectedSegmentCount, TrackingToken expectedToken, PooledStreamingEventProcessor.Builder builder) {
        return builder.initialSegmentCount(expectedSegmentCount).initialToken(source -> expectedToken);
    }

    private static /* synthetic */ PooledStreamingEventProcessor.Builder lambda$resetTokens$83(int expectedSegmentCount, TrackingToken expectedToken, PooledStreamingEventProcessor.Builder builder) {
        return builder.initialSegmentCount(expectedSegmentCount).initialToken(source -> expectedToken);
    }
}

