/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.TestSetup;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.utils.collect.Iterators;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.DBBackedQueueWithPolling;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.skife.config.TimeSpan;
import org.skife.jdbi.v2.IDBI;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestReaper
extends TestSetup {
    private static final Long SEARCH_KEY_1 = 1L;
    private static final Long SEARCH_KEY_2 = 2L;
    private DBBackedQueue<BusEventModelDao> queue;
    private PersistentBusSqlDao sqlDao;

    @Override
    @BeforeClass(groups={"slow"})
    public void beforeClass() throws Exception {
        super.beforeClass();
        this.sqlDao = (PersistentBusSqlDao)this.getDBI().onDemand(PersistentBusSqlDao.class);
    }

    @Override
    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 100, null, "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
    }

    @Test(groups={"slow"})
    public void testReapEntries() {
        PersistentBusConfig config = this.createConfig();
        this.queue = new DBBackedQueueWithPolling((Clock)this.clock, (IDBI)this.dbi, PersistentBusSqlDao.class, (PersistentQueueConfig)config, "testReapEntries", this.metricRegistry);
        DateTime now = this.clock.getUTCNow();
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(1L, CreatorName.get(), null, now, PersistentQueueEntryLifecycleState.AVAILABLE), config.getTableName());
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(2L, "thatOtherNode", null, now, PersistentQueueEntryLifecycleState.AVAILABLE), config.getTableName());
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(3L, CreatorName.get(), CreatorName.get(), now, PersistentQueueEntryLifecycleState.IN_PROCESSING), config.getTableName());
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntry(4L, "thatOtherNode", "thatOtherNode", now, PersistentQueueEntryLifecycleState.IN_PROCESSING), config.getTableName());
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntryForReaping(5L, CreatorName.get(), CreatorName.get(), now, config.getReapThreshold().getMillis(), PersistentQueueEntryLifecycleState.IN_PROCESSING), config.getTableName());
        this.sqlDao.insertEntry((EventEntryModelDao)this.createEntryForReaping(6L, "thatOtherNode", "thatOtherNode", now, config.getReapThreshold().getMillis(), PersistentQueueEntryLifecycleState.IN_PROCESSING), config.getTableName());
        List readyEntries = this.sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName());
        Assert.assertEquals((int)readyEntries.size(), (int)1);
        Assert.assertEquals((Object)((BusEventModelDao)readyEntries.get(0)).getRecordId(), (Object)1L);
        List readyOrInProcessingBeforeReaping = Iterators.toUnmodifiableList((Iterator)this.sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName()));
        Assert.assertEquals((int)readyOrInProcessingBeforeReaping.size(), (int)6);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(0)).getRecordId(), (Object)1L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(1)).getRecordId(), (Object)2L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(2)).getRecordId(), (Object)3L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(3)).getRecordId(), (Object)4L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(4)).getRecordId(), (Object)5L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingBeforeReaping.get(5)).getRecordId(), (Object)6L);
        Assert.assertFalse((boolean)this.sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName()).hasNext());
        this.queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate());
        List readyEntriesAfterReaping = this.sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName());
        Assert.assertEquals((int)readyEntriesAfterReaping.size(), (int)2);
        Assert.assertEquals((Object)((BusEventModelDao)readyEntriesAfterReaping.get(0)).getRecordId(), (Object)1L);
        Assert.assertTrue((((BusEventModelDao)readyEntriesAfterReaping.get(1)).getRecordId() > Long.valueOf(6L) ? 1 : 0) != 0);
        List readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList((Iterator)this.sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName()));
        Assert.assertEquals((int)readyOrInProcessingAfterReaping.size(), (int)6);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(0)).getRecordId(), (Object)1L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(1)).getRecordId(), (Object)2L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(2)).getRecordId(), (Object)3L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(3)).getRecordId(), (Object)4L);
        Assert.assertEquals((Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(4)).getRecordId(), (Object)5L);
        Assert.assertTrue((((BusEventModelDao)readyOrInProcessingAfterReaping.get(5)).getRecordId() > Long.valueOf(6L) ? 1 : 0) != 0);
        List historicalQueueEntries = Iterators.toUnmodifiableList((Iterator)this.sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName()));
        Assert.assertEquals((int)historicalQueueEntries.size(), (int)1);
        Assert.assertEquals((Object)((BusEventModelDao)historicalQueueEntries.get(0)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.REAPED);
        Assert.assertEquals((Object)((BusEventModelDao)historicalQueueEntries.get(0)).getUserToken(), (Object)((BusEventModelDao)readyOrInProcessingAfterReaping.get(5)).getUserToken());
    }

    private BusEventModelDao createEntry(long recordId, String creatingOwner, String processingOwner, DateTime createdDate, PersistentQueueEntryLifecycleState state) {
        return new BusEventModelDao(Long.valueOf(recordId), creatingOwner, processingOwner, createdDate, createdDate, state, String.class.getName(), "{}", Long.valueOf(0L), UUID.randomUUID(), SEARCH_KEY_1, SEARCH_KEY_2);
    }

    private BusEventModelDao createEntryForReaping(long recordId, String creatingOwner, String processingOwner, DateTime now, long reapThresholdMillis, PersistentQueueEntryLifecycleState state) {
        DateTime createdDate = now.minus(reapThresholdMillis);
        DateTime processingAvailableDate = now;
        return new BusEventModelDao(Long.valueOf(recordId), creatingOwner, processingOwner, createdDate, processingAvailableDate, state, String.class.getName(), "{}", Long.valueOf(0L), UUID.randomUUID(), SEARCH_KEY_1, SEARCH_KEY_2);
    }

    private PersistentBusConfig createConfig() {
        return new PersistentBusConfig(){

            public boolean isInMemory() {
                return false;
            }

            public int getMaxFailureRetries() {
                return 0;
            }

            public int getMinInFlightEntries() {
                return 1;
            }

            public int getMaxInFlightEntries() {
                return 100;
            }

            public int getMaxEntriesClaimed() {
                return 100;
            }

            public PersistentQueueConfig.PersistentQueueMode getPersistentQueueMode() {
                return PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING;
            }

            public TimeSpan getClaimedTime() {
                return new TimeSpan("1m");
            }

            public long getPollingSleepTimeMs() {
                return -1L;
            }

            public boolean isProcessingOff() {
                return false;
            }

            public int geMaxDispatchThreads() {
                return 1;
            }

            public int geNbLifecycleDispatchThreads() {
                return 1;
            }

            public int geNbLifecycleCompleteThreads() {
                return 1;
            }

            public int getEventQueueCapacity() {
                return -1;
            }

            public String getTableName() {
                return "bus_events";
            }

            public String getHistoryTableName() {
                return "bus_events_history";
            }

            public TimeSpan getReapThreshold() {
                return new TimeSpan(5L, TimeUnit.MINUTES);
            }

            public int getMaxReDispatchCount() {
                return 10;
            }

            public TimeSpan getReapSchedule() {
                return new TimeSpan(3L, TimeUnit.MINUTES);
            }

            public TimeSpan getShutdownTimeout() {
                return new TimeSpan(5L, TimeUnit.SECONDS);
            }
        };
    }
}

