/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.spring.eventsourcing.benchmark;

import com.mchange.v2.c3p0.ComboPooledDataSource;
import java.beans.PropertyVetoException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.sql.DataSource;
import org.axonframework.common.legacyjpa.EntityManagerProvider;
import org.axonframework.common.legacyjpa.SimpleEntityManagerProvider;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.legacyjpa.JpaEventStorageEngine;
import org.axonframework.eventsourcing.utils.EventStoreTestUtils;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.TestSerializer;
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableMBeanExport;
import org.springframework.jmx.support.RegistrationPolicy;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

@ExtendWith(value={SpringExtension.class})
@EnableMBeanExport(registration=RegistrationPolicy.IGNORE_EXISTING)
@ContextConfiguration(classes={TestContext.class})
class JpaStorageEngineInsertionReadOrderTest {
    private final Serializer serializer = TestSerializer.XSTREAM.getSerializer();
    @PersistenceContext
    private EntityManager entityManager;
    @Inject
    private PlatformTransactionManager tx;
    private TransactionTemplate txTemplate;
    private BatchingEventStorageEngine testSubject;

    JpaStorageEngineInsertionReadOrderTest() {
    }

    @BeforeEach
    void setUp() {
        this.txTemplate = new TransactionTemplate(this.tx);
        this.testSubject = JpaEventStorageEngine.builder().snapshotSerializer(this.serializer).eventSerializer(this.serializer).entityManagerProvider((EntityManagerProvider)new SimpleEntityManagerProvider(this.entityManager)).transactionManager((TransactionManager)new SpringTransactionManager(this.tx)).build();
    }

    @AfterEach
    void tearDown() {
        this.txTemplate.execute(ts -> {
            this.entityManager.createQuery("DELETE FROM DomainEventEntry").executeUpdate();
            return null;
        });
    }

    @Test
    @Timeout(value=30L)
    void insertConcurrentlyAndCheckReadOrder() throws Exception {
        int threadCount = 10;
        int eventsPerThread = 100;
        int inverseRollbackRate = 7;
        int rollbacksPerThread = (eventsPerThread + inverseRollbackRate - 1) / inverseRollbackRate;
        int expectedEventCount = threadCount * eventsPerThread - rollbacksPerThread * threadCount;
        Thread[] writerThreads = this.storeEvents(threadCount, eventsPerThread, inverseRollbackRate);
        List<TrackedEventMessage<?>> readEvents = this.readEvents(expectedEventCount);
        for (Thread thread : writerThreads) {
            thread.join();
        }
        Assertions.assertEquals((int)expectedEventCount, (int)readEvents.size(), (String)"The actually read list of events is shorted than the expected value");
    }

    @Test
    @Timeout(value=10L)
    void insertConcurrentlyAndReadUsingBlockingStreams() throws Exception {
        int threadCount = 10;
        int eventsPerThread = 100;
        int inverseRollbackRate = 2;
        int rollbacksPerThread = (eventsPerThread + inverseRollbackRate - 1) / inverseRollbackRate;
        int expectedEventCount = threadCount * eventsPerThread - rollbacksPerThread * threadCount;
        EmbeddedEventStore embeddedEventStore = EmbeddedEventStore.builder().storageEngine((EventStorageEngine)this.testSubject).build();
        Thread[] writerThreads = this.storeEvents(threadCount, eventsPerThread, inverseRollbackRate);
        TrackingEventStream readEvents = embeddedEventStore.openStream(null);
        int counter = 0;
        while (counter < expectedEventCount) {
            if (!readEvents.hasNextAvailable()) continue;
            ++counter;
        }
        for (Thread thread : writerThreads) {
            thread.join();
        }
        Assertions.assertEquals((int)expectedEventCount, (int)counter, (String)"The actually read list of events is shorted than the expected value");
    }

