/*
 * Decompiled with CFR 0.152.
 */
package com.xceptance.xlt.report;

import com.xceptance.common.util.SynchronizingCounter;
import com.xceptance.common.util.concurrent.DaemonThreadFactory;
import com.xceptance.xlt.engine.util.TimerUtils;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public abstract class AbstractReader<T> {
    private static final int MAX_CHUNK_SIZE = 1000;
    private final Thread processorThread;
    private List<T> workList = new ArrayList<T>(1000);
    private final ArrayBlockingQueue<List<T>> parsedDataRecordChunkQueue = new ArrayBlockingQueue(100);
    private final ArrayBlockingQueue<List<T>> preprocessedDataRecordChunkQueue = new ArrayBlockingQueue(100);
    private final ExecutorService preprocessorExecutor;
    private final SynchronizingCounter chunksToBeProcessed = new SynchronizingCounter();
    private int lineCount = 0;
    private long overallStartTime;
    private long readTime = 0L;

    public AbstractReader(String processorThreadName) {
        this(processorThreadName, 1);
    }

    public AbstractReader(String processorThreadName, int preprocessorThreadCount) {
        this.processorThread = new Thread(new Processor());
        if (processorThreadName != null) {
            this.processorThread.setName(processorThreadName);
        }
        this.processorThread.setDaemon(true);
        this.processorThread.start();
        this.preprocessorExecutor = Executors.newFixedThreadPool(preprocessorThreadCount, new DaemonThreadFactory("DataRecordPreprocessor-"));
        for (int i = 0; i < preprocessorThreadCount; ++i) {
            this.preprocessorExecutor.execute(new Preprocessor());
        }
    }

    public void read(BufferedReader bufferedReader) throws Exception {
        long startTime = TimerUtils.getTime();
        String line = bufferedReader.readLine();
        while (line != null) {
            ++this.lineCount;
            T t = this.processLine(line);
            if (t != null) {
                this.addToChunk(t);
            }
            line = bufferedReader.readLine();
        }
        this.finishChunk();
        this.readTime = TimerUtils.getTime() - startTime;
    }

    public void cleanUp() {
        this.processorThread.interrupt();
        this.preprocessorExecutor.shutdownNow();
    }

    private void printOverallStatistics() {
        System.out.printf("Data records read: %,d (%,d ms)\n", this.getLineCount(), TimerUtils.getTime() - this.getOverallStartTime());
    }

    protected int getLineCount() {
        return this.lineCount;
    }

    protected long getReadTime() {
        return this.readTime;
    }

    protected abstract T processLine(String var1);

    protected void preprocessLineResult(T t) {
    }

    protected abstract void processLineResult(T var1);

    protected void setOverallStartTime(long overallStartTime) {
        this.overallStartTime = overallStartTime;
    }

    protected long getOverallStartTime() {
        return this.overallStartTime;
    }

    protected void waitForDataRecordProcessingToComplete() throws InterruptedException {
        this.chunksToBeProcessed.awaitZero();
        this.printOverallStatistics();
    }

    protected void addToChunk(T t) throws Exception {
        this.workList.add(t);
        if (this.workList.size() == 1000) {
            this.addChunkToQueue();
        }
    }

    protected void finishChunk() throws Exception {
        if (!this.workList.isEmpty()) {
            this.addChunkToQueue();
        }
    }

    private void addChunkToQueue() throws Exception {
        this.chunksToBeProcessed.increment();
        this.enqueueItem(this.workList, this.parsedDataRecordChunkQueue, "parsedDataRecordChunkQueue");
        this.workList = new ArrayList<T>(1000);
    }

    private <E> void enqueueItem(E item, BlockingQueue<E> queue, String queueName) throws InterruptedException {
        queue.put(item);
    }

    private <E> E dequeueItem(BlockingQueue<E> queue, String queueName) throws InterruptedException {
        return queue.take();
    }

    private class Processor
    implements Runnable {
        private Processor() {
        }

        @Override
        public void run() {
            try {
                while (true) {
                    List dataRecords = AbstractReader.this.dequeueItem(AbstractReader.this.preprocessedDataRecordChunkQueue, "preprocessedDataRecordChunkQueue");
                    int length = dataRecords.size();
                    for (int i = 0; i < length; ++i) {
                        AbstractReader.this.processLineResult(dataRecords.get(i));
                    }
                    AbstractReader.this.chunksToBeProcessed.decrement();
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }

    private class Preprocessor
    implements Runnable {
        private Preprocessor() {
        }

        @Override
        public void run() {
            try {
                while (true) {
                    List dataRecords = AbstractReader.this.dequeueItem(AbstractReader.this.parsedDataRecordChunkQueue, "parsedDataRecordChunkQueue");
                    int length = dataRecords.size();
                    for (int i = 0; i < length; ++i) {
                        AbstractReader.this.preprocessLineResult(dataRecords.get(i));
                    }
                    AbstractReader.this.enqueueItem(dataRecords, AbstractReader.this.preprocessedDataRecordChunkQueue, "preprocessedDataRecordChunkQueue");
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }
}

