/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSortByteArray;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeStateHolder;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeStateHolderFactory;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PathElementComparator;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelinedMergeSortTask
implements Callable<Result> {
    public static final String OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD = "oak.indexer.pipelined.eagerMergeTriggerThreshold";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD = 64;
    public static final String OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE = "oak.indexer.pipelined.eagerMergeMaxFilesToMerge";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE = 32;
    public static final String OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE = "oak.indexer.pipelined.eagerMergeMinFilesToMerge";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE = 4;
    public static final String OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB = "oak.indexer.pipelined.eagerMergeMaxSizeToMergeMB";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB = 2048;
    public static final String OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE = "oak.indexer.pipelined.externalMerge.readBufferSize";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE = 16384;
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedMergeSortTask.class);
    private static final String THREAD_NAME = "mongo-merge-sort-files";
    private final Path storeDir;
    private final Comparator<NodeStateHolder> comparator;
    private final Compression algorithm;
    private final BlockingQueue<Path> sortedFilesQueue;
    private final StatisticsProvider statisticsProvider;
    private final PriorityQueue<PathAndSize> sortedFiles = new PriorityQueue();
    private final AtomicBoolean stopEagerMerging = new AtomicBoolean(false);
    private final int mergeTriggerThreshold;
    private final int minFilesToMerge;
    private final int maxFilesToMerge;
    private final int maxSizeToMergeMB;
    private final int externalMergeReadBufferSize;
    private int eagerMergeRuns;
    private int mergedFilesCounter = 0;

    public PipelinedMergeSortTask(Path storeDir, PathElementComparator pathComparator, Compression algorithm, BlockingQueue<Path> sortedFilesQueue, StatisticsProvider statisticsProvider) {
        this.storeDir = storeDir;
        this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
        this.algorithm = algorithm;
        this.sortedFilesQueue = sortedFilesQueue;
        this.statisticsProvider = statisticsProvider;
        this.mergeTriggerThreshold = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD, 64);
        Preconditions.checkArgument((this.mergeTriggerThreshold >= 16 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.eagerMergeTriggerThreshold: " + this.mergeTriggerThreshold + ". Must be >= 16"));
        this.minFilesToMerge = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE, 4);
        Preconditions.checkArgument((this.minFilesToMerge >= 2 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.eagerMergeMinFilesToMerge: " + this.minFilesToMerge + ". Must be >= 2"));
        this.maxFilesToMerge = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE, 32);
        Preconditions.checkArgument((this.maxFilesToMerge >= this.minFilesToMerge ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.eagerMergeMaxFilesToMerge: " + this.maxFilesToMerge + ". Must be >= oak.indexer.pipelined.eagerMergeMinFilesToMerge (" + this.minFilesToMerge + ")"));
        this.maxSizeToMergeMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB, 2048);
        Preconditions.checkArgument((this.maxSizeToMergeMB >= 1 ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.eagerMergeMaxSizeToMergeMB: " + this.maxSizeToMergeMB + ". Must be >= 1"));
        this.externalMergeReadBufferSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE, 16384);
        Preconditions.checkArgument(((long)this.externalMergeReadBufferSize >= 1024L ? 1 : 0) != 0, (Object)("Invalid value for property oak.indexer.pipelined.externalMerge.readBufferSize: " + this.externalMergeReadBufferSize + ". Must be >= 1 KB"));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Result call() throws Exception {
        this.eagerMergeRuns = 0;
        String originalName = Thread.currentThread().getName();
        Thread.currentThread().setName(THREAD_NAME);
        int intermediateFilesCount = 0;
        try {
            LOG.info("[TASK:{}:START] Starting merge sort task", (Object)THREAD_NAME.toUpperCase(Locale.ROOT));
            while (true) {
                LOG.debug("Waiting for next intermediate sorted file");
                Path sortedIntermediateFile = this.sortedFilesQueue.take();
                if (sortedIntermediateFile == PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE) {
                    long sortedFilesSizeBytes = PipelinedMergeSortTask.sizeOf(this.sortedFiles);
                    LOG.info("Going to sort {} files, total size {}", (Object)this.sortedFiles.size(), (Object)IOUtils.humanReadableByteCountBin((long)sortedFilesSizeBytes));
                    Stopwatch w = Stopwatch.createStarted();
                    List<Path> simpleFileList = this.sortedFiles.stream().map(f -> f.file).collect(Collectors.toList());
                    Path flatFileStore = this.sortStoreFile(simpleFileList);
                    LOG.info("Final merge completed in {}. Created file: {}", (Object)FormattingUtils.formatToSeconds((Stopwatch)w), (Object)flatFileStore.toAbsolutePath());
                    long ffsSizeBytes = Files.size(flatFileStore);
                    long durationSeconds = w.elapsed(TimeUnit.SECONDS);
                    String metrics = MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds((Stopwatch)w)).add("durationSeconds", durationSeconds).add("filesMerged", this.sortedFiles.size()).add("ffsSizeBytes", ffsSizeBytes).add("ffsSize", IOUtils.humanReadableByteCountBin((long)ffsSizeBytes)).build();
                    LOG.info("[TASK:{}:END] Metrics: {}", (Object)THREAD_NAME.toUpperCase(Locale.ROOT), (Object)metrics);
                    MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_mergeSortIntermediateFilesCount", (long)intermediateFilesCount);
                    MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_mergeSortEagerMergesRuns", (long)this.eagerMergeRuns);
                    MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_mergeSortFinalMergeFilesCount", (long)this.sortedFiles.size());
                    MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_mergeSortFinalMergeTime", (long)durationSeconds);
                    Result result = new Result(flatFileStore, intermediateFilesCount, this.sortedFiles.size(), this.eagerMergeRuns);
                    return result;
                }
                this.sortedFiles.add(new PathAndSize(sortedIntermediateFile, Files.size(sortedIntermediateFile)));
                ++intermediateFilesCount;
                LOG.info("Received new intermediate sorted file {}. Size: {}. Total files: {} of size {}", new Object[]{sortedIntermediateFile, IOUtils.humanReadableByteCountBin((long)Files.size(sortedIntermediateFile)), this.sortedFiles.size(), IOUtils.humanReadableByteCountBin((long)PipelinedMergeSortTask.sizeOf(this.sortedFiles))});
                if (this.stopEagerMerging.get()) {
                    LOG.debug("Skipping eager merging because download from Mongo has finished");
                    continue;
                }
                this.tryMergeIntermediateFilesEagerly();
                continue;
                break;
            }
        }
        catch (InterruptedException t) {
            LOG.warn("Thread interrupted", (Throwable)t);
            throw t;
        }
        catch (Throwable t) {
            LOG.warn("Thread terminating with exception", t);
            throw t;
        }
        finally {
            Thread.currentThread().setName(originalName);
        }
    }

    public void stopEagerMerging() {
        this.stopEagerMerging.set(true);
    }

    private static long sizeOf(PriorityQueue<PathAndSize> sortedFiles) {
        return sortedFiles.stream().mapToLong(f -> f.size).sum();
    }

    private void tryMergeIntermediateFilesEagerly() throws IOException {
        if (this.sortedFiles.size() < this.mergeTriggerThreshold) {
            return;
        }
        ArrayList<PathAndSize> filesAndSizeToSort = new ArrayList<PathAndSize>();
        long sumOfSizesBytes = 0L;
        while (!this.sortedFiles.isEmpty() && filesAndSizeToSort.size() < this.maxFilesToMerge && sumOfSizesBytes / 0x100000L < (long)this.maxSizeToMergeMB) {
            PathAndSize pathAndSize = this.sortedFiles.peek();
            if (pathAndSize.size / 0x100000L > (long)this.maxSizeToMergeMB) {
                LOG.debug("File {} is too large to be merged. Size: {}, max allowed: {} MB. Stopping searching for intermediate files to merge because all other files are larger.", new Object[]{pathAndSize.file.toAbsolutePath(), IOUtils.humanReadableByteCountBin((long)pathAndSize.size), this.maxSizeToMergeMB});
                break;
            }
            this.sortedFiles.poll();
            filesAndSizeToSort.add(pathAndSize);
            sumOfSizesBytes += pathAndSize.size;
        }
        if (filesAndSizeToSort.size() < this.minFilesToMerge) {
            this.sortedFiles.addAll(filesAndSizeToSort);
            LOG.debug("Not enough candidate files to merge. Found {} candidates of size {}, minimum for merging is {}", new Object[]{filesAndSizeToSort.size(), IOUtils.humanReadableByteCountBin((long)sumOfSizesBytes), this.minFilesToMerge});
            return;
        }
        LOG.info("Merge threshold reached: {} > {}. Going to merge the following {} files {} of total size {}.", new Object[]{this.sortedFiles.size() + filesAndSizeToSort.size(), this.mergeTriggerThreshold, filesAndSizeToSort.size(), filesAndSizeToSort.stream().map(fs -> fs.file.getFileName() + ": " + IOUtils.humanReadableByteCountBin((long)fs.size)).collect(Collectors.joining(", ", "[", "]")), IOUtils.humanReadableByteCountBin((long)sumOfSizesBytes)});
        Stopwatch start = Stopwatch.createStarted();
        Path mergedFiled = this.sortStoreFile(filesAndSizeToSort.stream().map(f -> f.file).collect(Collectors.toList()));
        ++this.eagerMergeRuns;
        Path destFile = mergedFiled.getParent().resolve("merged-" + this.mergedFilesCounter++);
        Files.move(mergedFiled, destFile, new CopyOption[0]);
        PathAndSize mergedPathAndSize = new PathAndSize(destFile, Files.size(destFile));
        this.sortedFiles.add(mergedPathAndSize);
        LOG.info("{} files merged in {} seconds. New file {}, size: {}", new Object[]{filesAndSizeToSort.size(), start.elapsed(TimeUnit.SECONDS), mergedPathAndSize.file.getFileName(), IOUtils.humanReadableByteCountBin((long)mergedPathAndSize.size)});
    }

    private Path sortStoreFile(List<Path> sortedFilesBatch) throws IOException {
        Path sortedFile = this.storeDir.resolve(IndexStoreUtils.getSortedStoreFileName(this.algorithm));
        try (OutputStream writer = IndexStoreUtils.createOutputStream(sortedFile, this.algorithm);){
            NodeStateHolderFactory byteArrayToType = new NodeStateHolderFactory();
            Function<NodeStateHolder, byte[]> typeToByteArray = holder -> holder == null ? null : holder.getLine();
            ExternalSortByteArray.mergeSortedFilesBinary(sortedFilesBatch, (OutputStream)writer, this.comparator, (boolean)true, (Compression)this.algorithm, typeToByteArray, (Function)byteArrayToType, (int)this.externalMergeReadBufferSize);
        }
        return sortedFile;
    }

    private static class PathAndSize
    implements Comparable<PathAndSize> {
        final Path file;
        final long size;

        public PathAndSize(Path file, long size) {
            this.file = file;
            this.size = size;
        }

        public String toString() {
            return "FileAndSize{file=" + this.file.toString() + ", size=" + IOUtils.humanReadableByteCountBin((long)this.size) + "}";
        }

        @Override
        public int compareTo(@NotNull PathAndSize other) {
            return Long.compare(this.size, other.size);
        }
    }

    public static class Result {
        private final Path flatFileStoreFile;
        private final int intermediateFilesCount;
        private final int finalMergeFilesCount;
        private final int eagerMergeRuns;

        public Result(Path flatFileStoreFile, int intermediateFilesCount, int finalMergeFilesCount, int eagerMergeRuns) {
            this.flatFileStoreFile = flatFileStoreFile;
            this.intermediateFilesCount = intermediateFilesCount;
            this.finalMergeFilesCount = finalMergeFilesCount;
            this.eagerMergeRuns = eagerMergeRuns;
        }

        public Path getFlatFileStoreFile() {
            return this.flatFileStoreFile;
        }

        public int getFinalMergeFilesCount() {
            return this.finalMergeFilesCount;
        }

        public int getEagerMergeRuns() {
            return this.eagerMergeRuns;
        }

        public int getIntermediateFilesCount() {
            return this.intermediateFilesCount;
        }

        public String toString() {
            return "Result{flatFileStoreFile=" + this.flatFileStoreFile + ", intermediateFilesCount=" + this.intermediateFilesCount + ", finalMergeFilesCount=" + this.finalMergeFilesCount + ", eagerMergeRuns=" + this.eagerMergeRuns + "}";
        }
    }
}

