/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.clock.Clock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEvent;
import org.killbill.commons.jdbi.notification.DatabaseTransactionEventType;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DBBackedQueueWithInflightQueue<T extends EventEntryModelDao>
extends DBBackedQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(DBBackedQueueWithInflightQueue.class);
    private static final int MAX_FETCHED_RECORDS_ID = 1000;
    private static final long INFLIGHT_POLLING_TIMEOUT_MSEC = 100L;
    private final LinkedBlockingQueue<Long> inflightEvents;
    private final DatabaseTransactionNotificationApi databaseTransactionNotificationApi;
    private static final AtomicInteger QUEUE_ID_CNT = new AtomicInteger(0);
    private final int queueId;
    private final TransientInflightQRowIdCache transientInflightQRowIdCache;

    public DBBackedQueueWithInflightQueue(Clock clock, IDBI dbi, Class<? extends QueueSqlDao<T>> sqlDaoClass, PersistentQueueConfig config, String dbBackedQId, MetricRegistry metricRegistry, DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        super(clock, dbi, sqlDaoClass, config, dbBackedQId, metricRegistry);
        Preconditions.checkArgument((config.getMinInFlightEntries() <= config.getMaxInFlightEntries() ? 1 : 0) != 0);
        this.queueId = QUEUE_ID_CNT.incrementAndGet();
        this.inflightEvents = new LinkedBlockingQueue();
        this.databaseTransactionNotificationApi = databaseTransactionNotificationApi;
        databaseTransactionNotificationApi.registerForNotification((Object)this);
        metricRegistry.register(MetricRegistry.name(DBBackedQueueWithInflightQueue.class, (String[])new String[]{dbBackedQId, "inflightQ", "size"}), (Metric)new Gauge<Integer>(){

            public Integer getValue() {
                return DBBackedQueueWithInflightQueue.this.inflightEvents.size();
            }
        });
        this.transientInflightQRowIdCache = new TransientInflightQRowIdCache(this.queueId);
    }

    @Override
    public void initialize() {
        this.initializeInflightQueue();
        log.info("{} Initialized with queueId={}, mode={}", new Object[]{this.DB_QUEUE_LOG_ID, this.queueId, this.config.getPersistentQueueMode()});
    }

    @Override
    public void close() {
        this.databaseTransactionNotificationApi.unregisterForNotification((Object)this);
    }

    @Override
    public void insertEntryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        Long lastInsertId = this.safeInsertEntry(transactional, entry);
        if (lastInsertId == 0L) {
            log.warn("{} Failed to insert entry, lastInsertedId={}", (Object)this.DB_QUEUE_LOG_ID, (Object)lastInsertId);
            return;
        }
        this.transientInflightQRowIdCache.addRowId(lastInsertId);
    }

    private long pollEntriesFromInflightQ(List<Long> result) {
        long pollSleepTime = 0L;
        this.inflightEvents.drainTo(result, this.config.getMaxInFlightEntries());
        if (result.isEmpty()) {
            try {
                long beforePollTime = System.nanoTime();
                Long entryId = this.inflightEvents.poll(100L, TimeUnit.MILLISECONDS);
                pollSleepTime = System.nanoTime() - beforePollTime;
                if (entryId != null) {
                    result.add(entryId);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("{} Got interrupted", (Object)this.DB_QUEUE_LOG_ID);
                return 0L;
            }
        }
        return pollSleepTime;
    }

    @Override
    public DBBackedQueue.ReadyEntriesWithMetrics<T> getReadyEntries() {
        long ini = System.nanoTime();
        long pollSleepTime = 0L;
        final ArrayList<Long> recordIds = new ArrayList<Long>(this.config.getMaxInFlightEntries());
        while (recordIds.size() < this.config.getMinInFlightEntries() && (pollSleepTime += this.pollEntriesFromInflightQ(recordIds)) < 100L) {
        }
        Object entries = ImmutableList.of();
        if (!recordIds.isEmpty()) {
            log.debug("{} fetchReadyEntriesFromIds: {}", (Object)this.DB_QUEUE_LOG_ID, recordIds);
            entries = (List)this.executeQuery(new DBBackedQueue.Query<List<T>, QueueSqlDao<T>>(){

                @Override
                public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                    long ini = System.nanoTime();
                    List result = queueSqlDao.getEntriesFromIds(recordIds, DBBackedQueueWithInflightQueue.this.config.getTableName());
                    DBBackedQueueWithInflightQueue.this.rawGetEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                    return result;
                }
            });
        }
        return new DBBackedQueue.ReadyEntriesWithMetrics(entries, System.nanoTime() - ini - pollSleepTime);
    }

    @Override
    public void updateOnError(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                transactional.updateOnError(this.val$entry.getRecordId(), DBBackedQueueWithInflightQueue.this.clock.getUTCNow().toDate(), this.val$entry.getErrorCount(), DBBackedQueueWithInflightQueue.this.config.getTableName());
                DBBackedQueueWithInflightQueue.this.transientInflightQRowIdCache.addRowId(this.val$entry.getRecordId());
                return null;
            }
        });
    }

    @Override
    protected void insertReapedEntriesFromTransaction(QueueSqlDao<T> transactional, List<T> entriesLeftBehind, DateTime now) {
        for (EventEntryModelDao entry : entriesLeftBehind) {
            entry.setCreatedDate(now);
            entry.setProcessingState(PersistentQueueEntryLifecycleState.AVAILABLE);
            entry.setCreatingOwner(CreatorName.get());
            entry.setProcessingOwner(null);
            this.insertEntryFromTransaction(transactional, entry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @AllowConcurrentEvents
    @Subscribe
    public void handleDatabaseTransactionEvent(DatabaseTransactionEvent event) {
        if (this.transientInflightQRowIdCache == null || !this.transientInflightQRowIdCache.isValid()) {
            return;
        }
        if (event.getType() == DatabaseTransactionEventType.ROLLBACK) {
            this.transientInflightQRowIdCache.reset();
            return;
        }
        try {
            Iterator<Long> entries = this.transientInflightQRowIdCache.iterator();
            while (entries.hasNext()) {
                Long entry = entries.next();
                boolean result = this.inflightEvents.offer(entry);
                if (result) {
                    log.debug("{} Inserting entry {} into inflightQ", (Object)this.DB_QUEUE_LOG_ID, (Object)entry);
                    continue;
                }
                log.warn("{} Inflight Q overflowed....", (Object)this.DB_QUEUE_LOG_ID, (Object)entry);
            }
        }
        finally {
            this.transientInflightQRowIdCache.reset();
        }
    }

    @VisibleForTesting
    public int getInflightQSize() {
        return this.inflightEvents.size();
    }

    private void initializeInflightQueue() {
        List<Long> existingIds;
        this.inflightEvents.clear();
        int totalEntries = 0;
        long fromRecordId = -1L;
        while (!(existingIds = ((PersistentBusSqlDao)this.sqlDao).getReadyEntryIds(this.clock.getUTCNow().toDate(), fromRecordId, 1000, CreatorName.get(), this.config.getTableName())).isEmpty()) {
            this.inflightEvents.addAll(existingIds);
            totalEntries += existingIds.size();
            if (existingIds.size() < 1000) break;
            fromRecordId = existingIds.get(existingIds.size() - 1) + 1L;
        }
        log.info("{} Inserting {} entries into inflightQ during initialization", (Object)this.DB_QUEUE_LOG_ID, (Object)totalEntries);
    }

    private static class TransientInflightQRowIdCache {
        private final ThreadLocal<RowRef> rowRefThreadLocal = new ThreadLocal();
        private final int queueId;

        private TransientInflightQRowIdCache(int queueId) {
            this.queueId = queueId;
        }

        public boolean isValid() {
            RowRef entry = this.rowRefThreadLocal.get();
            return entry != null && entry.queueId == this.queueId;
        }

        public void addRowId(Long rowId) {
            RowRef entry = this.rowRefThreadLocal.get();
            if (entry == null) {
                entry = new RowRef(this.queueId);
                this.rowRefThreadLocal.set(entry);
            }
            entry.addRowId(rowId);
        }

        public void reset() {
            this.rowRefThreadLocal.remove();
        }

        public Iterator<Long> iterator() {
            RowRef entry = this.rowRefThreadLocal.get();
            Preconditions.checkNotNull((Object)entry);
            return entry.iterator();
        }

        private static final class RowRef {
            private final int queueId;
            private final List<Long> rowIds;

            public RowRef(int queueId) {
                this.queueId = queueId;
                this.rowIds = new ArrayList<Long>();
            }

            public void addRowId(long rowId) {
                this.rowIds.add(rowId);
            }

            public Iterator<Long> iterator() {
                return this.rowIds.iterator();
            }
        }
    }
}

