/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.ps.xqsync;

import com.marklogic.ps.SimpleLogger;
import com.marklogic.ps.timing.TimedEvent;
import com.marklogic.ps.timing.Timer;
import com.marklogic.ps.xqsync.Configuration;
import com.marklogic.ps.xqsync.FatalException;
import com.marklogic.ps.xqsync.SyncException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Monitor
extends Thread {
    protected static final int DISPLAY_MILLIS = 60000;
    protected static final int FUTURE_MILLIS = 900000;
    protected static final int SLEEP_MILLIS = 500;
    protected SimpleLogger logger;
    protected boolean running = true;
    protected ThreadPoolExecutor pool;
    protected final CompletionService<TimedEvent[]> completionService;
    protected boolean fatalErrors = true;
    protected Timer timer;
    protected long taskCount = 0L;
    protected boolean taskCountFinal = false;
    protected final Object taskCountMutex = new Object();
    protected final Configuration config;

    public Monitor(Configuration config, ThreadPoolExecutor pool, CompletionService<TimedEvent[]> cs, boolean fatalErrors) {
        super("MonitorThread");
        this.config = config;
        this.completionService = cs;
        this.pool = pool;
        this.logger = config.getLogger();
        this.fatalErrors = fatalErrors;
    }

    @Override
    public void run() {
        if (null == this.logger) {
            throw new NullPointerException("must call setLogger");
        }
        try {
            this.logger.info("starting");
            this.monitor();
            Thread.yield();
        }
        catch (Exception e) {
            if (e instanceof ExecutionException) {
                this.logger.logException("fatal execution error", e.getCause());
            } else {
                this.logger.logException("fatal error", e);
            }
            System.exit(-1);
        }
        finally {
            this.pool.shutdownNow();
            this.running = false;
            this.logger.info("exiting after " + this.timer.getEventCount() + "/" + this.taskCount + ", " + this.timer.getProgressMessage());
        }
    }

    public void halt(Throwable t) {
        this.logger.logException("halting", t);
        this.running = false;
        this.pool.shutdownNow();
    }

    protected void monitor() throws ExecutionException {
        int futureMillis = 900000;
        Future<TimedEvent[]> future = null;
        long currentMillis = System.currentTimeMillis();
        long lastDisplayMillis = 0L;
        long lastFutureMillis = currentMillis;
        TimedEvent[] lastEvent = null;
        this.logger.finest("looping every 500, core=" + this.pool.getCorePoolSize() + ", active=" + this.pool.getActiveCount() + ", tasks=" + this.taskCount);
        this.timer = new Timer();
        do {
            Thread.yield();
            do {
                String currMsg;
                block10: {
                    try {
                        future = this.completionService.poll(500L, TimeUnit.MILLISECONDS);
                        if (null == future) break block10;
                        lastFutureMillis = System.currentTimeMillis();
                        try {
                            lastEvent = future.get();
                            if (null == lastEvent) {
                                throw new FatalException("unexpected null event");
                            }
                            for (TimedEvent timedEvent : lastEvent) {
                                if (null == timedEvent) continue;
                                this.timer.add(timedEvent, false);
                            }
                        }
                        catch (ExecutionException e) {
                            if (this.fatalErrors) {
                                throw e;
                            }
                            Throwable cause = e.getCause();
                            if (cause instanceof FatalException) {
                                throw (FatalException)cause;
                            }
                            this.logger.logException("non-fatal", e);
                            this.timer.incrementEventCount(false);
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                        this.logger.logException("interrupted in poll() or get()", e);
                        Thread.currentThread().interrupt();
                        continue;
                    }
                }
                currentMillis = System.currentTimeMillis();
                if (currentMillis - lastDisplayMillis <= 60000L) continue;
                lastDisplayMillis = currentMillis;
                this.logger.finer("thread count: core=" + this.pool.getCorePoolSize() + ", active=" + this.pool.getActiveCount() + ", tasks=" + this.taskCount);
                if (null == lastEvent) continue;
                this.logger.info("" + this.timer.getEventCount() + "/" + this.taskCount + ", " + this.timer.getProgressMessage(false) + ", " + lastEvent[0].getDescription());
                if (!this.config.doPrintCurrRate() || (currMsg = this.timer.getCurrProgressMessage()) == null) continue;
                this.logger.info(currMsg);
            } while (null != future);
            this.logger.finer("running = " + this.running + ", terminated = " + this.pool.isTerminated() + ", last future = " + lastFutureMillis);
            if (currentMillis - lastFutureMillis <= (long)futureMillis) continue;
            this.logger.warning("no futures received in over " + futureMillis + " ms");
            break;
        } while (this.running && !this.pool.isTerminated());
    }

    public void setLogger(SimpleLogger logger) {
        this.logger = logger;
    }

    public void setPool(ThreadPoolExecutor pool) {
        this.pool = pool;
    }

    public void incrementTaskCount() {
        if (this.taskCountFinal) {
            this.logger.logException("BUG!", new SyncException("increment to final task count"));
            return;
        }
        ++this.taskCount;
    }

    public long getTaskCount() {
        return this.taskCount;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setFinalTaskCount(long count) {
        Object object = this.taskCountMutex;
        synchronized (object) {
            if (this.taskCountFinal) {
                throw new FatalException("BUG!", new SyncException("setter on final task count " + count));
            }
            if (count != this.taskCount) {
                throw new FatalException("BUG!", new SyncException("setter on final task count " + count + " != " + this.taskCount));
            }
            this.logger.fine("setting " + count);
            this.taskCountFinal = true;
        }
    }

    public void checkThrottle() {
        if (!this.config.isThrottled()) {
            return;
        }
        double throttledEventsPerSecond = this.config.getThrottledEventsPerSecond();
        boolean isEvents = throttledEventsPerSecond > 0.0;
        int throttledBytesPerSecond = isEvents ? 0 : this.config.getThrottledBytesPerSecond();
        this.logger.fine("throttling " + (isEvents ? this.timer.getEventsPerSecond() + " tps to " + throttledEventsPerSecond + " tps" : this.timer.getBytesPerSecond() + " B/sec to " + throttledBytesPerSecond + " B/sec"));
        while (throttledEventsPerSecond > 0.0 && throttledEventsPerSecond < this.timer.getEventsPerSecond() || throttledBytesPerSecond > 0 && (double)throttledBytesPerSecond < this.timer.getBytesPerSecond()) {
            long sleepMillis = isEvents ? (long)Math.ceil(1000.0 * ((double)this.timer.getEventCount() / throttledEventsPerSecond - this.timer.getDurationSeconds())) : (long)Math.ceil(1000.0 * ((double)(this.timer.getBytes() / (long)throttledBytesPerSecond) - this.timer.getDurationSeconds()));
            sleepMillis = Math.max(sleepMillis, 1L);
            this.logger.finer("sleeping " + sleepMillis);
            try {
                Thread.sleep(sleepMillis);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.logger.logException("interrupted", e);
            }
        }
        this.logger.fine("throttled to " + (isEvents ? this.timer.getEventsPerSecond() + " tps" : this.timer.getBytesPerSecond() + " B/sec"));
    }
}

