/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.bus;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.EventBusException;
import com.google.common.eventbus.EventBusThatThrowsException;
import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import javax.sql.DataSource;
import org.joda.time.DateTime;
import org.killbill.CreatorName;
import org.killbill.bus.BusReaper;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.BusEventWithMetadata;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
import org.killbill.bus.dao.PersistentBusSqlDao;
import org.killbill.bus.dispatching.BusCallableCallback;
import org.killbill.clock.Clock;
import org.killbill.clock.DefaultClock;
import org.killbill.commons.jdbi.notification.DatabaseTransactionNotificationApi;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingFeature;
import org.killbill.queue.DBBackedQueue;
import org.killbill.queue.DefaultQueueLifecycle;
import org.killbill.queue.InTransaction;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dispatching.BlockingRejectionExecutionHandler;
import org.killbill.queue.dispatching.CallableCallbackBase;
import org.killbill.queue.dispatching.Dispatcher;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPersistentBus
extends DefaultQueueLifecycle
implements PersistentBus {
    private static final Logger log = LoggerFactory.getLogger(DefaultPersistentBus.class);
    private final DBI dbi;
    private final EventBusThatThrowsException eventBusDelegate;
    private final DBBackedQueue<BusEventModelDao> dao;
    private final Clock clock;
    private final PersistentBusConfig config;
    private final Timer dispatchTimer;
    private final Profiling<Iterable<BusEventModelDao>, RuntimeException> prof;
    private final BusReaper reaper;
    private final Dispatcher<BusEventModelDao> dispatcher;
    private final AtomicBoolean isStarted;
    private final String dbBackedQId;

    @Inject
    public DefaultPersistentBus(@Named(value="Queue") IDBI dbi, Clock clock, final PersistentBusConfig config, MetricRegistry metricRegistry, DatabaseTransactionNotificationApi databaseTransactionNotificationApi) {
        super("Bus", config);
        this.dbi = (DBI)dbi;
        this.clock = clock;
        this.config = config;
        this.dbBackedQId = "bus-" + config.getTableName();
        this.dao = new DBBackedQueue(clock, dbi, PersistentBusSqlDao.class, config, this.dbBackedQId, metricRegistry, databaseTransactionNotificationApi);
        this.prof = new Profiling();
        ThreadFactory busThreadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(new ThreadGroup("bus-grp"), r, config.getTableName() + "-th");
            }
        };
        this.dispatchTimer = metricRegistry.timer(MetricRegistry.name(DefaultPersistentBus.class, (String[])new String[]{"dispatch"}));
        this.dispatcher = new Dispatcher(1, config.geMaxDispatchThreads(), 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(config.getEventQueueCapacity()), busThreadFactory, new BlockingRejectionExecutionHandler());
        this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
        this.isStarted = new AtomicBoolean(false);
        this.reaper = new BusReaper(this.dao, config, clock);
    }

    public DefaultPersistentBus(DataSource dataSource, Properties properties) {
        this((IDBI)InTransaction.buildDDBI(dataSource), (Clock)new DefaultClock(), (PersistentBusConfig)new ConfigurationObjectFactory(properties).buildWithReplacements(PersistentBusConfig.class, (Map)ImmutableMap.of((Object)"instanceName", (Object)"main")), new MetricRegistry(), new DatabaseTransactionNotificationApi());
    }

    @Override
    public void start() {
        if (this.config.isProcessingOff()) {
            log.warn("PersistentBus processing is off, does not start");
            return;
        }
        if (this.isStarted.compareAndSet(false, true)) {
            this.dao.initialize();
            this.dispatcher.start();
            this.startQueue();
        }
        if (this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_POLLING || this.config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS) {
            this.reaper.start();
        }
    }

    @Override
    public void stop() {
        if (this.isStarted.compareAndSet(true, false)) {
            this.stopQueue();
            this.dispatcher.stop();
        }
        this.reaper.stop();
    }

    @Override
    public int doProcessEvents() {
        List<BusEventModelDao> events = this.dao.getReadyEntries();
        if (events.isEmpty()) {
            return 0;
        }
        log.debug("Bus events from {} to process: {}", (Object)this.config.getTableName(), events);
        for (BusEventModelDao cur : events) {
            BusCallableCallback callback = new BusCallableCallback(this);
            this.dispatcher.dispatch(cur, callback);
        }
        return events.size();
    }

    @Override
    public boolean isStarted() {
        return this.isStarted.get();
    }

    @Override
    public void register(Object handlerInstance) throws PersistentBus.EventBusException {
        if (this.isStarted.get()) {
            this.eventBusDelegate.register(handlerInstance);
        } else {
            log.warn("Attempting to register handler " + handlerInstance + " in a non initialized bus");
        }
    }

    @Override
    public void unregister(Object handlerInstance) throws PersistentBus.EventBusException {
        if (this.isStarted.get()) {
            this.eventBusDelegate.unregister(handlerInstance);
        } else {
            log.warn("Attempting to unregister handler " + handlerInstance + " in a non initialized bus");
        }
    }

    @Override
    public void post(BusEvent event) throws PersistentBus.EventBusException {
        try {
            if (this.isStarted.get()) {
                String json = this.objectMapper.writeValueAsString((Object)event);
                BusEventModelDao entry = new BusEventModelDao(CreatorName.get(), this.clock.getUTCNow(), event.getClass().getName(), json, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
                this.dao.insertEntry(entry);
            } else {
                log.warn("Attempting to post event " + event + " in a non initialized bus");
            }
        }
        catch (Exception e) {
            log.error("Failed to post BusEvent " + event, (Throwable)e);
        }
    }

    @Override
    public void postFromTransaction(BusEvent event, Connection connection) throws PersistentBus.EventBusException {
        String json;
        if (!this.isStarted.get()) {
            log.warn("Attempting to post event " + event + " in a non initialized bus");
            return;
        }
        try {
            json = this.objectMapper.writeValueAsString((Object)event);
        }
        catch (JsonProcessingException e) {
            log.warn("Unable to serialize event " + event, (Throwable)e);
            return;
        }
        final BusEventModelDao entry = new BusEventModelDao(CreatorName.get(), this.clock.getUTCNow(), event.getClass().getName(), json, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
        InTransaction.InTransactionHandler<PersistentBusSqlDao, Void> handler = new InTransaction.InTransactionHandler<PersistentBusSqlDao, Void>(){

            @Override
            public Void withSqlDao(PersistentBusSqlDao transactional) throws Exception {
                DefaultPersistentBus.this.dao.insertEntryFromTransaction(transactional, entry);
                return null;
            }
        };
        InTransaction.execute(this.dbi, connection, handler, PersistentBusSqlDao.class);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKeys(Long searchKey1, Long searchKey2) {
        return this.getAvailableBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), null, searchKey1, searchKey2);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsFromTransactionForSearchKeys(final Long searchKey1, final Long searchKey2, Connection connection) {
        InTransaction.InTransactionHandler handler = new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>(){

            @Override
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao transactional) throws Exception {
                return DefaultPersistentBus.this.getAvailableBusEventsForSearchKeysInternal(transactional, null, searchKey1, searchKey2);
            }
        };
        return (Iterable)InTransaction.execute(this.dbi, connection, handler, PersistentBusSqlDao.class);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKey2(DateTime maxCreatedDate, Long searchKey2) {
        return this.getAvailableBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), maxCreatedDate, null, searchKey2);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsFromTransactionForSearchKey2(final DateTime maxCreatedDate, final Long searchKey2, Connection connection) {
        InTransaction.InTransactionHandler handler = new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>(){

            @Override
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao transactional) throws Exception {
                return DefaultPersistentBus.this.getAvailableBusEventsForSearchKeysInternal(transactional, maxCreatedDate, null, searchKey2);
            }
        };
        return (Iterable)InTransaction.execute(this.dbi, connection, handler, PersistentBusSqlDao.class);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getInProcessingBusEvents() {
        return this.toBusEventWithMetadata(this.dao.getSqlDao().getInProcessingEntries(this.config.getTableName()));
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKeys(Long searchKey1, Long searchKey2) {
        return this.getAvailableOrInProcessingBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), null, searchKey1, searchKey2);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsFromTransactionForSearchKeys(final Long searchKey1, final Long searchKey2, Connection connection) {
        InTransaction.InTransactionHandler handler = new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>(){

            @Override
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao transactional) throws Exception {
                return DefaultPersistentBus.this.getAvailableOrInProcessingBusEventsForSearchKeysInternal(transactional, null, searchKey1, searchKey2);
            }
        };
        return (Iterable)InTransaction.execute(this.dbi, connection, handler, PersistentBusSqlDao.class);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKey2(DateTime maxCreatedDate, Long searchKey2) {
        return this.getAvailableOrInProcessingBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), maxCreatedDate, null, searchKey2);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsFromTransactionForSearchKey2(final DateTime maxCreatedDate, final Long searchKey2, Connection connection) {
        InTransaction.InTransactionHandler handler = new InTransaction.InTransactionHandler<PersistentBusSqlDao, Iterable<BusEventWithMetadata<T>>>(){

            @Override
            public Iterable<BusEventWithMetadata<T>> withSqlDao(PersistentBusSqlDao transactional) throws Exception {
                return DefaultPersistentBus.this.getAvailableOrInProcessingBusEventsForSearchKeysInternal(transactional, maxCreatedDate, null, searchKey2);
            }
        };
        return (Iterable)InTransaction.execute(this.dbi, connection, handler, PersistentBusSqlDao.class);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKeys(Long searchKey1, Long searchKey2) {
        return this.getHistoricalBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), null, searchKey1, searchKey2);
    }

    @Override
    public <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKey2(DateTime minCreatedDate, Long searchKey2) {
        return this.getHistoricalBusEventsForSearchKeysInternal((PersistentBusSqlDao)this.dao.getSqlDao(), minCreatedDate, null, searchKey2);
    }

    @Override
    public long getNbReadyEntries(DateTime maxCreatedDate) {
        return this.dao.getNbReadyEntries(maxCreatedDate.toDate());
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("DefaultPersistentBus{");
        sb.append("dbBackedQId='").append(this.dbBackedQId).append('\'');
        sb.append('}');
        return sb.toString();
    }

    public void dispatchBusEventWithMetrics(QueueEvent event) throws EventBusException {
        Timer.Context dispatchTimerContext = this.dispatchTimer.time();
        try {
            this.eventBusDelegate.postWithException(event);
        }
        finally {
            dispatchTimerContext.stop();
        }
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableBusEventsForSearchKeysInternal(PersistentBusSqlDao transactionalDao, @Nullable DateTime maxCreatedDate, @Nullable Long searchKey1, Long searchKey2) {
        Iterable<BusEventModelDao> entries = this.getReadyQueueEntriesForSearchKeysWithProfiling(transactionalDao, maxCreatedDate, searchKey1, searchKey2);
        return this.toBusEventWithMetadata(entries);
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getAvailableOrInProcessingBusEventsForSearchKeysInternal(PersistentBusSqlDao transactionalDao, @Nullable DateTime maxCreatedDate, @Nullable Long searchKey1, Long searchKey2) {
        Iterable<BusEventModelDao> entries = this.getReadyOrInProcessingQueueEntriesForSearchKeysWithProfiling(transactionalDao, maxCreatedDate, searchKey1, searchKey2);
        return this.toBusEventWithMetadata(entries);
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> getHistoricalBusEventsForSearchKeysInternal(PersistentBusSqlDao transactionalDao, @Nullable DateTime minCreatedDate, @Nullable Long searchKey1, Long searchKey2) {
        Iterable<BusEventModelDao> entries = this.getHistoricalQueueEntriesForSearchKeysWithProfiling(transactionalDao, minCreatedDate, searchKey1, searchKey2);
        return this.toBusEventWithMetadata(entries);
    }

    private Iterable<BusEventModelDao> getReadyQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao transactionalDao, final @Nullable DateTime maxCreatedDate, final @Nullable Long searchKey1, final Long searchKey2) {
        return (Iterable)this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getReadyQueueEntriesForSearchKeys", (Profiling.WithProfilingCallback)new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>(){

            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>(){

                    @Override
                    public Iterator<BusEventModelDao> iterator() {
                        return searchKey1 != null ? transactionalDao.getReadyQueueEntriesForSearchKeys(searchKey1, searchKey2, DefaultPersistentBus.this.config.getTableName()) : transactionalDao.getReadyQueueEntriesForSearchKey2(maxCreatedDate, searchKey2, DefaultPersistentBus.this.config.getTableName());
                    }
                };
            }
        });
    }

    private Iterable<BusEventModelDao> getReadyOrInProcessingQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao transactionalDao, final @Nullable DateTime maxCreatedDate, final @Nullable Long searchKey1, final Long searchKey2) {
        return (Iterable)this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getReadyOrInProcessingQueueEntriesForSearchKeys", (Profiling.WithProfilingCallback)new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>(){

            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>(){

                    @Override
                    public Iterator<BusEventModelDao> iterator() {
                        return searchKey1 != null ? transactionalDao.getReadyOrInProcessingQueueEntriesForSearchKeys(searchKey1, searchKey2, DefaultPersistentBus.this.config.getTableName()) : transactionalDao.getReadyOrInProcessingQueueEntriesForSearchKey2(maxCreatedDate, searchKey2, DefaultPersistentBus.this.config.getTableName());
                    }
                };
            }
        });
    }

    private Iterable<BusEventModelDao> getHistoricalQueueEntriesForSearchKeysWithProfiling(final PersistentBusSqlDao transactionalDao, final @Nullable DateTime minCreatedDate, final @Nullable Long searchKey1, final Long searchKey2) {
        return (Iterable)this.prof.executeWithProfiling(ProfilingFeature.ProfilingFeatureType.DAO, "DAO:PersistentBusSqlDao:getHistoricalQueueEntriesForSearchKeys", (Profiling.WithProfilingCallback)new Profiling.WithProfilingCallback<Iterable<BusEventModelDao>, RuntimeException>(){

            public Iterable<BusEventModelDao> execute() throws RuntimeException {
                return new Iterable<BusEventModelDao>(){

                    @Override
                    public Iterator<BusEventModelDao> iterator() {
                        return searchKey1 != null ? transactionalDao.getHistoricalQueueEntriesForSearchKeys(searchKey1, searchKey2, DefaultPersistentBus.this.config.getHistoryTableName()) : transactionalDao.getHistoricalQueueEntriesForSearchKey2(minCreatedDate, searchKey2, DefaultPersistentBus.this.config.getHistoryTableName());
                    }
                };
            }
        });
    }

    private <T extends BusEvent> Iterable<BusEventWithMetadata<T>> toBusEventWithMetadata(Iterable<BusEventModelDao> entries) {
        return Iterables.transform(entries, (Function)new Function<BusEventModelDao, BusEventWithMetadata<T>>(){

            public BusEventWithMetadata<T> apply(BusEventModelDao input) {
                return DefaultPersistentBus.this.toBusEventWithMetadata(input);
            }
        });
    }

    private <T extends BusEvent> BusEventWithMetadata<T> toBusEventWithMetadata(BusEventModelDao entry) {
        BusEvent event = (BusEvent)CallableCallbackBase.deserializeEvent(entry, this.objectMapper);
        return new BusEventWithMetadata<BusEvent>(entry.getRecordId(), entry.getUserToken(), entry.getCreatedDate(), entry.getSearchKey1(), entry.getSearchKey2(), event);
    }

    public DBBackedQueue<BusEventModelDao> getDao() {
        return this.dao;
    }

    public Clock getClock() {
        return this.clock;
    }

    public PersistentBusConfig getConfig() {
        return this.config;
    }

    private static final class EventBusDelegate
    extends EventBusThatThrowsException {
        public EventBusDelegate(String busName) {
            super(busName);
        }
    }
}

