/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.Coordinator;
import org.axonframework.eventhandling.pooled.WorkPackage;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.utils.AssertUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

class CoordinatorTest {
    private static final String PROCESSOR_NAME = "test";
    private Coordinator testSubject;
    private final Segment SEGMENT_ZERO = Segment.computeSegment((int)0, (int[])new int[0]);
    private final int SEGMENT_ID = 0;
    private final int[] SEGMENT_IDS = new int[]{0};
    private final Segment SEGMENT_ONE = Segment.computeSegment((int)0, (int[])this.SEGMENT_IDS);
    private final int[] EMPTY_SEGMENT_IDS = new int[0];
    private final TokenStore tokenStore = (TokenStore)Mockito.mock(TokenStore.class);
    private final ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor)Mockito.mock(ScheduledThreadPoolExecutor.class);
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource = (StreamableMessageSource)Mockito.mock(StreamableMessageSource.class);
    private final WorkPackage workPackage = (WorkPackage)Mockito.mock(WorkPackage.class);

    CoordinatorTest() {
    }

    @BeforeEach
    void setUp() {
        this.testSubject = Coordinator.builder().name(PROCESSOR_NAME).messageSource(this.messageSource).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.instance()).executorService((ScheduledExecutorService)this.executorService).workPackageFactory((segment, trackingToken) -> this.workPackage).initialToken(es -> ReplayToken.createReplayToken((TrackingToken)es.createHeadToken())).eventFilter(eventMessage -> true).maxClaimedSegments(this.SEGMENT_IDS.length).build();
    }

    @Test
    void ifCoordinationTaskRescheduledAfterTokenReleaseClaimFails() {
        RuntimeException streamOpenException = new RuntimeException("Some exception during event stream open");
        RuntimeException releaseClaimException = new RuntimeException("Some exception during release claim");
        GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(0L);
        ((TokenStore)Mockito.doReturn((Object)this.SEGMENT_IDS).when((Object)this.tokenStore)).fetchSegments(PROCESSOR_NAME);
        ((TokenStore)Mockito.doReturn((Object)token).when((Object)this.tokenStore)).fetchToken((String)ArgumentMatchers.eq((Object)PROCESSOR_NAME), Mockito.anyInt());
        ((TokenStore)Mockito.doThrow((Throwable[])new Throwable[]{releaseClaimException}).when((Object)this.tokenStore)).releaseClaim((String)ArgumentMatchers.eq((Object)PROCESSOR_NAME), Mockito.anyInt());
        ((StreamableMessageSource)Mockito.doThrow((Throwable[])new Throwable[]{streamOpenException}).when(this.messageSource)).openStream((TrackingToken)ArgumentMatchers.any());
        ((WorkPackage)Mockito.doReturn(CompletableFuture.completedFuture(streamOpenException)).when((Object)this.workPackage)).abort((Exception)ArgumentMatchers.any());
        ((WorkPackage)Mockito.doReturn((Object)this.SEGMENT_ZERO).when((Object)this.workPackage)).segment();
        ((ScheduledThreadPoolExecutor)Mockito.doAnswer(this.runTaskSync()).when((Object)this.executorService)).submit((Runnable)ArgumentMatchers.any(Runnable.class));
        this.testSubject.start();
        ((ScheduledThreadPoolExecutor)Mockito.verify((Object)this.executorService, (VerificationMode)Mockito.times((int)1))).schedule((Runnable)ArgumentMatchers.any(Runnable.class), Mockito.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        ((TokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.times((int)0))).initializeTokenSegments(ArgumentMatchers.anyString(), Mockito.anyInt(), (TrackingToken)ArgumentMatchers.any(TrackingToken.class));
    }

    @Test
    void ifCoordinationTaskInitializesTokenStoreWhenNeeded() {
        GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(0L);
        ((TokenStore)Mockito.doReturn((Object)this.EMPTY_SEGMENT_IDS).when((Object)this.tokenStore)).fetchSegments(PROCESSOR_NAME);
        ((TokenStore)Mockito.doReturn((Object)token).when((Object)this.tokenStore)).fetchToken((String)ArgumentMatchers.eq((Object)PROCESSOR_NAME), Mockito.anyInt());
        ((WorkPackage)Mockito.doReturn((Object)this.SEGMENT_ZERO).when((Object)this.workPackage)).segment();
        ((ScheduledThreadPoolExecutor)Mockito.doAnswer(this.runTaskSync()).when((Object)this.executorService)).submit((Runnable)ArgumentMatchers.any(Runnable.class));
        this.testSubject.start();
        ((ScheduledThreadPoolExecutor)Mockito.verify((Object)this.executorService, (VerificationMode)Mockito.times((int)1))).schedule((Runnable)ArgumentMatchers.any(Runnable.class), Mockito.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        ((TokenStore)Mockito.verify((Object)this.tokenStore, (VerificationMode)Mockito.times((int)1))).initializeTokenSegments(ArgumentMatchers.anyString(), Mockito.anyInt(), (TrackingToken)ArgumentMatchers.isNull());
    }

    @Test
    void ifCoordinationTaskSchedulesEventsWithTheSameTokenTogether() throws InterruptedException {
        GlobalSequenceTrackingToken testToken = new GlobalSequenceTrackingToken(0L);
        GenericTrackedEventMessage testEventOne = new GenericTrackedEventMessage((TrackingToken)testToken, GenericEventMessage.asEventMessage((Object)"this-event"));
        GenericTrackedEventMessage testEventTwo = new GenericTrackedEventMessage((TrackingToken)testToken, GenericEventMessage.asEventMessage((Object)"that-event"));
        ArrayList<GenericTrackedEventMessage> testEvents = new ArrayList<GenericTrackedEventMessage>();
        testEvents.add(testEventOne);
        testEvents.add(testEventTwo);
        Mockito.when((Object)this.workPackage.hasRemainingCapacity()).thenReturn((Object)true).thenReturn((Object)false);
        Mockito.when((Object)this.workPackage.isAbortTriggered()).thenReturn((Object)false);
        Mockito.when((Object)this.workPackage.scheduleEvents(testEvents)).thenReturn((Object)true);
        BlockingStream testStream = (BlockingStream)Mockito.mock(BlockingStream.class);
        Mockito.when((Object)testStream.setOnAvailableCallback((Runnable)ArgumentMatchers.any())).thenReturn((Object)false);
        Mockito.when((Object)testStream.hasNextAvailable()).thenReturn((Object)true).thenReturn((Object)true).thenReturn((Object)false);
        Mockito.when((Object)testStream.nextAvailable()).thenReturn((Object)testEventOne).thenReturn((Object)testEventTwo);
        Mockito.when((Object)testStream.peek()).thenReturn(Optional.of(testEventTwo)).thenReturn(Optional.of(testEventTwo)).thenReturn(Optional.empty());
        Mockito.when(this.executorService.submit((Runnable)ArgumentMatchers.any(Runnable.class))).thenAnswer(this.runTaskAsync());
        Mockito.when((Object)this.tokenStore.fetchSegments(PROCESSOR_NAME)).thenReturn((Object)this.SEGMENT_IDS);
        Mockito.when((Object)this.tokenStore.fetchAvailableSegments(PROCESSOR_NAME)).thenReturn(Collections.singletonList(this.SEGMENT_ONE));
        Mockito.when((Object)this.tokenStore.fetchToken(PROCESSOR_NAME, this.SEGMENT_ONE)).thenReturn((Object)testToken);
        Mockito.when((Object)this.messageSource.openStream((TrackingToken)testToken)).thenReturn((Object)testStream);
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> ((TokenStore)Mockito.verify((Object)this.tokenStore)).fetchToken(PROCESSOR_NAME, this.SEGMENT_ONE));
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> this.lambda$ifCoordinationTaskSchedulesEventsWithTheSameTokenTogether$4((TrackingToken)testToken));
        ArgumentCaptor eventsCaptor = ArgumentCaptor.forClass(List.class);
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> ((WorkPackage)Mockito.verify((Object)this.workPackage)).scheduleEvents((List)eventsCaptor.capture()));
        List resultEvents = (List)eventsCaptor.getValue();
        Assertions.assertEquals((int)2, (int)resultEvents.size());
        Assertions.assertTrue((boolean)resultEvents.contains(testEventOne));
        Assertions.assertTrue((boolean)resultEvents.contains(testEventTwo));
        ((WorkPackage)Mockito.verify((Object)this.workPackage, (VerificationMode)Mockito.times((int)0))).scheduleEvent((TrackedEventMessage)ArgumentMatchers.any());
    }

    private Answer<Future<Void>> runTaskSync() {
        return invocationOnMock -> {
            Runnable runnable = (Runnable)invocationOnMock.getArgument(0);
            runnable.run();
            return CompletableFuture.completedFuture(null);
        };
    }

    private Answer<Future<Void>> runTaskAsync() {
        return invocationOnMock -> CompletableFuture.runAsync((Runnable)invocationOnMock.getArgument(0));
    }

    private /* synthetic */ void lambda$ifCoordinationTaskSchedulesEventsWithTheSameTokenTogether$4(TrackingToken testToken) {
        ((StreamableMessageSource)Mockito.verify(this.messageSource)).openStream(testToken);
    }
}

