/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.clock.Clock;
import org.killbill.commons.metrics.api.MetricRegistry;
import org.killbill.commons.metrics.api.Timer;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingFeature;
import org.killbill.commons.utils.collect.Iterables;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dao.QueueSqlDao;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DBBackedQueue<T extends EventEntryModelDao> {
    protected static final Logger log = LoggerFactory.getLogger(DBBackedQueue.class);
    protected final String DB_QUEUE_LOG_ID;
    protected final IDBI dbi;
    protected final Class<? extends QueueSqlDao<T>> sqlDaoClass;
    protected final QueueSqlDao<T> sqlDao;
    protected final Clock clock;
    protected final PersistentQueueConfig config;
    protected final Timer rawGetEntriesTime;
    protected final Timer rawInsertEntryTime;
    protected final Timer rawClaimEntriesTime;
    protected final Timer rawClaimEntryTime;
    protected final Timer rawDeleteEntriesTime;
    protected final Timer rawDeleteEntryTime;
    protected final Profiling<Long, RuntimeException> prof;

    public DBBackedQueue(Clock clock, IDBI dbi, Class<? extends QueueSqlDao<T>> sqlDaoClass, PersistentQueueConfig config, String dbBackedQId, MetricRegistry metricRegistry) {
        this.dbi = dbi;
        this.sqlDaoClass = sqlDaoClass;
        this.sqlDao = (QueueSqlDao)dbi.onDemand(sqlDaoClass);
        this.config = config;
        this.clock = clock;
        this.prof = new Profiling();
        this.rawGetEntriesTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawGetEntriesTime"));
        this.rawInsertEntryTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawInsertEntryTime"));
        this.rawClaimEntriesTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawClaimEntriesTime"));
        this.rawClaimEntryTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawClaimEntryTime"));
        this.rawDeleteEntriesTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawDeleteEntriesTime"));
        this.rawDeleteEntryTime = metricRegistry.timer(String.format("%s.%s.%s", DBBackedQueue.class.getName(), dbBackedQId, "rawDeleteEntryTime"));
        this.DB_QUEUE_LOG_ID = "DBBackedQueue-" + dbBackedQId;
    }

    public abstract void initialize();

    public abstract void close();

    public abstract ReadyEntriesWithMetrics<T> getReadyEntries();

    public abstract void insertEntryFromTransaction(QueueSqlDao<T> var1, T var2);

    public abstract void updateOnError(T var1);

    protected abstract void insertReapedEntriesFromTransaction(QueueSqlDao<T> var1, List<T> var2, DateTime var3);

    public void insertEntry(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) {
                DBBackedQueue.this.insertEntryFromTransaction(transactional, this.val$entry);
                return null;
            }
        });
    }

    public void moveEntryToHistory(T entry) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>((EventEntryModelDao)entry){
            final /* synthetic */ EventEntryModelDao val$entry;
            {
                this.val$entry = eventEntryModelDao;
            }

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                DBBackedQueue.this.moveEntryToHistoryFromTransaction(transactional, this.val$entry);
                return null;
            }
        });
    }

    public void moveEntryToHistoryFromTransaction(QueueSqlDao<T> transactional, T entry) {
        try {
            switch (entry.getProcessingState()) {
                case FAILED: 
                case PROCESSED: 
                case REMOVED: 
                case REAPED: {
                    break;
                }
                default: {
                    log.warn("{} Unexpected terminal event state={} for record_id={}", new Object[]{this.DB_QUEUE_LOG_ID, entry.getProcessingState(), entry.getRecordId()});
                }
            }
            log.debug("{} Moving entry into history: recordId={}, className={}, json={}", new Object[]{this.DB_QUEUE_LOG_ID, entry.getRecordId(), entry.getClassName(), entry.getEventJson()});
            long ini = System.nanoTime();
            transactional.insertEntry(entry, this.config.getHistoryTableName());
            transactional.removeEntry(entry.getRecordId(), this.config.getTableName());
            this.rawDeleteEntryTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
        }
        catch (Exception e) {
            log.warn("{} Failed to move entry into history: {}", new Object[]{this.DB_QUEUE_LOG_ID, entry, e});
        }
    }

    public void moveEntriesToHistory(final Iterable<T> entries) {
        try {
            this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>(){

                public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(transactional, entries);
                    return null;
                }
            });
        }
        catch (Exception e) {
            log.warn("{} Failed to move entries into history: {}", new Object[]{this.DB_QUEUE_LOG_ID, entries, e});
        }
    }

    public void moveEntriesToHistoryFromTransaction(QueueSqlDao<T> transactional, Iterable<T> entries) {
        if (!entries.iterator().hasNext()) {
            return;
        }
        for (EventEntryModelDao cur : entries) {
            switch (cur.getProcessingState()) {
                case FAILED: 
                case PROCESSED: 
                case REMOVED: 
                case REAPED: {
                    break;
                }
                default: {
                    log.warn("{} Unexpected terminal event state={} for record_id={}", new Object[]{this.DB_QUEUE_LOG_ID, cur.getProcessingState(), cur.getRecordId()});
                }
            }
            log.debug("{} Moving entry into history: recordId={}, className={}, json={}", new Object[]{this.DB_QUEUE_LOG_ID, cur.getRecordId(), cur.getClassName(), cur.getEventJson()});
        }
        Collection toBeRemovedRecordIds = Iterables.toStream(entries).map(input -> input == null ? Long.valueOf(-1L) : input.getRecordId()).collect(Collectors.toUnmodifiableList());
        long ini = System.nanoTime();
        transactional.insertEntries(entries, this.config.getHistoryTableName());
        transactional.removeEntries(toBeRemovedRecordIds, this.config.getTableName());
        this.rawDeleteEntriesTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
    }

    protected long getNbReadyEntries() {
        Date now = this.clock.getUTCNow().toDate();
        return this.getNbReadyEntries(now);
    }

    public long getNbReadyEntries(final Date now) {
        final String owner = this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.POLLING ? null : CreatorName.get();
        return (Long)this.executeQuery(new Query<Long, QueueSqlDao<T>>(){

            @Override
            public Long execute(QueueSqlDao<T> queueSqlDao) {
                return queueSqlDao.getNbReadyEntries(now, owner, DBBackedQueue.this.config.getTableName());
            }
        });
    }

    protected Long safeInsertEntry(final QueueSqlDao<T> transactional, final T entry) {
        return (Long)this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "QueueSqlDao:insert", (Profiling.WithProfilingCallback)new Profiling.WithProfilingCallback<Long, RuntimeException>(){

            public Long execute() throws RuntimeException {
                long init = System.nanoTime();
                Long lastInsertId = transactional.insertEntry(entry, DBBackedQueue.this.config.getTableName());
                if (lastInsertId > 0L) {
                    log.debug("{} Inserting entry: lastInsertId={}, entry={}", new Object[]{DBBackedQueue.this.DB_QUEUE_LOG_ID, lastInsertId, entry});
                } else {
                    log.warn("{} Error inserting entry: lastInsertId={}, entry={}", new Object[]{DBBackedQueue.this.DB_QUEUE_LOG_ID, lastInsertId, entry});
                }
                DBBackedQueue.this.rawInsertEntryTime.update(System.nanoTime() - init, TimeUnit.NANOSECONDS);
                return lastInsertId;
            }
        });
    }

    public void reapEntries(final Date reapingDate) {
        this.executeTransaction(new Transaction<Void, QueueSqlDao<T>>(){

            public Void inTransaction(QueueSqlDao<T> transactional, TransactionStatus status) throws Exception {
                DateTime now = DBBackedQueue.this.clock.getUTCNow();
                String owner = CreatorName.get();
                List entriesLeftBehind = transactional.getEntriesLeftBehind(DBBackedQueue.this.config.getMaxReDispatchCount(), now.toDate(), reapingDate, DBBackedQueue.this.config.getTableName());
                if (entriesLeftBehind.isEmpty()) {
                    return null;
                }
                ArrayList<EventEntryModelDao> entriesToReInsert = new ArrayList<EventEntryModelDao>(entriesLeftBehind.size());
                LinkedList<EventEntryModelDao> lateEntries = new LinkedList<EventEntryModelDao>();
                for (EventEntryModelDao entryLeftBehind : entriesLeftBehind) {
                    boolean entryCreatedByThisNodeAndNeverProcessed;
                    boolean bl = entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null;
                    if (entryCreatedByThisNodeAndNeverProcessed) {
                        lateEntries.add(entryLeftBehind);
                        continue;
                    }
                    entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
                    entriesToReInsert.add(entryLeftBehind);
                }
                if (!lateEntries.isEmpty()) {
                    log.warn("{} reapEntries: late queue entries {}", (Object)DBBackedQueue.this.DB_QUEUE_LOG_ID, lateEntries);
                }
                if (!entriesToReInsert.isEmpty()) {
                    DBBackedQueue.this.moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert);
                    DBBackedQueue.this.insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now);
                    log.warn("{} reapEntries: {} entries were reaped by {} {}", new Object[]{DBBackedQueue.this.DB_QUEUE_LOG_ID, entriesToReInsert.size(), owner, entriesToReInsert.stream().map(input -> input == null ? null : input.getUserToken())});
                }
                return null;
            }
        });
    }

    protected <U> U executeQuery(final Query<U, QueueSqlDao<T>> query) {
        return (U)this.dbi.withHandle(new HandleCallback<U>(){

            public U withHandle(Handle handle) throws Exception {
                Object result = query.execute((QueueSqlDao)handle.attach(DBBackedQueue.this.sqlDaoClass));
                DBBackedQueue.this.printSQLWarnings(handle);
                return result;
            }
        });
    }

    protected <U> U executeTransaction(final Transaction<U, QueueSqlDao<T>> transaction) {
        return (U)this.dbi.inTransaction(new TransactionCallback<U>(){

            public U inTransaction(Handle handle, TransactionStatus status) throws Exception {
                Object result = transaction.inTransaction((Object)((QueueSqlDao)handle.attach(DBBackedQueue.this.sqlDaoClass)), status);
                DBBackedQueue.this.printSQLWarnings(handle);
                return result;
            }
        });
    }

    protected void printSQLWarnings(Handle handle) {
        try {
            for (SQLWarning warning = handle.getConnection().getWarnings(); warning != null; warning = warning.getNextWarning()) {
                log.debug("[SQL WARNING] {}", (Throwable)warning);
            }
            handle.getConnection().clearWarnings();
        }
        catch (SQLException e) {
            log.debug("Error whilst retrieving SQL warnings", (Throwable)e);
        }
    }

    public QueueSqlDao<T> getSqlDao() {
        return this.sqlDao;
    }

    protected static interface Query<U, QueueSqlDao> {
        public U execute(QueueSqlDao var1);
    }

    public static class ReadyEntriesWithMetrics<T extends EventEntryModelDao> {
        private final List<T> entries;
        private final long time;

        public ReadyEntriesWithMetrics(List<T> entries, long time) {
            this.entries = new ArrayList<T>(entries);
            this.time = time;
        }

        public List<T> getEntries() {
            return this.entries;
        }

        public long getTime() {
            return this.time;
        }
    }
}

