/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.notificationq;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.killbill.TestSetup;
import org.killbill.billing.util.queue.QueueRetryException;
import org.killbill.clock.Clock;
import org.killbill.clock.DefaultClock;
import org.killbill.notificationq.DefaultNotificationQueueService;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationEventWithMetadata;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
import org.killbill.notificationq.dao.NotificationEventModelDao;
import org.killbill.notificationq.dao.NotificationSqlDao;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.retry.RetryableHandler;
import org.killbill.queue.retry.RetryableService;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.IntegerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestNotificationQueue
extends TestSetup {
    private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
    private static final UUID TOKEN_ID = UUID.randomUUID();
    private static final long SEARCH_KEY_1 = 65L;
    private static final long SEARCH_KEY_2 = 34L;
    private NotificationQueueService queueService;
    private RetryableNotificationQueueService retryableQueueService;
    private volatile int eventsReceived;

    @Override
    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        this.queueService = new DefaultNotificationQueueService((IDBI)this.getDBI(), (Clock)this.clock, this.getNotificationQueueConfig(), this.metricRegistry);
        this.retryableQueueService = new RetryableNotificationQueueService(this.queueService);
        this.eventsReceived = 0;
    }

    @Test(groups={"slow"})
    public void testSimpleNotification() throws Exception {
        final TreeMap<TestNotificationKey, Boolean> expectedNotifications = new TreeMap<TestNotificationKey, Boolean>();
        final NotificationQueue queue = this.queueService.createNotificationQueue("test-svc", "foo", new NotificationQueueService.NotificationQueueHandler(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void handleReadyNotification(NotificationEvent eventJson, DateTime eventDateTime, UUID userToken, Long searchKey1, Long searchKey2) {
                Map map = expectedNotifications;
                synchronized (map) {
                    TestNotificationQueue.this.log.info("Handler received key: " + eventJson);
                    expectedNotifications.put(eventJson, Boolean.TRUE);
                    expectedNotifications.notify();
                }
            }
        });
        UUID key1 = UUID.randomUUID();
        final TestNotificationKey eventJson1 = new TestNotificationKey(key1.toString());
        expectedNotifications.put(eventJson1, Boolean.FALSE);
        UUID key2 = UUID.randomUUID();
        final TestNotificationKey eventJson2 = new TestNotificationKey(key2.toString());
        expectedNotifications.put(eventJson2, Boolean.FALSE);
        UUID key3 = UUID.randomUUID();
        final TestNotificationKey eventJson3 = new TestNotificationKey(key3.toString());
        expectedNotifications.put(eventJson3, Boolean.FALSE);
        UUID key4 = UUID.randomUUID();
        final TestNotificationKey eventJson4 = new TestNotificationKey(key4.toString());
        expectedNotifications.put(eventJson4, Boolean.FALSE);
        UUID key5 = UUID.randomUUID();
        final TestNotificationKey eventJson5 = new TestNotificationKey(key5.toString());
        expectedNotifications.put(eventJson5, Boolean.FALSE);
        queue.startQueue();
        DateTime now = DefaultClock.truncateMs((DateTime)new DateTime());
        final DateTime readyTime = now.plusMillis(2000);
        DBI dbi = this.getDBI();
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queue.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJson1, TOKEN_ID, Long.valueOf(1L), Long.valueOf(34L));
                TestNotificationQueue.this.log.info("Posted key: " + eventJson1);
                return null;
            }
        });
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queue.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJson2, TOKEN_ID, Long.valueOf(65L), Long.valueOf(1L));
                TestNotificationQueue.this.log.info("Posted key: " + eventJson2);
                return null;
            }
        });
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queue.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJson3, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
                TestNotificationQueue.this.log.info("Posted key: " + eventJson3);
                return null;
            }
        });
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queue.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJson4, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
                TestNotificationQueue.this.log.info("Posted key: " + eventJson4);
                return null;
            }
        });
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queue.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime.plusMonths(1), eventJson5, TOKEN_ID, Long.valueOf(1L), Long.valueOf(1L));
                TestNotificationQueue.this.log.info("Posted key: " + eventJson5);
                return null;
            }
        });
        Assert.assertEquals((int)Iterables.size((Iterable)queue.getInProcessingNotifications()), (int)0);
        Assert.assertEquals((long)queue.getNbReadyEntries(readyTime), (long)4L);
        ImmutableList futuresAll = ImmutableList.copyOf((Iterable)queue.getFutureNotificationForSearchKeys(Long.valueOf(65L), Long.valueOf(34L)));
        Assert.assertEquals((int)futuresAll.size(), (int)2);
        int found = 0;
        for (int i = 0; i < 2; ++i) {
            TestNotificationKey testNotificationKey = (TestNotificationKey)((NotificationEventWithMetadata)futuresAll.get(i)).getEvent();
            if (!testNotificationKey.getValue().equals(key3.toString()) && !testNotificationKey.getValue().equals(key4.toString())) continue;
            ++found;
        }
        Assert.assertEquals((int)found, (int)2);
        ImmutableList futures2 = ImmutableList.copyOf((Iterable)queue.getFutureNotificationForSearchKey2(null, Long.valueOf(34L)));
        Assert.assertEquals((int)futures2.size(), (int)3);
        found = 0;
        for (int i = 0; i < 3; ++i) {
            TestNotificationKey testNotificationKey = (TestNotificationKey)((NotificationEventWithMetadata)futures2.get(i)).getEvent();
            if (!testNotificationKey.getValue().equals(key3.toString()) && !testNotificationKey.getValue().equals(key4.toString()) && !testNotificationKey.getValue().equals(key1.toString())) continue;
            ++found;
        }
        Assert.assertEquals((int)found, (int)3);
        this.clock.setDeltaFromReality(3000L);
        Awaitility.await().atMost(1L, TimeUnit.MINUTES).until((Callable)new Callable<Boolean>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                Map map = expectedNotifications;
                synchronized (map) {
                    return (Boolean)expectedNotifications.get(eventJson1) != false && (Boolean)expectedNotifications.get(eventJson2) != false && (Boolean)expectedNotifications.get(eventJson3) != false && (Boolean)expectedNotifications.get(eventJson4) != false && queue.getNbReadyEntries(readyTime) == 0L;
                }
            }
        });
        Assert.assertEquals((int)Iterables.size((Iterable)queue.getFutureNotificationForSearchKeys(Long.valueOf(65L), Long.valueOf(34L))), (int)0);
        queue.removeFutureNotificationsForSearchKeys(Long.valueOf(65L), Long.valueOf(34L));
        Assert.assertEquals((int)Iterables.size((Iterable)queue.getFutureNotificationForSearchKeys(Long.valueOf(65L), Long.valueOf(34L))), (int)0);
        Assert.assertEquals((int)Iterables.size((Iterable)queue.getFutureNotificationForSearchKeys(Long.valueOf(1L), Long.valueOf(1L))), (int)1);
        queue.removeFutureNotificationsForSearchKeys(Long.valueOf(1L), Long.valueOf(1L));
        Assert.assertEquals((int)Iterables.size((Iterable)queue.getFutureNotificationForSearchKeys(Long.valueOf(1L), Long.valueOf(1L))), (int)0);
        queue.stopQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"slow"})
    public void testManyNotifications() throws Exception {
        int i;
        final TreeMap<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
        final NotificationQueue queue = this.queueService.createNotificationQueue("test-svc", "many", new NotificationQueueService.NotificationQueueHandler(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void handleReadyNotification(NotificationEvent eventJson, DateTime eventDateTime, UUID userToken, Long searchKey1, Long searchKey2) {
                Map map = expectedNotifications;
                synchronized (map) {
                    TestNotificationQueue.this.log.info("Handler received key: " + eventJson.toString());
                    expectedNotifications.put(((TestNotificationKey)eventJson).getValue(), Boolean.TRUE);
                    expectedNotifications.notify();
                }
            }
        });
        queue.startQueue();
        final DateTime now = this.clock.getUTCNow();
        int MAX_NOTIFICATIONS = 100;
        for (i = 0; i < 100; ++i) {
            String value = new Integer(i).toString();
            expectedNotifications.put(value, Boolean.FALSE);
        }
        for (i = 0; i < 100; ++i) {
            int nextReadyTimeIncrementMs = 1000;
            final int currentIteration = i;
            String value = new Integer(i).toString();
            final TestNotificationKey eventJson = new TestNotificationKey(value);
            DBI dbi = this.getDBI();
            dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

                public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                    queue.recordFutureNotificationFromTransaction(conn.getConnection(), now.plus((long)((currentIteration + 1) * 1000)), eventJson, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
                    return null;
                }
            });
            if (i == 0) {
                this.clock.setDeltaFromReality(1000L);
                continue;
            }
            this.clock.addDeltaFromReality(1000L);
        }
        int nbTry = 101;
        boolean success = false;
        do {
            TreeMap<String, Boolean> treeMap = expectedNotifications;
            synchronized (treeMap) {
                Collection completed = Collections2.filter(expectedNotifications.values(), (Predicate)new Predicate<Boolean>(){

                    public boolean apply(Boolean input) {
                        return input;
                    }
                });
                if (completed.size() == 100) {
                    success = true;
                    break;
                }
                this.log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s), nbTry=%d", completed.size(), this.clock.getUTCNow(), new DateTime(), nbTry));
                expectedNotifications.wait(1000L);
            }
        } while (!success);
        queue.stopQueue();
        this.log.info("GOT SIZE " + Collections2.filter(expectedNotifications.values(), (Predicate)new Predicate<Boolean>(){

            public boolean apply(Boolean input) {
                return input;
            }
        }).size());
        Assert.assertEquals((boolean)success, (boolean)true);
    }

    @Test(groups={"slow"})
    public void testMultipleHandlerNotification() throws Exception {
        final TreeMap<TestNotificationKey, Boolean> expectedNotificationsFred = new TreeMap<TestNotificationKey, Boolean>();
        final TreeMap expectedNotificationsBarney = new TreeMap();
        final NotificationQueue queueFred = this.queueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueService.NotificationQueueHandler(){

            public void handleReadyNotification(NotificationEvent eventJson, DateTime eventDateTime, UUID userToken, Long searchKey1, Long searchKey2) {
                TestNotificationQueue.this.log.info("Fred received key: " + eventJson);
                expectedNotificationsFred.put(eventJson, Boolean.TRUE);
                TestNotificationQueue.this.eventsReceived++;
            }
        });
        final NotificationQueue queueBarney = this.queueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueService.NotificationQueueHandler(){

            public void handleReadyNotification(NotificationEvent eventJson, DateTime eventDateTime, UUID userToken, Long searchKey1, Long searchKey2) {
                TestNotificationQueue.this.log.info("Barney received key: " + eventJson);
                expectedNotificationsBarney.put(eventJson, Boolean.TRUE);
                TestNotificationQueue.this.eventsReceived++;
            }
        });
        queueFred.startQueue();
        UUID key = UUID.randomUUID();
        DateTime now = new DateTime();
        final DateTime readyTime = now.plusMillis(2000);
        final TestNotificationKey eventJsonFred = new TestNotificationKey("Fred");
        final TestNotificationKey eventJsonBarney = new TestNotificationKey("Barney");
        expectedNotificationsFred.put(eventJsonFred, Boolean.FALSE);
        expectedNotificationsFred.put(eventJsonBarney, Boolean.FALSE);
        DBI dbi = this.getDBI();
        dbi.inTransaction((TransactionCallback)new TransactionCallback<Object>(){

            public Object inTransaction(Handle conn, TransactionStatus status) throws Exception {
                queueFred.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJsonFred, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
                TestNotificationQueue.this.log.info("posted key: " + eventJsonFred.toString());
                queueBarney.recordFutureNotificationFromTransaction(conn.getConnection(), readyTime, eventJsonBarney, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
                TestNotificationQueue.this.log.info("posted key: " + eventJsonBarney.toString());
                return null;
            }
        });
        this.clock.setDeltaFromReality(3000L);
        try {
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return TestNotificationQueue.this.eventsReceived >= 2;
                }
            });
            Assert.fail((String)"There should only have been one event for the queue to pick up - it got more than that");
        }
        catch (Exception exception) {
            // empty catch block
        }
        queueFred.stopQueue();
        Assert.assertTrue((boolean)((Boolean)expectedNotificationsFred.get(eventJsonFred)));
        Assert.assertFalse((boolean)((Boolean)expectedNotificationsFred.get(eventJsonBarney)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"slow"})
    public void testWithExceptionAndRetrySuccess() throws Exception {
        NotificationQueue queueWithExceptionAndRetrySuccess = this.queueService.createNotificationQueue("ExceptionAndRetrySuccess", "svc", (NotificationQueueService.NotificationQueueHandler)new NotificationQueueHandlerWithExceptions(1));
        try {
            queueWithExceptionAndRetrySuccess.startQueue();
            DateTime now = new DateTime();
            DateTime readyTime = now.plusMillis(2000);
            TestNotificationKey eventJson = new TestNotificationKey("Foo");
            queueWithExceptionAndRetrySuccess.recordFutureNotification(readyTime, (NotificationEvent)eventJson, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
            this.clock.setDeltaFromReality(3000L);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    Integer retryCount = (Integer)TestNotificationQueue.this.dbi.withHandle((HandleCallback)new HandleCallback<Integer>(){

                        public Integer withHandle(Handle handle) throws Exception {
                            return (Integer)handle.createQuery(String.format("select error_count from %s", TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())).map((ResultSetMapper)IntegerMapper.FIRST).first();
                        }
                    });
                    return retryCount != null && retryCount == 1 && TestNotificationQueue.this.eventsReceived == 1;
                }
            });
        }
        finally {
            queueWithExceptionAndRetrySuccess.stopQueue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"slow"})
    public void testWithExceptionAndFailed() throws Exception {
        NotificationQueue queueWithExceptionAndFailed = this.queueService.createNotificationQueue("ExceptionAndRetrySuccess", "svc", (NotificationQueueService.NotificationQueueHandler)new NotificationQueueHandlerWithExceptions(3));
        try {
            queueWithExceptionAndFailed.startQueue();
            DateTime now = new DateTime();
            DateTime readyTime = now.plusMillis(2000);
            TestNotificationKey eventJson = new TestNotificationKey("Foo");
            queueWithExceptionAndFailed.recordFutureNotification(readyTime, (NotificationEvent)eventJson, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
            this.clock.setDeltaFromReality(3000L);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    Integer retryCount = (Integer)TestNotificationQueue.this.dbi.withHandle((HandleCallback)new HandleCallback<Integer>(){

                        public Integer withHandle(Handle handle) throws Exception {
                            return (Integer)handle.createQuery(String.format("select error_count from %s", TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())).map((ResultSetMapper)IntegerMapper.FIRST).first();
                        }
                    });
                    return retryCount != null && retryCount == 3;
                }
            });
        }
        finally {
            queueWithExceptionAndFailed.stopQueue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"slow"})
    public void testRetryStateForNotifications() throws Exception {
        NotificationQueueHandlerWithExceptions handlerDelegate = new NotificationQueueHandlerWithExceptions((List<Period>)ImmutableList.of((Object)Period.millis((int)1), (Object)Period.millis((int)1), (Object)Period.millis((int)1), (Object)Period.days((int)1)));
        RetryableHandler retryableHandler = new RetryableHandler((Clock)this.clock, (RetryableService)this.retryableQueueService, (NotificationQueueService.NotificationQueueHandler)handlerDelegate);
        NotificationQueue queueWithExceptionAndFailed = this.queueService.createNotificationQueue("svc", "queueName", (NotificationQueueService.NotificationQueueHandler)retryableHandler);
        try {
            this.retryableQueueService.initialize(queueWithExceptionAndFailed.getQueueName(), handlerDelegate);
            this.retryableQueueService.start();
            queueWithExceptionAndFailed.startQueue();
            DateTime now = new DateTime();
            DateTime readyTime = now.plusMillis(2000);
            TestNotificationKey eventJson = new TestNotificationKey("Foo");
            queueWithExceptionAndFailed.recordFutureNotification(readyTime, (NotificationEvent)eventJson, TOKEN_ID, Long.valueOf(65L), Long.valueOf(34L));
            this.clock.setDeltaFromReality(3000L);
            final NotificationSqlDao notificationSqlDao = (NotificationSqlDao)this.dbi.onDemand(NotificationSqlDao.class);
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("svc:queueName", Long.valueOf(65L), Long.valueOf(34L), TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())) == 1 && Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:queueName", Long.valueOf(65L), Long.valueOf(34L), TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())) == 3;
                }
            });
            ImmutableList historicalEntriesForOriginalEvent = ImmutableList.copyOf((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("svc:queueName", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForOriginalEvent.size(), (int)1);
            Assert.assertEquals((long)((NotificationEventModelDao)historicalEntriesForOriginalEvent.get(0)).getErrorCount(), (long)0L);
            Assert.assertEquals((Object)((NotificationEventModelDao)historicalEntriesForOriginalEvent.get(0)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            ImmutableList historicalEntriesForRetries = ImmutableList.copyOf((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:queueName", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForRetries.size(), (int)3);
            for (NotificationEventModelDao historicalEntriesForRetry : historicalEntriesForRetries) {
                Assert.assertEquals((long)historicalEntriesForRetry.getErrorCount(), (long)0L);
                Assert.assertEquals((Object)historicalEntriesForRetry.getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            }
            handlerDelegate.shouldFail(false);
            this.clock.addDays(1);
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("svc:queueName", Long.valueOf(65L), Long.valueOf(34L), TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())) == 1 && Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:queueName", Long.valueOf(65L), Long.valueOf(34L), TestNotificationQueue.this.notificationQueueConfig.getHistoryTableName())) == 4;
                }
            });
            historicalEntriesForOriginalEvent = ImmutableList.copyOf((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("svc:queueName", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForOriginalEvent.size(), (int)1);
            Assert.assertEquals((long)((NotificationEventModelDao)historicalEntriesForOriginalEvent.get(0)).getErrorCount(), (long)0L);
            Assert.assertEquals((Object)((NotificationEventModelDao)historicalEntriesForOriginalEvent.get(0)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            historicalEntriesForRetries = ImmutableList.copyOf((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:queueName", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForRetries.size(), (int)4);
            for (int i = 0; i < historicalEntriesForRetries.size(); ++i) {
                NotificationEventModelDao historicalEntriesForRetry;
                historicalEntriesForRetry = (NotificationEventModelDao)historicalEntriesForRetries.get(i);
                Assert.assertEquals((long)historicalEntriesForRetry.getErrorCount(), (long)0L);
                Assert.assertEquals((Object)historicalEntriesForRetry.getProcessingState(), (Object)(i == historicalEntriesForRetries.size() - 1 ? PersistentQueueEntryLifecycleState.PROCESSED : PersistentQueueEntryLifecycleState.FAILED));
            }
        }
        finally {
            queueWithExceptionAndFailed.stopQueue();
            this.retryableQueueService.stop();
        }
    }

    private class NotificationQueueHandlerWithExceptions
    implements NotificationQueueService.NotificationQueueHandler {
        private final List<Period> retrySchedule;
        private final int nbTotalExceptionsToThrow;
        private int nbExceptionsThrown;
        public boolean shouldFail = true;

        public NotificationQueueHandlerWithExceptions(List<Period> retrySchedule) {
            this(retrySchedule, 0, 0);
        }

        public NotificationQueueHandlerWithExceptions(int nbTotalExceptionsToThrow) {
            this(null, nbTotalExceptionsToThrow, 0);
        }

        public NotificationQueueHandlerWithExceptions(List<Period> retrySchedule, int nbTotalExceptionsToThrow, int nbExceptionsThrown) {
            this.retrySchedule = retrySchedule;
            this.nbTotalExceptionsToThrow = nbTotalExceptionsToThrow;
            this.nbExceptionsThrown = nbExceptionsThrown;
        }

        public void handleReadyNotification(NotificationEvent eventJson, DateTime eventDateTime, UUID userToken, Long searchKey1, Long searchKey2) {
            if (!this.shouldFail) {
                TestNotificationQueue.this.eventsReceived++;
                return;
            }
            NullPointerException exceptionForTests = new NullPointerException("Expected exception for tests");
            if (this.retrySchedule != null) {
                throw new QueueRetryException((Exception)exceptionForTests, this.retrySchedule);
            }
            if (this.nbExceptionsThrown < this.nbTotalExceptionsToThrow) {
                ++this.nbExceptionsThrown;
                throw exceptionForTests;
            }
            TestNotificationQueue.this.eventsReceived++;
        }

        public void shouldFail(boolean shouldFail) {
            this.shouldFail = shouldFail;
        }
    }

    private static final class TestNotificationKey
    implements NotificationEvent,
    Comparable<TestNotificationKey> {
        private final String value;

        @JsonCreator
        public TestNotificationKey(@JsonProperty(value="value") String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }

        @Override
        public int compareTo(TestNotificationKey arg0) {
            return this.value.compareTo(arg0.value);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.value);
            return sb.toString();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof TestNotificationKey)) {
                return false;
            }
            TestNotificationKey that = (TestNotificationKey)o;
            return !(this.value != null ? !this.value.equals(that.value) : that.value != null);
        }

        public int hashCode() {
            return this.value != null ? this.value.hashCode() : 0;
        }
    }

    private static final class RetryableNotificationQueueService
    extends RetryableService {
        public RetryableNotificationQueueService(NotificationQueueService notificationQueueService) {
            super(notificationQueueService);
        }
    }
}

