/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.bus;

import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.joda.time.Period;
import org.killbill.TestSetup;
import org.killbill.billing.util.queue.QueueRetryException;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.TestEventBusBase;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.eventbus.AllowConcurrentEvents;
import org.killbill.commons.eventbus.Subscribe;
import org.killbill.commons.utils.collect.Iterators;
import org.killbill.notificationq.DefaultNotificationQueueService;
import org.killbill.notificationq.api.NotificationQueueService;
import org.killbill.notificationq.dao.NotificationEventModelDao;
import org.killbill.notificationq.dao.NotificationSqlDao;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.retry.RetryableService;
import org.killbill.queue.retry.RetryableSubscriber;
import org.skife.jdbi.v2.IDBI;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestRetries
extends TestSetup {
    private static final UUID TOKEN_ID = UUID.randomUUID();
    private static final long SEARCH_KEY_1 = 65L;
    private static final long SEARCH_KEY_2 = 34L;
    private NotificationQueueService queueService;
    private PersistentBus busService;

    @Override
    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        this.queueService = new DefaultNotificationQueueService((IDBI)this.getDBI(), (Clock)this.clock, this.getNotificationQueueConfig(), this.metricRegistry);
        this.busService = new DefaultPersistentBus((IDBI)this.getDBI(), (Clock)this.clock, this.getPersistentBusConfig(), this.metricRegistry, this.databaseTransactionNotificationApi);
        this.busService.startQueue();
    }

    @AfterMethod(groups={"slow"})
    public void afterMethod() throws Exception {
        this.busService.stopQueue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"slow"})
    public void testRetryStateForBus() throws Exception {
        RetryableBusService retryableBusService = new RetryableBusService(this.queueService);
        retryableBusService.initialize();
        this.busService.register((Object)retryableBusService);
        try {
            retryableBusService.start();
            TestEventBusBase.MyEvent myEvent = new TestEventBusBase.MyEvent("Foo", 1L, "Baz", 65L, 34L, TOKEN_ID);
            this.busService.post((BusEvent)myEvent);
            final PersistentBusSqlDao busSqlDao = (PersistentBusSqlDao)this.dbi.onDemand(PersistentBusSqlDao.class);
            final NotificationSqlDao notificationSqlDao = (NotificationSqlDao)this.dbi.onDemand(NotificationSqlDao.class);
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> Iterators.size((Iterator)busSqlDao.getHistoricalQueueEntriesForSearchKeys(Long.valueOf(65L), Long.valueOf(34L), this.persistentBusConfig.getHistoryTableName())) == 1 && Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:myEvent-listener", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName())) == 3);
            List historicalEntriesForOriginalEvent = Iterators.toUnmodifiableList((Iterator)busSqlDao.getHistoricalQueueEntriesForSearchKeys(Long.valueOf(65L), Long.valueOf(34L), this.persistentBusConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForOriginalEvent.size(), (int)1);
            Assert.assertEquals((long)((BusEventModelDao)historicalEntriesForOriginalEvent.get(0)).getErrorCount(), (long)0L);
            Assert.assertEquals((Object)((BusEventModelDao)historicalEntriesForOriginalEvent.get(0)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            List historicalEntriesForRetries = Iterators.toUnmodifiableList((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:myEvent-listener", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForRetries.size(), (int)3);
            for (NotificationEventModelDao historicalEntriesForRetry : historicalEntriesForRetries) {
                Assert.assertEquals((long)historicalEntriesForRetry.getErrorCount(), (long)0L);
                Assert.assertEquals((Object)historicalEntriesForRetry.getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            }
            retryableBusService.shouldFail(false);
            this.clock.addDays(1);
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return Iterators.size((Iterator)busSqlDao.getHistoricalQueueEntriesForSearchKeys(Long.valueOf(65L), Long.valueOf(34L), TestRetries.this.persistentBusConfig.getHistoryTableName())) == 1 && Iterators.size((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:myEvent-listener", Long.valueOf(65L), Long.valueOf(34L), TestRetries.this.notificationQueueConfig.getHistoryTableName())) == 4;
                }
            });
            historicalEntriesForOriginalEvent = Iterators.toUnmodifiableList((Iterator)busSqlDao.getHistoricalQueueEntriesForSearchKeys(Long.valueOf(65L), Long.valueOf(34L), this.persistentBusConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForOriginalEvent.size(), (int)1);
            Assert.assertEquals((long)((BusEventModelDao)historicalEntriesForOriginalEvent.get(0)).getErrorCount(), (long)0L);
            Assert.assertEquals((Object)((BusEventModelDao)historicalEntriesForOriginalEvent.get(0)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.FAILED);
            historicalEntriesForRetries = Iterators.toUnmodifiableList((Iterator)notificationSqlDao.getHistoricalQueueEntriesForSearchKeys("notifications-retries:myEvent-listener", Long.valueOf(65L), Long.valueOf(34L), this.notificationQueueConfig.getHistoryTableName()));
            Assert.assertEquals((int)historicalEntriesForRetries.size(), (int)4);
            for (int i = 0; i < historicalEntriesForRetries.size(); ++i) {
                NotificationEventModelDao historicalEntriesForRetry;
                historicalEntriesForRetry = (NotificationEventModelDao)historicalEntriesForRetries.get(i);
                Assert.assertEquals((long)historicalEntriesForRetry.getErrorCount(), (long)0L);
                Assert.assertEquals((Object)historicalEntriesForRetry.getProcessingState(), (Object)(i == historicalEntriesForRetries.size() - 1 ? PersistentQueueEntryLifecycleState.PROCESSED : PersistentQueueEntryLifecycleState.FAILED));
            }
        }
        finally {
            retryableBusService.stop();
        }
    }

    private final class RetryableBusService
    extends RetryableService {
        private final RetryableSubscriber.SubscriberQueueHandler subscriberQueueHandler;
        private final RetryableSubscriber retryableSubscriber;
        private boolean shouldFail;

        public RetryableBusService(NotificationQueueService notificationQueueService) {
            super(notificationQueueService);
            this.subscriberQueueHandler = new RetryableSubscriber.SubscriberQueueHandler();
            this.shouldFail = true;
            this.subscriberQueueHandler.subscribe(TestEventBusBase.MyEvent.class, (RetryableSubscriber.SubscriberAction)new RetryableSubscriber.SubscriberAction<TestEventBusBase.MyEvent>(){

                public void run(TestEventBusBase.MyEvent event) {
                    if (!RetryableBusService.this.shouldFail) {
                        return;
                    }
                    NullPointerException exceptionForTests = new NullPointerException("Expected exception for tests");
                    throw new QueueRetryException((Exception)exceptionForTests, List.of(Period.millis((int)1), Period.millis((int)1), Period.millis((int)1), Period.days((int)1)));
                }
            });
            this.retryableSubscriber = new RetryableSubscriber((Clock)TestRetries.this.clock, (RetryableService)this, (NotificationQueueService.NotificationQueueHandler)this.subscriberQueueHandler);
        }

        public void initialize() {
            super.initialize("myEvent-listener", (NotificationQueueService.NotificationQueueHandler)this.subscriberQueueHandler);
        }

        @AllowConcurrentEvents
        @Subscribe
        public void handleMyEvent(TestEventBusBase.MyEvent event) {
            this.retryableSubscriber.handleEvent((BusEvent)event);
        }

        public void shouldFail(boolean shouldFail) {
            this.shouldFail = shouldFail;
        }
    }
}

