/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.jdbc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GapAwareTrackingToken;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngineTest;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.jdbc.EventSchema;
import org.axonframework.eventsourcing.eventstore.jdbc.EventTableFactory;
import org.axonframework.eventsourcing.eventstore.jdbc.HsqlEventTableFactory;
import org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.jdbc.statements.JdbcEventStorageEngineStatements;
import org.axonframework.eventsourcing.eventstore.jdbc.statements.ReadEventDataForAggregateStatementBuilder;
import org.axonframework.eventsourcing.eventstore.jpa.SQLErrorCodesResolver;
import org.axonframework.eventsourcing.utils.EventStoreTestUtils;
import org.axonframework.eventsourcing.utils.TestSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.UnknownSerializedType;
import org.hsqldb.jdbc.JDBCDataSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.springframework.test.annotation.DirtiesContext;

class JdbcEventStorageEngineTest
extends BatchingEventStorageEngineTest<JdbcEventStorageEngine, JdbcEventStorageEngine.Builder> {
    private JDBCDataSource dataSource;
    private PersistenceExceptionResolver defaultPersistenceExceptionResolver;
    private JdbcEventStorageEngine testSubject;
    private ReadEventDataForAggregateStatementBuilder readForAggregateStatementBuilder;

    JdbcEventStorageEngineTest() {
    }

    @BeforeEach
    void setUp() throws SQLException {
        this.dataSource = new JDBCDataSource();
        this.dataSource.setUrl("jdbc:hsqldb:mem:test");
        this.defaultPersistenceExceptionResolver = new SQLErrorCodesResolver((DataSource)this.dataSource);
        this.readForAggregateStatementBuilder = (ReadEventDataForAggregateStatementBuilder)Mockito.spy((Object)new ReadEventDataForAggregateStatementBuilder(){

            public PreparedStatement build(Connection connection, EventSchema schema, String identifier, long firstSequenceNumber, int batchSize) throws SQLException {
                return JdbcEventStorageEngineStatements.readEventDataForAggregate((Connection)connection, (EventSchema)schema, (String)identifier, (long)firstSequenceNumber, (int)batchSize);
            }
        });
        this.testSubject = this.createEngine(b -> b.readEventDataForAggregate(this.readForAggregateStatementBuilder));
        this.setTestSubject((BatchingEventStorageEngine)this.testSubject);
    }

    @Test
    void testLoadLargeAmountOfEventsFromAggregateStream() {
        super.loadLargeAmountOfEventsFromAggregateStream();
        try {
            ((ReadEventDataForAggregateStatementBuilder)Mockito.verify((Object)this.readForAggregateStatementBuilder, (VerificationMode)Mockito.times((int)6))).build((Connection)ArgumentMatchers.any(), (EventSchema)ArgumentMatchers.any(), (String)ArgumentMatchers.eq((Object)"aggregate"), ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt());
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    void loadLargeAmountOfEventsFromAggregateStream_WithCustomFinalBatchPredicate() throws SQLException {
        this.testSubject = this.createEngine(b -> b.finalAggregateBatchPredicate(l -> l.size() < 50).readEventDataForAggregate(this.readForAggregateStatementBuilder));
        this.setTestSubject((BatchingEventStorageEngine)this.testSubject);
        super.loadLargeAmountOfEventsFromAggregateStream();
        ((ReadEventDataForAggregateStatementBuilder)Mockito.verify((Object)this.readForAggregateStatementBuilder, (VerificationMode)Mockito.times((int)4))).build((Connection)ArgumentMatchers.any(), (EventSchema)ArgumentMatchers.any(), (String)ArgumentMatchers.eq((Object)"aggregate"), ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt());
    }

    @Test
    void storeTwoExactSameSnapshots() {
        this.testSubject.storeSnapshot(EventStoreTestUtils.createEvent(1L));
        this.testSubject.storeSnapshot(EventStoreTestUtils.createEvent(1L));
    }

    @Test
    void loadLastSequenceNumber() {
        String aggregateId = UUID.randomUUID().toString();
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(aggregateId, 0L), EventStoreTestUtils.createEvent(aggregateId, 1L)});
        Assertions.assertEquals((long)1L, (long)this.testSubject.lastSequenceNumberFor(aggregateId).orElse(-1L));
        Assertions.assertFalse((boolean)this.testSubject.lastSequenceNumberFor("inexistent").isPresent());
    }

    @Test
    @DirtiesContext
    void customSchemaConfig() {
        EventSchema testSchema = EventSchema.builder().eventTable("CustomDomainEvent").payloadColumn("eventData").build();
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.schema(testSchema).dataType(String.class), (EventTableFactory)new HsqlEventTableFactory(){

            protected String payloadType() {
                return "LONGVARCHAR";
            }
        });
        this.setTestSubject((BatchingEventStorageEngine)this.testSubject);
        this.storeAndLoadEvents();
    }

    @Test
    @DirtiesContext
    void customSchemaConfigTimestampColumn() {
        this.testSubject = this.createTimestampEngine((EventTableFactory)new HsqlEventTableFactory(){

            protected String timestampType() {
                return "timestamp";
            }
        });
        this.setTestSubject((BatchingEventStorageEngine)this.testSubject);
        this.storeAndLoadEvents();
    }

    @Test
    void gapsForVeryOldEventsAreNotIncluded() throws SQLException {
        GenericEventMessage.clock = Clock.fixed(Clock.systemUTC().instant().minus(1L, ChronoUnit.HOURS), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-1L), EventStoreTestUtils.createEvent(0L)});
        GenericEventMessage.clock = Clock.fixed(Clock.systemUTC().instant().minus(2L, ChronoUnit.MINUTES), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-2L), EventStoreTestUtils.createEvent(1L)});
        GenericEventMessage.clock = Clock.fixed(Clock.systemUTC().instant().minus(50L, ChronoUnit.SECONDS), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-3L), EventStoreTestUtils.createEvent(2L)});
        GenericEventMessage.clock = Clock.fixed(Clock.systemUTC().instant(), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-4L), EventStoreTestUtils.createEvent(3L)});
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("DELETE FROM DomainEventEntry WHERE sequenceNumber < 0").executeUpdate();
        }
        this.testSubject.fetchTrackedEvents((TrackingToken)null, 100).stream().map(i -> (GapAwareTrackingToken)i.trackingToken()).forEach(i -> Assertions.assertTrue((i.getGaps().size() <= 2 ? 1 : 0) != 0));
    }

    @DirtiesContext
    @Test
    void oldGapsAreRemovedFromProvidedTrackingToken() throws SQLException {
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.gapTimeout(50001).gapCleaningThreshold(50));
        Instant now = Clock.systemUTC().instant();
        GenericEventMessage.clock = Clock.fixed(now.minus(1L, ChronoUnit.HOURS), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-1L), EventStoreTestUtils.createEvent(0L)});
        GenericEventMessage.clock = Clock.fixed(now.minus(2L, ChronoUnit.MINUTES), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-2L), EventStoreTestUtils.createEvent(1L)});
        GenericEventMessage.clock = Clock.fixed(now.minus(50L, ChronoUnit.SECONDS), Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-3L), EventStoreTestUtils.createEvent(2L)});
        GenericEventMessage.clock = Clock.fixed(now, Clock.systemUTC().getZone());
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent(-4L), EventStoreTestUtils.createEvent(3L)});
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("DELETE FROM DomainEventEntry WHERE sequenceNumber < 0").executeUpdate();
        }
        List gaps = LongStream.range(-50L, 6L).filter(i -> i != 1L && i != 3L && i != 5L).boxed().collect(Collectors.toList());
        List events = this.testSubject.fetchTrackedEvents((TrackingToken)GapAwareTrackingToken.newInstance((long)6L, gaps), 100);
        Assertions.assertEquals((int)1, (int)events.size());
        Assertions.assertEquals((long)4L, (long)((Long)((GapAwareTrackingToken)((TrackedEventData)events.get(0)).trackingToken()).getGaps().first()));
    }

    @Test
    void eventsWithUnknownPayloadTypeDoNotResultInError() throws SQLException, InterruptedException {
        String expectedPayloadOne = "Payload3";
        String expectedPayloadTwo = "Payload4";
        int testBatchSize = 2;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        EmbeddedEventStore testEventStore = EmbeddedEventStore.builder().storageEngine((EventStorageEngine)this.testSubject).build();
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent("aggregate", 1L, "Payload1"), EventStoreTestUtils.createEvent("aggregate", 2L, "Payload2")});
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("UPDATE DomainEventEntry e SET e.payloadType = 'unknown'").executeUpdate();
        }
        this.testSubject.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent("aggregate", 3L, expectedPayloadOne), EventStoreTestUtils.createEvent("aggregate", 4L, expectedPayloadTwo)});
        List eventStorageEngineResult = this.testSubject.readEvents(null, false).filter(m -> m.getPayload() instanceof String).map(m -> (String)m.getPayload()).collect(Collectors.toList());
        Assertions.assertEquals(Arrays.asList(expectedPayloadOne, expectedPayloadTwo), eventStorageEngineResult);
        TrackingEventStream eventStoreResult = testEventStore.openStream(null);
        Assertions.assertTrue((boolean)eventStoreResult.hasNextAvailable());
        Assertions.assertEquals(UnknownSerializedType.class, (Object)((TrackedEventMessage)eventStoreResult.nextAvailable()).getPayloadType());
        Assertions.assertEquals(UnknownSerializedType.class, (Object)((TrackedEventMessage)eventStoreResult.nextAvailable()).getPayloadType());
        Assertions.assertEquals((Object)expectedPayloadOne, (Object)((TrackedEventMessage)eventStoreResult.nextAvailable()).getPayload());
        Assertions.assertEquals((Object)expectedPayloadTwo, (Object)((TrackedEventMessage)eventStoreResult.nextAvailable()).getPayload());
    }

    @Test
    void streamCrossesConsecutiveGapsOfMoreThanBatchSuccessfully() throws SQLException {
        int testBatchSize = 10;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        this.testSubject.appendEvents(EventStoreTestUtils.createEvents(100));
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("DELETE FROM DomainEventEntry WHERE globalIndex >= 20 and globalIndex < 40").executeUpdate();
        }
        Stream actual = this.testSubject.readEvents(null, false);
        List actualEvents = actual.collect(Collectors.toList());
        Assertions.assertEquals((int)80, (int)actualEvents.size());
    }

    @Test
    void streamDoesNotCrossExtendedGapWhenDisabled() throws SQLException {
        int testBatchSize = 10;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize).extendedGapCheckEnabled(false));
        try {
            Connection connection = this.dataSource.getConnection();
            connection.prepareStatement("DROP TABLE IF EXISTS DomainEventEntry").executeUpdate();
            connection.prepareStatement("DROP TABLE IF EXISTS SnapshotEventEntry").executeUpdate();
            this.testSubject.createSchema((EventTableFactory)HsqlEventTableFactory.INSTANCE);
        }
        catch (SQLException e) {
            throw new IllegalStateException(e);
        }
        this.testSubject.appendEvents(EventStoreTestUtils.createEvents(100));
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("DELETE FROM DomainEventEntry WHERE globalIndex >= 20 and globalIndex < 40").executeUpdate();
        }
        Stream actual = this.testSubject.readEvents(null, false);
        List actualEvents = actual.collect(Collectors.toList());
        Assertions.assertEquals((int)20, (int)actualEvents.size());
    }

    @Test
    void streamCrossesInitialConsecutiveGapsOfMoreThanBatchSuccessfully() throws SQLException {
        int testBatchSize = 10;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        this.testSubject.appendEvents(EventStoreTestUtils.createEvents(100));
        try (Connection conn = this.dataSource.getConnection();){
            conn.prepareStatement("DELETE FROM DomainEventEntry WHERE globalIndex < 20").executeUpdate();
        }
        Stream actual = this.testSubject.readEvents(null, false);
        List actualEvents = actual.collect(Collectors.toList());
        Assertions.assertEquals((int)80, (int)actualEvents.size());
    }

    @Test
    void readEventsForAggregateReturnsTheCompleteStream() {
        int testBatchSize = 10;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        DomainEventMessage<String> testEventOne = EventStoreTestUtils.createEvent(0L);
        DomainEventMessage<String> testEventTwo = EventStoreTestUtils.createEvent(1L);
        DomainEventMessage<String> testEventThree = EventStoreTestUtils.createEvent(2L);
        DomainEventMessage<String> testEventFour = EventStoreTestUtils.createEvent(3L);
        DomainEventMessage<String> testEventFive = EventStoreTestUtils.createEvent(4L);
        this.testSubject.appendEvents(new EventMessage[]{testEventOne, testEventTwo, testEventThree, testEventFour, testEventFive});
        List result = this.testSubject.readEvents("aggregate", 0L).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)5, (int)result.size());
        Assertions.assertEquals((long)0L, (long)((DomainEventMessage)result.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)1L, (long)((DomainEventMessage)result.get(1)).getSequenceNumber());
        Assertions.assertEquals((long)2L, (long)((DomainEventMessage)result.get(2)).getSequenceNumber());
        Assertions.assertEquals((long)3L, (long)((DomainEventMessage)result.get(3)).getSequenceNumber());
        Assertions.assertEquals((long)4L, (long)((DomainEventMessage)result.get(4)).getSequenceNumber());
    }

    @Test
    void readEventsForAggregateWithGapsReturnsTheCompleteStream() {
        int testBatchSize = 10;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        DomainEventMessage<String> testEventOne = EventStoreTestUtils.createEvent(0L);
        DomainEventMessage<String> testEventTwo = EventStoreTestUtils.createEvent(1L);
        DomainEventMessage<String> testEventFour = EventStoreTestUtils.createEvent(3L);
        DomainEventMessage<String> testEventFive = EventStoreTestUtils.createEvent(4L);
        this.testSubject.appendEvents(new EventMessage[]{testEventOne, testEventTwo, testEventFour, testEventFive});
        List result = this.testSubject.readEvents("aggregate", 0L).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)4, (int)result.size());
        Assertions.assertEquals((long)0L, (long)((DomainEventMessage)result.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)1L, (long)((DomainEventMessage)result.get(1)).getSequenceNumber());
        Assertions.assertEquals((long)3L, (long)((DomainEventMessage)result.get(2)).getSequenceNumber());
        Assertions.assertEquals((long)4L, (long)((DomainEventMessage)result.get(3)).getSequenceNumber());
    }

    @Test
    void readEventsForAggregateWithEventsExceedingOneBatchReturnsTheCompleteStream() {
        int testBatchSize = 5;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        DomainEventMessage<String> testEventOne = EventStoreTestUtils.createEvent(0L);
        DomainEventMessage<String> testEventTwo = EventStoreTestUtils.createEvent(1L);
        DomainEventMessage<String> testEventThree = EventStoreTestUtils.createEvent(2L);
        DomainEventMessage<String> testEventFour = EventStoreTestUtils.createEvent(3L);
        DomainEventMessage<String> testEventFive = EventStoreTestUtils.createEvent(4L);
        DomainEventMessage<String> testEventSix = EventStoreTestUtils.createEvent(5L);
        DomainEventMessage<String> testEventSeven = EventStoreTestUtils.createEvent(6L);
        DomainEventMessage<String> testEventEight = EventStoreTestUtils.createEvent(7L);
        this.testSubject.appendEvents(new EventMessage[]{testEventOne, testEventTwo, testEventThree, testEventFour, testEventFive, testEventSix, testEventSeven, testEventEight});
        List result = this.testSubject.readEvents("aggregate", 0L).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)8, (int)result.size());
        Assertions.assertEquals((long)0L, (long)((DomainEventMessage)result.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)1L, (long)((DomainEventMessage)result.get(1)).getSequenceNumber());
        Assertions.assertEquals((long)2L, (long)((DomainEventMessage)result.get(2)).getSequenceNumber());
        Assertions.assertEquals((long)3L, (long)((DomainEventMessage)result.get(3)).getSequenceNumber());
        Assertions.assertEquals((long)4L, (long)((DomainEventMessage)result.get(4)).getSequenceNumber());
        Assertions.assertEquals((long)5L, (long)((DomainEventMessage)result.get(5)).getSequenceNumber());
        Assertions.assertEquals((long)6L, (long)((DomainEventMessage)result.get(6)).getSequenceNumber());
        Assertions.assertEquals((long)7L, (long)((DomainEventMessage)result.get(7)).getSequenceNumber());
    }

    @Test
    void readEventsForAggregateWithEventsExceedingOneBatchAndGapsReturnsTheCompleteStream() {
        int testBatchSize = 5;
        this.testSubject = this.createEngine(engineBuilder -> engineBuilder.batchSize(testBatchSize));
        DomainEventMessage<String> testEventOne = EventStoreTestUtils.createEvent(0L);
        DomainEventMessage<String> testEventTwo = EventStoreTestUtils.createEvent(1L);
        DomainEventMessage<String> testEventFour = EventStoreTestUtils.createEvent(3L);
        DomainEventMessage<String> testEventFive = EventStoreTestUtils.createEvent(4L);
        DomainEventMessage<String> testEventSix = EventStoreTestUtils.createEvent(5L);
        DomainEventMessage<String> testEventSeven = EventStoreTestUtils.createEvent(6L);
        DomainEventMessage<String> testEventEight = EventStoreTestUtils.createEvent(7L);
        this.testSubject.appendEvents(new EventMessage[]{testEventOne, testEventTwo, testEventFour, testEventFive, testEventSix, testEventSeven, testEventEight});
        List result = this.testSubject.readEvents("aggregate", 0L).asStream().collect(Collectors.toList());
        Assertions.assertEquals((int)7, (int)result.size());
        Assertions.assertEquals((long)0L, (long)((DomainEventMessage)result.get(0)).getSequenceNumber());
        Assertions.assertEquals((long)1L, (long)((DomainEventMessage)result.get(1)).getSequenceNumber());
        Assertions.assertEquals((long)3L, (long)((DomainEventMessage)result.get(2)).getSequenceNumber());
        Assertions.assertEquals((long)4L, (long)((DomainEventMessage)result.get(3)).getSequenceNumber());
        Assertions.assertEquals((long)5L, (long)((DomainEventMessage)result.get(4)).getSequenceNumber());
        Assertions.assertEquals((long)6L, (long)((DomainEventMessage)result.get(5)).getSequenceNumber());
        Assertions.assertEquals((long)7L, (long)((DomainEventMessage)result.get(6)).getSequenceNumber());
    }

    @Override
    protected JdbcEventStorageEngine createEngine(UnaryOperator<JdbcEventStorageEngine.Builder> customization) {
        return this.createEngine(customization, (EventTableFactory)HsqlEventTableFactory.INSTANCE);
    }

    private JdbcEventStorageEngine createEngine(UnaryOperator<JdbcEventStorageEngine.Builder> customization, EventTableFactory eventTableFactory) {
        JdbcEventStorageEngine.Builder engineBuilder = JdbcEventStorageEngine.builder().eventSerializer((Serializer)TestSerializer.xStreamSerializer()).persistenceExceptionResolver(this.defaultPersistenceExceptionResolver).snapshotSerializer((Serializer)TestSerializer.xStreamSerializer()).batchSize(100).connectionProvider(() -> ((JDBCDataSource)this.dataSource).getConnection()).transactionManager((TransactionManager)NoTransactionManager.INSTANCE);
        return this.doCreateTables(eventTableFactory, new JdbcEventStorageEngine((JdbcEventStorageEngine.Builder)customization.apply(engineBuilder)));
    }

    private JdbcEventStorageEngine createTimestampEngine(EventTableFactory eventTableFactory) {
        JdbcEventStorageEngine.Builder builder = JdbcEventStorageEngine.builder().eventSerializer((Serializer)TestSerializer.xStreamSerializer()).snapshotSerializer((Serializer)TestSerializer.xStreamSerializer()).connectionProvider(() -> ((JDBCDataSource)this.dataSource).getConnection()).transactionManager((TransactionManager)NoTransactionManager.INSTANCE);
        JdbcEventStorageEngine result = new JdbcEventStorageEngine(builder){

            protected Object readTimeStamp(ResultSet resultSet, String columnName) throws SQLException {
                Timestamp ts = resultSet.getTimestamp(columnName);
                return ts.toInstant();
            }

            protected void writeTimestamp(PreparedStatement preparedStatement, int position, Instant timestamp) throws SQLException {
                preparedStatement.setTimestamp(position, new Timestamp(timestamp.toEpochMilli()));
            }
        };
        return this.doCreateTables(eventTableFactory, result);
    }

    private JdbcEventStorageEngine doCreateTables(EventTableFactory eventTableFactory, JdbcEventStorageEngine result) {
        try {
            Connection connection = this.dataSource.getConnection();
            connection.prepareStatement("DROP TABLE IF EXISTS DomainEventEntry").executeUpdate();
            connection.prepareStatement("DROP TABLE IF EXISTS SnapshotEventEntry").executeUpdate();
            result.createSchema(eventTableFactory);
            return result;
        }
        catch (SQLException e) {
            throw new IllegalStateException(e);
        }
    }
}

