/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.transaction.NoOpTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DefaultEventBusSpanFactory;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventBusSpanFactory;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.utils.EventStoreTestUtils;
import org.axonframework.eventsourcing.utils.MockException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.tracing.SpanFactory;
import org.axonframework.tracing.TestSpanFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public abstract class EmbeddedEventStoreTest {
    private static final int CACHED_EVENTS = 10;
    private static final long FETCH_DELAY = 1000L;
    private static final long CLEANUP_DELAY = 10000L;
    private static final boolean OPTIMIZE_EVENT_CONSUMPTION = true;
    private EmbeddedEventStore testSubject;
    protected TransactionManager transactionManager;
    private EventStorageEngine storageEngine;
    private ThreadFactory threadFactory;
    private TestSpanFactory spanFactory;

    @BeforeEach
    void setUp() {
        this.spanFactory = new TestSpanFactory();
        this.transactionManager = this.getTransactionManager();
        this.storageEngine = (EventStorageEngine)Mockito.spy((Object)this.createStorageEngine());
        this.threadFactory = (ThreadFactory)Mockito.spy((Object)new AxonThreadFactory(EmbeddedEventStore.class.getSimpleName()));
        this.newTestSubject(10, 1000L, 10000L, true);
    }

    public abstract EventStorageEngine createStorageEngine();

    public TransactionManager getTransactionManager() {
        this.transactionManager = new NoOpTransactionManager();
        return this.transactionManager;
    }

    private void newTestSubject(int cachedEvents, long fetchDelay, long cleanupDelay, boolean optimizeEventConsumption) {
        Optional.ofNullable(this.testSubject).ifPresent(EmbeddedEventStore::shutDown);
        this.testSubject = EmbeddedEventStore.builder().storageEngine(this.storageEngine).cachedEvents(cachedEvents).fetchDelay(fetchDelay).cleanupDelay(cleanupDelay).threadFactory(this.threadFactory).optimizeEventConsumption(optimizeEventConsumption).spanFactory((EventBusSpanFactory)DefaultEventBusSpanFactory.builder().spanFactory((SpanFactory)this.spanFactory).build()).build();
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
    }

    @Test
    void existingEventIsPassedToReader() throws Exception {
        DomainEventMessage<String> expected = EventStoreTestUtils.createEvent();
        this.testSubject.publish(new EventMessage[]{expected});
        TrackingEventStream stream = this.testSubject.openStream(null);
        Assertions.assertTrue((boolean)stream.hasNextAvailable());
        TrackedEventMessage actual = (TrackedEventMessage)stream.nextAvailable();
        Assertions.assertEquals((Object)expected.getIdentifier(), (Object)actual.getIdentifier());
        Assertions.assertEquals((Object)expected.getPayload(), (Object)actual.getPayload());
        Assertions.assertTrue((boolean)(actual instanceof DomainEventMessage));
        Assertions.assertEquals((Object)expected.getAggregateIdentifier(), (Object)((DomainEventMessage)actual).getAggregateIdentifier());
    }

    @Test
    @Timeout(value=100L, unit=TimeUnit.MILLISECONDS)
    void eventPublishedAfterOpeningStreamIsPassedToReaderImmediately() throws Exception {
        TrackingEventStream stream = this.testSubject.openStream(null);
        Assertions.assertFalse((boolean)stream.hasNextAvailable());
        DomainEventMessage<String> expected = EventStoreTestUtils.createEvent();
        Thread t = new Thread(() -> {
            try {
                Assertions.assertEquals((Object)expected.getIdentifier(), (Object)((TrackedEventMessage)stream.nextAvailable()).getIdentifier());
            }
            catch (InterruptedException e) {
                Assertions.fail();
            }
        });
        t.start();
        this.testSubject.publish(new EventMessage[]{expected});
        t.join();
    }

    @Test
    @Timeout(value=5L)
    void readingIsBlockedWhenStoreIsEmpty() throws Exception {
        CountDownLatch lock = new CountDownLatch(1);
        TrackingEventStream stream = this.testSubject.openStream(null);
        Thread t = new Thread(() -> stream.asStream().findFirst().ifPresent(event -> lock.countDown()));
        t.start();
        Assertions.assertFalse((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        t.join();
        Assertions.assertEquals((long)0L, (long)lock.getCount());
    }

    @Test
    @Timeout(value=5L)
    void readingIsBlockedWhenEndOfStreamIsReached() throws Exception {
        CountDownLatch lock = new CountDownLatch(2);
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        TrackingEventStream stream = this.testSubject.openStream(null);
        Thread t = new Thread(() -> stream.asStream().limit(2L).forEach(event -> lock.countDown()));
        t.start();
        Assertions.assertFalse((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals((long)1L, (long)lock.getCount());
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent("unique-aggregate-id", 0L)});
        t.join();
        Assertions.assertFalse((boolean)t.isAlive());
        Assertions.assertEquals((long)0L, (long)lock.getCount());
    }

    @Test
    @Timeout(value=5L)
    void readingCanBeContinuedUsingLastToken() throws Exception {
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(2);
        this.testSubject.publish(events);
        TrackedEventMessage first = (TrackedEventMessage)this.testSubject.openStream(null).nextAvailable();
        TrackingToken firstToken = first.trackingToken();
        TrackedEventMessage second = (TrackedEventMessage)this.testSubject.openStream(firstToken).nextAvailable();
        Assertions.assertEquals((Object)((EventMessage)events.get(0)).getIdentifier(), (Object)first.getIdentifier());
        Assertions.assertEquals((Object)((EventMessage)events.get(1)).getIdentifier(), (Object)second.getIdentifier());
    }

    @Test
    @Timeout(value=5L)
    void eventIsFetchedFromCacheWhenFetchedASecondTime() throws Exception {
        CountDownLatch lock = new CountDownLatch(2);
        CopyOnWriteArrayList events = new CopyOnWriteArrayList();
        Thread t = new Thread(() -> this.testSubject.openStream(null).asStream().limit(2L).forEach(event -> {
            lock.countDown();
            events.add(event);
        }));
        t.start();
        Assertions.assertFalse((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(EventStoreTestUtils.createEvents(2));
        t.join();
        Assertions.assertFalse((boolean)t.isAlive());
        TrackedEventMessage second = (TrackedEventMessage)this.testSubject.openStream(((TrackedEventMessage)events.get(0)).trackingToken()).nextAvailable();
        Assertions.assertSame(events.get(1), (Object)second);
    }

    @Test
    @Timeout(value=5L)
    void periodicPollingWhenEventStorageIsUpdatedIndependently() throws Exception {
        this.newTestSubject(10, 20L, 10000L, true);
        CountDownLatch lock = new CountDownLatch(1);
        TrackingEventStream stream = this.testSubject.openStream(null);
        Thread t = new Thread(() -> stream.asStream().findFirst().ifPresent(event -> lock.countDown()));
        t.start();
        Assertions.assertFalse((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
        this.storageEngine.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent()});
        t.join();
        Assertions.assertFalse((boolean)t.isAlive());
        Assertions.assertTrue((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
    }

    @Test
    @Timeout(value=5L)
    void consumerStopsTailingWhenItFallsBehindTheCache() throws Exception {
        this.newTestSubject(10, 1000L, 20L, true);
        TrackingEventStream stream = this.testSubject.openStream(null);
        Assertions.assertFalse((boolean)stream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(10));
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L)).until(() -> ((TrackingEventStream)stream).hasNextAvailable());
        Mockito.reset((Object[])new EventStorageEngine[]{this.storageEngine});
        TrackedEventMessage firstEvent = (TrackedEventMessage)stream.nextAvailable();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.storageEngine});
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent(10L), EventStoreTestUtils.createEvent(11L)});
        Thread.sleep(100L);
        Mockito.reset((Object[])new EventStorageEngine[]{this.storageEngine});
        Assertions.assertTrue((boolean)stream.hasNextAvailable());
        ((EventStorageEngine)Mockito.verify((Object)this.storageEngine)).readEvents(firstEvent.trackingToken(), false);
    }

    @Test
    void loadWithoutSnapshot() {
        String aggregateId = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> aggregateId, 110));
        List eventMessages = this.testSubject.readEvents(aggregateId).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)110, (int)eventMessages.size());
        Assertions.assertEquals((long)109L, (long)((DomainEventMessage)eventMessages.get(eventMessages.size() - 1)).getSequenceNumber());
    }

    @Test
    void loadWithSnapshot() {
        String aggregateId = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> aggregateId, 110));
        this.transactionManager.executeInTransaction(() -> this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(aggregateId, 30L)));
        List eventMessages = this.testSubject.readEvents(aggregateId).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)80, (int)eventMessages.size());
        Assertions.assertEquals((long)30L, (long)((DomainEventMessage)eventMessages.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)109L, (long)((DomainEventMessage)eventMessages.get(eventMessages.size() - 1)).getSequenceNumber());
    }

    @Test
    void streamEventsShouldNotReturnDuplicateTokens() throws InterruptedException {
        this.newTestSubject(0, 1000L, 1000L, true);
        Stream mockStream = (Stream)Mockito.mock(Stream.class);
        Iterator mockIterator = (Iterator)Mockito.mock(Iterator.class);
        Mockito.when(mockStream.iterator()).thenReturn((Object)mockIterator);
        Mockito.when((Object)this.storageEngine.readEvents((TrackingToken)Mockito.any(TrackingToken.class), Mockito.eq((boolean)false))).thenReturn((Object)mockStream);
        Mockito.when((Object)mockIterator.hasNext()).thenAnswer((Answer)new SynchronizedBooleanAnswer(false)).thenAnswer((Answer)new SynchronizedBooleanAnswer(true));
        Mockito.when(mockIterator.next()).thenReturn((Object)new GenericTrackedEventMessage((TrackingToken)new GlobalSequenceTrackingToken(1L), EventStoreTestUtils.createEvent()));
        TrackingEventStream stream = this.testSubject.openStream(null);
        Assertions.assertFalse((boolean)stream.hasNextAvailable());
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        Thread.sleep(200L);
        Assertions.assertFalse((boolean)stream.hasNextAvailable());
    }

    @Test
    void loadWithFailingSnapshot() {
        String aggregateId = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> aggregateId, 110));
        this.transactionManager.executeInTransaction(() -> this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(aggregateId, 30L)));
        Mockito.when((Object)this.storageEngine.readSnapshot(aggregateId)).thenThrow(new Throwable[]{new MockException()});
        List eventMessages = this.testSubject.readEvents(aggregateId).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)110, (int)eventMessages.size());
        Assertions.assertEquals((long)0L, (long)((DomainEventMessage)eventMessages.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)109L, (long)((DomainEventMessage)eventMessages.get(eventMessages.size() - 1)).getSequenceNumber());
    }

    @Test
    void loadEventsAfterPublishingInSameUnitOfWork() {
        String aggregateId = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(() -> aggregateId, 10);
        this.testSubject.publish(events.subList(0, 2));
        DefaultUnitOfWork.startAndGet(null).execute(() -> {
            Assertions.assertEquals((long)2L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
            this.testSubject.publish(events.subList(2, events.size()));
            Assertions.assertEquals((long)10L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
        });
    }

    @Test
    void loadEventsWithOffsetAfterPublishingInSameUnitOfWork() {
        String aggregateId = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(() -> aggregateId, 10);
        this.testSubject.publish(events.subList(0, 2));
        DefaultUnitOfWork.startAndGet(null).execute(() -> {
            Assertions.assertEquals((long)2L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
            this.testSubject.publish(events.subList(2, events.size()));
            Assertions.assertEquals((long)8L, (long)this.testSubject.readEvents(aggregateId, 2L).asStream().count());
        });
    }

    @Test
    void eventsAppendedInvisibleUntilUnitOfWorkIsCommitted() {
        String aggregateId = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(() -> aggregateId, 10);
        this.testSubject.publish(events.subList(0, 2));
        DefaultUnitOfWork unitOfWork = DefaultUnitOfWork.startAndGet(null);
        this.testSubject.publish(events.subList(2, events.size()));
        CurrentUnitOfWork.clear((UnitOfWork)unitOfWork);
        Assertions.assertEquals((long)2L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
        CurrentUnitOfWork.set((UnitOfWork)unitOfWork);
        Assertions.assertEquals((long)10L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
        unitOfWork.rollback();
        Assertions.assertEquals((long)2L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
    }

    @Test
    void appendEventsCreatesCorrectSpans() {
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(10);
        DefaultUnitOfWork.startAndGet(null);
        this.testSubject.publish(events);
        events.forEach(e -> {
            this.spanFactory.verifySpanCompleted("EventBus.publishEvent", (Message)e);
            this.spanFactory.verifySpanPropagated("EventBus.publishEvent", (Message)e);
            this.spanFactory.verifySpanHasType("EventBus.publishEvent", TestSpanFactory.TestSpanType.DISPATCH);
        });
        this.spanFactory.verifyNotStarted("EventBus.commitEvents");
        CurrentUnitOfWork.commit();
        this.spanFactory.verifySpanCompleted("EventBus.commitEvents");
        this.spanFactory.verifySpanHasType("EventBus.commitEvents", TestSpanFactory.TestSpanType.INTERNAL);
    }

    @Test
    void stagedEventsNotDuplicatedAfterCommit() {
        String aggregateId = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> events = EventStoreTestUtils.createEvents(() -> aggregateId, 10);
        this.testSubject.publish(events.subList(0, 2));
        DefaultUnitOfWork outerUoW = DefaultUnitOfWork.startAndGet(null);
        this.testSubject.publish(events.subList(2, 4));
        DefaultUnitOfWork innerUoW = DefaultUnitOfWork.startAndGet(null);
        this.testSubject.publish(events.subList(4, events.size()));
        Consumer<UnitOfWork> assertCorrectEventCount = uow -> Assertions.assertEquals((long)10L, (long)this.testSubject.readEvents(aggregateId).asStream().count());
        innerUoW.onPrepareCommit(assertCorrectEventCount);
        innerUoW.afterCommit(assertCorrectEventCount);
        innerUoW.onCommit(assertCorrectEventCount);
        outerUoW.onPrepareCommit(assertCorrectEventCount);
        outerUoW.afterCommit(assertCorrectEventCount);
        outerUoW.onCommit(assertCorrectEventCount);
        innerUoW.commit();
        outerUoW.commit();
    }

    @Test
    @Timeout(value=5L)
    void customThreadFactoryIsUsed() throws Exception {
        CountDownLatch lock = new CountDownLatch(1);
        TrackingEventStream stream = this.testSubject.openStream(null);
        Thread t = new Thread(() -> stream.asStream().findFirst().ifPresent(event -> lock.countDown()));
        t.start();
        Assertions.assertFalse((boolean)lock.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        t.join();
        Assertions.assertFalse((boolean)t.isAlive());
        Assertions.assertEquals((long)0L, (long)lock.getCount());
        ((ThreadFactory)Mockito.verify((Object)this.threadFactory, (VerificationMode)Mockito.atLeastOnce())).newThread((Runnable)Mockito.any(Runnable.class));
    }

    @Test
    void openStreamReadsEventsFromAnEventProducedByVerifyThreadFactoryOperation() throws InterruptedException {
        TrackingEventStream eventStream = this.testSubject.openStream(null);
        Assertions.assertFalse((boolean)eventStream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        Thread.sleep(100L);
        Assertions.assertTrue((boolean)eventStream.hasNextAvailable());
        while (eventStream.hasNextAvailable()) {
            eventStream.nextAvailable();
        }
        Assertions.assertFalse((boolean)eventStream.hasNextAvailable());
        ((ThreadFactory)Mockito.verify((Object)this.threadFactory, (VerificationMode)Mockito.atLeastOnce())).newThread((Runnable)Mockito.any(Runnable.class));
    }

    @Test
    void tailingConsumptionThreadIsNeverCreatedIfEventConsumptionOptimizationIsSwitchedOff() throws InterruptedException {
        boolean doNotOptimizeEventConsumption = false;
        this.newTestSubject(10, 1000L, 10000L, doNotOptimizeEventConsumption);
        TrackingEventStream eventStream = this.testSubject.openStream(null);
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        while (eventStream.hasNextAvailable()) {
            eventStream.nextAvailable();
        }
        Mockito.verifyNoInteractions((Object[])new Object[]{this.threadFactory});
    }

    @Test
    void eventStreamKeepsReturningEventsIfEventConsumptionOptimizationIsSwitchedOff() throws InterruptedException {
        boolean doNotOptimizeEventConsumption = false;
        this.newTestSubject(10, 1000L, 10000L, doNotOptimizeEventConsumption);
        TrackingEventStream eventStream = this.testSubject.openStream(null);
        Assertions.assertFalse((boolean)eventStream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        Assertions.assertTrue((boolean)eventStream.hasNextAvailable());
        while (eventStream.hasNextAvailable()) {
            eventStream.nextAvailable();
        }
        Assertions.assertFalse((boolean)eventStream.hasNextAvailable());
    }

    private static class SynchronizedBooleanAnswer
    implements Answer<Boolean> {
        private final boolean answer;

        private SynchronizedBooleanAnswer(boolean answer) {
            this.answer = answer;
        }

        public synchronized Boolean answer(InvocationOnMock invocation) {
            return this.answer;
        }
    }
}

