/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;

public class DBBackedQueueWithPolling<T extends EventEntryModelDao>
extends DBBackedQueue<T> {
    public DBBackedQueueWithPolling(Clock clock, IDBI dbi, Class<? extends QueueSqlDao<T>> sqlDaoClass, PersistentQueueConfig config, String dbBackedQId, MetricRegistry metricRegistry) {
        super(clock, dbi, sqlDaoClass, config, dbBackedQId, metricRegistry);
    }

    @Override
    public void initialize() {
        log.info("{} Initialized  mode={}", (Object)this.DB_QUEUE_LOG_ID, (Object)this.config.getPersistentQueueMode());
    }

    @Override
    public void close() {
    }

    @Override
    public void insertEntryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        this.safeInsertEntry(transactional, entry);
    }

    @Override
    public DBBackedQueue.ReadyEntriesWithMetrics<T> getReadyEntries() {
        long ini = System.nanoTime();
        List<T> entriesToClaim = this.fetchReadyEntries(this.config.getMaxEntriesClaimed());
        Object claimedEntries = ImmutableList.of();
        if (!entriesToClaim.isEmpty()) {
            log.debug("{} Entries to claim: {}", (Object)this.DB_QUEUE_LOG_ID, entriesToClaim);
            claimedEntries = this.claimEntries(entriesToClaim);
        }
        return new DBBackedQueue.ReadyEntriesWithMetrics(claimedEntries, System.nanoTime() - ini);
    }

    @Override
    public void updateOnError(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                transactional.updateOnError(this.val$entry.getRecordId(), DBBackedQueueWithPolling.this.clock.getUTCNow().toDate(), this.val$entry.getErrorCount(), DBBackedQueueWithPolling.this.config.getTableName());
                return null;
            }
        });
    }

    @Override
    protected void insertReapedEntriesFromTransaction(QueueSqlDao<T> transactional, List<T> entriesLeftBehind, DateTime now) {
        if (this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING) {
            for (EventEntryModelDao entry : entriesLeftBehind) {
                entry.setCreatedDate(now);
                entry.setProcessingState(PersistentQueueEntryLifecycleState.AVAILABLE);
                entry.setCreatingOwner(CreatorName.get());
            }
            transactional.insertEntries(entriesLeftBehind, this.config.getTableName());
        }
    }

    private List<T> fetchReadyEntries(final int maxEntries) {
        final Date now = this.clock.getUTCNow().toDate();
        final String owner = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        return (List)this.executeQuery(new DBBackedQueue.Query<List<T>, QueueSqlDao<T>>(){

            @Override
            public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                long ini = System.nanoTime();
                List result = queueSqlDao.getReadyEntries(now, maxEntries, owner, DBBackedQueueWithPolling.this.config.getTableName());
                DBBackedQueueWithPolling.this.rawGetEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                return result;
            }
        });
    }

    private List<T> claimEntries(List<T> candidates) {
        switch (this.config.getPersistentQueueMode()) {
            case POLLING: {
                return this.sequentialClaimEntries(candidates);
            }
            case STICKY_POLLING: {
                return this.batchClaimEntries(candidates);
            }
        }
        throw new IllegalStateException("Unsupported PersistentQueueMode " + (Object)((Object)this.config.getPersistentQueueMode()));
    }

    private List<T> batchClaimEntries(List<T> candidates) {
        if (candidates.isEmpty()) {
            return ImmutableList.of();
        }
        final Date nextAvailable = this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate();
        final Collection recordIds = Collections2.transform(candidates, (Function)new Function<T, Long>(){

            public Long apply(T input) {
                return input.getRecordId();
            }
        });
        int resultCount = (Integer)this.executeQuery(new DBBackedQueue.Query<Integer, QueueSqlDao<T>>(){

            @Override
            public Integer execute(QueueSqlDao<T> queueSqlDao) {
                long ini = System.nanoTime();
                Integer result = queueSqlDao.claimEntries(recordIds, DBBackedQueueWithPolling.this.clock.getUTCNow().toDate(), CreatorName.get(), nextAvailable, DBBackedQueueWithPolling.this.config.getTableName());
                DBBackedQueueWithPolling.this.rawClaimEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                return result;
            }
        });
        if (resultCount == candidates.size()) {
            log.debug("{} batchClaimEntries claimed: {}", (Object)this.DB_QUEUE_LOG_ID, candidates);
            return candidates;
        }
        if (resultCount == 0) {
            log.warn("{} batchClaimEntries see 0 entries", (Object)this.DB_QUEUE_LOG_ID);
            return ImmutableList.of();
        }
        List maybeClaimedEntries = (List)this.executeQuery(new DBBackedQueue.Query<List<T>, QueueSqlDao<T>>(){

            @Override
            public List<T> execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.getEntriesFromIds((List<Long>)ImmutableList.copyOf((Collection)recordIds), DBBackedQueueWithPolling.this.config.getTableName());
            }
        });
        Iterable claimed = Iterables.filter((Iterable)maybeClaimedEntries, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return input.getProcessingState() == PersistentQueueEntryLifecycleState.IN_PROCESSING && input.getProcessingOwner().equals(CreatorName.get());
            }
        });
        ImmutableList result = ImmutableList.copyOf((Iterable)claimed);
        log.warn("{} batchClaimEntries only claimed partial entries {}/{}", new Object[]{this.DB_QUEUE_LOG_ID, result.size(), candidates.size()});
        return result;
    }

    private List<T> sequentialClaimEntries(List<T> candidates) {
        return ImmutableList.copyOf((Collection)Collections2.filter(candidates, (Predicate)new Predicate<T>(){

            public boolean apply(T input) {
                return DBBackedQueueWithPolling.this.claimEntry(input);
            }
        }));
    }

    private boolean claimEntry(T entry) {
        boolean claimed;
        Date nextAvailable = this.clock.getUTCNow().plus(this.config.getClaimedTime().getMillis()).toDate();
        int claimEntry = (Integer)this.executeQuery(new DBBackedQueue.Query<Integer, QueueSqlDao<T>>((EventEntryModelDao)entry, nextAvailable){
            final /* synthetic */ EventEntryModelDao val$entry;
            final /* synthetic */ Date val$nextAvailable;
            {
                this.val$entry = eventEntryModelDao;
                this.val$nextAvailable = date;
            }

            @Override
            public Integer execute(QueueSqlDao<T> queueSqlDao) {
                long ini = System.nanoTime();
                Integer result = queueSqlDao.claimEntry(this.val$entry.getRecordId(), DBBackedQueueWithPolling.this.clock.getUTCNow().toDate(), CreatorName.get(), this.val$nextAvailable, DBBackedQueueWithPolling.this.config.getTableName());
                DBBackedQueueWithPolling.this.rawClaimEntryTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                return result;
            }
        });
        boolean bl = claimed = claimEntry == 1;
        if (claimed) {
            log.debug("{} Claimed entry {}", (Object)this.DB_QUEUE_LOG_ID, entry);
        }
        return claimed;
    }
}

