/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.MongoIncompatibleDriverException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ReadPreference;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.DownloadRange;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeDocumentCodecProvider;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.bson.BsonDocument;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelinedMongoDownloadTask
implements Callable<Result> {
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedMongoDownloadTask.class);
    public static final String OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = "oak.indexer.pipelined.retryOnConnectionErrors";
    public static final boolean DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = true;
    public static final String OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = "oak.indexer.pipelined.mongoConnectionRetrySeconds";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 300;
    public static final String OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = "oak.indexer.pipelined.mongoRegexPathFiltering";
    public static final boolean DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = false;
    private static final long retryInitialIntervalMillis = 100L;
    private static final long retryMaxIntervalMillis = 10000L;
    private static final Duration MONGO_QUEUE_OFFER_TIMEOUT = Duration.ofMinutes(30L);
    private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10;
    private static final BsonDocument NATURAL_HINT = BsonDocument.parse((String)"{ $natural: 1 }");
    private static final BsonDocument ID_INDEX_HINT = BsonDocument.parse((String)"{ _id: 1 }");
    private static final String THREAD_NAME = "mongo-dump";
    private final int maxBatchNumberOfDocuments;
    private final BlockingQueue<NodeDocument[]> mongoDocQueue;
    private final List<PathFilter> pathFilters;
    private final int retryDuringSeconds;
    private final boolean retryOnConnectionErrors;
    private final boolean regexPathFiltering;
    private final Logger traversalLog = LoggerFactory.getLogger((String)(PipelinedMongoDownloadTask.class.getName() + ".traversal"));
    private final MongoCollection<NodeDocument> dbCollection;
    private final ReadPreference readPreference;
    private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted();
    private final int maxBatchSizeBytes;
    private final StatisticsProvider statisticsProvider;
    private long totalEnqueueWaitTimeMillis = 0L;
    private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now();
    private long documentsRead = 0L;
    private long nextLastModified = 0L;
    private String lastIdDownloaded = null;

    public PipelinedMongoDownloadTask(MongoDatabase mongoDatabase, MongoDocumentStore mongoDocStore, int maxBatchSizeBytes, int maxBatchNumberOfDocuments, BlockingQueue<NodeDocument[]> queue, List<PathFilter> pathFilters, StatisticsProvider statisticsProvider) {
        this.statisticsProvider = statisticsProvider;
        NodeDocumentCodecProvider nodeDocumentCodecProvider = new NodeDocumentCodecProvider(mongoDocStore, (Collection<NodeDocument>)Collection.NODES);
        CodecRegistry nodeDocumentCodecRegistry = CodecRegistries.fromRegistries((CodecRegistry[])new CodecRegistry[]{CodecRegistries.fromProviders((CodecProvider[])new CodecProvider[]{nodeDocumentCodecProvider}), MongoClientSettings.getDefaultCodecRegistry()});
        this.dbCollection = mongoDatabase.withCodecRegistry(nodeDocumentCodecRegistry).getCollection(Collection.NODES.toString(), NodeDocument.class);
        this.maxBatchSizeBytes = maxBatchSizeBytes;
        this.maxBatchNumberOfDocuments = maxBatchNumberOfDocuments;
        this.mongoDocQueue = queue;
        this.pathFilters = pathFilters;
        this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS, 300);
        Preconditions.checkArgument((this.retryDuringSeconds > 0 ? 1 : 0) != 0, (Object)("Property oak.indexer.pipelined.mongoConnectionRetrySeconds must be > 0. Was: " + this.retryDuringSeconds));
        this.retryOnConnectionErrors = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, true);
        this.regexPathFiltering = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, false);
        this.readPreference = ReadPreference.secondaryPreferred();
        LOG.info("maxBatchSizeBytes: {}, maxBatchNumberOfDocuments: {}, readPreference: {}", new Object[]{maxBatchSizeBytes, maxBatchNumberOfDocuments, this.readPreference.getName()});
    }

    @Override
    public Result call() throws Exception {
        String originalName = Thread.currentThread().getName();
        Thread.currentThread().setName(THREAD_NAME);
        LOG.info("[TASK:{}:START] Starting to download from MongoDB", (Object)THREAD_NAME.toUpperCase(Locale.ROOT));
        try {
            this.nextLastModified = 0L;
            this.lastIdDownloaded = null;
            this.downloadStartWatch.start();
            if (this.retryOnConnectionErrors) {
                this.downloadWithRetryOnConnectionErrors();
            } else {
                this.downloadWithNaturalOrdering();
            }
            long durationMillis = this.downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
            String enqueueingDelayPercentage = PipelinedUtils.formatAsPercentage(this.totalEnqueueWaitTimeMillis, durationMillis);
            String metrics = MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds((Stopwatch)this.downloadStartWatch)).add("durationSeconds", durationMillis / 1000L).add("documentsDownloaded", this.documentsRead).add("enqueueingDelayMillis", this.totalEnqueueWaitTimeMillis).add("enqueueingDelayPercentage", enqueueingDelayPercentage).build();
            MetricsUtils.setCounterOnce((StatisticsProvider)this.statisticsProvider, (String)"oak_indexer_pipelined_mongoDownloadEnqueueDelayPercentage", (long)PipelinedUtils.toPercentage(this.totalEnqueueWaitTimeMillis, durationMillis));
            LOG.info("[TASK:{}:END] Metrics: {}", (Object)THREAD_NAME.toUpperCase(Locale.ROOT), (Object)metrics);
            Result result = new Result(this.documentsRead);
            return result;
        }
        catch (InterruptedException t) {
            LOG.warn("Thread interrupted", (Throwable)t);
            throw t;
        }
        catch (Throwable t) {
            LOG.warn("Thread terminating with exception.", t);
            throw t;
        }
        finally {
            Thread.currentThread().setName(originalName);
        }
    }

    private void reportProgress(String id) {
        if (this.documentsRead % 10000L == 0L) {
            double rate = (double)this.documentsRead / (double)this.downloadStartWatch.elapsed(TimeUnit.SECONDS);
            String formattedRate = String.format(Locale.ROOT, "%1.2f nodes/s, %1.2f nodes/hr", rate, rate * 3600.0);
            LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})", new Object[]{this.documentsRead, id, formattedRate, FormattingUtils.formatToSeconds((Stopwatch)this.downloadStartWatch)});
        }
        this.traversalLog.trace(id);
    }

    private void downloadWithRetryOnConnectionErrors() throws InterruptedException, TimeoutException {
        Bson childrenFilter;
        String regexBasePath = this.getPathForRegexFiltering();
        if (regexBasePath == null) {
            childrenFilter = null;
        } else {
            this.downloadAncestors(regexBasePath);
            childrenFilter = PipelinedMongoDownloadTask.descendantsFilter(regexBasePath);
        }
        Instant failuresStartTimestamp = null;
        long retryIntervalMs = 100L;
        int numberOfFailures = 0;
        boolean downloadCompleted = false;
        HashMap<String, Integer> exceptions = new HashMap<String, Integer>();
        this.nextLastModified = 0L;
        this.lastIdDownloaded = null;
        while (!downloadCompleted) {
            try {
                if (this.lastIdDownloaded != null) {
                    LOG.info("Recovering from broken connection, finishing downloading documents with _modified={}", (Object)this.nextLastModified);
                    this.downloadRange(new DownloadRange(this.nextLastModified, this.nextLastModified + 1L, this.lastIdDownloaded), childrenFilter);
                    failuresStartTimestamp = null;
                    numberOfFailures = 0;
                    this.downloadRange(new DownloadRange(this.nextLastModified + 1L, Long.MAX_VALUE, null), childrenFilter);
                } else {
                    this.downloadRange(new DownloadRange(this.nextLastModified, Long.MAX_VALUE, null), childrenFilter);
                }
                downloadCompleted = true;
            }
            catch (MongoException e) {
                if (e instanceof MongoInterruptedException || e instanceof MongoIncompatibleDriverException) {
                    throw e;
                }
                if (failuresStartTimestamp == null) {
                    failuresStartTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS);
                }
                LOG.warn("Connection error downloading from MongoDB.", (Throwable)e);
                long secondsSinceStartOfFailures = Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
                if (secondsSinceStartOfFailures > (long)this.retryDuringSeconds) {
                    StringBuilder summary = new StringBuilder();
                    for (Map.Entry entry : exceptions.entrySet()) {
                        summary.append("\n\t").append(entry.getValue()).append("x: ").append((String)entry.getKey());
                    }
                    throw new RetryException(this.retryDuringSeconds, summary.toString(), e);
                }
                LOG.warn("Retrying download in {} ms; number of times failed: {}; current series of failures started at: {} ({} seconds ago)", new Object[]{retryIntervalMs, ++numberOfFailures, failuresStartTimestamp, secondsSinceStartOfFailures});
                exceptions.compute(((Object)((Object)e)).getClass().getSimpleName() + " - " + e.getMessage(), (key, val) -> val == null ? 1 : val + 1);
                Thread.sleep(retryIntervalMs);
                retryIntervalMs = Math.min(10000L, retryIntervalMs * 2L);
            }
        }
    }

    private void downloadRange(DownloadRange range, Bson filter) throws InterruptedException, TimeoutException {
        BsonDocument findQuery = range.getFindQuery();
        if (filter != null) {
            findQuery = Filters.and((Bson[])new Bson[]{findQuery, filter});
        }
        LOG.info("Traversing: {}. Query: {}", (Object)range, (Object)findQuery);
        FindIterable mongoIterable = this.dbCollection.withReadPreference(this.readPreference).find((Bson)findQuery).sort(Sorts.ascending((String[])new String[]{"_modified", "_id"}));
        this.download((FindIterable<NodeDocument>)mongoIterable);
    }

    private void downloadAncestors(String basePath) throws InterruptedException, TimeoutException {
        Bson ancestorQuery = PipelinedMongoDownloadTask.ancestorsFilter(basePath);
        LOG.info("Downloading using regex path filtering. Base path: {}, Ancestors query: {}.", (Object)basePath, (Object)ancestorQuery);
        FindIterable ancestorsIterable = this.dbCollection.withReadPreference(this.readPreference).find(ancestorQuery).hint((Bson)ID_INDEX_HINT);
        this.download((FindIterable<NodeDocument>)ancestorsIterable);
    }

    private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutException {
        String regexBasePath = this.getPathForRegexFiltering();
        if (regexBasePath == null) {
            LOG.info("Downloading full repository using natural order");
            FindIterable mongoIterable = this.dbCollection.withReadPreference(this.readPreference).find().hint((Bson)NATURAL_HINT);
            this.download((FindIterable<NodeDocument>)mongoIterable);
        } else {
            this.downloadAncestors(regexBasePath);
            Bson childrenQuery = PipelinedMongoDownloadTask.descendantsFilter(regexBasePath);
            LOG.info("Downloading using regex path filtering. Downloading children: {}.", (Object)childrenQuery);
            FindIterable childrenIterable = this.dbCollection.withReadPreference(this.readPreference).find(childrenQuery).hint((Bson)NATURAL_HINT);
            this.download((FindIterable<NodeDocument>)childrenIterable);
        }
    }

    private String getPathForRegexFiltering() {
        if (!this.regexPathFiltering) {
            LOG.info("Regex path filtering disabled.");
            return null;
        }
        return PipelinedMongoDownloadTask.getSingleIncludedPath(this.pathFilters);
    }

    static String getSingleIncludedPath(List<PathFilter> pathFilters) {
        LOG.info("Creating regex filter from pathFilters: " + pathFilters);
        if (pathFilters == null) {
            return null;
        }
        Set includedPaths = pathFilters.stream().flatMap(pathFilter -> pathFilter.getIncludedPaths().stream()).collect(Collectors.toSet());
        Set excludedPaths = pathFilters.stream().flatMap(pathFilter -> pathFilter.getExcludedPaths().stream()).collect(Collectors.toSet());
        if (excludedPaths.isEmpty() && includedPaths.size() == 1) {
            return (String)includedPaths.stream().iterator().next();
        }
        return null;
    }

    private static Bson descendantsFilter(String path) {
        if (!((String)path).endsWith("/")) {
            path = (String)path + "/";
        }
        String quotedPath = Pattern.quote((String)path);
        return Filters.or((Bson[])new Bson[]{Filters.regex((String)"_id", (Pattern)Pattern.compile("^[0-9]{1,3}:" + quotedPath + ".*$")), Filters.regex((String)"_path", (Pattern)Pattern.compile(quotedPath + ".*$"))});
    }

    private static Bson ancestorsFilter(String path) {
        ArrayList<Bson> parentFilters = new ArrayList<Bson>();
        String currentPath = path;
        while (true) {
            String currentId = Utils.getIdFromPath((String)currentPath);
            parentFilters.add(Filters.eq((String)"_id", (Object)currentId));
            if (PathUtils.denotesRoot((String)currentPath)) break;
            currentPath = PathUtils.getParentPath((String)currentPath);
        }
        return Filters.or(parentFilters);
    }

    private void download(FindIterable<NodeDocument> mongoIterable) throws InterruptedException, TimeoutException {
        try (MongoCursor cursor = mongoIterable.iterator();){
            NodeDocument[] batch = new NodeDocument[this.maxBatchNumberOfDocuments];
            int nextIndex = 0;
            int batchSize = 0;
            try {
                while (cursor.hasNext()) {
                    NodeDocument next = (NodeDocument)cursor.next();
                    String id = next.getId();
                    if (this.retryOnConnectionErrors) {
                        this.nextLastModified = next.getModified();
                    }
                    this.lastIdDownloaded = id;
                    ++this.documentsRead;
                    this.reportProgress(id);
                    batch[nextIndex] = next;
                    int docSize = (Integer)next.remove("__ESTIMATED_SIZE__");
                    if ((batchSize += docSize) < this.maxBatchSizeBytes && ++nextIndex != batch.length) continue;
                    LOG.trace("Enqueuing block with {} elements, estimated size: {} bytes", (Object)nextIndex, (Object)batchSize);
                    this.tryEnqueueCopy(batch, nextIndex);
                    nextIndex = 0;
                    batchSize = 0;
                }
                if (nextIndex > 0) {
                    LOG.info("Enqueueing last block with {} elements, estimated size: {}", (Object)nextIndex, (Object)IOUtils.humanReadableByteCountBin((long)batchSize));
                    this.tryEnqueueCopy(batch, nextIndex);
                }
            }
            catch (MongoException e) {
                if (e instanceof MongoInterruptedException || e instanceof MongoIncompatibleDriverException) {
                    throw e;
                }
                if (nextIndex > 0) {
                    LOG.info("Connection interrupted with recoverable failure. Enqueueing partial block with {} elements, estimated size: {}", (Object)nextIndex, (Object)IOUtils.humanReadableByteCountBin((long)batchSize));
                    this.tryEnqueueCopy(batch, nextIndex);
                }
                throw e;
            }
        }
    }

    private void tryEnqueueCopy(NodeDocument[] batch, int nextIndex) throws TimeoutException, InterruptedException {
        NodeDocument[] copyOfBatch = Arrays.copyOfRange(batch, 0, nextIndex);
        Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
        if (!this.mongoDocQueue.offer(copyOfBatch, MONGO_QUEUE_OFFER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException("Timeout trying to enqueue batch of MongoDB documents. Waited " + MONGO_QUEUE_OFFER_TIMEOUT);
        }
        long enqueueDelay = enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.totalEnqueueWaitTimeMillis += enqueueDelay;
        if (enqueueDelay > 1L) {
            this.logWithRateLimit(() -> LOG.info("Enqueuing of Mongo document batch was delayed, took {} ms. mongoDocQueue size {}. Consider increasing the number of Transform threads. (This message is logged at most once every {} seconds)", new Object[]{enqueueDelay, this.mongoDocQueue.size(), 10}));
        }
    }

    private void logWithRateLimit(Runnable f) {
        Instant now = Instant.now();
        if (Duration.between(this.lastDelayedEnqueueWarningMessageLoggedTimestamp, now).toSeconds() > 10L) {
            f.run();
            this.lastDelayedEnqueueWarningMessageLoggedTimestamp = now;
        }
    }

    private static class RetryException
    extends RuntimeException {
        private final int retrialDurationSeconds;

        public RetryException(int retrialDurationSeconds, String message, Throwable cause) {
            super(message, cause);
            this.retrialDurationSeconds = retrialDurationSeconds;
        }

        @Override
        public String toString() {
            return "Tried for " + this.retrialDurationSeconds + " seconds: \n" + super.toString();
        }
    }

    public static class Result {
        private final long documentsDownloaded;

        public Result(long documentsDownloaded) {
            this.documentsDownloaded = documentsDownloaded;
        }

        public long getDocumentsDownloaded() {
            return this.documentsDownloaded;
        }
    }
}

