/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.killbill.CreatorName;
import org.killbill.TestSetup;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.config.TimeSpan;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestDBBackedQueue
extends TestSetup {
    private static final Logger log = LoggerFactory.getLogger(TestDBBackedQueue.class);
    private DBBackedQueue<BusEventModelDao> queue;
    private PersistentBusSqlDao sqlDao;

    @BeforeClass(groups={"slow"})
    public void beforeClass() throws Exception {
        super.beforeClass();
        this.sqlDao = (PersistentBusSqlDao)this.getDBI().onDemand(PersistentBusSqlDao.class);
    }

    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 100, null, "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
    }

    @Test(groups={"slow"})
    public void testOnlyInflightQ() {
        PersistentBusConfig config = this.createConfig(1, 10, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "onlyInflightQ-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        long expectedRecordId = -1L;
        ArrayList<Long> recordIs = new ArrayList<Long>();
        for (int i = 0; i < 100; ++i) {
            Assert.assertTrue((boolean)this.queue.isQueueOpenForRead());
            Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
            BusEventModelDao input = this.createEntry(new Long(i));
            this.queue.insertEntry((EventEntryModelDao)input);
            List claimed = this.queue.getReadyEntries();
            Assert.assertEquals((int)claimed.size(), (int)1);
            BusEventModelDao output = (BusEventModelDao)claimed.get(0);
            expectedRecordId = i == 0 ? output.getRecordId() : expectedRecordId + 1L;
            Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
            Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
            Assert.assertEquals((String)output.getEventJson(), (String)"json");
            Assert.assertEquals((Object)output.getProcessingState(), (Object)PersistentQueueEntryLifecycleState.AVAILABLE);
            Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(i));
            Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
            recordIs.add(output.getRecordId());
            BusEventModelDao historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
            this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
        }
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, CreatorName.get(), "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        List readyHistory = this.sqlDao.getEntriesFromIds(recordIs, "bus_events_history");
        Assert.assertEquals((int)readyHistory.size(), (int)100);
        for (int i = 0; i < 100; ++i) {
            Assert.assertEquals((Object)((BusEventModelDao)readyHistory.get(i)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.PROCESSED);
            Assert.assertEquals((String)((BusEventModelDao)readyHistory.get(i)).getProcessingOwner(), (String)CreatorName.get());
        }
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalFetched(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)100L);
    }

    @Test(groups={"slow"})
    public void testWithExistingEntriesForDifferentOwners() {
        for (int i = 0; i < 5; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i), "otherOwner");
            this.sqlDao.insertEntry((EventEntryModelDao)input, "bus_events");
        }
        PersistentBusConfig config = this.createConfig(1, 10, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "existingEntriesForDifferentOwners-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        long expectedRecordId = -1L;
        ArrayList<Long> recordIs = new ArrayList<Long>();
        for (int i = 0; i < 10; ++i) {
            Assert.assertTrue((boolean)this.queue.isQueueOpenForRead());
            Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
            BusEventModelDao input = this.createEntry(new Long(i));
            this.queue.insertEntry((EventEntryModelDao)input);
            List claimed = this.queue.getReadyEntries();
            Assert.assertEquals((int)claimed.size(), (int)1);
            BusEventModelDao output = (BusEventModelDao)claimed.get(0);
            expectedRecordId = i == 0 ? output.getRecordId() : expectedRecordId + 1L;
            Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
            Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
            Assert.assertEquals((String)output.getEventJson(), (String)"json");
            Assert.assertEquals((Object)output.getProcessingState(), (Object)PersistentQueueEntryLifecycleState.AVAILABLE);
            Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(i));
            Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
            recordIs.add(output.getRecordId());
            BusEventModelDao historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
            this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
        }
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)10L);
        Assert.assertEquals((long)this.queue.getTotalFetched(), (long)10L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)10L);
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)10L);
        List remaining = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 10, null, "bus_events");
        Assert.assertEquals((int)remaining.size(), (int)5);
        for (BusEventModelDao cur : remaining) {
            this.sqlDao.removeEntry(cur.getRecordId(), "bus_events");
        }
    }

    @Test(groups={"slow"})
    public void testInflightQWithMultipleEntriesPerTransaction() {
        PersistentBusConfig config = this.createConfig(3, 10, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "MultipleEntriesPerTransaction-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        final BusEventModelDao input1 = this.createEntry(new Long(1L));
        final BusEventModelDao input2 = this.createEntry(new Long(2L));
        final BusEventModelDao input3 = this.createEntry(new Long(3L));
        this.sqlDao.inTransaction((Transaction)new Transaction<Void, QueueSqlDao<BusEventModelDao>>(){

            public Void inTransaction(QueueSqlDao<BusEventModelDao> transactional, TransactionStatus status) throws Exception {
                TestDBBackedQueue.this.queue.insertEntryFromTransaction(transactional, (EventEntryModelDao)input1);
                TestDBBackedQueue.this.queue.insertEntryFromTransaction(transactional, (EventEntryModelDao)input2);
                TestDBBackedQueue.this.queue.insertEntryFromTransaction(transactional, (EventEntryModelDao)input3);
                return null;
            }
        });
        List claimed = this.queue.getReadyEntries();
        Assert.assertEquals((int)claimed.size(), (int)3);
        long expectedRecordId = -1L;
        for (int i = 0; i < claimed.size(); ++i) {
            BusEventModelDao output = (BusEventModelDao)claimed.get(i);
            expectedRecordId = i == 0 ? output.getRecordId() : expectedRecordId + 1L;
            Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
            Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
            Assert.assertEquals((String)output.getEventJson(), (String)"json");
        }
    }

    @Test(groups={"slow"})
    public void testInflightQWithSmallExistingEntriesOnStart() {
        for (int i = 0; i < 5; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.sqlDao.insertEntry((EventEntryModelDao)input, "bus_events");
        }
        PersistentBusConfig config = this.createConfig(7, 100, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "inflightQWithSmallExistingEntriesOnStart-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
        Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
        ArrayList<Long> recordIs = new ArrayList<Long>();
        for (int i = 5; i < 105; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.queue.insertEntry((EventEntryModelDao)input);
        }
        long expectedRecordId = -1L;
        for (int i = 0; i < 15; ++i) {
            if (i <= 1) {
                Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
            } else {
                Assert.assertTrue((boolean)this.queue.isQueueOpenForRead());
            }
            Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
            List claimed = this.queue.getReadyEntries();
            for (int j = 0; j < claimed.size(); ++j) {
                BusEventModelDao output = (BusEventModelDao)claimed.get(j);
                expectedRecordId = i == 0 && j == 0 ? output.getRecordId() : expectedRecordId + 1L;
                Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
                Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
                Assert.assertEquals((String)output.getEventJson(), (String)"json");
                Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(i * 7 + j));
                Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
                recordIs.add(output.getRecordId());
                BusEventModelDao historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
            }
        }
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, CreatorName.get(), "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        List readyHistory = this.sqlDao.getEntriesFromIds(recordIs, "bus_events_history");
        Assert.assertEquals((int)readyHistory.size(), (int)105);
        for (int i = 0; i < 105; ++i) {
            Assert.assertEquals((Object)((BusEventModelDao)readyHistory.get(i)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.PROCESSED);
            Assert.assertEquals((String)((BusEventModelDao)readyHistory.get(i)).getProcessingOwner(), (String)CreatorName.get());
        }
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)91L);
        Assert.assertEquals((long)this.queue.getTotalFetched(), (long)105L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)100L);
    }

    @Test(groups={"slow"})
    public void testInflightQWithLargeExistingEntriesOnStart() {
        BusEventModelDao historyInput;
        BusEventModelDao output;
        int j;
        for (int i = 0; i < 20; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.sqlDao.insertEntry((EventEntryModelDao)input, "bus_events");
        }
        PersistentBusConfig config = this.createConfig(20, 100, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "inflightQWithLargeExistingEntriesOnStart-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
        Assert.assertFalse((boolean)this.queue.isQueueOpenForWrite());
        ArrayList<Long> recordIs = new ArrayList<Long>();
        for (int i = 20; i < 40; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.queue.insertEntry((EventEntryModelDao)input);
        }
        long nextCreatedEntry = 40L;
        long expectedRecordId = -1L;
        List claimed = this.queue.getReadyEntries();
        while (true) {
            for (j = 0; j < claimed.size(); ++j) {
                output = (BusEventModelDao)claimed.get(j);
                expectedRecordId = expectedRecordId == -1L ? output.getRecordId() : expectedRecordId + 1L;
                Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
                Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
                Assert.assertEquals((String)output.getEventJson(), (String)"json");
                Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(expectedRecordId - 1L));
                Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
                recordIs.add(output.getRecordId());
                historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
            }
            Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
            if (claimed.size() < 20) break;
            BusEventModelDao input = this.createEntry(new Long(nextCreatedEntry++));
            this.queue.insertEntry((EventEntryModelDao)input);
            claimed = this.queue.getReadyEntries();
        }
        Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
        Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
        for (int i = 42; i < 100; ++i) {
            BusEventModelDao input = this.createEntry(new Long(nextCreatedEntry++));
            this.queue.insertEntry((EventEntryModelDao)input);
        }
        claimed = this.queue.getReadyEntries();
        do {
            for (j = 0; j < claimed.size(); ++j) {
                output = (BusEventModelDao)claimed.get(j);
                Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(++expectedRecordId));
                Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
                Assert.assertEquals((String)output.getEventJson(), (String)"json");
                Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(expectedRecordId - 1L));
                Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
                recordIs.add(output.getRecordId());
                historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
            }
        } while ((claimed = this.queue.getReadyEntries()).size() != 0);
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)38L);
        Assert.assertEquals((long)this.queue.getTotalFetched(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)58L);
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)80L);
    }

    @Test(groups={"slow"})
    public void testInflightQWithSmallExistingEntriesOnStartAndOverflowWrite() {
        for (int i = 0; i < 5; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.sqlDao.insertEntry((EventEntryModelDao)input, "bus_events");
        }
        PersistentBusConfig config = this.createConfig(1, 100, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "smallExistingEntriesOnStartAndOverflowWrite_bus-event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
        Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
        ArrayList<Long> recordIs = new ArrayList<Long>();
        for (int i = 0; i < 200; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i + 5));
            this.queue.insertEntry((EventEntryModelDao)input);
            if (i >= 100) {
                Assert.assertFalse((boolean)this.queue.isQueueOpenForWrite());
                continue;
            }
            Assert.assertTrue((boolean)this.queue.isQueueOpenForWrite());
        }
        long expectedRecordId = -1L;
        int i = 0;
        do {
            if (i <= 5) {
                Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
            } else if (i < 106) {
                Assert.assertTrue((boolean)this.queue.isQueueOpenForRead());
            } else {
                Assert.assertFalse((boolean)this.queue.isQueueOpenForRead());
            }
            List allReady = this.queue.getReadyEntries();
            for (BusEventModelDao output : allReady) {
                expectedRecordId = i == 0 ? output.getRecordId() : expectedRecordId + 1L;
                Assert.assertEquals((Object)output.getRecordId(), (Object)new Long(expectedRecordId));
                Assert.assertEquals((String)output.getClassName(), (String)String.class.getName());
                Assert.assertEquals((String)output.getEventJson(), (String)"json");
                Assert.assertEquals((Object)output.getSearchKey1(), (Object)new Long(i));
                Assert.assertEquals((Object)output.getSearchKey2(), (Object)new Long(1L));
                recordIs.add(output.getRecordId());
                BusEventModelDao historyInput = new BusEventModelDao(output, CreatorName.get(), this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                this.queue.moveEntryToHistory((EventEntryModelDao)historyInput);
                ++i;
            }
        } while (i != 205);
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, CreatorName.get(), "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        List readyHistory = this.sqlDao.getEntriesFromIds(recordIs, "bus_events_history");
        Assert.assertEquals((int)readyHistory.size(), (int)205);
        for (i = 0; i < 205; ++i) {
            Assert.assertEquals((Object)((BusEventModelDao)readyHistory.get(i)).getProcessingState(), (Object)PersistentQueueEntryLifecycleState.PROCESSED);
            Assert.assertEquals((String)((BusEventModelDao)readyHistory.get(i)).getProcessingOwner(), (String)CreatorName.get());
        }
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)99L);
        Assert.assertEquals((long)this.queue.getTotalFetched(), (long)205L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)100L);
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)200L);
    }

    @Test(groups={"slow"})
    public void testWithOneReaderOneWriter() throws InterruptedException {
        PersistentBusConfig config = this.createConfig(7, 100, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "oneReaderOneWriter-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        Thread writer = new Thread(new WriterRunnable(0, 1000, this.queue));
        AtomicLong consumed = new AtomicLong(0L);
        ReaderRunnable readerRunnable = new ReaderRunnable(0, consumed, 1000, this.queue);
        Thread reader = new Thread(readerRunnable);
        writer.start();
        while (this.queue.isQueueOpenForWrite()) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {}
        }
        reader.start();
        try {
            writer.join();
            reader.join();
        }
        catch (InterruptedException e) {
            Assert.fail((String)"InterruptedException ", (Throwable)e);
        }
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, CreatorName.get(), "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        log.info("Got inflightProcessed = " + this.queue.getTotalInflightFetched() + "/1000, inflightWritten = " + this.queue.getTotalInflightInsert() + "/1000");
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)1000L);
        long expected = 999L;
        for (Long cur : readerRunnable.getSearch1()) {
            Assert.assertEquals((long)cur, (long)expected);
            --expected;
        }
    }

    @Test(groups={"slow"}, enabled=true)
    public void testMultipleWritersOneReader() throws InterruptedException {
        PersistentBusConfig config = this.createConfig(7, 100, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (QueueSqlDao)this.sqlDao, (PersistentQueueConfig)config, "multipleReaderMultipleWriter-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        Thread[] writers = new Thread[2];
        Thread[] readers = new Thread[1];
        writers[0] = new Thread(new WriterRunnable(0, 1000, this.queue));
        writers[1] = new Thread(new WriterRunnable(1, 1000, this.queue));
        AtomicLong consumed = new AtomicLong(0L);
        readers[0] = new Thread(new ReaderRunnable(0, consumed, 2000, this.queue));
        writers[0].start();
        writers[1].start();
        while (this.queue.isQueueOpenForWrite()) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {}
        }
        readers[0].start();
        try {
            writers[0].join();
            writers[1].join();
            readers[0].join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, CreatorName.get(), "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        log.info("Got inflightProcessed = " + this.queue.getTotalInflightFetched() + "/1000, inflightWritten = " + this.queue.getTotalInflightInsert() + "/1000");
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)2000L);
    }

    private BusEventModelDao createEntry(Long searchKey1, String owner) {
        String json = "json";
        return new BusEventModelDao(owner, this.clock.getUTCNow(), String.class.getName(), "json", UUID.randomUUID(), searchKey1, Long.valueOf(1L));
    }

    private BusEventModelDao createEntry(Long searchKey1) {
        return this.createEntry(searchKey1, CreatorName.get());
    }

    private PersistentBusConfig createConfig(final int claimed, final int qCapacity, final PersistentQueueConfig.PersistentQueueMode mode) {
        return new PersistentBusConfig(){

            public boolean isInMemory() {
                return false;
            }

            public int getMaxFailureRetries() {
                return 0;
            }

            public int getMaxEntriesClaimed() {
                return claimed;
            }

            public PersistentQueueConfig.PersistentQueueMode getPersistentQueueMode() {
                return mode;
            }

            public TimeSpan getClaimedTime() {
                return new TimeSpan("5m");
            }

            public long getPollingSleepTimeMs() {
                return 100L;
            }

            public boolean isProcessingOff() {
                return false;
            }

            public int geMaxDispatchThreads() {
                return 0;
            }

            public int getEventQueueCapacity() {
                return qCapacity;
            }

            public String getTableName() {
                return "bus_events";
            }

            public String getHistoryTableName() {
                return "bus_events_history";
            }
        };
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public class WriterRunnable
    implements Runnable {
        private final int writerId;
        private final int nbToWrite;
        private final DBBackedQueue<BusEventModelDao> queue;
        private final Random r;

        public WriterRunnable(int writerId, int nbToWrite, DBBackedQueue<BusEventModelDao> queue) {
            this.writerId = writerId;
            this.nbToWrite = nbToWrite;
            this.queue = queue;
            this.r = new Random(writerId);
        }

        @Override
        public void run() {
            int remaining = this.nbToWrite;
            do {
                long search1 = this.nbToWrite * this.writerId + (remaining - 1);
                BusEventModelDao entry = TestDBBackedQueue.this.createEntry(new Long(search1));
                this.queue.insertEntry((EventEntryModelDao)entry);
                this.maybeSleep();
            } while (--remaining > 0);
        }

        private void maybeSleep() {
            while (!this.queue.isQueueOpenForWrite()) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public class ReaderRunnable
    implements Runnable {
        private final DBBackedQueue<BusEventModelDao> queue;
        private final AtomicLong consumed;
        private final int maxEntries;
        private final List<Long> search1;

        public ReaderRunnable(int readerId, AtomicLong consumed, int maxEntries, DBBackedQueue<BusEventModelDao> queue) {
            this.queue = queue;
            this.consumed = consumed;
            this.maxEntries = maxEntries;
            this.search1 = new ArrayList<Long>();
        }

        @Override
        public void run() {
            do {
                List entries;
                if ((entries = this.queue.getReadyEntries()).size() == 0) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException e) {}
                    continue;
                }
                for (BusEventModelDao cur : entries) {
                    this.search1.add(cur.getSearchKey1());
                    BusEventModelDao history = new BusEventModelDao(cur, CreatorName.get(), TestDBBackedQueue.this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                    this.queue.moveEntryToHistory((EventEntryModelDao)history);
                }
                this.consumed.getAndAdd(entries.size());
            } while (this.consumed.get() < (long)this.maxEntries);
        }

        public List<Long> getSearch1() {
            return this.search1;
        }
    }
}

