/*
 * Decompiled with CFR 0.152.
 */
package io.smartcat.cassandra.diagnostics.connector;

import io.smartcat.cassandra.diagnostics.connector.ConnectorConfiguration;
import io.smartcat.cassandra.diagnostics.connector.QueryReporter;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventProcessor.class);
    private static final AtomicLong THREAD_COUNT = new AtomicLong(0L);
    private ThreadPoolExecutor executor;
    protected QueryReporter queryReporter;
    protected ConnectorConfiguration configuration;
    private static boolean queueOverflow = false;

    public AbstractEventProcessor(QueryReporter queryReporter, ConnectorConfiguration configuration) {
        this.queryReporter = queryReporter;
        this.configuration = configuration;
        this.executor = new ThreadPoolExecutor(configuration.numWorkerThreads, configuration.numWorkerThreads, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName("cassandra-diagnostics-connector-" + THREAD_COUNT.getAndIncrement());
                thread.setDaemon(true);
                thread.setPriority(1);
                return thread;
            }
        });
    }

    protected void report(Runnable reportAction) {
        int numQueuedEvents = this.executor.getQueue().size();
        if (!queueOverflow) {
            this.executor.submit(reportAction);
            if (numQueuedEvents > this.configuration.queuedEventsOverflowThreshold) {
                queueOverflow = true;
                logger.warn("Event queue overflown. Until relaxed, further events will be dropped.");
            }
        } else if (numQueuedEvents <= this.configuration.queuedEventsRelaxThreshold) {
            queueOverflow = false;
            logger.info("Event queue relaxed. Further events will be accepted and processed.");
        } else {
            logger.trace("Event queue overflown. Event is dropped.");
        }
    }
}

