/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;

public class DBBackedQueueWithPolling<T extends EventEntryModelDao>
extends DBBackedQueue<T> {
    public DBBackedQueueWithPolling(Clock clock, IDBI dbi, Class<? extends QueueSqlDao<T>> sqlDaoClass, PersistentQueueConfig config, String dbBackedQId, MetricRegistry metricRegistry) {
        super(clock, dbi, sqlDaoClass, config, dbBackedQId, metricRegistry);
    }

    @Override
    public void initialize() {
        log.info("{} Initialized  mode={}", (Object)this.DB_QUEUE_LOG_ID, (Object)this.config.getPersistentQueueMode());
    }

    @Override
    public void close() {
    }

    @Override
    public void insertEntryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        this.safeInsertEntry(transactional, entry);
    }

    @Override
    public DBBackedQueue.ReadyEntriesWithMetrics<T> getReadyEntries() {
        long ini = System.nanoTime();
        List claimedEntries = (List)this.executeTransaction(new Transaction<List<T>, QueueSqlDao<T>>(){

            public List<T> inTransaction(QueueSqlDao<T> queueSqlDao, TransactionStatus status) throws Exception {
                DateTime now = DBBackedQueueWithPolling.this.clock.getUTCNow();
                List entriesToClaim = DBBackedQueueWithPolling.this.fetchReadyEntries(now, DBBackedQueueWithPolling.this.config.getMaxEntriesClaimed(), queueSqlDao);
                Object claimedEntries = ImmutableList.of();
                if (!entriesToClaim.isEmpty()) {
                    DBBackedQueue.log.debug("{} Entries to claim: {}", (Object)DBBackedQueueWithPolling.this.DB_QUEUE_LOG_ID, (Object)entriesToClaim);
                    claimedEntries = DBBackedQueueWithPolling.this.claimEntries(now, entriesToClaim, queueSqlDao);
                }
                return claimedEntries;
            }
        });
        return new DBBackedQueue.ReadyEntriesWithMetrics(claimedEntries, System.nanoTime() - ini);
    }

    @Override
    public void updateOnError(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                transactional.updateOnError(this.val$entry.getRecordId(), DBBackedQueueWithPolling.this.clock.getUTCNow().toDate(), this.val$entry.getErrorCount(), DBBackedQueueWithPolling.this.config.getTableName());
                return null;
            }
        });
    }

    @Override
    protected void insertReapedEntriesFromTransaction(QueueSqlDao<T> transactional, List<T> entriesLeftBehind, DateTime now) {
        if (this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING) {
            for (EventEntryModelDao entry : entriesLeftBehind) {
                entry.setCreatedDate(now);
                entry.setProcessingState(PersistentQueueEntryLifecycleState.AVAILABLE);
                entry.setCreatingOwner(CreatorName.get());
                entry.setProcessingOwner(null);
            }
            transactional.insertEntries(entriesLeftBehind, this.config.getTableName());
        }
    }

    private List<T> fetchReadyEntries(DateTime now, int maxEntries, QueueSqlDao<T> queueSqlDao) {
        String owner = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        long ini = System.nanoTime();
        List<T> result = queueSqlDao.getReadyEntries(now.toDate(), maxEntries, owner, this.config.getTableName());
        this.rawGetEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
        return result;
    }

    private List<T> claimEntries(DateTime now, List<T> candidates, QueueSqlDao<T> queueSqlDao) {
        switch (this.config.getPersistentQueueMode()) {
            case POLLING: {
                return this.sequentialClaimEntries(now, candidates, queueSqlDao);
            }
            case STICKY_POLLING: {
                return this.batchClaimEntries(now, candidates, queueSqlDao);
            }
        }
        throw new IllegalStateException("Unsupported PersistentQueueMode " + (Object)((Object)this.config.getPersistentQueueMode()));
    }

    private List<T> batchClaimEntries(DateTime utcNow, List<T> candidates, QueueSqlDao<T> queueSqlDao) {
        if (candidates.isEmpty()) {
            return ImmutableList.of();
        }
        Date now = utcNow.toDate();
        Date nextAvailable = utcNow.plus(this.config.getClaimedTime().getMillis()).toDate();
        final String owner = CreatorName.get();
        Collection recordIds = Collections2.transform(candidates, (Function)new Function<T, Long>(){

            public Long apply(T input) {
                return input == null ? Long.valueOf(-1L) : input.getRecordId();
            }
        });
        long ini = System.nanoTime();
        int resultCount = queueSqlDao.claimEntries(recordIds, owner, nextAvailable, this.config.getTableName());
        this.rawClaimEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
        if (resultCount == candidates.size()) {
            log.debug("{} batchClaimEntries claimed (recordIds={}, now={}, nextAvailable={}, owner={}): {}", new Object[]{this.DB_QUEUE_LOG_ID, recordIds, now, nextAvailable, owner, candidates});
            return candidates;
        }
        List<T> maybeClaimedEntries = queueSqlDao.getEntriesFromIds((List<Long>)ImmutableList.copyOf((Collection)recordIds), this.config.getTableName());
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < maybeClaimedEntries.size(); ++i) {
            EventEntryModelDao eventEntryModelDao = (EventEntryModelDao)maybeClaimedEntries.get(i);
            if (i > 0) {
                stringBuilder.append(",");
            }
            stringBuilder.append("[recordId=").append(eventEntryModelDao.getRecordId()).append(",processingState=").append((Object)eventEntryModelDao.getProcessingState()).append(",processingOwner=").append(eventEntryModelDao.getProcessingOwner()).append(",processingAvailableDate=").append(eventEntryModelDao.getNextAvailableDate()).append("]");
        }
        log.warn("{} batchClaimEntries only claimed partial entries {}/{} (now={}, nextAvailable={}, owner={}): {}", new Object[]{this.DB_QUEUE_LOG_ID, resultCount, candidates.size(), now, nextAvailable, owner, stringBuilder.toString()});
        Iterable claimed = Iterables.filter(maybeClaimedEntries, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return input != null && input.getProcessingState() == PersistentQueueEntryLifecycleState.IN_PROCESSING && owner.equals(input.getProcessingOwner());
            }
        });
        return ImmutableList.copyOf((Iterable)claimed);
    }

    private List<T> sequentialClaimEntries(final DateTime now, List<T> candidates, final QueueSqlDao<T> queueSqlDao) {
        return ImmutableList.copyOf((Collection)Collections2.filter(candidates, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return DBBackedQueueWithPolling.this.claimEntry(now, input, queueSqlDao);
            }
        }));
    }

    private boolean claimEntry(DateTime now, T entry, QueueSqlDao<T> queueSqlDao) {
        boolean claimed;
        Date nextAvailable = now.plus(this.config.getClaimedTime().getMillis()).toDate();
        long ini = System.nanoTime();
        int claimEntry = queueSqlDao.claimEntry(entry.getRecordId(), CreatorName.get(), nextAvailable, this.config.getTableName());
        this.rawClaimEntryTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
        boolean bl = claimed = claimEntry == 1;
        if (claimed) {
            log.debug("{} Claimed entry {}", (Object)this.DB_QUEUE_LOG_ID, entry);
        }
        return claimed;
    }
}

