/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeStateEntryBatch;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.TransformStageStatistics;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Path;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PipelinedTransformTask
implements Callable<Result> {
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedTransformTask.class);
    private static final AtomicInteger threadIdGenerator = new AtomicInteger();
    private static final String THREAD_NAME_PREFIX = "mongo-transform-";
    private final MongoDocumentStore mongoStore;
    private final DocumentNodeStore documentNodeStore;
    private final RevisionVector rootRevision;
    private final NodeStateEntryWriter entryWriter;
    private final Predicate<String> pathPredicate;
    private final ArrayBlockingQueue<NodeDocument[]> mongoDocQueue;
    private final ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue;
    private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
    private final TransformStageStatistics statistics;
    private final int threadId = threadIdGenerator.getAndIncrement();
    private long totalEnqueueDelayMillis = 0L;
    private long totalEmptyBatchQueueWaitTimeMillis = 0L;

    public PipelinedTransformTask(MongoDocumentStore mongoStore, DocumentNodeStore documentNodeStore, RevisionVector rootRevision, Predicate<String> pathPredicate, NodeStateEntryWriter entryWriter, ArrayBlockingQueue<NodeDocument[]> mongoDocQueue, ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue, ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue, TransformStageStatistics statsCollector) {
        this.mongoStore = mongoStore;
        this.documentNodeStore = documentNodeStore;
        this.rootRevision = rootRevision;
        this.pathPredicate = pathPredicate;
        this.entryWriter = entryWriter;
        this.mongoDocQueue = mongoDocQueue;
        this.emptyBatchesQueue = emptyBatchesQueue;
        this.nonEmptyBatchesQueue = nonEmptyBatchesQueue;
        this.statistics = statsCollector;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Result call() throws Exception {
        String originalName = Thread.currentThread().getName();
        String threadName = THREAD_NAME_PREFIX + this.threadId;
        Thread.currentThread().setName(threadName);
        try {
            LOG.info("[TASK:{}:START] Starting transform task", (Object)threadName.toUpperCase(Locale.ROOT));
            NodeDocumentCache nodeCache = MongoDocumentStoreHelper.getNodeDocumentCache(this.mongoStore);
            Stopwatch taskStartWatch = Stopwatch.createStarted();
            long totalDocumentQueueWaitTimeMillis = 0L;
            long totalEntryCount = 0L;
            long mongoObjectsProcessed = 0L;
            LOG.debug("Waiting for an empty buffer");
            NodeStateEntryBatch nseBatch = this.emptyBatchesQueue.take();
            LOG.debug("Obtained an empty buffer. Starting to convert Mongo documents to node state entries");
            ArrayList<NodeStateEntry> nodeStateEntries = new ArrayList<NodeStateEntry>();
            Stopwatch docQueueWaitStopwatch = Stopwatch.createUnstarted();
            block9: while (true) {
                docQueueWaitStopwatch.reset().start();
                NodeDocument[] nodeDocumentBatch = this.mongoDocQueue.take();
                totalDocumentQueueWaitTimeMillis += docQueueWaitStopwatch.elapsed(TimeUnit.MILLISECONDS);
                if (nodeDocumentBatch == PipelinedStrategy.SENTINEL_MONGO_DOCUMENT) {
                    long totalDurationMillis = taskStartWatch.elapsed(TimeUnit.MILLISECONDS);
                    String totalDocumentQueueWaitPercentage = PipelinedUtils.formatAsPercentage(totalDocumentQueueWaitTimeMillis, totalDurationMillis);
                    String totalEnqueueDelayPercentage = PipelinedUtils.formatAsPercentage(this.totalEnqueueDelayMillis, totalDurationMillis);
                    String totalEmptyBatchQueueWaitPercentage = PipelinedUtils.formatAsPercentage(this.totalEmptyBatchQueueWaitTimeMillis, totalDurationMillis);
                    String metrics = MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds((Stopwatch)taskStartWatch)).add("durationSeconds", totalDurationMillis / 1000L).add("nodeStateEntriesGenerated", totalEntryCount).add("enqueueDelayMillis", this.totalEnqueueDelayMillis).add("enqueueDelayPercentage", totalEnqueueDelayPercentage).add("documentQueueWaitMillis", totalDocumentQueueWaitTimeMillis).add("documentQueueWaitPercentage", totalDocumentQueueWaitPercentage).add("totalEmptyBatchQueueWaitTimeMillis", this.totalEmptyBatchQueueWaitTimeMillis).add("totalEmptyBatchQueueWaitPercentage", totalEmptyBatchQueueWaitPercentage).build();
                    LOG.info("[TASK:{}:END] Metrics: {}", (Object)threadName.toUpperCase(Locale.ROOT), (Object)metrics);
                    nseBatch.getBuffer().flip();
                    this.tryEnqueue(nseBatch);
                    Result result = new Result(this.threadId, totalEntryCount);
                    return result;
                }
                NodeDocument[] nodeDocumentArray = nodeDocumentBatch;
                int n = nodeDocumentArray.length;
                int n2 = 0;
                while (true) {
                    if (n2 >= n) continue block9;
                    NodeDocument nodeDoc = nodeDocumentArray[n2];
                    this.statistics.incrementMongoDocumentsTraversed();
                    if (++mongoObjectsProcessed % 50000L == 0L) {
                        LOG.info("Mongo objects: {}, total entries: {}, current batch: {}, Size: {}/{} MB", new Object[]{mongoObjectsProcessed, totalEntryCount, nseBatch.numberOfEntries(), (long)nseBatch.sizeOfEntriesBytes() / 0x100000L, (long)nseBatch.capacity() / 0x100000L});
                    }
                    nodeCache.put(nodeDoc);
                    if (nodeDoc.isSplitDocument()) {
                        this.statistics.addSplitDocument(nodeDoc.getId());
                    } else {
                        nodeStateEntries.clear();
                        this.extractNodeStateEntries(nodeDoc, nodeStateEntries);
                        if (nodeStateEntries.isEmpty()) {
                            this.statistics.addEmptyNodeStateEntry(nodeDoc.getId());
                        } else {
                            for (NodeStateEntry nse : nodeStateEntries) {
                                String path = nse.getPath();
                                if (!NodeStateUtils.isHiddenPath((String)path) && this.pathPredicate.test(path)) {
                                    int entrySize;
                                    this.statistics.incrementEntriesAccepted();
                                    ++totalEntryCount;
                                    byte[] jsonBytes = this.entryWriter.asJson(nse.getNodeState()).getBytes(StandardCharsets.UTF_8);
                                    try {
                                        entrySize = nseBatch.addEntry(path, jsonBytes);
                                    }
                                    catch (NodeStateEntryBatch.BufferFullException e) {
                                        LOG.info("Buffer full, passing buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}", new Object[]{totalEntryCount, nseBatch.numberOfEntries(), IOUtils.humanReadableByteCountBin((long)nseBatch.sizeOfEntriesBytes())});
                                        nseBatch.flip();
                                        this.tryEnqueue(nseBatch);
                                        Stopwatch emptyBatchesQueueStopwatch = Stopwatch.createStarted();
                                        nseBatch = this.emptyBatchesQueue.take();
                                        this.totalEmptyBatchQueueWaitTimeMillis += emptyBatchesQueueStopwatch.elapsed(TimeUnit.MILLISECONDS);
                                        entrySize = nseBatch.addEntry(path, jsonBytes);
                                    }
                                    this.statistics.incrementTotalExtractedEntriesSize(entrySize);
                                    continue;
                                }
                                this.statistics.incrementEntriesRejected();
                                if (NodeStateUtils.isHiddenPath((String)path)) {
                                    this.statistics.addRejectedHiddenPath(path);
                                }
                                if (this.pathPredicate.test(path)) continue;
                                this.statistics.addRejectedFilteredPath(path);
                            }
                        }
                    }
                    ++n2;
                }
                break;
            }
        }
        catch (InterruptedException t) {
            LOG.warn("Thread interrupted", (Throwable)t);
            throw t;
        }
        catch (Throwable t) {
            LOG.warn("Thread terminating with exception", t);
            throw t;
        }
        finally {
            Thread.currentThread().setName(originalName);
        }
    }

    private void tryEnqueue(NodeStateEntryBatch nseBatch) throws InterruptedException {
        Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
        this.nonEmptyBatchesQueue.put(nseBatch);
        long enqueueDelay = enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.totalEnqueueDelayMillis += enqueueDelay;
        if (enqueueDelay > 1L) {
            LOG.info("Enqueuing of node state entries batch was delayed, took {} ms. nonEmptyBatchesQueue size {}. ", (Object)enqueueDelay, (Object)this.nonEmptyBatchesQueue.size());
        }
    }

    private void extractNodeStateEntries(NodeDocument doc, ArrayList<NodeStateEntry> nodeStateEntries) {
        Path path = doc.getPath();
        DocumentNodeState nodeState = this.documentNodeStore.getNode(path, this.rootRevision);
        if (nodeState == null || !nodeState.exists()) {
            return;
        }
        nodeStateEntries.add(this.toNodeStateEntry(doc, nodeState));
        for (DocumentNodeState dns : nodeState.getAllBundledNodesStates()) {
            nodeStateEntries.add(this.toNodeStateEntry(doc, dns));
        }
    }

    private NodeStateEntry toNodeStateEntry(NodeDocument doc, DocumentNodeState dns) {
        NodeStateEntry.NodeStateEntryBuilder builder = new NodeStateEntry.NodeStateEntryBuilder((NodeState)dns, dns.getPath().toString());
        if (doc.getModified() != null) {
            builder.withLastModified(doc.getModified());
        }
        builder.withID(doc.getId());
        return builder.build();
    }

    public static class Result {
        private final int transformThreadId;
        private final long entryCount;

        public Result(int threadId, long entryCount) {
            this.transformThreadId = threadId;
            this.entryCount = entryCount;
        }

        public long getEntryCount() {
            return this.entryCount;
        }

        public int getThreadId() {
            return this.transformThreadId;
        }
    }
}

