/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue.dispatching;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.killbill.commons.concurrent.DynamicThreadPoolExecutorWithLoggingOnExceptions;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dispatching.CallableCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class Dispatcher<M extends EventEntryModelDao> {
    private static final Logger log = LoggerFactory.getLogger(Dispatcher.class);
    private final int corePoolSize;
    private final int maximumPoolSize;
    private final long keepAliveTime;
    private final TimeUnit keepAliveTimeUnit;
    private final BlockingQueue<Runnable> workQueue;
    private final ThreadFactory threadFactory;
    private final RejectedExecutionHandler rejectionHandler;
    private ExecutorService executor;

    public Dispatcher(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectionHandler) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = keepAliveTime;
        this.keepAliveTimeUnit = keepAliveTimeUnit;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.rejectionHandler = rejectionHandler;
    }

    public void start() {
        this.executor = new DynamicThreadPoolExecutorWithLoggingOnExceptions(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, this.keepAliveTimeUnit, this.workQueue, this.threadFactory, this.rejectionHandler);
    }

    public void stop() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.info(String.format("Stop sequence has been interrupted", new Object[0]));
        }
    }

    public <E extends QueueEvent> void dispatch(M modelDao, CallableCallback<E, M> callback) {
        log.debug("Dispatching entry: recordId={} className={} json={}", new Object[]{modelDao.getRecordId(), modelDao.getClassName(), modelDao.getEventJson()});
        CallableQueue<E, M> entry = new CallableQueue<E, M>(modelDao, callback);
        this.executor.submit(entry);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static class CallableQueue<E extends QueueEvent, M extends EventEntryModelDao>
    implements Callable<E> {
        private final M entry;
        private final CallableCallback<E, M> callback;

        public CallableQueue(M entry, CallableCallback<E, M> callback) {
            this.entry = entry;
            this.callback = callback;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public E call() throws Exception {
            E event = this.callback.deserialize(this.entry);
            if (event != null) {
                Throwable lastException = null;
                long errorCount = this.entry.getErrorCount();
                try {
                    this.callback.dispatch(event, this.entry);
                }
                catch (Exception e) {
                    lastException = e.getCause() != null && e.getCause() instanceof InvocationTargetException ? e.getCause().getCause() : e;
                    lastException = e;
                    ++errorCount;
                }
                finally {
                    this.callback.updateErrorCountOrMoveToHistory(event, this.entry, errorCount, lastException);
                }
            }
            return event;
        }
    }
}

