/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
import org.apache.jackrabbit.guava.common.util.concurrent.Striped;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
import org.apache.jackrabbit.oak.commons.conditions.Validate;
import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexNode;
import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.IndexingQueue;
import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LuceneDoc;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DocumentQueue
implements Closeable,
IndexingQueue {
    private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger((String)(DocumentQueue.class.getName() + ".perf")));
    private static final LuceneDoc STOP = LuceneDoc.forUpdate("", "", Collections.emptyList());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final IndexTracker tracker;
    private final BlockingQueue<LuceneDoc> docsQueue;
    private final Executor executor;
    private final CounterStats queueSizeStats;
    private final MeterStats added;
    private final MeterStats dropped;
    private final Striped<Lock> locks = Striped.lock((int)64);
    private Thread.UncaughtExceptionHandler delegate = (t, e) -> {};
    private final long queueOfferTimeoutMillis;
    private volatile boolean stopped;
    private final Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler(){

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            DocumentQueue.this.log.error("Uncaught exception", e);
        }
    };
    private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
    private final Runnable completionHandler = new Runnable(){
        private final Callable<Void> task = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    long start = PERF_LOGGER.start();
                    int maxSize = DocumentQueue.this.docsQueue.size();
                    ArrayList docs = new ArrayList(maxSize);
                    ArrayListMultimap docsPerIndex = ArrayListMultimap.create();
                    int count = DocumentQueue.this.docsQueue.drainTo(docs, maxSize);
                    if (count == 0) {
                        return null;
                    }
                    DocumentQueue.this.queueSizeStats.dec((long)count);
                    for (int i = 0; i < count; ++i) {
                        LuceneDoc doc = (LuceneDoc)docs.get(i);
                        if (doc == STOP) {
                            return null;
                        }
                        docsPerIndex.get((Object)doc.indexPath).add(doc);
                    }
                    DocumentQueue.this.addDocsToIndex(docsPerIndex.asMap(), true);
                    DocumentQueue.this.scheduleQueuedDocsProcessing();
                    PERF_LOGGER.end(start, 1L, "Processed {} docs from queue", (Object)count);
                }
                catch (Throwable t) {
                    DocumentQueue.this.exceptionHandler.uncaughtException(Thread.currentThread(), t);
                    DocumentQueue.this.delegate.uncaughtException(Thread.currentThread(), t);
                }
                return null;
            }
        };

        @Override
        public void run() {
            DocumentQueue.this.currentTask = new NotifyingFutureTask(this.task);
            DocumentQueue.this.executor.execute((Runnable)DocumentQueue.this.currentTask);
        }
    };

    public DocumentQueue(int maxQueueSize, IndexTracker tracker, Executor executor) {
        this(maxQueueSize, 100L, tracker, executor, StatisticsProvider.NOOP);
    }

    public DocumentQueue(int maxQueueSize, long queueOfferTimeoutMillis, IndexTracker tracker, Executor executor, StatisticsProvider sp) {
        this.docsQueue = new LinkedBlockingDeque<LuceneDoc>(maxQueueSize);
        this.tracker = tracker;
        this.executor = executor;
        this.queueOfferTimeoutMillis = queueOfferTimeoutMillis;
        this.queueSizeStats = sp.getCounterStats("HYBRID_QUEUE_SIZE", StatsOptions.DEFAULT);
        this.added = sp.getMeter("HYBRID_ADDED", StatsOptions.DEFAULT);
        this.dropped = sp.getMeter("HYBRID_DROPPED", StatsOptions.DEFAULT);
    }

    @Override
    public boolean addIfNotFullWithoutWait(LuceneDoc doc) {
        Validate.checkState((!this.stopped ? 1 : 0) != 0);
        boolean added = this.docsQueue.offer(doc);
        if (added) {
            this.queueSizeStats.inc();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Adding {} without wait to queue at size {}", (Object)doc, (Object)this.docsQueue.size());
            }
        }
        return added;
    }

    @Override
    public boolean add(LuceneDoc doc) {
        Validate.checkState((!this.stopped ? 1 : 0) != 0);
        boolean added = false;
        try {
            added = this.docsQueue.offer(doc, this.queueOfferTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.scheduleQueuedDocsProcessing();
        if (added) {
            this.queueSizeStats.inc();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Adding {} to queue at size {}", (Object)doc, (Object)this.docsQueue.size());
            }
        } else {
            this.dropped.mark();
        }
        return added;
    }

    @Override
    public void scheduleQueuedDocsProcessing() {
        this.currentTask.onComplete(this.completionHandler);
    }

    @Override
    public void addAllSynchronously(Map<String, Collection<LuceneDoc>> docsPerIndex) {
        this.addDocsToIndex(docsPerIndex, false);
    }

    public void setExceptionHandler(Thread.UncaughtExceptionHandler delegate) {
        this.delegate = delegate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addDocsToIndex(Map<String, Collection<LuceneDoc>> docsPerIndex, boolean docsFromQueue) {
        for (Map.Entry<String, Collection<LuceneDoc>> e : docsPerIndex.entrySet()) {
            String indexPath = e.getKey();
            Lock indexingLock = (Lock)this.locks.get((Object)indexPath);
            indexingLock.lock();
            try {
                this.processDocs(indexPath, (Iterable<LuceneDoc>)e.getValue(), docsFromQueue);
            }
            finally {
                indexingLock.unlock();
            }
            this.added.mark((long)e.getValue().size());
        }
    }

    List<LuceneDoc> getQueuedDocs() {
        ArrayList<LuceneDoc> docs = new ArrayList<LuceneDoc>();
        docs.addAll(this.docsQueue);
        return docs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processDocs(String indexPath, Iterable<LuceneDoc> docs, boolean docsFromQueue) {
        if (this.stopped) {
            return;
        }
        LuceneIndexNode indexNode = this.tracker.acquireIndexNode(indexPath);
        if (indexNode == null) {
            this.log.debug("No LuceneIndexNode found for index [{}].", (Object)indexPath);
            return;
        }
        try {
            LuceneIndexWriter writer = indexNode.getLocalWriter();
            boolean docAdded = false;
            for (LuceneDoc doc : docs) {
                if (writer == null) {
                    this.log.debug("No local IndexWriter found for index [{}]. Skipping index entry for [{}]", (Object)indexPath, (Object)doc.docPath);
                    return;
                }
                if (doc.isProcessed()) continue;
                doc.markProcessed();
                if (doc.delete) {
                    writer.deleteDocuments(doc.docPath);
                } else {
                    writer.updateDocument(doc.docPath, doc.doc);
                }
                docAdded = true;
                String prefix = docsFromQueue ? "Queued" : "Direct";
                this.log.trace("[{}] Updated index with doc {}", (Object)prefix, (Object)doc);
            }
            if (docAdded) {
                indexNode.refreshReadersOnWriteIfRequired();
            }
        }
        catch (Exception e) {
            this.log.warn("Error occurred while indexing index [{}]", (Object)indexPath, (Object)e);
            this.delegate.uncaughtException(Thread.currentThread(), e);
        }
        finally {
            indexNode.release();
        }
    }

    @Override
    public void close() throws IOException {
        this.docsQueue.clear();
        this.docsQueue.add(STOP);
        this.stopped = true;
    }
}

