/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ranger.audit.provider;

import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.MultiDestAuditProvider;

public class AsyncAuditProvider
extends MultiDestAuditProvider
implements Runnable {
    private static final Log LOG = LogFactory.getLog(AsyncAuditProvider.class);
    private static int sThreadCount = 0;
    private BlockingQueue<AuditEventBase> mQueue = null;
    private Thread mThread = null;
    private String mName = null;
    private int mMaxQueueSize = 10240;
    private int mMaxFlushInterval = 5000;
    private static final int mStopLoopIntervalSecs = 1;
    private static final int mWaitToCompleteLoopIntervalSecs = 1;
    private AtomicLong lifeTimeInLogCount = new AtomicLong(0L);
    private AtomicLong lifeTimeOutLogCount = new AtomicLong(0L);
    private AtomicLong lifeTimeDropCount = new AtomicLong(0L);
    private AtomicLong intervalInLogCount = new AtomicLong(0L);
    private AtomicLong intervalOutLogCount = new AtomicLong(0L);
    private AtomicLong intervalDropCount = new AtomicLong(0L);
    private long lastIntervalLogTime = System.currentTimeMillis();
    private int intervalLogDurationMS = 60000;
    private long lastFlushTime = System.currentTimeMillis();

    public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval) {
        LOG.info((Object)("AsyncAuditProvider(" + name + "): creating.."));
        if (maxQueueSize < 1) {
            LOG.warn((Object)("AsyncAuditProvider(" + name + "): invalid maxQueueSize=" + maxQueueSize + ". will use default " + this.mMaxQueueSize));
            maxQueueSize = this.mMaxQueueSize;
        }
        this.mName = name;
        this.mMaxQueueSize = maxQueueSize;
        this.mMaxFlushInterval = maxFlushInterval;
        this.mQueue = new ArrayBlockingQueue<AuditEventBase>(this.mMaxQueueSize);
    }

    public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider) {
        this(name, maxQueueSize, maxFlushInterval);
        this.addAuditProvider(provider);
    }

    @Override
    public void init(Properties props) {
        LOG.info((Object)("AsyncAuditProvider(" + this.mName + ").init()"));
        super.init(props);
    }

    public int getIntervalLogDurationMS() {
        return this.intervalLogDurationMS;
    }

    public void setIntervalLogDurationMS(int intervalLogDurationMS) {
        this.intervalLogDurationMS = intervalLogDurationMS;
    }

    @Override
    public boolean log(AuditEventBase event) {
        LOG.debug((Object)"AsyncAuditProvider.logEvent(AuditEventBase)");
        this.queueEvent(event);
        return true;
    }

    @Override
    public void start() {
        this.mThread = new Thread((Runnable)this, "AsyncAuditProvider" + ++sThreadCount);
        this.mThread.setDaemon(true);
        this.mThread.start();
        super.start();
    }

    @Override
    public void stop() {
        LOG.info((Object)"==> AsyncAuditProvider.stop()");
        try {
            LOG.info((Object)("Interrupting child thread of " + this.mName + "..."));
            this.mThread.interrupt();
            while (this.mThread.isAlive()) {
                try {
                    LOG.info((Object)String.format("Waiting for child thread of %s to exit.  Sleeping for %d secs", this.mName, 1));
                    this.mThread.join(1000L);
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)"Interrupted while waiting for child thread to join!  Proceeding with stop", (Throwable)e);
                    break;
                }
            }
            super.stop();
        }
        finally {
            LOG.info((Object)"<== AsyncAuditProvider.stop()");
        }
    }

    @Override
    public void waitToComplete() {
        this.waitToComplete(0L);
        super.waitToComplete();
    }

    @Override
    public void run() {
        LOG.info((Object)"==> AsyncAuditProvider.run()");
        while (true) {
            AuditEventBase event = null;
            try {
                event = this.dequeueEvent();
                if (event != null) {
                    super.log(event);
                    continue;
                }
                this.lastFlushTime = System.currentTimeMillis();
                this.flush();
                continue;
            }
            catch (InterruptedException excp) {
                LOG.info((Object)"AsyncAuditProvider.run - Interrupted!  Breaking out of while loop.");
            }
            catch (Exception excp) {
                this.logFailedEvent(event, (Throwable)excp);
                continue;
            }
            break;
        }
        try {
            this.lastFlushTime = System.currentTimeMillis();
            this.flush();
        }
        catch (Exception excp) {
            LOG.error((Object)"AsyncAuditProvider.run()", (Throwable)excp);
        }
        LOG.info((Object)"<== AsyncAuditProvider.run()");
    }

    private void queueEvent(AuditEventBase event) {
        this.lifeTimeInLogCount.incrementAndGet();
        this.intervalInLogCount.incrementAndGet();
        if (!this.mQueue.offer(event)) {
            this.lifeTimeDropCount.incrementAndGet();
            this.intervalDropCount.incrementAndGet();
        }
    }

    private AuditEventBase dequeueEvent() throws InterruptedException {
        AuditEventBase ret = (AuditEventBase)this.mQueue.poll();
        while (ret == null) {
            this.logSummaryIfRequired();
            if (this.mMaxFlushInterval > 0) {
                long timeTillNextFlush = this.getTimeTillNextFlush();
                if (timeTillNextFlush <= 0L) break;
                ret = this.mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
                continue;
            }
            long waitTime = (long)this.intervalLogDurationMS - (System.currentTimeMillis() - this.lastIntervalLogTime);
            waitTime = waitTime <= 0L ? (long)this.intervalLogDurationMS : waitTime;
            ret = this.mQueue.poll(waitTime, TimeUnit.MILLISECONDS);
        }
        if (ret != null) {
            this.lifeTimeOutLogCount.incrementAndGet();
            this.intervalOutLogCount.incrementAndGet();
        }
        this.logSummaryIfRequired();
        return ret;
    }

    private void logSummaryIfRequired() {
        long intervalSinceLastLog = System.currentTimeMillis() - this.lastIntervalLogTime;
        if (intervalSinceLastLog > (long)this.intervalLogDurationMS) {
            if (this.intervalInLogCount.get() > 0L || this.intervalOutLogCount.get() > 0L) {
                long queueSize = this.mQueue.size();
                LOG.info((Object)("AsyncAuditProvider-stats:" + this.mName + ": past " + this.formatIntervalForLog(intervalSinceLastLog) + ": inLogs=" + this.intervalInLogCount.get() + ", outLogs=" + this.intervalOutLogCount.get() + ", dropped=" + this.intervalDropCount.get() + ", currentQueueSize=" + queueSize));
                LOG.info((Object)("AsyncAuditProvider-stats:" + this.mName + ": process lifetime: inLogs=" + this.lifeTimeInLogCount.get() + ", outLogs=" + this.lifeTimeOutLogCount.get() + ", dropped=" + this.lifeTimeDropCount.get()));
            }
            this.lastIntervalLogTime = System.currentTimeMillis();
            this.intervalInLogCount.set(0L);
            this.intervalOutLogCount.set(0L);
            this.intervalDropCount.set(0L);
        }
    }

    private boolean isEmpty() {
        return this.mQueue.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void waitToComplete(long maxWaitSeconds) {
        LOG.debug((Object)"==> AsyncAuditProvider.waitToComplete()");
        try {
            for (long waitTime = 0L; !(this.isEmpty() || maxWaitSeconds > 0L && maxWaitSeconds <= waitTime); ++waitTime) {
                try {
                    LOG.info((Object)String.format("%d messages yet to be flushed by %s.  Sleeoping for %d sec", this.mQueue.size(), this.mName, 1));
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException excp) {
                    LOG.warn((Object)("Caught interrupted exception! " + this.mQueue.size() + " messages still unflushed!  Won't wait for queue to flush, exiting..."), (Throwable)excp);
                    break;
                }
            }
        }
        finally {
            LOG.debug((Object)"<== AsyncAuditProvider.waitToComplete()");
        }
    }

    private long getTimeTillNextFlush() {
        long timeTillNextFlush = this.mMaxFlushInterval;
        if (this.mMaxFlushInterval > 0 && this.lastFlushTime != 0L) {
            long timeSinceLastFlush = System.currentTimeMillis() - this.lastFlushTime;
            timeTillNextFlush = timeSinceLastFlush >= (long)this.mMaxFlushInterval ? 0L : (long)this.mMaxFlushInterval - timeSinceLastFlush;
        }
        return timeTillNextFlush;
    }
}

