/*
 * Decompiled with CFR 0.152.
 */
package net.anotheria.anoprise.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import net.anotheria.anoprise.processor.MultiProcessor;
import net.anotheria.anoprise.processor.PackageWorker;
import net.anotheria.anoprise.processor.UnrecoverableQueueOverflowException;
import net.anotheria.anoprise.processor.WorkProcessingListener;
import net.anotheria.anoprise.queue.BoundedFifoQueueFactory;
import net.anotheria.anoprise.queue.EnterpriseQueue;
import net.anotheria.anoprise.queue.EnterpriseQueueFactory;
import net.anotheria.moskito.core.predefined.QueueStats;
import net.anotheria.moskito.core.predefined.QueuingSystemStats;
import net.anotheria.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedMultiProcessor<T>
extends Thread {
    private Logger log;
    private EnterpriseQueueFactory<T> queueFactory;
    private final int queueSize;
    private EnterpriseQueue<T> queue;
    private MultiProcessor<T> processor;
    private final long sleepTime;
    private QueuingSystemStats stats;
    private AtomicBoolean stopQueueing;
    private AtomicBoolean stopImmediately;
    private int packageCapacity;

    public QueuedMultiProcessor(String aName, PackageWorker<T> aWorker, EnterpriseQueueFactory<T> aQueueFactory, int aQueueSize, int aProcessingChannels, long aSleepTime, Logger aLog) {
        super(aName);
        this.setDaemon(true);
        this.log = aLog != null ? aLog : LoggerFactory.getLogger(QueuedMultiProcessor.class);
        this.stats = new QueuingSystemStats(aName);
        this.stats.setServersSize(aProcessingChannels);
        this.stats.setQueueSize(aQueueSize);
        this.queueSize = aQueueSize;
        this.sleepTime = aSleepTime;
        this.queueFactory = aQueueFactory == null ? new BoundedFifoQueueFactory() : aQueueFactory;
        this.packageCapacity = aWorker.packageCapacity();
        this.processor = new MultiProcessor<T>(aProcessingChannels, aWorker, aLog);
        this.processor.addListener(new WorkProcessingListener<T>(){

            @Override
            public void workStarted(List<T> workingPackage) {
            }

            @Override
            public void workFinished(List<T> workingPackage, long workDuration) {
                QueuedMultiProcessor.this.stats.addServicingTime(workDuration);
                QueuedMultiProcessor.this.stats.addServiced();
            }

            @Override
            public void workInterrupted(List<T> workingPackage) {
                QueuedMultiProcessor.this.stats.addError();
            }
        });
        this.init();
    }

    private void init() {
        this.queue = this.queueFactory.createQueue(this.queueSize);
        this.stopQueueing = new AtomicBoolean(false);
        this.stopImmediately = new AtomicBoolean(false);
    }

    public void reset() {
        this.init();
    }

    public void addToQueue(T aElement) throws UnrecoverableQueueOverflowException {
        this.addToQueueDontWait(aElement);
    }

    public void addToQueueAndWait(T element) throws UnrecoverableQueueOverflowException {
        this.addToQueueAndWait(element, 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToQueueAndWait(T element, long timeout) throws UnrecoverableQueueOverflowException {
        this.stats.addArrived();
        if (this.stopQueueing.get()) {
            this.stats.addThrowedAway();
            this.log.error(this.getName() + ": queueing is stopped! Throwing away " + element + ", " + this.getStatsString());
            throw new UnrecoverableQueueOverflowException(this.getName() + ": queueing is stopped! Throwing away " + element + ", " + this.getStatsString());
        }
        long startTime = System.nanoTime();
        while (timeout <= 0L || System.nanoTime() - startTime < timeout) {
            if (this.queue.offer(element)) {
                return;
            }
            EnterpriseQueue<T> enterpriseQueue = this.queue;
            synchronized (enterpriseQueue) {
                this.stats.addWaited();
                long waitStart = System.nanoTime();
                ThreadUtils.waitIgnoreException(this.queue);
                long dur = System.nanoTime() - waitStart;
                this.stats.addWaitingTime(dur);
            }
        }
        this.stats.addThrowedAway();
        this.log.error("Waiting for enqueue timeout. Throwing away " + element + ", " + this.getStatsString());
        throw new UnrecoverableQueueOverflowException("Waiting for enqueue timeout. Throwing away : " + element + ", " + this.getStatsString());
    }

    public void addToQueueDontWait(T element) throws UnrecoverableQueueOverflowException {
        this.addToQueueDontWait(element, 2, 100);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToQueueDontWait(T element, int enqueueTries, int triesDelay) throws UnrecoverableQueueOverflowException {
        this.stats.addArrived();
        if (this.stopQueueing.get()) {
            this.stats.addThrowedAway();
            this.log.error(this.getName() + ": queueing is stopped! Throwing away " + element + ", " + this.getStatsString());
            throw new UnrecoverableQueueOverflowException(this.getName() + ": queueing is stopped! Throwing away " + element + ", " + this.getStatsString());
        }
        for (int i = 0; i < enqueueTries; ++i) {
            if (this.queue.offer(element)) {
                return;
            }
            long waitStart = System.nanoTime();
            QueuedMultiProcessor queuedMultiProcessor = this;
            synchronized (queuedMultiProcessor) {
                ThreadUtils.sleepIgnoreException((long)triesDelay);
            }
            long dur = System.nanoTime() - waitStart;
            this.stats.addWaited();
            this.stats.addWaitingTime(dur);
        }
        this.stats.addThrowedAway();
        this.log.error("couldn't recover from queue overflow, throwing away " + element + ", " + this.getStatsString());
        throw new UnrecoverableQueueOverflowException("Element: " + element + ", " + this.getStatsString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        final AtomicBoolean shutdown = new AtomicBoolean(false);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                QueuedMultiProcessor.this.log.info("Shutting down processor!");
                QueuedMultiProcessor.this.stopQueueing();
                shutdown.set(true);
                AtomicBoolean atomicBoolean = shutdown;
                synchronized (atomicBoolean) {
                    QueuedMultiProcessor.this.log.info("Wait while queue processing complete...");
                    ThreadUtils.waitIgnoreException((Object)shutdown);
                }
                QueuedMultiProcessor.this.log.info("Processor is shutted down!");
                QueuedMultiProcessor.this.log.info(QueuedMultiProcessor.this.getStatsString());
            }
        });
        try {
            while (!this.stopImmediately.get()) {
                T element;
                ArrayList<T> elementsPackage = new ArrayList<T>();
                while (elementsPackage.size() < this.packageCapacity && (element = this.queue.poll()) != null) {
                    elementsPackage.add(element);
                }
                Object object = this.queue;
                synchronized (object) {
                    this.queue.notify();
                }
                if (elementsPackage.size() > 0) {
                    this.processor.process(elementsPackage);
                    continue;
                }
                if (shutdown.get()) {
                    this.processor.shutdown();
                    if (this.processor.isFinished()) {
                        this.log.info("Queue is empty and all works are done. Processing completed!");
                        object = shutdown;
                        synchronized (object) {
                            shutdown.notify();
                            break;
                        }
                    }
                }
                ThreadUtils.sleepIgnoreException((long)this.sleepTime);
            }
        }
        catch (Throwable ttt) {
            try {
                this.log.error("run ", ttt);
            }
            catch (Exception e) {
                System.out.println(QueuedMultiProcessor.class + " Can't log!!!");
                ttt.printStackTrace();
            }
        }
    }

    public List<T> drainQueue() {
        return this.queue.drain();
    }

    public void stopQueueing() {
        this.stopQueueing.set(true);
    }

    public List<T> stopImmediately() {
        this.stopImmediately.set(true);
        return this.drainQueue();
    }

    public boolean isStopped() {
        return this.stopImmediately.get() || this.stopQueueing.get();
    }

    public QueuingSystemStats getProcessorStats() {
        return this.stats;
    }

    public QueueStats getQueueStat() {
        return this.queue.getQueueStats();
    }

    public String getStatsString() {
        return this.getProcessorStats().toStatsString() + ",\nQUEUE: " + this.getQueueStat().toStatsString();
    }
}

