/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.killbill.CreatorName;
import org.killbill.TestSetup;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.skife.config.TimeSpan;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestLoadDBBackedQueue
extends TestSetup {
    private static final Logger log = LoggerFactory.getLogger(TestLoadDBBackedQueue.class);
    private static final String OWNER = CreatorName.get();
    private DBBackedQueue<BusEventModelDao> queue;
    private PersistentBusSqlDao sqlDao;

    @BeforeClass(groups={"slow"})
    public void beforeClass() throws Exception {
        super.beforeClass();
        this.sqlDao = (PersistentBusSqlDao)this.getDBI().onDemand(PersistentBusSqlDao.class);
    }

    @BeforeMethod(groups={"slow"})
    public void beforeMethod() throws Exception {
        super.beforeMethod();
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 100, null, "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
    }

    @Test(groups={"load"})
    public void testPollingLoad() {
        int NB_EVENTS = 1000;
        int CLAIMED_EVENTS = 10;
        PersistentBusConfig config = this.createConfig(10, -1, PersistentQueueConfig.PersistentQueueMode.POLLING);
        this.queue = new DBBackedQueue((Clock)this.clock, (IDBI)this.dbi, PersistentBusSqlDao.class, (PersistentQueueConfig)config, "perf-bus_event", this.metricRegistry, null);
        this.queue.initialize();
        for (int i = 0; i < 1000; ++i) {
            BusEventModelDao input = this.createEntry(new Long(i));
            this.queue.insertEntry((EventEntryModelDao)input);
        }
        log.error("Starting load test");
        long ini = System.nanoTime();
        long cumlGetReadyEntries = 0L;
        long cumlMoveEntriesToHistory = 0L;
        for (int i = 0; i < 100; ++i) {
            long t1 = System.nanoTime();
            List ready = this.queue.getReadyEntries();
            Assert.assertEquals((int)ready.size(), (int)10);
            long t2 = System.nanoTime();
            cumlGetReadyEntries += t2 - t1;
            Iterable processed = Iterables.transform((Iterable)ready, (Function)new Function<BusEventModelDao, BusEventModelDao>(){

                public BusEventModelDao apply(@Nullable BusEventModelDao input) {
                    return new BusEventModelDao(input, CreatorName.get(), TestLoadDBBackedQueue.this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                }
            });
            long t3 = System.nanoTime();
            this.queue.moveEntriesToHistory(processed);
            long t4 = System.nanoTime();
            cumlMoveEntriesToHistory += t4 - t3;
        }
        long fini = System.nanoTime();
        log.error("Load test took " + (fini - ini) / 1000000L + " ms, getReadyEntry = " + cumlGetReadyEntries / 1000000L + " ms, moveEntriesToHistory = " + cumlMoveEntriesToHistory / 1000000L);
    }

    @Test(groups={"load"})
    public void testInflightQLoad() throws InterruptedException {
        int i;
        int nbEntries = 10000;
        PersistentBusConfig config = this.createConfig(10, 10000, PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS);
        this.queue = new DBBackedQueue((Clock)this.clock, (IDBI)this.dbi, PersistentBusSqlDao.class, (PersistentQueueConfig)config, "multipleReaderMultipleWriter-bus_event", this.metricRegistry, this.databaseTransactionNotificationApi);
        this.queue.initialize();
        for (int i2 = 0; i2 < 10000; ++i2) {
            BusEventModelDao input = this.createEntry(new Long(i2 + 5));
            this.queue.insertEntry((EventEntryModelDao)input);
        }
        int maxThreads = 3;
        Thread[] readers = new Thread[3];
        AtomicLong consumed = new AtomicLong(0L);
        for (int i3 = 0; i3 < 3; ++i3) {
            readers[i3] = new Thread(new ReaderRunnable(consumed, 10000, this.queue));
        }
        long ini = System.currentTimeMillis();
        for (i = 0; i < 3; ++i) {
            readers[i].start();
        }
        try {
            for (i = 0; i < 3; ++i) {
                readers[i].join();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        long fini = System.currentTimeMillis();
        long elapsed = fini - ini;
        log.info(String.format("Processed %s events in %s msec => rate = %s", 10000, elapsed, 10000.0 / (double)elapsed * 1000.0));
        List ready = this.sqlDao.getReadyEntries(this.clock.getUTCNow().toDate(), 1000, OWNER, "bus_events");
        Assert.assertEquals((int)ready.size(), (int)0);
        log.info("Got inflightProcessed = " + this.queue.getTotalInflightFetched() + "/1000, inflightWritten = " + this.queue.getTotalInflightInsert() + "/1000");
        Assert.assertEquals((long)this.queue.getTotalInsert(), (long)10000L);
        Assert.assertEquals((long)this.queue.getTotalInflightFetched(), (long)10000L);
        Assert.assertEquals((long)this.queue.getTotalInflightInsert(), (long)10000L);
    }

    private BusEventModelDao createEntry(Long searchKey1, String owner) {
        String json = "json";
        return new BusEventModelDao(owner, this.clock.getUTCNow(), String.class.getName(), "json", UUID.randomUUID(), searchKey1, Long.valueOf(1L));
    }

    private BusEventModelDao createEntry(Long searchKey1) {
        return this.createEntry(searchKey1, OWNER);
    }

    private PersistentBusConfig createConfig(final int claimed, final int qCapacity, final PersistentQueueConfig.PersistentQueueMode mode) {
        return new PersistentBusConfig(){

            public boolean isInMemory() {
                return false;
            }

            public int getMaxFailureRetries() {
                return 0;
            }

            public int getMaxEntriesClaimed() {
                return claimed;
            }

            public PersistentQueueConfig.PersistentQueueMode getPersistentQueueMode() {
                return mode;
            }

            public TimeSpan getClaimedTime() {
                return new TimeSpan("5m");
            }

            public long getPollingSleepTimeMs() {
                return 100L;
            }

            public boolean isProcessingOff() {
                return false;
            }

            public int geMaxDispatchThreads() {
                return 0;
            }

            public int getEventQueueCapacity() {
                return qCapacity;
            }

            public String getTableName() {
                return "bus_events";
            }

            public String getHistoryTableName() {
                return "bus_events_history";
            }

            public TimeSpan getReapThreshold() {
                return new TimeSpan(5L, TimeUnit.MINUTES);
            }

            public int getMaxReDispatchCount() {
                return 10;
            }
        };
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public class ReaderRunnable
    implements Runnable {
        private final DBBackedQueue<BusEventModelDao> queue;
        private final AtomicLong consumed;
        private final int maxEntries;
        private final List<Long> search1;

        public ReaderRunnable(AtomicLong consumed, int maxEntries, DBBackedQueue<BusEventModelDao> queue) {
            this.queue = queue;
            this.consumed = consumed;
            this.maxEntries = maxEntries;
            this.search1 = new ArrayList<Long>();
        }

        @Override
        public void run() {
            do {
                List entries;
                if ((entries = this.queue.getReadyEntries()).isEmpty()) {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                for (BusEventModelDao cur : entries) {
                    this.search1.add(cur.getSearchKey1());
                    BusEventModelDao history = new BusEventModelDao(cur, OWNER, TestLoadDBBackedQueue.this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED);
                    this.queue.moveEntryToHistory((EventEntryModelDao)history);
                }
                this.consumed.getAndAdd(entries.size());
            } while (this.consumed.get() < (long)this.maxEntries);
        }
    }
}

