/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.collect.Lists;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.MemoryManager;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.MergeRunner;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateHolder;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.PathElementComparator;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.TraverseAndSortTask;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class MultithreadedTraverseWithSortStrategy
extends IndexStoreSortStrategyBase {
    private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
    private final File mergeDir;
    private final String mergeDirName = "merge";
    private final Comparator<NodeStateHolder> comparator;
    private final BlockingQueue<File> sortedFiles;
    private final ConcurrentLinkedQueue<Throwable> throwables;
    private final BlockingQueue<Callable<List<File>>> taskQueue;
    private final Phaser phaser;
    private final Phaser mergePhaser;
    private static final Callable<List<File>> POISON_PILL = () -> null;
    private final MemoryManager memoryManager;
    private final long dumpThreshold;

    @Deprecated
    MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List<Long> lastModifiedBreakPoints, PathElementComparator pathComparator, BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs, Compression algorithm, MemoryManager memoryManager, long dumpThreshold, Predicate<String> pathPredicate) throws IOException {
        super(storeDir, algorithm, pathPredicate, null, null);
        this.mergeDir = new File(storeDir, "merge");
        this.sortedFiles = new LinkedBlockingQueue<File>();
        this.throwables = new ConcurrentLinkedQueue();
        this.comparator = (e1, e2) -> pathComparator.compare((Iterable<String>)e1.getPathElements(), (Iterable<String>)e2.getPathElements());
        this.taskQueue = new LinkedBlockingQueue<Callable<List<File>>>();
        this.phaser = new Phaser(){

            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase == Phases.WAITING_FOR_RESULTS.value && registeredParties == 0;
            }
        };
        this.mergePhaser = new Phaser(1);
        this.memoryManager = memoryManager;
        this.dumpThreshold = dumpThreshold;
        this.createInitialTasks(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, blobStore, existingDataDumpDirs);
    }

    public MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List<Long> lastModifiedBreakPoints, Set<String> preferredPaths, BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs, Compression algorithm, MemoryManager memoryManager, long dumpThreshold, Predicate<String> pathPredicate, String checkpoint) throws IOException {
        super(storeDir, algorithm, pathPredicate, preferredPaths, checkpoint);
        this.mergeDir = new File(storeDir, "merge");
        this.sortedFiles = new LinkedBlockingQueue<File>();
        this.throwables = new ConcurrentLinkedQueue();
        this.comparator = (e1, e2) -> new PathElementComparator(preferredPaths).compare((Iterable<String>)e1.getPathElements(), (Iterable<String>)e2.getPathElements());
        this.taskQueue = new LinkedBlockingQueue<Callable<List<File>>>();
        this.phaser = new Phaser(){

            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase == Phases.WAITING_FOR_RESULTS.value && registeredParties == 0;
            }
        };
        this.mergePhaser = new Phaser(1);
        this.memoryManager = memoryManager;
        this.dumpThreshold = dumpThreshold;
        this.createInitialTasks(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, blobStore, existingDataDumpDirs);
    }

    void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List<Long> lastModifiedBreakPoints, BlobStore blobStore, List<File> existingDataDumpDirs) throws IOException {
        ConcurrentLinkedQueue<String> completedTasks = new ConcurrentLinkedQueue<String>();
        if (existingDataDumpDirs != null && existingDataDumpDirs.size() > 0) {
            for (int i = 0; i < existingDataDumpDirs.size(); ++i) {
                File existingDataDumpDir = existingDataDumpDirs.get(i);
                File[] existingWorkDirs = existingDataDumpDir.listFiles();
                if (existingWorkDirs == null) {
                    throw new IllegalArgumentException("Could not obtain file from " + existingDataDumpDir.getPath());
                }
                for (File existingSortWorkDir : existingWorkDirs) {
                    if (!existingSortWorkDir.isDirectory()) {
                        log.info("Not a directory {}. Skipping it.", (Object)existingSortWorkDir.getAbsolutePath());
                        continue;
                    }
                    if (existingSortWorkDir.getName().equals("merge")) {
                        log.info("Intermediate Merge Directory {}. Skipping it.", (Object)existingSortWorkDir.getAbsolutePath());
                        DirectoryHelper.getDataFiles(existingSortWorkDir).forEach(file -> {
                            log.debug("Including existing intermediate merged file {}", (Object)file.getPath());
                            this.sortedFiles.add((File)file);
                        });
                        continue;
                    }
                    boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
                    if (!downloadCompleted && i == existingDataDumpDirs.size() - 1) {
                        long start = DirectoryHelper.getLastModifiedLowerLimit(existingSortWorkDir);
                        long end = DirectoryHelper.getLastModifiedUpperLimit(existingSortWorkDir);
                        DirectoryHelper.SavedState savedState = DirectoryHelper.getIdOfLastDownloadedDocument(existingSortWorkDir);
                        if (savedState == null) {
                            this.addTask(new TraversingRange(new LastModifiedRange(start, end), null), nodeStateEntryTraverserFactory, blobStore, completedTasks);
                        } else {
                            start = savedState.lastModified;
                            this.addTask(new TraversingRange(new LastModifiedRange(start, start + 1L), savedState.id), nodeStateEntryTraverserFactory, blobStore, completedTasks);
                            if (end > start + 1L) {
                                this.addTask(new TraversingRange(new LastModifiedRange(start + 1L, end), null), nodeStateEntryTraverserFactory, blobStore, completedTasks);
                            }
                        }
                    }
                    log.info("Including existing sorted files from directory {} (hasCompleted={})", (Object)existingSortWorkDir.getAbsolutePath(), (Object)downloadCompleted);
                    DirectoryHelper.getDataFiles(existingSortWorkDir).forEach(file -> {
                        log.debug("Including existing sorted file {}", (Object)file.getName());
                        this.sortedFiles.add((File)file);
                    });
                }
            }
        } else {
            for (int i = 0; i < lastModifiedBreakPoints.size(); ++i) {
                long start = lastModifiedBreakPoints.get(i);
                long end = i < lastModifiedBreakPoints.size() - 1 ? lastModifiedBreakPoints.get(i + 1) : lastModifiedBreakPoints.get(i) + 1L;
                this.addTask(new TraversingRange(new LastModifiedRange(start, end), null), nodeStateEntryTraverserFactory, blobStore, completedTasks);
            }
        }
    }

    void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue<String> completedTasks) throws IOException {
        this.taskQueue.add(new TraverseAndSortTask(range, this.comparator, blobStore, this.getStoreDir(), this.getAlgorithm(), completedTasks, this.taskQueue, this.phaser, nodeStateEntryTraverserFactory, this.memoryManager, this.dumpThreshold, this.sortedFiles, this.getPathPredicate()));
    }

    @Override
    public File createSortedStoreFile() throws CompositeException {
        String watcherThreadName = "watcher";
        String mergerThreadName = "merger";
        Thread watcher = new Thread((Runnable)new TaskRunner(), watcherThreadName);
        watcher.setDaemon(true);
        watcher.start();
        File sortedFile = new File(this.getStoreDir(), IndexStoreUtils.getSortedStoreFileName(this.getAlgorithm()));
        int threadPoolSize = Integer.getInteger("oak.indexer.mergeTaskThreadPoolSize", 1);
        int batchMergeSize = Integer.getInteger("oak.indexer.mergeTaskBatchSize", 64);
        MergeRunner mergeRunner = new MergeRunner(sortedFile, this.sortedFiles, this.mergeDir, this.comparator, this.mergePhaser, batchMergeSize, threadPoolSize, this.getAlgorithm());
        Thread merger = new Thread((Runnable)mergeRunner, mergerThreadName);
        merger.setDaemon(true);
        merger.start();
        this.phaser.awaitAdvance(Phases.WAITING_FOR_TASK_SPLITS.value);
        log.debug("All tasks completed. Signalling {} to proceed to result collection.", (Object)watcherThreadName);
        this.taskQueue.add(POISON_PILL);
        this.phaser.awaitAdvance(Phases.WAITING_FOR_RESULTS.value);
        if (!this.throwables.isEmpty()) {
            CompositeException exception = new CompositeException();
            for (Throwable throwable : this.throwables) {
                exception.addSuppressed(throwable);
            }
            this.sortedFiles.add(MergeRunner.MERGE_FORCE_STOP_POISON_PILL);
            this.mergePhaser.awaitAdvance(0);
            throw exception;
        }
        log.debug("Result collection complete. Proceeding to final merge.");
        Stopwatch w = Stopwatch.createStarted();
        this.sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
        this.mergePhaser.awaitAdvance(0);
        log.info("Merging of sorted files completed in {}", (Object)w);
        return sortedFile;
    }

    @Override
    public long getEntryCount() {
        return 0L;
    }

    static class DirectoryHelper {
        private static final String PREFIX = "sort-work-dir-";
        private static final String LAST_MODIFIED_TIME_DELIMITER = "-from-";
        private static final String STATUS_FILE_NAME = "last-saved";
        private static final String LAST_MODIFIED_UPPER_LIMIT = "last-modified-upper-limit";
        private static final String COMPLETION_MARKER_FILE_NAME = "completed";
        private static final Logger log = LoggerFactory.getLogger(DirectoryHelper.class);

        DirectoryHelper() {
        }

        static File createdSortWorkDir(File storeDir, String taskID, long lastModifiedLowerBound, long lastModifiedUpperBound) throws IOException {
            File sortedFileDir = new File(storeDir, PREFIX + taskID + LAST_MODIFIED_TIME_DELIMITER + lastModifiedLowerBound);
            FileUtils.forceMkdir((File)sortedFileDir);
            DirectoryHelper.setLastModifiedUpperLimit(sortedFileDir, lastModifiedUpperBound);
            return sortedFileDir;
        }

        static long getLastModifiedLowerLimit(File dir) {
            if (!dir.isDirectory()) {
                throw new IllegalArgumentException(dir.getAbsolutePath() + " is not a directory");
            }
            return Long.parseLong(dir.getName().substring(dir.getName().lastIndexOf(LAST_MODIFIED_TIME_DELIMITER) + LAST_MODIFIED_TIME_DELIMITER.length()));
        }

        static void setLastModifiedUpperLimit(File sortWorkDir, long lastModifiedUpperLimit) throws IOException {
            Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/last-modified-upper-limit", new String[0]), ("" + lastModifiedUpperLimit).getBytes(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
        }

        static long getLastModifiedUpperLimit(File sortWorkDir) throws IOException {
            File lastModifiedUpperLimitFile = new File(sortWorkDir.getAbsolutePath() + "/last-modified-upper-limit");
            if (!lastModifiedUpperLimitFile.exists()) {
                throw new IOException("Could not find file containing last modified upper limit in " + sortWorkDir.getAbsolutePath());
            }
            return Long.parseLong(Files.readAllLines(lastModifiedUpperLimitFile.toPath()).get(0));
        }

        static void markCompleted(File sortWorkDir) {
            try {
                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/completed", new String[0]), COMPLETION_MARKER_FILE_NAME.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
            }
            catch (IOException e) {
                log.warn("Resuming download will not be accurate. Could not mark the directory " + sortWorkDir.getAbsolutePath() + " completed.", (Throwable)e);
            }
        }

        static boolean hasCompleted(File sortWorkDir) {
            return new File(sortWorkDir + "/completed").exists();
        }

        static void markLastProcessedStatus(File sortWorkDir, long lastDownloadedDocLastModified, String lastDownloadedDocID) {
            try {
                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/last-saved", new String[0]), new SavedState(lastDownloadedDocLastModified, lastDownloadedDocID).serialize().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
            }
            catch (IOException e) {
                log.warn("Resuming download will not be accurate. Could not save last processed status = " + lastDownloadedDocID + " in " + sortWorkDir.getAbsolutePath(), (Throwable)e);
            }
        }

        static SavedState getIdOfLastDownloadedDocument(File sortWorkDir) throws IOException {
            File statusFile = new File(sortWorkDir.getAbsolutePath() + "/last-saved");
            if (!statusFile.exists()) {
                return null;
            }
            return SavedState.deserialize(Files.readAllLines(statusFile.toPath()).get(0));
        }

        static Stream<File> getDataFiles(File sortWorkDir) {
            return Arrays.stream(sortWorkDir.listFiles()).filter(f -> !STATUS_FILE_NAME.equals(f.getName()) && !COMPLETION_MARKER_FILE_NAME.equals(f.getName()) && !LAST_MODIFIED_UPPER_LIMIT.equals(f.getName()));
        }

        static class SavedState {
            long lastModified;
            String id;

            public SavedState(long lastModified, String id) {
                this.lastModified = lastModified;
                this.id = id;
            }

            String serialize() {
                return this.lastModified + ":" + this.id;
            }

            static SavedState deserialize(String s) {
                int colonIndex = s.indexOf(":");
                if (colonIndex == -1) {
                    throw new IllegalArgumentException("Invalid serialized string " + s);
                }
                long lastMod = Long.parseLong(s.substring(0, colonIndex));
                String id = s.substring(colonIndex + 1);
                return new SavedState(lastMod, id);
            }
        }
    }

    private class TaskRunner
    implements Runnable {
        private final ExecutorService executorService;
        private final int threadPoolSize = Integer.getInteger("oak.indexer.dataDumpThreadPoolSize", 8);

        public TaskRunner() {
            this.executorService = Executors.newFixedThreadPool(this.threadPoolSize);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                log.info("Using a thread pool of size {}", (Object)this.threadPoolSize);
                ArrayList results = Lists.newArrayList();
                while (true) {
                    Callable<List<File>> task;
                    if ((task = MultithreadedTraverseWithSortStrategy.this.taskQueue.take()) == POISON_PILL) break;
                    results.add(this.executorService.submit(task));
                }
                log.debug("Won't wait for new tasks now.");
                log.debug("Registering to phaser and waiting for results now.");
                MultithreadedTraverseWithSortStrategy.this.phaser.register();
                try {
                    boolean exceptionsCaught = false;
                    for (Future result : results) {
                        try {
                            result.get();
                        }
                        catch (Throwable e) {
                            MultithreadedTraverseWithSortStrategy.this.throwables.add(e);
                            exceptionsCaught = true;
                        }
                    }
                    log.debug("Completed result collection {}. Arriving at phaser now.", (Object)(!exceptionsCaught ? "fully" : "partially"));
                }
                finally {
                    MultithreadedTraverseWithSortStrategy.this.phaser.arrive();
                }
            }
            catch (InterruptedException e) {
                log.error("Could not complete task submissions", (Throwable)e);
            }
            this.executorService.shutdown();
        }
    }

    private static enum Phases {
        WAITING_FOR_TASK_SPLITS(0),
        WAITING_FOR_RESULTS(1);

        private final int value;

        private Phases(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }
    }
}

