/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.collector;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Queue;
import kieker.common.record.IMonitoringRecord;
import kieker.monitoring.writer.collector.ChunkingCollector;
import kieker.monitoring.writer.raw.IRawDataWriter;
import kieker.monitoring.writer.serializer.IMonitoringRecordSerializer;

class ChunkWriterTask
implements Runnable {
    private final ByteBuffer buffer;
    private final IMonitoringRecordSerializer serializer;
    private final IRawDataWriter writer;
    private final int outputChunkSize;
    private final long deferredWriteDelayNs;
    private volatile long nextWriteTime;
    private final ChunkingCollector collector;

    public ChunkWriterTask(ChunkingCollector collector, int outputChunkSize, int deferredWriteDelayMs, int outputBufferSize, IMonitoringRecordSerializer serializer, IRawDataWriter writer) {
        this.collector = collector;
        this.serializer = serializer;
        this.writer = writer;
        this.outputChunkSize = outputChunkSize;
        this.deferredWriteDelayNs = (long)deferredWriteDelayMs * 1000000L;
        this.buffer = ByteBuffer.allocate(outputBufferSize);
        this.updateNextWriteTime();
    }

    @Override
    public void run() {
        int chunkSize;
        Queue<IMonitoringRecord> queue = this.collector.getRecordQueue();
        int numberOfPendingRecords = queue.size();
        if (numberOfPendingRecords >= (chunkSize = this.outputChunkSize)) {
            do {
                this.writeChunk(queue, chunkSize);
            } while ((numberOfPendingRecords = queue.size()) >= chunkSize);
            this.updateNextWriteTime();
            return;
        }
        long currentTime = System.nanoTime();
        if (numberOfPendingRecords > 0 && currentTime >= this.nextWriteTime) {
            this.writeChunk(queue, numberOfPendingRecords);
            this.updateNextWriteTime(currentTime);
        }
    }

    public void initialize() {
        this.writer.onInitialization();
        this.serializer.onInitialization();
    }

    public void terminate() {
        this.flush();
        this.serializer.onTermination();
        this.writer.onTermination();
    }

    public void flush() {
        int currentChunkSize;
        Queue<IMonitoringRecord> queue = this.collector.getRecordQueue();
        int chunkSize = this.outputChunkSize;
        for (int numberOfPendingRecords = queue.size(); numberOfPendingRecords > 0; numberOfPendingRecords -= currentChunkSize) {
            currentChunkSize = numberOfPendingRecords > chunkSize ? chunkSize : numberOfPendingRecords;
            this.writeChunk(queue, currentChunkSize);
        }
    }

    private void writeChunk(Queue<IMonitoringRecord> queue, int chunkSize) {
        ArrayList<IMonitoringRecord> chunk = new ArrayList<IMonitoringRecord>(chunkSize);
        for (int recordIndex = 0; recordIndex < chunkSize; ++recordIndex) {
            IMonitoringRecord record = queue.poll();
            chunk.add(record);
        }
        ByteBuffer outputBuffer = this.buffer;
        outputBuffer.rewind();
        int bytesWritten = this.serializer.serializeRecords(chunk, outputBuffer);
        this.writer.writeData(outputBuffer, 0, bytesWritten);
    }

    private void updateNextWriteTime() {
        this.updateNextWriteTime(System.nanoTime());
    }

    private void updateNextWriteTime(long currentTime) {
        this.nextWriteTime = currentTime + this.deferredWriteDelayNs;
    }
}

