/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.AbstractAggregateFactory;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.eventsourcing.utils.StubDomainEvent;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.modelling.command.Aggregate;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.AggregateLifecycle;
import org.axonframework.modelling.command.ConcurrencyException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

public class EventSourcingRepositoryIntegrationTest
implements Thread.UncaughtExceptionHandler {
    private static final int CONCURRENT_MODIFIERS = 10;
    private EventSourcingRepository<SimpleAggregateRoot> repository;
    private String aggregateIdentifier;
    private EventStore eventStore;
    private List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<Throwable>();
    private List<Thread> startedThreads = new ArrayList<Thread>();

    @Test
    @Timeout(value=6L)
    void pessimisticLocking() throws Throwable {
        this.initializeRepository();
        long lastSequenceNumber = this.executeConcurrentModifications(10);
        Assertions.assertEquals((long)20L, (long)lastSequenceNumber);
        Assertions.assertEquals((int)10, (int)this.getSuccessfulModifications());
    }

    private int getSuccessfulModifications() {
        return 10 - this.uncaughtExceptions.size();
    }

    private void initializeRepository() throws Exception {
        this.eventStore = EmbeddedEventStore.builder().storageEngine((EventStorageEngine)new InMemoryEventStorageEngine()).build();
        this.repository = EventSourcingRepository.builder(SimpleAggregateRoot.class).aggregateFactory((AggregateFactory)new SimpleAggregateFactory()).eventStore(this.eventStore).build();
        EventBus mockEventBus = (EventBus)Mockito.mock(EventBus.class);
        DefaultUnitOfWork uow = DefaultUnitOfWork.startAndGet(null);
        Aggregate aggregate = this.repository.newInstance(() -> new SimpleAggregateRoot());
        uow.commit();
        Mockito.reset((Object[])new EventBus[]{mockEventBus});
        this.aggregateIdentifier = (String)aggregate.invoke(SimpleAggregateRoot::getIdentifier);
    }

    private long executeConcurrentModifications(int concurrentModifiers) throws Throwable {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch threadsDone = new CountDownLatch(concurrentModifiers);
        for (int t = 0; t < concurrentModifiers; ++t) {
            this.prepareAggregateModifier(startSignal, threadsDone, this.repository, this.aggregateIdentifier);
        }
        startSignal.countDown();
        if (!threadsDone.await(30L, TimeUnit.SECONDS)) {
            this.printDiagnosticInformation();
            Assertions.fail((String)"Thread found to be alive after timeout. It might be hanging");
        }
        for (Throwable e : this.uncaughtExceptions) {
            if (e instanceof ConcurrencyException) continue;
            throw e;
        }
        DomainEventStream committedEvents = this.eventStore.readEvents(this.aggregateIdentifier);
        long lastSequenceNumber = -1L;
        while (committedEvents.hasNext()) {
            DomainEventMessage nextEvent = committedEvents.next();
            Assertions.assertEquals((long)(++lastSequenceNumber), (long)nextEvent.getSequenceNumber(), (String)"Events are not stored sequentially. Most likely due to unlocked concurrent access.");
        }
        return lastSequenceNumber;
    }

    private void printDiagnosticInformation() {
        for (Thread t : this.startedThreads) {
            System.out.print("## Thread [" + t.getName() + "] did not properly shut down during Locking test. ##");
            if (t.getState() != Thread.State.TERMINATED) {
                for (StackTraceElement ste : t.getStackTrace()) {
                    System.out.println(" - " + ste.toString());
                }
            }
            System.out.println();
        }
    }

    private Thread prepareAggregateModifier(CountDownLatch awaitFor, CountDownLatch reportDone, EventSourcingRepository<SimpleAggregateRoot> repository, String aggregateIdentifier) {
        Thread t = new Thread(() -> {
            try {
                awaitFor.await();
                DefaultUnitOfWork uow = DefaultUnitOfWork.startAndGet(null);
                Aggregate aggregate = repository.load(aggregateIdentifier, null);
                aggregate.execute(rec$ -> ((SimpleAggregateRoot)rec$).doOperation());
                aggregate.execute(rec$ -> ((SimpleAggregateRoot)rec$).doOperation());
                uow.commit();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            finally {
                reportDone.countDown();
            }
        });
        t.setUncaughtExceptionHandler(this);
        this.startedThreads.add(t);
        t.start();
        return t;
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        this.uncaughtExceptions.add(e);
    }

    private static class SimpleAggregateFactory
    extends AbstractAggregateFactory<SimpleAggregateRoot> {
        SimpleAggregateFactory() {
            super(SimpleAggregateRoot.class);
        }

        public SimpleAggregateRoot doCreateAggregate(String aggregateIdentifier, DomainEventMessage firstEvent) {
            return new SimpleAggregateRoot(aggregateIdentifier);
        }
    }

    private static class SimpleAggregateRoot {
        @AggregateIdentifier
        private String identifier;

        private SimpleAggregateRoot() {
            this.identifier = UUID.randomUUID().toString();
            AggregateLifecycle.apply((Object)new StubDomainEvent());
        }

        private SimpleAggregateRoot(String identifier) {
            this.identifier = identifier;
        }

        private void doOperation() {
            AggregateLifecycle.apply((Object)new StubDomainEvent());
        }

        @EventSourcingHandler
        protected void handle(EventMessage event) {
            this.identifier = ((DomainEventMessage)event).getAggregateIdentifier();
        }

        public String getIdentifier() {
            return this.identifier;
        }
    }
}

