/*
 * Decompiled with CFR 0.152.
 */
package org.killbill.queue;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.killbill.commons.concurrent.Executors;
import org.killbill.queue.QueueObjectMapper;
import org.killbill.queue.api.PersistentQueueConfig;
import org.killbill.queue.api.QueueLifecycle;
import org.killbill.queue.dao.EventEntryModelDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultQueueLifecycle
implements QueueLifecycle {
    public static final String QUEUE_NAME = "Queue";
    private static final Logger log = LoggerFactory.getLogger(DefaultQueueLifecycle.class);
    private static final long ONE_MILLION = 1000000L;
    private static final long MAX_SLEEP_TIME_MS = 100L;
    protected final String svcQName;
    protected final ObjectMapper objectMapper;
    protected final PersistentQueueConfig config;
    private final LinkedBlockingQueue<EventEntryModelDao> completedOrFailedEvents;
    private final LinkedBlockingQueue<EventEntryModelDao> retriedEvents;
    private final Timer dispatchTime;
    private final Timer completeTime;
    private final Histogram dispatchedEntries;
    private final Histogram completeEntries;
    private final boolean isStickyEvent;
    private volatile boolean isProcessingEvents;
    private ExecutorService executor;

    public DefaultQueueLifecycle(String svcQName, PersistentQueueConfig config, MetricRegistry metricRegistry) {
        this(svcQName, config, metricRegistry, QueueObjectMapper.get());
    }

    private DefaultQueueLifecycle(String svcQName, PersistentQueueConfig config, MetricRegistry metricRegistry, ObjectMapper objectMapper) {
        this.svcQName = svcQName;
        this.config = config;
        this.isProcessingEvents = false;
        this.objectMapper = objectMapper;
        this.completedOrFailedEvents = new LinkedBlockingQueue();
        this.retriedEvents = new LinkedBlockingQueue();
        this.isStickyEvent = config.getPersistentQueueMode() == PersistentQueueConfig.PersistentQueueMode.STICKY_EVENTS;
        this.dispatchTime = metricRegistry.timer(MetricRegistry.name(DefaultQueueLifecycle.class, (String[])new String[]{"dispatchTime"}));
        this.completeTime = metricRegistry.timer(MetricRegistry.name(DefaultQueueLifecycle.class, (String[])new String[]{"completeTime"}));
        this.dispatchedEntries = metricRegistry.histogram(MetricRegistry.name(DefaultQueueLifecycle.class, (String[])new String[]{"dispatchedEntries"}));
        this.completeEntries = metricRegistry.histogram(MetricRegistry.name(DefaultQueueLifecycle.class, (String[])new String[]{"completeEntries"}));
    }

    @Override
    public boolean startQueue() {
        this.executor = Executors.newFixedThreadPool((int)1, (String)(this.config.getTableName() + "-lifecycle-th"));
        this.isProcessingEvents = true;
        log.info(String.format("%s: Starting...", this.svcQName));
        this.executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Loose catch block
             */
            @Override
            public void run() {
                log.info(String.format("%s: Thread %s [%d] starting", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName(), Thread.currentThread().getId()));
                try {
                    while (true) {
                        this.completeOrRetryProcessedEvents();
                        if (!DefaultQueueLifecycle.this.isProcessingEvents) break;
                        long beforeLoop = System.nanoTime();
                        this.dispatchEvents();
                        long afterLoop = System.nanoTime();
                        this.completeOrRetryProcessedEvents();
                        this.sleepSporadically((afterLoop - beforeLoop) / 1000000L);
                    }
                    this.completeOrRetryProcessedEvents();
                }
                catch (InterruptedException e) {
                    log.info(String.format("%s: Thread %s got interrupted, exting... ", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                    this.completeOrRetryProcessedEvents();
                    log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                }
                catch (Throwable e2) {
                    log.error(String.format("%s: Thread %s got an exception, exting... ", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()), e2);
                    this.completeOrRetryProcessedEvents();
                    {
                        catch (Throwable throwable) {
                            this.completeOrRetryProcessedEvents();
                            log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                            throw throwable;
                        }
                    }
                    log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
                }
                log.info(String.format("%s: Thread %s has exited", DefaultQueueLifecycle.this.svcQName, Thread.currentThread().getName()));
            }

            private void dispatchEvents() {
                long ini = System.nanoTime();
                DispatchResultMetrics metricsResult = DefaultQueueLifecycle.this.doDispatchEvents();
                DefaultQueueLifecycle.this.dispatchedEntries.update(metricsResult.getNbEntries());
                if (DefaultQueueLifecycle.this.isStickyEvent) {
                    DefaultQueueLifecycle.this.dispatchTime.update(metricsResult.getTimeNanoSec(), TimeUnit.NANOSECONDS);
                } else {
                    DefaultQueueLifecycle.this.dispatchTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                }
            }

            private void completeOrRetryProcessedEvents() {
                int retried;
                long ini = System.nanoTime();
                int completed = this.drainCompletedEvents();
                int completeOrRetried = completed + (retried = this.drainRetriedEvents());
                if (completeOrRetried > 0) {
                    DefaultQueueLifecycle.this.completeEntries.update(completeOrRetried);
                    DefaultQueueLifecycle.this.completeTime.update(System.nanoTime() - ini, TimeUnit.NANOSECONDS);
                }
            }

            private int drainCompletedEvents() {
                int curSize = DefaultQueueLifecycle.this.completedOrFailedEvents.size();
                if (curSize > 0) {
                    ArrayList completed = new ArrayList(curSize);
                    DefaultQueueLifecycle.this.completedOrFailedEvents.drainTo(completed, curSize);
                    DefaultQueueLifecycle.this.doProcessCompletedEvents(completed);
                }
                return curSize;
            }

            private int drainRetriedEvents() {
                int curSize = DefaultQueueLifecycle.this.retriedEvents.size();
                if (curSize > 0) {
                    ArrayList retried = new ArrayList(curSize);
                    DefaultQueueLifecycle.this.retriedEvents.drainTo(retried, curSize);
                    DefaultQueueLifecycle.this.doProcessRetriedEvents(retried);
                }
                return curSize;
            }

            private void sleepSporadically(long loopTimeMsec) throws InterruptedException {
                long curSleepTime;
                if (DefaultQueueLifecycle.this.isStickyEvent) {
                    return;
                }
                for (long remainingSleepTime = DefaultQueueLifecycle.this.config.getPollingSleepTimeMs() - loopTimeMsec; remainingSleepTime > 0L; remainingSleepTime -= curSleepTime) {
                    curSleepTime = remainingSleepTime > 100L ? 100L : remainingSleepTime;
                    Thread.sleep(curSleepTime);
                    this.completeOrRetryProcessedEvents();
                }
            }
        });
        return true;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void stopQueue() {
        int remainingRetried;
        int remainingCompleted;
        this.isProcessingEvents = false;
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(this.config.getPollingSleepTimeMs(), TimeUnit.SECONDS);
            remainingCompleted = this.completedOrFailedEvents.size();
            remainingRetried = this.retriedEvents.size();
            if (remainingCompleted <= 0 && remainingRetried <= 0) return;
        }
        catch (InterruptedException e) {
            int remainingRetried2;
            int remainingCompleted2;
            try {
                log.info(String.format("%s: Stop sequence has been interrupted", this.svcQName));
                remainingCompleted2 = this.completedOrFailedEvents.size();
                remainingRetried2 = this.retriedEvents.size();
                if (remainingCompleted2 <= 0 && remainingRetried2 <= 0) return;
            }
            catch (Throwable throwable) {
                int remainingCompleted3 = this.completedOrFailedEvents.size();
                int remainingRetried3 = this.retriedEvents.size();
                if (remainingCompleted3 <= 0 && remainingRetried3 <= 0) throw throwable;
                log.warn(String.format("%s: Stopped queue with %d event/notifications non completed ", this.svcQName, remainingCompleted3 + remainingRetried3));
                throw throwable;
            }
            log.warn(String.format("%s: Stopped queue with %d event/notifications non completed ", this.svcQName, remainingCompleted2 + remainingRetried2));
            return;
        }
        log.warn(String.format("%s: Stopped queue with %d event/notifications non completed ", this.svcQName, remainingCompleted + remainingRetried));
        return;
    }

    public <M extends EventEntryModelDao> void dispatchCompletedOrFailedEvents(M event) {
        this.completedOrFailedEvents.add(event);
    }

    public <M extends EventEntryModelDao> void dispatchRetriedEvents(M event) {
        this.retriedEvents.add(event);
    }

    public abstract DispatchResultMetrics doDispatchEvents();

    public abstract void doProcessCompletedEvents(Iterable<? extends EventEntryModelDao> var1);

    public abstract void doProcessRetriedEvents(Iterable<? extends EventEntryModelDao> var1);

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    public static class DispatchResultMetrics {
        private final int nbEntries;
        private final long timeNanoSec;

        public DispatchResultMetrics(int nbEntries, long timeNanoSec) {
            this.nbEntries = nbEntries;
            this.timeNanoSec = timeNanoSec;
        }

        public int getNbEntries() {
            return this.nbEntries;
        }

        public long getTimeNanoSec() {
            return this.timeNanoSec;
        }
    }
}