    @Test
    @Timeout(value=30L)
    void insertConcurrentlyAndReadUsingBlockingStreams_SlowConsumer() throws Exception {
        int threadCount = 4;
        int eventsPerThread = 100;
        int inverseRollbackRate = 2;
        int rollbacksPerThread = (eventsPerThread + inverseRollbackRate - 1) / inverseRollbackRate;
        int expectedEventCount = threadCount * eventsPerThread - rollbacksPerThread * threadCount;
        EmbeddedEventStore embeddedEventStore = EmbeddedEventStore.builder().storageEngine((EventStorageEngine)this.testSubject).cachedEvents(20).fetchDelay(100L).cleanupDelay(1000L).build();
        Thread[] writerThreads = this.storeEvents(threadCount, eventsPerThread, inverseRollbackRate);
        TrackingEventStream readEvents = embeddedEventStore.openStream(null);
        int counter = 0;
        while (counter < expectedEventCount) {
            readEvents.nextAvailable();
            if (++counter % 50 != 0) continue;
            Thread.sleep(200L);
        }
        for (Thread thread : writerThreads) {
            thread.join();
        }
        Assertions.assertEquals((int)expectedEventCount, (int)counter, (String)"The actually read list of events is shorted than the expected value");
    }

    private Thread[] storeEvents(int threadCount, int eventsPerThread, int inverseRollbackRate) {
        Thread[] threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; ++i) {
            int threadIndex = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < eventsPerThread; ++j) {
                    int s = j;
                    try {
                        this.txTemplate.execute(ts -> {
                            this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent((String)"aggregate", (long)((long)threadIndex * (long)eventsPerThread + (long)s), (String)("Thread" + threadIndex))});
                            if (s % inverseRollbackRate == 0) {
                                throw new RuntimeException("Rolling back on purpose");
                            }
                            try {
                                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                            }
                            catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return null;
                        });
                        continue;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            });
            threads[i].start();
        }
        return threads;
    }

    private List<TrackedEventMessage<?>> readEvents(int eventCount) {
        ArrayList result = new ArrayList();
        TrackingToken lastToken = null;
        while (result.size() < eventCount) {
            List batch = this.testSubject.readEvents(lastToken, false).collect(Collectors.toList());
            for (TrackedEventMessage message : batch) {
                result.add(message);
                lastToken = message.trackingToken();
            }
        }
        return result;
    }

    @Configuration
    public static class TestContext {
        @Bean
        public ComboPooledDataSource dataSource() throws PropertyVetoException {
            ComboPooledDataSource dataSource = new ComboPooledDataSource();
            dataSource.setDriverClass("org.hsqldb.jdbcDriver");
            dataSource.setJdbcUrl("jdbc:hsqldb:mem:axontest");
            dataSource.setUser("sa");
            dataSource.setMaxPoolSize(50);
            dataSource.setMinPoolSize(1);
            Properties dataSourceProperties = new Properties();
            dataSourceProperties.setProperty("hsqldb.log_size", "0");
            dataSource.setProperties(dataSourceProperties);
            return dataSource;
        }

        @Bean
        public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource) {
            LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean();
            entityManagerFactory.setPersistenceUnitName("AxonSpringTest");
            HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
            vendorAdapter.setDatabasePlatform("org.hibernate.dialect.HSQLDialect");
            vendorAdapter.setShowSql(false);
            entityManagerFactory.setJpaVendorAdapter((JpaVendorAdapter)vendorAdapter);
            HashMap<String, Object> jpaProperties = new HashMap<String, Object>();
            jpaProperties.put("javax.persistence.schema-generation.database.action", "drop-and-create");
            jpaProperties.put("hibernate.id.new_generator_mappings", true);
            entityManagerFactory.setJpaPropertyMap(jpaProperties);
            entityManagerFactory.setDataSource(dataSource);
            return entityManagerFactory;
        }

        @Bean
        public JpaTransactionManager transactionManager() {
            return new JpaTransactionManager();
        }

        @Bean
        public PersistenceAnnotationBeanPostProcessor persistenceAnnotationBeanPostProcessor() {
            return new PersistenceAnnotationBeanPostProcessor();
        }
    }
}

