/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.client.MongoDatabase;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeStateEntryBatch;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PathElementComparator;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMergeSortTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedSortBatchTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedTransformTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.TransformStageStatistics;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelinedStrategy
extends IndexStoreSortStrategyBase {
    public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB = "oak.indexer.pipelined.mongoDocBatchMaxSizeMB";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB = 4;
    public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS = "oak.indexer.pipelined.mongoDocBatchMaxNumberOfDocuments";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS = 10000;
    public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB = "oak.indexer.pipelined.mongoDocQueueReservedMemoryMB";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB = 128;
    public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2;
    public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0;
    public static final String OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE = "oak.indexer.pipelined.sortBufferMemoryPercentage";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE = 25;
    static final NodeDocument[] SENTINEL_MONGO_DOCUMENT = new NodeDocument[0];
    static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
    static final Path SENTINEL_SORTED_FILES_QUEUE = Paths.get("SENTINEL", new String[0]);
    static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
    static final char FLATFILESTORE_LINE_SEPARATOR = '\n';
    static final byte FLATFILESTORE_DELIMITER = 124;
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedStrategy.class);
    private static final int MIN_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB = 16;
    private static final int MIN_AUTODETECT_WORKING_MEMORY_MB = 128;
    private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 32;
    private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
    private final MongoDocumentStore docStore;
    private final MongoDatabase mongoDatabase;
    private final DocumentNodeStore documentNodeStore;
    private final RevisionVector rootRevision;
    private final BlobStore blobStore;
    private final PathElementComparator pathComparator;
    private final List<PathFilter> pathFilters;
    private final StatisticsProvider statisticsProvider;
    private final int numberOfTransformThreads;
    private final int mongoDocQueueSize;
    private final int mongoDocBatchMaxSizeMB;
    private final int mongoDocBatchMaxNumberOfDocuments;
    private final int nseBuffersCount;
    private final int nseBuffersSizeBytes;
    private long nodeStateEntriesExtracted;

    private static <T> void printStatistics(ArrayBlockingQueue<T[]> mongoDocQueue, ArrayBlockingQueue<NodeStateEntryBatch> emptyBuffersQueue, ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue, ArrayBlockingQueue<Path> sortedFilesQueue, TransformStageStatistics transformStageStatistics, boolean printHistogramsAtInfo) {
        String queueSizeStats = MetricsFormatter.newBuilder().add("mongoDocQueue", mongoDocQueue.size()).add("emptyBuffersQueue", emptyBuffersQueue.size()).add("nonEmptyBuffersQueue", nonEmptyBuffersQueue.size()).add("sortedFilesQueue", sortedFilesQueue.size()).build();
        LOG.info("Queue sizes: {}", (Object)queueSizeStats);
        LOG.info("Transform stats: {}", (Object)transformStageStatistics.formatStats());
        PipelinedStrategy.prettyPrintTransformStatisticsHistograms(transformStageStatistics, printHistogramsAtInfo);
    }

    private static void prettyPrintTransformStatisticsHistograms(TransformStageStatistics transformStageStatistics, boolean printHistogramAtInfo) {
        if (printHistogramAtInfo) {
            LOG.info("Top hidden paths rejected: {}", (Object)transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
            LOG.info("Top paths filtered: {}", (Object)transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
            LOG.info("Top empty node state documents: {}", (Object)transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
        } else {
            LOG.debug("Top hidden paths rejected: {}", (Object)transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
            LOG.debug("Top paths filtered: {}", (Object)transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
            LOG.debug("Top empty node state documents: {}", (Object)transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
        }
    }

    public PipelinedStrategy(MongoDocumentStore documentStore, MongoDatabase mongoDatabase, DocumentNodeStore documentNodeStore, RevisionVector rootRevision, Set<String> preferredPathElements, BlobStore blobStore, File storeDir, Compression algorithm, Predicate<String> pathPredicate, List<PathFilter> pathFilters, String checkpoint, StatisticsProvider statisticsProvider) {
        super(storeDir, algorithm, pathPredicate, preferredPathElements, checkpoint);
        this.docStore = documentStore;
        this.mongoDatabase = mongoDatabase;
        this.documentNodeStore = documentNodeStore;
        this.rootRevision = rootRevision;
        this.blobStore = blobStore;
        this.pathComparator = new PathElementComparator(preferredPathElements);
        this.pathFilters = pathFilters;
        this.statisticsProvider = statisticsProvider;
        Preconditions.checkState((boolean)documentStore.isReadOnly(), (Object)"Traverser can only be used with readOnly store");
        int mongoDocQueueReservedMemoryMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB, 128);
        Preconditions.checkArgument((mongoDocQueueReservedMemoryMB >= 16 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.mongoDocQueueReservedMemoryMB: " + mongoDocQueueReservedMemoryMB + ". Must be >= 16"));
        this.mongoDocBatchMaxSizeMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB, 4);
        Preconditions.checkArgument((this.mongoDocBatchMaxSizeMB > 0 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.mongoDocBatchMaxSizeMB: " + this.mongoDocBatchMaxSizeMB + ". Must be > 0"));
        this.mongoDocBatchMaxNumberOfDocuments = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_NUMBER_OF_DOCUMENTS, 10000);
        Preconditions.checkArgument((this.mongoDocBatchMaxNumberOfDocuments > 0 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.mongoDocBatchMaxNumberOfDocuments: " + this.mongoDocBatchMaxNumberOfDocuments + ". Must be > 0"));
        this.numberOfTransformThreads = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_TRANSFORM_THREADS, 2);
        Preconditions.checkArgument((this.numberOfTransformThreads > 0 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.transformThreads: " + this.numberOfTransformThreads + ". Must be > 0"));
        int sortBufferMemoryPercentage = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE, 25);
        Preconditions.checkArgument((sortBufferMemoryPercentage > 0 && sortBufferMemoryPercentage <= 100 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.sortBufferMemoryPercentage: " + this.numberOfTransformThreads + ". Must be between 1 and 100"));
        Preconditions.checkArgument((mongoDocQueueReservedMemoryMB >= 8 * this.mongoDocBatchMaxSizeMB ? 1 : 0) != 0, (Object)("Invalid values for properties oak.indexer.pipelined.mongoDocQueueReservedMemoryMB and oak.indexer.pipelined.mongoDocBatchMaxSizeMB: oak.indexer.pipelined.mongoDocQueueReservedMemoryMB must be at least 8x oak.indexer.pipelined.mongoDocBatchMaxSizeMB, but are " + mongoDocQueueReservedMemoryMB + " and " + this.mongoDocBatchMaxSizeMB + ", respectively"));
        this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / this.mongoDocBatchMaxSizeMB;
        int nseWorkingMemoryMB = this.readNSEBuffersReservedMemory();
        this.nseBuffersCount = 1 + this.numberOfTransformThreads;
        long nseWorkingMemoryBytes = (long)nseWorkingMemoryMB * 0x100000L;
        long memoryReservedForSortKeysArray = PipelinedStrategy.estimateMaxSizeOfSortKeyArray(nseWorkingMemoryBytes, this.nseBuffersCount, sortBufferMemoryPercentage);
        long memoryReservedForBuffers = nseWorkingMemoryBytes - memoryReservedForSortKeysArray;
        this.nseBuffersSizeBytes = PipelinedStrategy.limitToIntegerRange(memoryReservedForBuffers / (long)this.nseBuffersCount);
        if ((long)this.nseBuffersSizeBytes < 0x2000000L) {
            throw new IllegalArgumentException("Entry batch buffer size too small: " + this.nseBuffersSizeBytes + " bytes. Must be at least 32 MB. To increase the size of the buffers, either increase the size of the working memory region (system property oak.indexer.pipelined.workingMemoryMB) or decrease the number of transform threads (oak.indexer.pipelined.transformThreads)");
        }
        LOG.info("MongoDocumentQueue: [ reservedMemory: {} MB, batchMaxSize: {} MB, queueSize: {} (reservedMemory/batchMaxSize) ]", new Object[]{mongoDocQueueReservedMemoryMB, this.mongoDocBatchMaxSizeMB, this.mongoDocQueueSize});
        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, numberOfBuffers: {}, bufferSize: {}, sortBufferReservedMemory: {} ]", new Object[]{nseWorkingMemoryMB, this.nseBuffersCount, IOUtils.humanReadableByteCountBin((long)this.nseBuffersSizeBytes), IOUtils.humanReadableByteCountBin((long)memoryReservedForSortKeysArray)});
    }

    static long estimateMaxSizeOfSortKeyArray(long nseWorkingMemoryBytes, long nseBuffersCount, int sortBufferMemoryPercentage) {
        long approxNseBufferSize = PipelinedStrategy.limitToIntegerRange(nseWorkingMemoryBytes / nseBuffersCount);
        return approxNseBufferSize * (long)sortBufferMemoryPercentage / 100L;
    }

    private int readNSEBuffersReservedMemory() {
        int workingMemoryMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB, 0);
        Preconditions.checkArgument((workingMemoryMB >= 0 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.workingMemoryMB: " + workingMemoryMB + ". Must be >= 0"));
        if (workingMemoryMB == 0) {
            return this.autodetectWorkingMemoryMB();
        }
        return workingMemoryMB;
    }

    private int autodetectWorkingMemoryMB() {
        int maxHeapSizeMB = (int)(Runtime.getRuntime().maxMemory() / 0x100000L);
        int workingMemoryMB = maxHeapSizeMB - 2048;
        LOG.info("Auto detecting working memory. Maximum heap size: {} MB, selected working memory: {} MB", (Object)maxHeapSizeMB, (Object)workingMemoryMB);
        if (workingMemoryMB > 4000) {
            LOG.warn("Auto-detected value for working memory too high, setting to the maximum allowed for auto-detection: {} MB", (Object)4000);
            return 4000;
        }
        if (workingMemoryMB < 128) {
            LOG.warn("Auto-detected value for working memory too low, setting to the minimum allowed for auto-detection: {} MB", (Object)128);
            return 128;
        }
        return workingMemoryMB;
    }

    private static int limitToIntegerRange(long bufferSizeBytes) {
        if (bufferSizeBytes > Integer.MAX_VALUE) {
            int truncatedBufferSize = 0x7FFFFFEF;
            LOG.warn("Computed buffer size too big: {}, exceeds Integer.MAX_VALUE. Truncating to: {}", (Object)bufferSizeBytes, (Object)truncatedBufferSize);
            return truncatedBufferSize;
        }
        return (int)bufferSizeBytes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public File createSortedStoreFile() throws IOException {
        int numberOfThreads = 1 + this.numberOfTransformThreads + 1 + 1;
        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads, new ThreadFactoryBuilder().setNameFormat("mongo-dump").setDaemon(true).build());
        ExecutorCompletionService<Object> ecs = new ExecutorCompletionService<Object>(threadPool);
        ScheduledExecutorService monitorThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("monitor").setDaemon(true).build());
        try {
            ArrayBlockingQueue<T[]> mongoDocQueue = new ArrayBlockingQueue<T[]>(this.mongoDocQueueSize);
            ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<NodeStateEntryBatch>(this.nseBuffersCount);
            ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<NodeStateEntryBatch>(this.nseBuffersCount + 1);
            ArrayBlockingQueue<Path> sortedFilesQueue = new ArrayBlockingQueue<Path>(64);
            TransformStageStatistics transformStageStatistics = new TransformStageStatistics();
            ScheduledFuture<?> monitorFuture = monitorThreadPool.scheduleWithFixedDelay(new MonitorTask(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics), 10L, 30L, TimeUnit.SECONDS);
            for (int i = 0; i < this.nseBuffersCount; ++i) {
                emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(this.nseBuffersSizeBytes, Integer.MAX_VALUE));
            }
            LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
            Stopwatch start = Stopwatch.createStarted();
            ecs.submit(new PipelinedMongoDownloadTask(this.mongoDatabase, this.docStore, (int)((long)this.mongoDocBatchMaxSizeMB * 0x100000L), this.mongoDocBatchMaxNumberOfDocuments, mongoDocQueue, this.pathFilters, this.statisticsProvider));
            for (int i = 0; i < this.numberOfTransformThreads; ++i) {
                NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(this.blobStore);
                ecs.submit(new PipelinedTransformTask(this.docStore, this.documentNodeStore, this.rootRevision, this.getPathPredicate(), entryWriter, mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, transformStageStatistics));
            }
            ecs.submit(new PipelinedSortBatchTask(this.getStoreDir().toPath(), this.pathComparator, this.getAlgorithm(), emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue));
            PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(this.getStoreDir().toPath(), this.pathComparator, this.getAlgorithm(), sortedFilesQueue, this.statisticsProvider);
            ecs.submit(mergeSortTask);
            Path flatFileStore = null;
            try {
                LOG.info("Waiting for tasks to complete");
                int transformTasksFinished = 0;
                for (int tasksFinished = 0; tasksFinished < numberOfThreads; ++tasksFinished) {
                    Future completedTask = ecs.take();
                    try {
                        Object result = completedTask.get();
                        if (result instanceof PipelinedMongoDownloadTask.Result) {
                            PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result)result;
                            LOG.info("Download task finished. Documents downloaded: {}", (Object)downloadResult.getDocumentsDownloaded());
                            for (int i = 0; i < this.numberOfTransformThreads; ++i) {
                                mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
                            }
                            mergeSortTask.stopEagerMerging();
                            MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_documentsDownloaded", (long)downloadResult.getDocumentsDownloaded());
                            continue;
                        }
                        if (result instanceof PipelinedTransformTask.Result) {
                            PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result)result;
                            this.nodeStateEntriesExtracted += transformResult.getEntryCount();
                            LOG.info("Transform task {} finished. Entries processed: {}", (Object)transformResult.getThreadId(), (Object)transformResult.getEntryCount());
                            if (++transformTasksFinished != this.numberOfTransformThreads) continue;
                            LOG.info("All transform tasks finished. Total entries processed: {}", (Object)this.nodeStateEntriesExtracted);
                            monitorFuture.cancel(false);
                            nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
                            transformStageStatistics.publishStatistics(this.statisticsProvider);
                            continue;
                        }
                        if (result instanceof PipelinedSortBatchTask.Result) {
                            PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result)result;
                            LOG.info("Sort batch task finished. Entries processed: {}", (Object)sortTaskResult.getTotalEntries());
                            sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
                            if (!nonEmptyBatchesQueue.isEmpty()) {
                                LOG.warn("emptyBatchesQueue is not empty. Size: {}", (Object)emptyBatchesQueue.size());
                            }
                            emptyBatchesQueue.clear();
                            PipelinedStrategy.printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
                            continue;
                        }
                        if (result instanceof PipelinedMergeSortTask.Result) {
                            PipelinedMergeSortTask.Result mergeSortedFilesTask = (PipelinedMergeSortTask.Result)result;
                            Path ffs = mergeSortedFilesTask.getFlatFileStoreFile();
                            LOG.info("Merge-sort sort task finished. FFS: {}, Size: {}", (Object)ffs, (Object)IOUtils.humanReadableByteCountBin((long)Files.size(ffs)));
                            flatFileStore = mergeSortedFilesTask.getFlatFileStoreFile();
                            continue;
                        }
                        throw new RuntimeException("Unknown result type: " + result);
                    }
                    catch (ExecutionException ex) {
                        LOG.warn("Execution error dumping from MongoDB: " + ex + ". Shutting down all threads");
                        threadPool.shutdownNow();
                        boolean terminated = threadPool.awaitTermination(5L, TimeUnit.SECONDS);
                        if (!terminated) {
                            LOG.warn("Thread pool failed to terminate");
                        }
                        throw new RuntimeException(ex.getCause());
                    }
                    catch (Throwable ex) {
                        LOG.warn("Error dumping from MongoDB: " + ex);
                        threadPool.shutdownNow();
                        boolean terminated = threadPool.awaitTermination(5L, TimeUnit.SECONDS);
                        if (!terminated) {
                            LOG.warn("Thread pool failed to terminate");
                        }
                        throw new RuntimeException(ex);
                    }
                }
                LOG.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", (Object)MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds((Stopwatch)start)).add("durationSeconds", start.elapsed(TimeUnit.SECONDS)).add("nodeStateEntriesExtracted", this.nodeStateEntriesExtracted).build());
                PipelinedStrategy.printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            finally {
                monitorFuture.cancel(true);
            }
            File file = flatFileStore.toFile();
            return file;
        }
        finally {
            threadPool.shutdown();
            monitorThreadPool.shutdown();
        }
    }

    @Override
    public long getEntryCount() {
        return this.nodeStateEntriesExtracted;
    }

    private static class MonitorTask<T>
    implements Runnable {
        private final ArrayBlockingQueue<T[]> mongoDocQueue;
        private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
        private final ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue;
        private final ArrayBlockingQueue<Path> sortedFilesQueue;
        private final TransformStageStatistics transformStageStatistics;

        public MonitorTask(ArrayBlockingQueue<T[]> mongoDocQueue, ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue, ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue, ArrayBlockingQueue<Path> sortedFilesQueue, TransformStageStatistics transformStageStatistics) {
            this.mongoDocQueue = mongoDocQueue;
            this.emptyBatchesQueue = emptyBatchesQueue;
            this.nonEmptyBatchesQueue = nonEmptyBatchesQueue;
            this.sortedFilesQueue = sortedFilesQueue;
            this.transformStageStatistics = transformStageStatistics;
        }

        @Override
        public void run() {
            try {
                PipelinedStrategy.printStatistics(this.mongoDocQueue, this.emptyBatchesQueue, this.nonEmptyBatchesQueue, this.sortedFilesQueue, this.transformStageStatistics, false);
            }
            catch (Exception e) {
                LOG.error("Error while logging queue sizes", (Throwable)e);
            }
        }
    }
}

