/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEvent;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEventType;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingFeature;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DBBackedQueue<T extends EventEntryModelDao> {
    private static final Logger log = LoggerFactory.getLogger(DBBackedQueue.class);
    private static final int RATIO_INFLIGHT_SIZE_TO_REOPEN_Q_FOR_WRITE = 10;
    private static final int MAX_FETCHED_ENTRIES = 100;
    private static final long INFLIGHT_POLLING_TIMEOUT_MSEC = 100L;
    private static final long POLLING_ORPHANS_MSEC = 300000L;
    private final String DB_QUEUE_LOG_ID;
    private final IDBI dbi;
    private final Class<? extends QueueSqlDao<T>> sqlDaoClass;
    private final QueueSqlDao<T> sqlDao;
    private final Clock clock;
    private final PersistentQueueConfig config;
    private final boolean useInflightQueue;
    private final LinkedBlockingQueue<Long> inflightEvents;
    private final int thresholdToReopenQForWrite;
    private final Counter totalInflightInsert;
    private final Counter totalInflightFetched;
    private final Counter totalInsert;
    private final Counter totalFetched;
    private final Counter totalClaimed;
    private final Counter totalProcessedFirstFailures;
    private final Counter totalProcessedSuccess;
    private final Counter totalProcessedAborted;
    private final Profiling<Long, RuntimeException> prof;
    private volatile boolean isQueueOpenForWrite;
    private volatile boolean isQueueOpenForRead;
    private long lastPollingOrphanTime;
    private long lowestOrphanEntry;
    private static final AtomicInteger QUEUE_ID_CNT = new AtomicInteger(0);
    private final int queueId = QUEUE_ID_CNT.incrementAndGet();
    private final TransientInflightQRowIdCache transientInflightQRowIdCache;

    public DBBackedQueue(Clock clock, IDBI dbi, Class<? extends QueueSqlDao<T>> sqlDaoClass, PersistentQueueConfig config, String dbBackedQId, MetricRegistry metricRegistry, @Nullable DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        this.useInflightQueue = config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS;
        this.dbi = dbi;
        this.sqlDaoClass = sqlDaoClass;
        this.sqlDao = (QueueSqlDao)dbi.onDemand(sqlDaoClass);
        this.config = config;
        this.inflightEvents = this.useInflightQueue ? new LinkedBlockingQueue(config.getEventQueueCapacity()) : null;
        this.isQueueOpenForWrite = false;
        this.isQueueOpenForRead = false;
        this.clock = clock;
        this.prof = new Profiling();
        if (this.useInflightQueue && databaseTransactionNotificationApi != null) {
            databaseTransactionNotificationApi.registerForNotification((Object)this);
        }
        this.totalInflightInsert = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalInflightInsert"}));
        this.totalInflightFetched = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalInflightFetched"}));
        this.totalInsert = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalInsert"}));
        this.totalFetched = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalFetched"}));
        this.totalClaimed = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalClaimed"}));
        this.totalProcessedSuccess = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalProcessedSuccess"}));
        this.totalProcessedFirstFailures = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalProcessedFirstFailures"}));
        this.totalProcessedAborted = metricRegistry.counter(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "totalProcessedAborted"}));
        metricRegistry.register(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "inflightQ", "size"}), (Metric)new Gauge<Integer>(){

            public Integer getValue() {
                return DBBackedQueue.this.useInflightQueue ? DBBackedQueue.this.inflightEvents.size() : 0;
            }
        });
        metricRegistry.register(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "inflightQ", "isOpenForRead"}), (Metric)new Gauge<Boolean>(){

            public Boolean getValue() {
                return DBBackedQueue.this.isQueueOpenForRead;
            }
        });
        metricRegistry.register(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "inflightQ", "isOpenForWrite"}), (Metric)new Gauge<Boolean>(){

            public Boolean getValue() {
                return DBBackedQueue.this.isQueueOpenForWrite;
            }
        });
        metricRegistry.register(MetricRegistry.name(DBBackedQueue.class, (String[])new String[]{dbBackedQId, "inflightQ", "lowestOrphanEntry"}), (Metric)new Gauge<Long>(){

            public Long getValue() {
                return DBBackedQueue.this.lowestOrphanEntry;
            }
        });
        this.thresholdToReopenQForWrite = config.getEventQueueCapacity() / 10;
        this.lastPollingOrphanTime = clock.getUTCNow().getMillis();
        this.lowestOrphanEntry = -1L;
        this.transientInflightQRowIdCache = this.useInflightQueue ? new TransientInflightQRowIdCache(this.queueId) : null;
        this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + dbBackedQId;
    }

    public void initialize() {
        if (this.useInflightQueue) {
            this.inflightEvents.clear();
            List<T> entries = this.fetchReadyEntries(this.thresholdToReopenQForWrite);
            if (entries.isEmpty()) {
                this.isQueueOpenForRead = true;
                this.isQueueOpenForWrite = true;
            } else {
                this.isQueueOpenForRead = false;
                this.isQueueOpenForWrite = entries.size() < this.thresholdToReopenQForWrite;
            }
        } else {
            this.isQueueOpenForRead = false;
            this.isQueueOpenForWrite = false;
        }
        this.totalInflightFetched.dec(this.totalInflightFetched.getCount());
        this.totalFetched.dec(this.totalFetched.getCount());
        this.totalInflightInsert.dec(this.totalInflightInsert.getCount());
        this.totalInsert.dec(this.totalInsert.getCount());
        this.totalClaimed.dec(this.totalClaimed.getCount());
        this.totalProcessedSuccess.dec(this.totalProcessedSuccess.getCount());
        this.totalProcessedFirstFailures.dec(this.totalProcessedFirstFailures.getCount());
        this.totalProcessedAborted.dec(this.totalProcessedAborted.getCount());
        log.info("{} Initialized with queueId={}, mode={}, isQueueOpenForWrite={}, isQueueOpenForRead={}", new Object[]{this.DB_QUEUE_LOG_ID, this.queueId, this.config.getPersistentQueueMode(), this.isQueueOpenForWrite, this.isQueueOpenForRead});
    }

    public void insertEntry(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) {
                DBBackedQueue.this.insertEntryFromTransaction(transactional, this.val$entry);
                return null;
            }
        });
    }

    public void insertEntryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        Long lastInsertId = this.safeInsertEntry(transactional, entry);
        if (lastInsertId == 0L) {
            log.warn("{} Failed to insert entry, lastInsertedId={}", (Object)this.DB_QUEUE_LOG_ID, (Object)lastInsertId);
            return;
        }
        if (this.useInflightQueue && this.isQueueOpenForWrite) {
            this.transientInflightQRowIdCache.addRowId(lastInsertId);
        }
        this.totalInsert.inc();
    }

    public List<T> getReadyEntries() {
        if (this.useInflightQueue) {
            return this.getReadyEntriesUsingInflightQueue();
        }
        return this.getReadyEntriesUsingPollingMode();
    }

    private List<T> getReadyEntriesUsingPollingMode() {
        List<T> entriesToClaim = this.fetchReadyEntries(this.config.getMaxEntriesClaimed());
        this.totalFetched.inc((long)entriesToClaim.size());
        if (!entriesToClaim.isEmpty()) {
            log.debug("{} Entries to claim: {}", (Object)this.DB_QUEUE_LOG_ID, entriesToClaim);
            return this.claimEntries(entriesToClaim);
        }
        return ImmutableList.of();
    }

    private List<T> getReadyEntriesUsingInflightQueue() {
        List<T> candidates;
        if (this.isQueueOpenForRead) {
            this.checkForOrphanEntries();
            candidates = this.fetchReadyEntriesFromIds();
            if (!candidates.isEmpty()) {
                this.totalInflightFetched.inc((long)candidates.size());
                this.totalFetched.inc((long)candidates.size());
                return candidates;
            }
            if (!this.isQueueOpenForWrite) {
                this.isQueueOpenForRead = false;
                log.info("{} Closing Q for read", (Object)this.DB_QUEUE_LOG_ID);
            }
        }
        if (!this.isQueueOpenForRead) {
            candidates = this.fetchReadyEntries(this.config.getMaxEntriesClaimed());
            if (!(this.isQueueOpenForWrite || candidates.size() >= this.config.getMaxEntriesClaimed() && this.getNbReadyEntries() >= (long)this.thresholdToReopenQForWrite)) {
                this.isQueueOpenForWrite = true;
                log.info("{} Opening Q for write", (Object)this.DB_QUEUE_LOG_ID);
            }
            if (this.removeInflightEventsWhenSwitchingToQueueOpenForRead(candidates)) {
                this.isQueueOpenForRead = true;
                log.info("{} Opening Q for read", (Object)this.DB_QUEUE_LOG_ID);
            }
            this.totalFetched.inc((long)candidates.size());
            return this.claimEntries(candidates);
        }
        return ImmutableList.of();
    }

    private void checkForOrphanEntries() {
        if (this.clock.getUTCNow().getMillis() > this.lastPollingOrphanTime + 300000L) {
            List<T> entriesToClaim = this.fetchReadyEntries(1);
            Long previousLowestOrphanEntry = this.lowestOrphanEntry;
            long l = this.lowestOrphanEntry = entriesToClaim.isEmpty() ? -1L : ((EventEntryModelDao)entriesToClaim.get(0)).getRecordId();
            if (previousLowestOrphanEntry > 0L && previousLowestOrphanEntry == this.lowestOrphanEntry) {
                log.warn("{} Detected unprocessed bus event {}", (Object)this.DB_QUEUE_LOG_ID, (Object)previousLowestOrphanEntry);
            }
            this.lastPollingOrphanTime = this.clock.getUTCNow().getMillis();
        }
    }

    private boolean removeInflightEventsWhenSwitchingToQueueOpenForRead(List<T> candidates) {
        if (candidates.isEmpty()) {
            return true;
        }
        boolean foundAllEntriesInInflightEvents = true;
        ArrayList<Long> entries = new ArrayList<Long>(candidates.size());
        for (EventEntryModelDao entry : candidates) {
            entries.add(entry.getRecordId());
            boolean found = this.inflightEvents.remove(entry.getRecordId());
            if (found) continue;
            foundAllEntriesInInflightEvents = false;
        }
        return foundAllEntriesInInflightEvents;
    }

    public void updateOnError(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                transactional.updateOnError(this.val$entry.getRecordId(), DBBackedQueue.this.clock.getUTCNow().toDate(), this.val$entry.getErrorCount(), DBBackedQueue.this.config.getTableName());
                if (this.val$entry.getErrorCount() == 1L) {
                    DBBackedQueue.this.totalProcessedFirstFailures.inc();
                }
                if (DBBackedQueue.this.useInflightQueue) {
                    DBBackedQueue.this.transientInflightQRowIdCache.addRowId(this.val$entry.getRecordId());
                }
                return null;
            }
        });
    }

    public void moveEntryToHistory(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                DBBackedQueue.this.moveEntryToHistoryFromTransaction(transactional, this.val$entry);
                return null;
            }
        });
    }

    public void moveEntryToHistoryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        try {
            switch (entry.getProcessingState()) {
                case FAILED: {
                    this.totalProcessedAborted.inc();
                    break;
                }
                case PROCESSED: {
                    this.totalProcessedSuccess.inc();
                    break;
                }
                case REMOVED: {
                    break;
                }
                default: {
                    log.warn("{} Unexpected terminal event state={} for record_id={}", new Object[]{this.DB_QUEUE_LOG_ID, entry.getProcessingState(), entry.getRecordId()});
                }
            }
            log.debug("{} Moving entry into history: recordId={}, className={}, json={}", new Object[]{this.DB_QUEUE_LOG_ID, entry.getRecordId(), entry.getClassName(), entry.getEventJson()});
            transactional.insertEntry(entry, this.config.getHistoryTableName());
            transactional.removeEntry(entry.getRecordId(), this.config.getTableName());
        }
        catch (Exception e) {
            log.warn("{} Failed to move entry into history: {}", new Object[]{this.DB_QUEUE_LOG_ID, entry, e});
        }
    }

    public void moveEntriesToHistory(final Iterable<T> entries) {
        try {
            this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>(){

                public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(transactional, entries);
                    return null;
                }
            });
        }
        catch (Exception e) {
            log.warn("{} Failed to move entries into history: {}", new Object[]{this.DB_QUEUE_LOG_ID, entries, e});
        }
    }

    public void moveEntriesToHistoryFromTransaction(QueueSqlDao<T> transactional, Iterable<T> entries) {
        if (!entries.iterator().hasNext()) {
            return;
        }
        for (EventEntryModelDao cur : entries) {
            switch (cur.getProcessingState()) {
                case FAILED: {
                    this.totalProcessedAborted.inc();
                    break;
                }
                case PROCESSED: {
                    this.totalProcessedSuccess.inc();
                    break;
                }
                case REMOVED: {
                    break;
                }
                default: {
                    log.warn("{} Unexpected terminal event state={} for record_id={}", new Object[]{this.DB_QUEUE_LOG_ID, cur.getProcessingState(), cur.getRecordId()});
                }
            }
            log.debug("{} Moving entry into history: recordId={}, className={}, json={}", new Object[]{this.DB_QUEUE_LOG_ID, cur.getRecordId(), cur.getClassName(), cur.getEventJson()});
        }
        Iterable toBeRemovedRecordIds = Iterables.transform(entries, (Function)new Function<T, Long>(){

            public Long apply(T input) {
                return input.getRecordId();
            }
        });
        transactional.insertEntries(entries, this.config.getHistoryTableName());
        transactional.removeEntries((Collection<Long>)ImmutableList.copyOf((Iterable)toBeRemovedRecordIds), this.config.getTableName());
    }

    private List<T> fetchReadyEntriesFromIds() {
        final ArrayList<Long> recordIds = new ArrayList<Long>(100);
        this.inflightEvents.drainTo(recordIds, 100);
        if (recordIds.isEmpty()) {
            try {
                Long entryId = this.inflightEvents.poll(100L, TimeUnit.MILLISECONDS);
                if (entryId != null) {
                    recordIds.add(entryId);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("{} Got interrupted", (Object)this.DB_QUEUE_LOG_ID);
                return ImmutableList.of();
            }
        }
        if (!recordIds.isEmpty()) {
            log.debug("{} fetchReadyEntriesFromIds: {}", (Object)this.DB_QUEUE_LOG_ID, recordIds);
            return (List)this.executeQuery(new Query<List<T>, QueueSqlDao<T>>(){

                @Override
                public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                    return queueSqlDao.getEntriesFromIds(recordIds, DBBackedQueue.this.config.getTableName());
                }
            });
        }
        return ImmutableList.of();
    }

    private List<T> fetchReadyEntries(final int size) {
        final Date now = this.clock.getUTCNow().toDate();
        final String owner = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        return (List)this.executeQuery(new Query<List<T>, QueueSqlDao<T>>(){

            @Override
            public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.getReadyEntries(now, size, owner, DBBackedQueue.this.config.getTableName());
            }
        });
    }

    private long getNbReadyEntries() {
        Date now = this.clock.getUTCNow().toDate();
        return this.getNbReadyEntries(now);
    }

    public long getNbReadyEntries(final Date now) {
        final String owner = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        return (Long)this.executeQuery(new Query<Long, QueueSqlDao<T>>(){

            @Override
            public Long execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.getNbReadyEntries(now, owner, DBBackedQueue.this.config.getTableName());
            }
        });
    }

    private List<T> claimEntries(List<T> candidates) {
        switch (this.config.getPersistentQueueMode()) {
            case POLLING: {
                return this.sequentialClaimEntries(candidates);
            }
            case STICKY_POLLING: 
            case STICKY_EVENTS: {
                return this.batchClaimEntries(candidates);
            }
        }
        throw new IllegalStateException("Unsupported PersistentQueueMode " + (Object)((Object)this.config.getPersistentQueueMode()));
    }

    private List<T> batchClaimEntries(List<T> candidates) {
        if (candidates.isEmpty()) {
            return ImmutableList.of();
        }
        final Date nextAvailable = this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate();
        final Collection recordIds = Collections2.transform(candidates, (Function)new Function<T, Long>(){

            public Long apply(T input) {
                return input.getRecordId();
            }
        });
        int resultCount = (Integer)this.executeQuery(new Query<Integer, QueueSqlDao<T>>(){

            @Override
            public Integer execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.claimEntries(recordIds, DBBackedQueue.this.clock.getUTCNow().toDate(), CreatorName.get(), nextAvailable, DBBackedQueue.this.config.getTableName());
            }
        });
        if (resultCount == candidates.size()) {
            this.totalClaimed.inc((long)resultCount);
            log.debug("{} batchClaimEntries claimed: {}", (Object)this.DB_QUEUE_LOG_ID, candidates);
            return candidates;
        }
        if (resultCount == 0) {
            log.warn("{} batchClaimEntries see 0 entries", (Object)this.DB_QUEUE_LOG_ID);
            return ImmutableList.of();
        }
        List maybeClaimedEntries = (List)this.executeQuery(new Query<List<T>, QueueSqlDao<T>>(){

            @Override
            public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.getEntriesFromIds((List<Long>)ImmutableList.copyOf((Collection)recordIds), DBBackedQueue.this.config.getTableName());
            }
        });
        Iterable claimed = Iterables.filter((Iterable)maybeClaimedEntries, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return input.getProcessingState() == PersistentQueueEntryLifecycleState.IN_PROCESSING && input.getProcessingOwner().equals(CreatorName.get());
            }
        });
        ImmutableList result = ImmutableList.copyOf((Iterable)claimed);
        log.warn("{} batchClaimEntries only claimed partial entries {}/{}", new Object[]{this.DB_QUEUE_LOG_ID, result.size(), candidates.size()});
        this.totalClaimed.inc((long)result.size());
        return result;
    }

    private List<T> sequentialClaimEntries(List<T> candidates) {
        return ImmutableList.copyOf((Collection)Collections2.filter(candidates, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return DBBackedQueue.this.claimEntry(input);
            }
        }));
    }

    private boolean claimEntry(T entry) {
        boolean claimed;
        Date nextAvailable = this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate();
        int claimEntry = (Integer)this.executeQuery(new Query<Integer, QueueSqlDao<T>>((EventEntryModelDao)entry, nextAvailable){
            final /* synthetic */ EventEntryModelDao val$entry;
            final /* synthetic */ Date val$nextAvailable;
            {
                this.val$entry = eventEntryModelDao;
                this.val$nextAvailable = date;
            }

            @Override
            public Integer execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.claimEntry(this.val$entry.getRecordId(), DBBackedQueue.this.clock.getUTCNow().toDate(), CreatorName.get(), this.val$nextAvailable, DBBackedQueue.this.config.getTableName());
            }
        });
        boolean bl = claimed = claimEntry == 1;
        if (claimed) {
            this.totalClaimed.inc();
            log.debug("{} Claimed entry {}", (Object)this.DB_QUEUE_LOG_ID, entry);
        }
        return claimed;
    }

    public QueueSqlDao<T> getSqlDao() {
        return this.sqlDao;
    }

    public boolean isQueueOpenForWrite() {
        return this.isQueueOpenForWrite;
    }

    public boolean isQueueOpenForRead() {
        return this.isQueueOpenForRead;
    }

    public long getTotalInflightFetched() {
        return this.totalInflightFetched.getCount();
    }

    public long getTotalFetched() {
        return this.totalFetched.getCount();
    }

    public long getTotalInflightInsert() {
        return this.totalInflightInsert.getCount();
    }

    public long getTotalInsert() {
        return this.totalInsert.getCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @AllowConcurrentEvents
    @Subscribe
    public void handleDatabaseTransactionEvent(DatabaseTransactionEvent event) {
        if (this.transientInflightQRowIdCache == null || !this.transientInflightQRowIdCache.isValid()) {
            return;
        }
        if (event.getType() == DatabaseTransactionEventType.ROLLBACK) {
            this.transientInflightQRowIdCache.reset();
            return;
        }
        try {
            Iterator<Long> entries = this.transientInflightQRowIdCache.iterator();
            while (entries.hasNext()) {
                Long entry = entries.next();
                boolean result = this.inflightEvents.offer(entry);
                if (result) {
                    log.debug("{} Inserting entry {} into inflightQ", (Object)this.DB_QUEUE_LOG_ID, (Object)entry);
                    this.totalInflightInsert.inc();
                    continue;
                }
                if (!this.isQueueOpenForWrite) continue;
                this.isQueueOpenForWrite = false;
                log.warn("{} Closing Q for write: Overflowed with recordId={}", (Object)this.DB_QUEUE_LOG_ID, (Object)entry);
            }
        }
        finally {
            this.transientInflightQRowIdCache.reset();
        }
    }

    private Long safeInsertEntry(final QueueSqlDao<T> transactional, final T entry) {
        return (Long)this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "QueueSqlDao:insert", (Profiling.WithProfilingCallback)new Profiling.WithProfilingCallback<Long, RuntimeException>(){

            public Long execute() throws RuntimeException {
                transactional.resetLastInsertId();
                transactional.insertEntry(entry, DBBackedQueue.this.config.getTableName());
                Long lastInsertId = transactional.getLastInsertId();
                log.debug("{} Inserting entry: lastInsertId={}, entry={}", new Object[]{DBBackedQueue.this.DB_QUEUE_LOG_ID, lastInsertId, entry});
                return lastInsertId;
            }
        });
    }

    public void reapEntries(final Date reapingDate) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>(){

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                DateTime now = DBBackedQueue.this.clock.getUTCNow();
                List entriesLeftBehind = transactional.getEntriesLeftBehind(DBBackedQueue.this.config.getMaxReDispatchCount(), now.toDate(), reapingDate, DBBackedQueue.this.config.getTableName());
                if (entriesLeftBehind.size() > 0) {
                    Iterable entriesToMove = Iterables.transform(entriesLeftBehind, (Function)new Function<T, T>(){

                        @Nullable
                        public T apply(@Nullable T entry) {
                            entry.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
                            return entry;
                        }
                    });
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(transactional, entriesToMove);
                    DBBackedQueue.this.insertReapedEntriesFromTransaction(transactional, entriesLeftBehind, now);
                    log.warn("{} {} entries were reaped by {}", new Object[]{DBBackedQueue.this.DB_QUEUE_LOG_ID, entriesLeftBehind.size(), CreatorName.get()});
                }
                return null;
            }
        });
    }

    private void insertReapedEntriesFromTransaction(QueueSqlDao<T> transactional, List<T> entriesLeftBehind, DateTime now) {
        for (EventEntryModelDao entry : entriesLeftBehind) {
            entry.setCreatedDate(now);
            entry.setProcessingState(PersistentQueueEntryLifecycleState.AVAILABLE);
            entry.setCreatingOwner(CreatorName.get());
            if (this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS) {
                this.insertEntryFromTransaction(transactional, entry);
                continue;
            }
            this.totalInsert.inc();
        }
        if (this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING) {
            transactional.insertEntries(entriesLeftBehind, this.config.getTableName());
        }
    }

    private <U> U executeQuery(final Query<U, QueueSqlDao<T>> query) {
        return (U)this.dbi.withHandle(new HandleCallback<U>(){

            public U withHandle(Handle handle) throws Exception {
                Object result = query.execute(handle.attach(DBBackedQueue.this.sqlDaoClass));
                DBBackedQueue.this.printSQLWarnings(handle);
                return result;
            }
        });
    }

    private <U> U executeTransaction(final Transaction<U, QueueSqlDao<T>> transaction) {
        return (U)this.dbi.inTransaction(new TransactionCallback<U>(){

            public U inTransaction(Handle handle, TransactionStatus status) throws Exception {
                Object result = transaction.inTransaction(handle.attach(DBBackedQueue.this.sqlDaoClass), status);
                DBBackedQueue.this.printSQLWarnings(handle);
                return result;
            }
        });
    }

    private void printSQLWarnings(Handle handle) {
        if (log.isDebugEnabled()) {
            try {
                for (SQLWarning warning = handle.getConnection().getWarnings(); warning != null; warning = warning.getNextWarning()) {
                    log.debug("[SQL WARNING] {}", (Throwable)warning);
                }
                handle.getConnection().clearWarnings();
            }
            catch (SQLException e) {
                log.debug("Error whilst retrieving SQL warnings", (Throwable)e);
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static interface Query<U, QueueSqlDao> {
        public U execute(QueueSqlDao var1);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class TransientInflightQRowIdCache {
        private final ThreadLocal<RowRef> rowRefThreadLocal = new ThreadLocal();
        private final int queueId;

        private TransientInflightQRowIdCache(int queueId) {
            this.queueId = queueId;
        }

        public boolean isValid() {
            RowRef entry = this.rowRefThreadLocal.get();
            return entry != null && entry.queueId == this.queueId;
        }

        public void addRowId(Long rowId) {
            RowRef entry = this.rowRefThreadLocal.get();
            if (entry == null) {
                entry = new RowRef(this.queueId);
                this.rowRefThreadLocal.set(entry);
            }
            entry.addRowId(rowId);
        }

        public void reset() {
            this.rowRefThreadLocal.remove();
        }

        public Iterator<Long> iterator() {
            RowRef entry = this.rowRefThreadLocal.get();
            Preconditions.checkNotNull((Object)entry);
            return entry.iterator();
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        private final class RowRef {
            private final int queueId;
            private final List<Long> rowIds;

            public RowRef(int queueId) {
                this.queueId = queueId;
                this.rowIds = new ArrayList<Long>();
            }

            public void addRowId(long rowId) {
                this.rowIds.add(rowId);
            }

            public Iterator<Long> iterator() {
                return this.rowIds.iterator();
            }
        }
    }
}

