/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.killbill.commons.concurrent.Executors;
import org.killbill.commons.metrics.api.Gauge;
import org.killbill.commons.metrics.api.Histogram;
import org.killbill.commons.metrics.api.MetricRegistry;
import org.killbill.commons.metrics.api.Timer;
import org.killbill.queue.QueueObjectMapper;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueLifecycle;
import org.killbill.queue.dao.EventEntryModelDao;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultQueueLifecycle
implements QueueLifecycle {
    public static final String QUEUE_NAME = "Queue";
    private static final Logger log = LoggerFactory.getLogger(DefaultQueueLifecycle.class);
    private static final long ONE_MILLION = 1000000L;
    private static final long MAX_SLEEP_TIME_MS = 100L;
    private static final int MAX_COMPLETED_ENTRIES = 15;
    protected final String svcQName;
    protected final ObjectReader objectReader;
    protected final ObjectWriter objectWriter;
    protected final PersistentQueueConfig config;
    private final LinkedBlockingQueue<EventEntryModelDao> completedOrFailedEvents;
    private final LinkedBlockingQueue<EventEntryModelDao> retriedEvents;
    private final Timer dispatchTime;
    private final Timer completeTime;
    private final Histogram dispatchedEntries;
    private final Histogram completeEntries;
    private final boolean isStickyEvent;
    private volatile boolean isDispatchingEvents;
    private volatile boolean isCompletingEvents;
    private ExecutorService lifecycleDispatcherExecutor;
    private ExecutorService lifecycleCompletionExecutor;
    protected final Gauge<Integer> completedOrFailedEventsGauge;

    public DefaultQueueLifecycle(String svcQName, PersistentQueueConfig config, MetricRegistry metricRegistry) {
        this(svcQName, config, metricRegistry, QueueObjectMapper.get());
    }

    private DefaultQueueLifecycle(String svcQName, PersistentQueueConfig config, MetricRegistry metricRegistry, ObjectMapper objectMapper) {
        this.svcQName = svcQName;
        this.config = config;
        this.isDispatchingEvents = false;
        this.isCompletingEvents = false;
        this.objectReader = objectMapper.reader();
        this.objectWriter = objectMapper.writer();
        this.completedOrFailedEvents = new LinkedBlockingQueue();
        this.retriedEvents = new LinkedBlockingQueue();
        this.isStickyEvent = config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS;
        this.dispatchTime = metricRegistry.timer(String.format("%s.%s.%s", DefaultQueueLifecycle.class.getName(), svcQName, "dispatchTime"));
        this.completeTime = metricRegistry.timer(String.format("%s.%s.%s", DefaultQueueLifecycle.class.getName(), svcQName, "completeTime"));
        this.dispatchedEntries = metricRegistry.histogram(String.format("%s.%s.%s", DefaultQueueLifecycle.class.getName(), svcQName, "dispatchedEntries"));
        this.completeEntries = metricRegistry.histogram(String.format("%s.%s.%s", DefaultQueueLifecycle.class.getName(), svcQName, "completeEntries"));
        this.completedOrFailedEventsGauge = metricRegistry.gauge(String.format("%s.%s.%s.%s", DefaultQueueLifecycle.class.getName(), svcQName, "completedOrFailedEvents", "size"), (Gauge)new Gauge<Integer>(){

            public Integer getValue() {
                return DefaultQueueLifecycle.this.completedOrFailedEvents.size();
            }
        });
    }

    @Override
    public boolean startQueue() {
        int i;
        this.lifecycleDispatcherExecutor = Executors.newFixedThreadPool((int)this.config.geNbLifecycleDispatchThreads(), (String)(this.config.getTableName() + "-lifecycle-dispatcher-th"));
        this.lifecycleCompletionExecutor = Executors.newFixedThreadPool((int)this.config.geNbLifecycleCompleteThreads(), (String)(this.config.getTableName() + "-lifecycle-completion-th"));
        log.info("{}: Starting...", (Object)this.svcQName);
        this.isCompletingEvents = true;
        for (i = 0; i < this.config.geNbLifecycleCompleteThreads(); ++i) {
            this.lifecycleCompletionExecutor.execute(new CompletionRunnable());
        }
        this.isDispatchingEvents = true;
        for (i = 0; i < this.config.geNbLifecycleDispatchThreads(); ++i) {
            this.lifecycleDispatcherExecutor.execute(new DispatcherRunnable());
        }
        return true;
    }

    protected boolean stopLifecycleDispatcher() {
        this.isDispatchingEvents = false;
        this.lifecycleDispatcherExecutor.shutdown();
        try {
            return this.lifecycleDispatcherExecutor.awaitTermination(this.config.getShutdownTimeout().getPeriod(), this.config.getShutdownTimeout().getUnit());
        }
        catch (InterruptedException e) {
            log.info("{}: Lifecycle dispatcher stop sequence has been interrupted", (Object)this.svcQName);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean stopLifecycleCompletion() {
        this.isCompletingEvents = false;
        this.lifecycleCompletionExecutor.shutdown();
        try {
            boolean bl = this.lifecycleCompletionExecutor.awaitTermination(this.config.getShutdownTimeout().getPeriod(), this.config.getShutdownTimeout().getUnit());
            return bl;
        }
        catch (InterruptedException e) {
            log.info("{}: Lifecycle completion stop sequence has been interrupted", (Object)this.svcQName);
            boolean bl = false;
            return bl;
        }
        finally {
            int remainingCompleted = this.completedOrFailedEvents.size();
            int remainingRetried = this.retriedEvents.size();
            if (remainingCompleted > 0 || remainingRetried > 0) {
                log.warn("{}: Stopped queue with {} event/notifications non completed", (Object)this.svcQName, (Object)(remainingCompleted + remainingRetried));
            }
        }
    }

    public <M extends EventEntryModelDao> void dispatchCompletedOrFailedEvents(M event) {
        this.completedOrFailedEvents.add(event);
    }

    public <M extends EventEntryModelDao> void dispatchRetriedEvents(M event) {
        this.retriedEvents.add(event);
    }

    public abstract DispatchResultMetrics doDispatchEvents();

    public abstract void doProcessCompletedEvents(Iterable<? extends EventEntryModelDao> var1);

    public abstract void doProcessRetriedEvents(Iterable<? extends EventEntryModelDao> var1);

    public ObjectReader getObjectReader() {
        return this.objectReader;
    }

    public ObjectWriter getObjectWriter() {
        return this.objectWriter;
    }

    private void withHandlingRuntimeException(RunnableRawCallback cb) throws InterruptedException {
        try {
            cb.callback();
        }
        catch (DBIException e) {
            log.warn("{}: Thread {} got DBIException exception: ", new Object[]{this.svcQName, Thread.currentThread().getName(), e});
        }
        catch (RuntimeException e) {
            log.warn("{}: Thread {} got Runtime exception: ", new Object[]{this.svcQName, Thread.currentThread().getName(), e});
        }
    }

    private static interface RunnableRawCallback {
        public void callback() throws InterruptedException;
    }

    private final class DispatcherRunnable
    implements Runnable {
        private DispatcherRunnable() {
        }

        /*
         * Loose catch block
         */
        @Override
        public void run() {
            try {
                log.info("{}: Dispatching thread {} [{}] starting ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                while (DefaultQueueLifecycle.this.isDispatchingEvents) {
                    DefaultQueueLifecycle.this.withHandlingRuntimeException(new RunnableRawCallback(){

                        @Override
                        public void callback() throws InterruptedException {
                            long beforeLoop = System.nanoTime();
                            DispatcherRunnable.this.dispatchEvents();
                            long afterLoop = System.nanoTime();
                            DispatcherRunnable.this.sleepSporadically((afterLoop - beforeLoop) / 1000000L);
                        }
                    });
                }
            }
            catch (InterruptedException e) {
                log.info("{}: Dispatching thread {} [{}] got interrupted, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
            }
            catch (Error e2) {
                log.error("{}: Dispatching thread {} [{}] got an exception, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId(), e2});
                {
                    catch (Throwable throwable) {
                        log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                        throw throwable;
                    }
                }
                log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
            }
            log.info("{}: Dispatching thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
        }

        private void dispatchEvents() {
            long ini = System.nanoTime();
            DispatchResultMetrics metricsResult = DefaultQueueLifecycle.this.doDispatchEvents();
            DefaultQueueLifecycle.this.dispatchedEntries.update((long)metricsResult.getNbEntries());
            if (DefaultQueueLifecycle.this.isStickyEvent) {
                DefaultQueueLifecycle.this.dispatchTime.update(metricsResult.getTimeNanoSec(), TimeUnit.NANOSECONDS);
            } else {
                DefaultQueueLifecycle.this.dispatchTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
            }
        }

        private void sleepSporadically(long loopTimeMsec) throws InterruptedException {
            long curSleepTime;
            if (DefaultQueueLifecycle.this.isStickyEvent) {
                return;
            }
            for (long remainingSleepTime = DefaultQueueLifecycle.this.config.getPollingSleepTimeMs() - loopTimeMsec; remainingSleepTime > 0L; remainingSleepTime -= curSleepTime) {
                curSleepTime = remainingSleepTime > 100L ? 100L : remainingSleepTime;
                Thread.sleep(curSleepTime);
            }
        }
    }

    private final class CompletionRunnable
    implements Runnable {
        private CompletionRunnable() {
        }

        /*
         * Loose catch block
         */
        @Override
        public void run() {
            try {
                log.info("{}: Completion thread {} [{}] starting ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                while (DefaultQueueLifecycle.this.isCompletingEvents) {
                    DefaultQueueLifecycle.this.withHandlingRuntimeException(new RunnableRawCallback(){

                        @Override
                        public void callback() throws InterruptedException {
                            long ini = System.nanoTime();
                            long pollSleepTime = 0L;
                            ArrayList<EventEntryModelDao> completed = new ArrayList<EventEntryModelDao>(15);
                            DefaultQueueLifecycle.this.completedOrFailedEvents.drainTo(completed, 15);
                            if (completed.isEmpty()) {
                                long beforePollTime = System.nanoTime();
                                EventEntryModelDao entry = DefaultQueueLifecycle.this.completedOrFailedEvents.poll(100L, TimeUnit.MILLISECONDS);
                                pollSleepTime = System.nanoTime() - beforePollTime;
                                if (entry != null) {
                                    completed.add(entry);
                                }
                            }
                            if (!completed.isEmpty()) {
                                DefaultQueueLifecycle.this.doProcessCompletedEvents(completed);
                            }
                            int retried = CompletionRunnable.this.drainRetriedEvents();
                            int completeOrRetried = completed.size() + retried;
                            if (completeOrRetried > 0) {
                                DefaultQueueLifecycle.this.completeEntries.update((long)completeOrRetried);
                                DefaultQueueLifecycle.this.completeTime.update(System.nanoTime() - ini - pollSleepTime, TimeUnit.NANOSECONDS);
                            }
                        }
                    });
                }
            }
            catch (InterruptedException e) {
                log.info("{}: Completion thread {} [{}] got interrupted, exiting... ", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
            }
            catch (Error e2) {
                log.error("{}: Completion thread {} [{}] got an exception, exiting...", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId(), e2});
                {
                    catch (Throwable throwable) {
                        log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
                        throw throwable;
                    }
                }
                log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
            }
            log.info("{}: Completion thread {} [{}] has exited", new Object[]{DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()});
        }

        private int drainRetriedEvents() {
            int curSize = DefaultQueueLifecycle.this.retriedEvents.size();
            if (curSize > 0) {
                ArrayList retried = new ArrayList(curSize);
                DefaultQueueLifecycle.this.retriedEvents.drainTo(retried, curSize);
                DefaultQueueLifecycle.this.doProcessRetriedEvents(retried);
            }
            return curSize;
        }
    }

    public static class DispatchResultMetrics {
        private final int nbEntries;
        private final long timeNanoSec;

        public DispatchResultMetrics(int nbEntries, long timeNanoSec) {
            this.nbEntries = nbEntries;
            this.timeNanoSec = timeNanoSec;
        }

        public int getNbEntries() {
            return this.nbEntries;
        }

        public long getTimeNanoSec() {
            return this.timeNanoSec;
        }
    }
}

