/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.deadletter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.axonframework.common.transaction.NoOpTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.annotation.MessageIdentifier;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.utils.AssertUtils;
import org.axonframework.utils.InMemoryStreamableEventSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public abstract class DeadLetteringEventIntegrationTest {
    protected static final String PROCESSING_GROUP = "problematicProcessingGroup";
    private static final boolean SUCCEED = true;
    private static final boolean SUCCEED_RETRY = true;
    private static final boolean FAIL = false;
    private static final boolean FAIL_RETRY = false;
    private ProblematicEventHandlingComponent eventHandlingComponent;
    private SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue;
    private DeadLetteringEventHandlerInvoker deadLetteringInvoker;
    private InMemoryStreamableEventSource eventSource;
    private StreamingEventProcessor streamingProcessor;
    protected TransactionManager transactionManager;
    private ScheduledExecutorService executor;

    protected abstract SequencedDeadLetterQueue<EventMessage<?>> buildDeadLetterQueue();

    protected TransactionManager getTransactionManager() {
        return new NoOpTransactionManager();
    }

    @BeforeEach
    void setUp() {
        this.transactionManager = this.getTransactionManager();
        this.eventHandlingComponent = new ProblematicEventHandlingComponent();
        this.deadLetterQueue = this.buildDeadLetterQueue();
        EnqueuePolicy enqueuePolicy = (letter, cause) -> {
            int retries = (Integer)letter.diagnostics().getOrDefault((Object)"retries", (Object)0);
            if (retries < 1) {
                return Decisions.enqueue((Throwable)cause, l -> MetaData.with((String)"retries", (Object)((Integer)l.diagnostics().getOrDefault((Object)"retries", (Object)0) + 1)));
            }
            return Decisions.evict();
        };
        this.deadLetteringInvoker = ((DeadLetteringEventHandlerInvoker.Builder)((DeadLetteringEventHandlerInvoker.Builder)DeadLetteringEventHandlerInvoker.builder().eventHandlers(new Object[]{this.eventHandlingComponent})).sequencingPolicy(event -> ((DeadLetterableEvent)event.getPayload()).getAggregateIdentifier())).enqueuePolicy(enqueuePolicy).queue(this.deadLetterQueue).transactionManager(this.transactionManager).build();
        this.eventSource = new InMemoryStreamableEventSource();
        this.streamingProcessor = PooledStreamingEventProcessor.builder().name(PROCESSING_GROUP).eventHandlerInvoker((EventHandlerInvoker)this.deadLetteringInvoker).rollbackConfiguration((RollbackConfiguration)RollbackConfigurationType.ANY_THROWABLE).messageSource((StreamableMessageSource)this.eventSource).tokenStore((TokenStore)new InMemoryTokenStore()).transactionManager(this.transactionManager).coordinatorExecutor(Executors.newSingleThreadScheduledExecutor()).workerExecutor(Executors.newSingleThreadScheduledExecutor()).initialSegmentCount(1).claimExtensionThreshold(1000L).build();
        this.executor = Executors.newScheduledThreadPool(2);
    }

    @AfterEach
    void tearDown() {
        boolean executorTerminated = false;
        CompletableFuture processorShutdown = this.streamingProcessor.shutdownAsync();
        try {
            processorShutdown.get(15L, TimeUnit.SECONDS);
            executorTerminated = this.executor.awaitTermination(50L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
        if (!executorTerminated) {
            this.executor.shutdownNow();
        }
    }

    protected void startProcessingEvent() {
        this.streamingProcessor.start();
    }

    protected void processAnyDeadLetter() {
        this.deadLetteringInvoker.processAny();
    }

    protected void processAnyDeadLettersPeriodically() {
        this.executor.scheduleWithFixedDelay(this::processAnyDeadLetter, 5L, 5L, TimeUnit.MILLISECONDS);
    }

    @Test
    void failedEventHandlingEnqueuesTheEvent() {
        EventMessage failedEvent = GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent("failure", false));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent("success", true)));
        this.eventSource.publishMessage(failedEvent);
        this.startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 2L ? 1 : 0) != 0));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful("success"));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful("failure"));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)"failure"));
        Assertions.assertFalse((boolean)this.deadLetterQueue.contains((Object)"success"));
        Iterator sequence = this.deadLetterQueue.deadLetterSequence((Object)"failure").iterator();
        Assertions.assertTrue((boolean)sequence.hasNext());
        Assertions.assertEquals((Object)failedEvent.getPayload(), (Object)((EventMessage)((DeadLetter)sequence.next()).message()).getPayload());
        Assertions.assertFalse((boolean)sequence.hasNext());
    }

    @Test
    void eventsInTheSameSequenceAreAllEnqueuedIfOneOfThemFails() {
        int expectedSuccessfulHandlingCount = 3;
        String aggregateId = UUID.randomUUID().toString();
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        DeadLetterableEvent firstDeadLetter = new DeadLetterableEvent(aggregateId, false);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)firstDeadLetter));
        DeadLetterableEvent secondDeadLetter = new DeadLetterableEvent(aggregateId, true);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)secondDeadLetter));
        DeadLetterableEvent thirdDeadLetter = new DeadLetterableEvent(aggregateId, true);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)thirdDeadLetter));
        this.startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 6L ? 1 : 0) != 0));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful(aggregateId));
        Assertions.assertEquals((int)expectedSuccessfulHandlingCount, (int)this.eventHandlingComponent.successfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful(aggregateId));
        Assertions.assertEquals((int)1, (int)this.eventHandlingComponent.unsuccessfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Iterator sequence = this.deadLetterQueue.deadLetterSequence((Object)aggregateId).iterator();
            Assertions.assertTrue((boolean)sequence.hasNext());
            Assertions.assertEquals((Object)firstDeadLetter, (Object)((EventMessage)((DeadLetter)sequence.next()).message()).getPayload());
            Assertions.assertTrue((boolean)sequence.hasNext());
            Assertions.assertEquals((Object)secondDeadLetter, (Object)((EventMessage)((DeadLetter)sequence.next()).message()).getPayload());
            Assertions.assertTrue((boolean)sequence.hasNext());
            Assertions.assertEquals((Object)thirdDeadLetter, (Object)((EventMessage)((DeadLetter)sequence.next()).message()).getPayload());
            Assertions.assertFalse((boolean)sequence.hasNext());
        });
    }

    @Test
    void successfulRetryingLettersEvictsTheLettersFromTheQueue() {
        int expectedSuccessfulInitialHandlingCount = 3;
        int expectedUnsuccessfulInitialHandlingCount = 1;
        int expectedSuccessfulEvaluationCount = 3;
        int expectedUnsuccessfulEvaluationCount = 0;
        String aggregateId = UUID.randomUUID().toString();
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, false, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, true)));
        this.startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 6L ? 1 : 0) != 0));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful(aggregateId));
        Assertions.assertEquals((int)expectedSuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.successfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful(aggregateId));
        Assertions.assertEquals((int)expectedUnsuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.unsuccessfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
        this.deadLetteringInvoker.process(deadLetter -> true);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasSuccessful(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)expectedSuccessfulEvaluationCount, (int)this.eventHandlingComponent.successfulEvaluationCount(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertFalse((boolean)this.eventHandlingComponent.evaluationWasUnsuccessful(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)expectedUnsuccessfulEvaluationCount, (int)this.eventHandlingComponent.unsuccessfulEvaluationCount(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertFalse((boolean)this.deadLetterQueue.contains((Object)aggregateId)));
    }

    @Test
    void unsuccessfulProcessingLettersRequeuesTheLettersInTheQueue() {
        int expectedSuccessfulInitialHandlingCount = 3;
        int expectedUnsuccessfulInitialHandlingCount = 1;
        int expectedSuccessfulEvaluationCount = 2;
        int expectedUnsuccessfulEvaluationCount = 1;
        String aggregateId = UUID.randomUUID().toString();
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, false, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, false)));
        this.startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            OptionalLong optionalPosition = ((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue((boolean)optionalPosition.isPresent());
            Assertions.assertTrue((optionalPosition.getAsLong() >= 6L ? 1 : 0) != 0);
        });
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful(aggregateId));
        Assertions.assertEquals((int)expectedSuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.successfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful(aggregateId));
        Assertions.assertEquals((int)expectedUnsuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.unsuccessfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
        this.deadLetteringInvoker.process(deadLetter -> true);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasSuccessful(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)expectedSuccessfulEvaluationCount, (int)this.eventHandlingComponent.successfulEvaluationCount(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasUnsuccessful(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)expectedUnsuccessfulEvaluationCount, (int)this.eventHandlingComponent.unsuccessfulEvaluationCount(aggregateId)));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
    }

    @Test
    void publishEventsAndProcessDeadLettersConcurrentlyShouldWorkFine() {
        int expectedSuccessfulInitialHandlingCount = 3;
        int expectedUnsuccessfulInitialHandlingCount = 1;
        int expectedSuccessfulEvaluationCount = 2;
        int expectedUnsuccessfulEvaluationCount = 1;
        String aggregateId = UUID.randomUUID().toString();
        this.startProcessingEvent();
        this.processAnyDeadLettersPeriodically();
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        DeadLetterableEvent firstDeadLetter = new DeadLetterableEvent(aggregateId, false, true);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)firstDeadLetter));
        DeadLetterableEvent secondDeadLetter = new DeadLetterableEvent(aggregateId, true, true);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)secondDeadLetter));
        DeadLetterableEvent thirdDeadLetter = new DeadLetterableEvent(aggregateId, true, false);
        this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)thirdDeadLetter));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            OptionalLong optionalPosition = ((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue((boolean)optionalPosition.isPresent());
            Assertions.assertTrue((optionalPosition.getAsLong() >= 6L ? 1 : 0) != 0);
        });
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful(aggregateId));
        Assertions.assertEquals((int)expectedSuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.successfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful(aggregateId));
        Assertions.assertEquals((int)expectedUnsuccessfulInitialHandlingCount, (int)this.eventHandlingComponent.unsuccessfulInitialHandlingCount(aggregateId));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasSuccessful(aggregateId)));
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((int)expectedSuccessfulEvaluationCount, (int)this.eventHandlingComponent.successfulEvaluationCount(aggregateId)));
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasUnsuccessful(aggregateId)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)expectedUnsuccessfulEvaluationCount, (int)this.eventHandlingComponent.unsuccessfulEvaluationCount(aggregateId)));
        Assertions.assertTrue((boolean)this.deadLetterQueue.contains((Object)aggregateId));
    }

    @Test
    @Timeout(value=20L)
    void publishEventsAndProcessDeadLettersConcurrentlyInBulkShouldWorkFine() throws InterruptedException {
        int immediateSuccessesPerAggregate = 5;
        int failFirstAndThenSucceedPerAggregate = 4;
        int persistentFailingPerAggregate = 1;
        int expectedSuccessfulEvaluationCount = failFirstAndThenSucceedPerAggregate - persistentFailingPerAggregate;
        int expectedOverallSuccessfulHandlingCount = immediateSuccessesPerAggregate + expectedSuccessfulEvaluationCount;
        int publishingRuns = 40;
        int totalNumberOfEvents = (immediateSuccessesPerAggregate + failFirstAndThenSucceedPerAggregate) * publishingRuns;
        HashSet aggregateIds = new HashSet();
        HashSet<String> validatedAggregateIds = new HashSet<String>();
        Thread publishingThread = new Thread(() -> {
            for (int i = 0; i < publishingRuns; ++i) {
                String aggregateId = Integer.toString(i);
                this.publishEventsFor(aggregateId, immediateSuccessesPerAggregate, failFirstAndThenSucceedPerAggregate, persistentFailingPerAggregate);
                aggregateIds.add(aggregateId);
            }
        });
        this.startProcessingEvent();
        this.processAnyDeadLettersPeriodically();
        publishingThread.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> Assertions.assertEquals((int)1, (int)this.streamingProcessor.processingStatus().size()));
        AssertUtils.assertWithin(15, TimeUnit.SECONDS, () -> {
            OptionalLong optionalPosition = ((EventTrackerStatus)this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue((boolean)optionalPosition.isPresent());
            Assertions.assertEquals((long)totalNumberOfEvents, (long)optionalPosition.getAsLong());
        });
        for (String aggregateId : aggregateIds) {
            if (validatedAggregateIds.contains(aggregateId)) continue;
            AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasSuccessful(aggregateId)));
            AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((int)immediateSuccessesPerAggregate, (int)this.eventHandlingComponent.successfulInitialHandlingCount(aggregateId)));
            AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.initialHandlingWasUnsuccessful(aggregateId)));
            Assertions.assertEquals((int)1, (int)this.eventHandlingComponent.unsuccessfulInitialHandlingCount(aggregateId));
            AssertUtils.assertWithin(15, TimeUnit.SECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasSuccessful(aggregateId)));
            AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertEquals((int)expectedSuccessfulEvaluationCount, (int)this.eventHandlingComponent.successfulEvaluationCount(aggregateId)));
            AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> Assertions.assertTrue((boolean)this.eventHandlingComponent.evaluationWasUnsuccessful(aggregateId)));
            Assertions.assertTrue((this.eventHandlingComponent.unsuccessfulEvaluationCount(aggregateId) >= persistentFailingPerAggregate ? 1 : 0) != 0);
            Assertions.assertEquals((int)expectedOverallSuccessfulHandlingCount, (int)this.eventHandlingComponent.overallSuccessfulHandlingCount(aggregateId));
            validatedAggregateIds.add(aggregateId);
        }
        publishingThread.join();
    }

    private void publishEventsFor(String aggregateId, int immediateSuccessesPerAggregate, int failFirstAndThenSucceedPerAggregate, int persistentFailingPerAggregate) {
        int i;
        for (i = 0; i < immediateSuccessesPerAggregate; ++i) {
            this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true)));
        }
        for (i = 0; i < failFirstAndThenSucceedPerAggregate; ++i) {
            if (i == 0) {
                this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, false, true)));
                continue;
            }
            if (failFirstAndThenSucceedPerAggregate - persistentFailingPerAggregate == i) {
                this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, false)));
                continue;
            }
            this.eventSource.publishMessage(GenericEventMessage.asEventMessage((Object)new DeadLetterableEvent(aggregateId, true, true)));
        }
    }

    private static class DeadLetterableEvent {
        private final String aggregateIdentifier;
        private final boolean shouldSucceed;
        private final boolean shouldSucceedOnEvaluation;

        private DeadLetterableEvent(String aggregateIdentifier, boolean shouldSucceed) {
            this(aggregateIdentifier, shouldSucceed, true);
        }

        @JsonCreator
        public DeadLetterableEvent(@JsonProperty(value="aggregateIdentifier") String aggregateIdentifier, @JsonProperty(value="shouldSucceed") boolean shouldSucceed, @JsonProperty(value="shouldSucceedOnEvaluation") boolean shouldSucceedOnEvaluation) {
            this.aggregateIdentifier = aggregateIdentifier;
            this.shouldSucceed = shouldSucceed;
            this.shouldSucceedOnEvaluation = shouldSucceedOnEvaluation;
        }

        public String getAggregateIdentifier() {
            return this.aggregateIdentifier;
        }

        public boolean isShouldSucceed() {
            return this.shouldSucceed;
        }

        public boolean isShouldSucceedOnEvaluation() {
            return this.shouldSucceedOnEvaluation;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            DeadLetterableEvent that = (DeadLetterableEvent)o;
            return this.shouldSucceed == that.shouldSucceed && this.shouldSucceedOnEvaluation == that.shouldSucceedOnEvaluation && Objects.equals(this.aggregateIdentifier, that.aggregateIdentifier);
        }

        public int hashCode() {
            return Objects.hash(this.aggregateIdentifier, this.shouldSucceed, this.shouldSucceedOnEvaluation);
        }

        public String toString() {
            return "DeadLetterableEvent{aggregateIdentifier='" + this.aggregateIdentifier + '\'' + ", shouldSucceed=" + this.shouldSucceed + ", shouldSucceedOnEvaluation=" + this.shouldSucceedOnEvaluation + '}';
        }
    }

    private static class ProblematicEventHandlingComponent {
        private final Set<String> handledEvent = new ConcurrentSkipListSet<String>();
        private final Map<String, Integer> firstTrySuccesses = new ConcurrentSkipListMap<String, Integer>();
        private final Map<String, Integer> evaluationSuccesses = new ConcurrentSkipListMap<String, Integer>();
        private final Map<String, Integer> firstTryFailures = new ConcurrentSkipListMap<String, Integer>();
        private final Map<String, Integer> evaluationFailures = new ConcurrentSkipListMap<String, Integer>();

        private ProblematicEventHandlingComponent() {
        }

        @EventHandler
        public void on(DeadLetterableEvent event, @MessageIdentifier String eventIdentifier) {
            String sequenceId = event.getAggregateIdentifier();
            if (!this.handledEvent.contains(eventIdentifier)) {
                this.handledEvent.add(eventIdentifier);
                if (!this.initialHandlingWasUnsuccessful(sequenceId)) {
                    this.processInitialHandlingOf(event, sequenceId);
                } else {
                    this.processEvaluationOf(event, sequenceId);
                }
            } else {
                this.processEvaluationOf(event, sequenceId);
            }
        }

        private void processInitialHandlingOf(DeadLetterableEvent event, String sequenceId) {
            if (!event.isShouldSucceed()) {
                this.firstTryFailures.compute(sequenceId, (id, count) -> {
                    int n;
                    if (count == null) {
                        n = 1;
                    } else {
                        count = count + 1;
                        n = count;
                    }
                    return n;
                });
                throw new RuntimeException("Initial handling failed. Let's dead letter event [" + sequenceId + "].");
            }
            this.firstTrySuccesses.compute(sequenceId, (id, count) -> {
                int n;
                if (count == null) {
                    n = 1;
                } else {
                    count = count + 1;
                    n = count;
                }
                return n;
            });
        }

        private void processEvaluationOf(DeadLetterableEvent event, String sequenceId) {
            if (!event.isShouldSucceedOnEvaluation()) {
                this.evaluationFailures.compute(sequenceId, (id, count) -> {
                    int n;
                    if (count == null) {
                        n = 1;
                    } else {
                        count = count + 1;
                        n = count;
                    }
                    return n;
                });
                throw new RuntimeException("Evaluation failed. Let's dead letter event [" + sequenceId + "].");
            }
            this.evaluationSuccesses.compute(sequenceId, (id, count) -> {
                int n;
                if (count == null) {
                    n = 1;
                } else {
                    count = count + 1;
                    n = count;
                }
                return n;
            });
        }

        public boolean initialHandlingWasSuccessful(String aggregateIdentifier) {
            return this.firstTrySuccesses.containsKey(aggregateIdentifier);
        }

        public int successfulInitialHandlingCount(String aggregateIdentifier) {
            return this.initialHandlingWasSuccessful(aggregateIdentifier) ? this.firstTrySuccesses.get(aggregateIdentifier) : 0;
        }

        public boolean evaluationWasSuccessful(String aggregateIdentifier) {
            return this.evaluationSuccesses.containsKey(aggregateIdentifier);
        }

        public int successfulEvaluationCount(String aggregateIdentifier) {
            return this.evaluationWasSuccessful(aggregateIdentifier) ? this.evaluationSuccesses.get(aggregateIdentifier) : 0;
        }

        public boolean initialHandlingWasUnsuccessful(String aggregateIdentifier) {
            return this.firstTryFailures.containsKey(aggregateIdentifier);
        }

        public int unsuccessfulInitialHandlingCount(String aggregateIdentifier) {
            return this.initialHandlingWasUnsuccessful(aggregateIdentifier) ? this.firstTryFailures.get(aggregateIdentifier) : 0;
        }

        public boolean evaluationWasUnsuccessful(String aggregateIdentifier) {
            return this.evaluationFailures.containsKey(aggregateIdentifier);
        }

        public int unsuccessfulEvaluationCount(String aggregateIdentifier) {
            return this.evaluationWasUnsuccessful(aggregateIdentifier) ? this.evaluationFailures.get(aggregateIdentifier) : 0;
        }

        public int overallSuccessfulHandlingCount(String aggregateIdentifier) {
            return this.firstTrySuccesses.get(aggregateIdentifier) + this.evaluationSuccesses.get(aggregateIdentifier);
        }
    }
}

