/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue.dispatching;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.killbill.clock.Clock;
import org.killbill.commons.concurrent.DynamicThreadPoolExecutorWithLoggingOnExceptions;
import org.killbill.queue.DefaultQueueLifecycle;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.killbill.queue.api.QueueEvent;
import org.killbill.queue.dao.EventEntryModelDao;
import org.killbill.queue.dispatching.CallableCallback;
import org.killbill.queue.retry.RetryableInternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class Dispatcher<E extends QueueEvent, M extends EventEntryModelDao> {
    private static final Logger log = LoggerFactory.getLogger(Dispatcher.class);
    private final int corePoolSize;
    private final int maximumPoolSize;
    private final long keepAliveTime;
    private final TimeUnit keepAliveTimeUnit;
    private final long shutdownTimeout;
    private final TimeUnit shutdownTimeUnit;
    private final BlockingQueue<Runnable> workQueue;
    private final ThreadFactory threadFactory;
    private final RejectedExecutionHandler rejectionHandler;
    private final int maxFailureRetries;
    private final CallableCallback<E, M> handlerCallback;
    private final DefaultQueueLifecycle parentLifeCycle;
    private final Clock clock;
    private ExecutorService handlerExecutor;

    public Dispatcher(int corePoolSize, PersistentQueueConfig config, long keepAliveTime, TimeUnit keepAliveTimeUnit, long shutdownTimeout, TimeUnit shutdownTimeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectionHandler, Clock clock, CallableCallback<E, M> handlerCallback, DefaultQueueLifecycle parentLifeCycle) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = config.geMaxDispatchThreads();
        this.keepAliveTime = keepAliveTime;
        this.keepAliveTimeUnit = keepAliveTimeUnit;
        this.shutdownTimeout = shutdownTimeout;
        this.shutdownTimeUnit = shutdownTimeUnit;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.rejectionHandler = rejectionHandler;
        this.clock = clock;
        this.maxFailureRetries = config.getMaxFailureRetries();
        this.handlerCallback = handlerCallback;
        this.parentLifeCycle = parentLifeCycle;
    }

    public void start() {
        this.handlerExecutor = new DynamicThreadPoolExecutorWithLoggingOnExceptions(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, this.keepAliveTimeUnit, this.workQueue, this.threadFactory, this.rejectionHandler);
    }

    public boolean stopDispatcher() {
        this.handlerExecutor.shutdown();
        try {
            return this.handlerExecutor.awaitTermination(this.shutdownTimeout, this.shutdownTimeUnit);
        }
        catch (InterruptedException e) {
            log.info("Stop sequence, handlerExecutor has been interrupted");
            return false;
        }
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
    public void dispatch(M modelDao) {
        log.debug("Dispatching entry {}", modelDao);
        CallableQueueHandler<E, M> entry = new CallableQueueHandler<E, M>(modelDao, this.handlerCallback, this.parentLifeCycle, this.clock, this.maxFailureRetries);
        this.handlerExecutor.submit(entry);
    }

    public static class CallableQueueHandler<E extends QueueEvent, M extends EventEntryModelDao>
    implements Callable<E> {
        private static final String MDC_KB_USER_TOKEN = "kb.userToken";
        private static final Logger log = LoggerFactory.getLogger(CallableQueueHandler.class);
        private final M entry;
        private final CallableCallback<E, M> callback;
        private final DefaultQueueLifecycle parentLifeCycle;
        private final int maxFailureRetries;
        private final Clock clock;

        public CallableQueueHandler(M entry, CallableCallback<E, M> callback, DefaultQueueLifecycle parentLifeCycle, Clock clock, int maxFailureRetries) {
            this.entry = entry;
            this.callback = callback;
            this.parentLifeCycle = parentLifeCycle;
            this.clock = clock;
            this.maxFailureRetries = maxFailureRetries;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public E call() throws Exception {
            try {
                UUID userToken = this.entry.getUserToken();
                MDC.put((String)MDC_KB_USER_TOKEN, (String)(userToken != null ? userToken.toString() : null));
                log.debug("Starting processing entry {}", this.entry);
                E event = this.callback.deserialize(this.entry);
                if (event != null) {
                    Throwable lastException = null;
                    long errorCount = this.entry.getErrorCount();
                    try {
                        this.callback.dispatch(event, this.entry);
                    }
                    catch (Exception e) {
                        lastException = e.getCause() != null && e.getCause() instanceof InvocationTargetException ? e.getCause().getCause() : (e.getCause() != null && e.getCause() instanceof RetryableInternalException ? e.getCause() : e);
                        ++errorCount;
                    }
                    finally {
                        if (this.parentLifeCycle != null) {
                            if (lastException == null) {
                                M newEntry = this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.PROCESSED, this.entry.getErrorCount());
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(newEntry);
                                log.debug("Done handling notification {}, key = {}", (Object)this.entry.getRecordId(), (Object)this.entry.getEventJson());
                            } else if (lastException instanceof RetryableInternalException) {
                                M newEntry = this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount());
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(newEntry);
                            } else if (errorCount <= (long)this.maxFailureRetries) {
                                log.info("Dispatch error, will attempt a retry ", lastException);
                                M newEntry = this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.AVAILABLE, errorCount);
                                this.parentLifeCycle.dispatchRetriedEvents(newEntry);
                            } else {
                                log.error("Fatal NotificationQ dispatch error, data corruption...", lastException);
                                M newEntry = this.callback.buildEntry(this.entry, this.clock.getUTCNow(), PersistentQueueEntryLifecycleState.FAILED, this.entry.getErrorCount());
                                this.parentLifeCycle.dispatchCompletedOrFailedEvents(newEntry);
                            }
                        }
                    }
                }
                E e = event;
                return e;
            }
            finally {
                MDC.remove((String)MDC_KB_USER_TOKEN);
            }
        }
    }
}

