/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.collector;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.classpath.InstantiationFactory;
import kieker.common.util.thread.DaemonThreadFactory;
import kieker.monitoring.core.controller.ReceiveUnfilteredConfiguration;
import kieker.monitoring.writer.AbstractMonitoringWriter;
import kieker.monitoring.writer.collector.ChunkWriterTask;
import kieker.monitoring.writer.raw.IRawDataWriter;
import kieker.monitoring.writer.serializer.IMonitoringRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ReceiveUnfilteredConfiguration
public class ChunkingCollector
extends AbstractMonitoringWriter {
    private static final int NUMBER_OF_WORKERS = 1;
    private static final int DEFAULT_QUEUE_SIZE = 16384;
    private static final int DEFAULT_DEFERRED_WRITE_DELAY = 500;
    private static final int DEFAULT_CHUNK_SIZE = 16;
    private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 65536;
    private static final int DEFAULT_TASK_RUN_INTERVAL = 20;
    private static final String PREFIX = ChunkingCollector.class.getName() + ".";
    public static final String CONFIG_SERIALIZER_CLASSNAME = PREFIX + "serializer";
    public static final String CONFIG_WRITER_CLASSNAME = PREFIX + "writer";
    public static final String CONFIG_DEFERRED_WRITE_DELAY = PREFIX + "deferredWriteDelay";
    public static final String CONFIG_QUEUE_SIZE = PREFIX + "queueSize";
    public static final String CONFIG_CHUNK_SIZE = PREFIX + "chunkSize";
    public static final String CONFIG_OUTPUT_BUFFER_SIZE = PREFIX + "outputBufferSize";
    public static final String CONFIG_TASK_RUN_INTERVAL = PREFIX + "taskRunInterval";
    public static final String CONFIG_QUEUE_TYPE = PREFIX + "queueType";
    private static final TimeUnit TASK_RUN_INTERVAL_TIME_UNIT = TimeUnit.MILLISECONDS;
    private static final Logger LOGGER = LoggerFactory.getLogger(ChunkingCollector.class);
    private final Queue<IMonitoringRecord> recordQueue;
    private final ScheduledExecutorService scheduledExecutor;
    private final int taskRunInterval;
    private final ChunkWriterTask writerTask;

    public ChunkingCollector(Configuration configuration) {
        super(configuration);
        int queueSize = configuration.getIntProperty(CONFIG_QUEUE_SIZE, 16384);
        String queueType = configuration.getStringProperty(CONFIG_QUEUE_TYPE, "");
        this.taskRunInterval = configuration.getIntProperty(CONFIG_TASK_RUN_INTERVAL, 20);
        this.recordQueue = this.createQueue(queueType, queueSize);
        this.scheduledExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
        InstantiationFactory controllerFactory = InstantiationFactory.getInstance(configuration);
        String serializerName = configuration.getStringProperty(CONFIG_SERIALIZER_CLASSNAME);
        IMonitoringRecordSerializer serializer = controllerFactory.createAndInitialize(IMonitoringRecordSerializer.class, serializerName, configuration);
        String writerName = configuration.getStringProperty(CONFIG_WRITER_CLASSNAME);
        IRawDataWriter writer = controllerFactory.createAndInitialize(IRawDataWriter.class, writerName, configuration);
        int deferredWriteDelayMs = configuration.getIntProperty(CONFIG_DEFERRED_WRITE_DELAY, 500);
        int chunkSize = configuration.getIntProperty(CONFIG_CHUNK_SIZE, 16);
        int outputBufferSize = configuration.getIntProperty(CONFIG_OUTPUT_BUFFER_SIZE, 65536);
        this.writerTask = new ChunkWriterTask(this, chunkSize, deferredWriteDelayMs, outputBufferSize, serializer, writer);
    }

    private Queue<IMonitoringRecord> createQueue(String queueTypeName, int queueSize) {
        if (queueTypeName == null) {
            return this.createDefaultQueue(queueSize);
        }
        try {
            Class<?> queueClass = Class.forName(queueTypeName);
            Constructor<?> constructor = queueClass.getConstructor(Integer.TYPE);
            return (Queue)constructor.newInstance(queueSize);
        }
        catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            LOGGER.error("Error instantiating queue type {}. Using default queue type instead.", (Object)queueTypeName, (Object)e);
            return this.createDefaultQueue(queueSize);
        }
    }

    private Queue<IMonitoringRecord> createDefaultQueue(int queueSize) {
        return new ArrayBlockingQueue<IMonitoringRecord>(queueSize);
    }

    @Override
    public void onStarting() {
        this.scheduledExecutor.scheduleAtFixedRate(this.writerTask, 0L, this.taskRunInterval, TASK_RUN_INTERVAL_TIME_UNIT);
        this.writerTask.initialize();
    }

    @Override
    public void onTerminating() {
        this.scheduledExecutor.shutdown();
        try {
            this.scheduledExecutor.awaitTermination(Long.MAX_VALUE, TASK_RUN_INTERVAL_TIME_UNIT);
        }
        catch (InterruptedException e) {
            LOGGER.warn("Awaiting termination of the scheduled executor was interrupted.", e);
        }
        this.writerTask.terminate();
    }

    private boolean enqueueRecord(IMonitoringRecord record) {
        for (int tryNumber = 0; tryNumber < 10; ++tryNumber) {
            boolean recordEnqueued = this.recordQueue.offer(record);
            if (!recordEnqueued) continue;
            return true;
        }
        LOGGER.error("Failed to add new monitoring record to queue (maximum number of attempts reached).");
        return false;
    }

    @Override
    public void writeMonitoringRecord(IMonitoringRecord record) {
        this.enqueueRecord(record);
    }

    public Queue<IMonitoringRecord> getRecordQueue() {
        return this.recordQueue;
    }
}

