/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.query.DocumentProducer;
import com.azure.cosmos.implementation.query.IDocumentQueryClient;
import com.azure.cosmos.implementation.query.IDocumentQueryExecutionComponent;
import com.azure.cosmos.implementation.query.ParallelDocumentQueryExecutionContextBase;
import com.azure.cosmos.implementation.query.PartitionMapper;
import com.azure.cosmos.implementation.query.PipelinedDocumentQueryParams;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.query.TriFunction;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;

public class ParallelDocumentQueryExecutionContext<T>
extends ParallelDocumentQueryExecutionContextBase<T> {
    private static final Logger logger = LoggerFactory.getLogger(ParallelDocumentQueryExecutionContext.class);
    private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
    private final Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationTokenMap;

    private ParallelDocumentQueryExecutionContext(DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient client, ResourceType resourceTypeEnum, Class<T> resourceType, SqlQuerySpec query, CosmosQueryRequestOptions cosmosQueryRequestOptions, String resourceLink, String rewrittenQuery, UUID correlatedActivityId, boolean shouldUnwrapSelectValue) {
        super(diagnosticsClientContext, client, resourceTypeEnum, resourceType, query, cosmosQueryRequestOptions, resourceLink, rewrittenQuery, correlatedActivityId, shouldUnwrapSelectValue);
        this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
        this.partitionKeyRangeToContinuationTokenMap = new HashMap<FeedRangeEpkImpl, String>();
    }

    public static <T> Flux<IDocumentQueryExecutionComponent<T>> createAsync(DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient client, PipelinedDocumentQueryParams<T> initParams, DocumentCollection collection) {
        QueryInfo queryInfo = initParams.getQueryInfo();
        ParallelDocumentQueryExecutionContext<T> context = new ParallelDocumentQueryExecutionContext<T>(diagnosticsClientContext, client, initParams.getResourceTypeEnum(), initParams.getResourceType(), initParams.getQuery(), initParams.getCosmosQueryRequestOptions(), initParams.getResourceLink(), queryInfo.getRewrittenQuery(), initParams.getCorrelatedActivityId(), queryInfo.hasSelectValue() && !queryInfo.hasOrderBy() && !queryInfo.hasAggregates() && !queryInfo.hasGroupBy() && !queryInfo.hasDCount() && !queryInfo.hasDistinct());
        context.setTop(initParams.getTop());
        try {
            super.initialize(collection, initParams.getFeedRanges(), initParams.getInitialPageSize(), ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(initParams.getCosmosQueryRequestOptions()));
            return Flux.just(context);
        }
        catch (CosmosException dce) {
            return Flux.error((Throwable)((Object)dce));
        }
    }

    public static <T> Flux<IDocumentQueryExecutionComponent<T>> createReadManyQueryAsync(DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient queryClient, SqlQuerySpec sqlQuery, Map<PartitionKeyRange, SqlQuerySpec> rangeQueryMap, CosmosQueryRequestOptions cosmosQueryRequestOptions, String collectionRid, String collectionLink, UUID activityId, Class<T> klass, ResourceType resourceTypeEnum) {
        ParallelDocumentQueryExecutionContext<T> context = new ParallelDocumentQueryExecutionContext<T>(diagnosticsClientContext, queryClient, resourceTypeEnum, klass, sqlQuery, cosmosQueryRequestOptions, collectionLink, sqlQuery.getQueryText(), activityId, false);
        context.initializeReadMany(rangeQueryMap, cosmosQueryRequestOptions, collectionRid);
        return Flux.just(context);
    }

    private void initialize(DocumentCollection collection, List<FeedRangeEpkImpl> feedRanges, int initialPageSize, String continuationToken) {
        if (continuationToken == null) {
            for (FeedRangeEpkImpl feedRangeEpk : feedRanges) {
                this.partitionKeyRangeToContinuationTokenMap.put(feedRangeEpk, null);
            }
        } else {
            Utils.ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new Utils.ValueHolder<CompositeContinuationToken>();
            if (!CompositeContinuationToken.tryParse(continuationToken, outCompositeContinuationToken)) {
                String message = String.format("INVALID JSON in continuation token %s for Parallel~Context", continuationToken);
                throw BridgeInternal.createCosmosException(400, message);
            }
            CompositeContinuationToken compositeContinuationToken = (CompositeContinuationToken)outCompositeContinuationToken.v;
            PartitionMapper.PartitionMapping<CompositeContinuationToken> partitionMapping = PartitionMapper.getPartitionMapping(feedRanges, Collections.singletonList(compositeContinuationToken));
            this.populatePartitionToContinuationMap(partitionMapping.getTargetMapping());
            this.populatePartitionToContinuationMap(partitionMapping.getMappingRightOfTarget());
        }
        super.initialize(collection, this.partitionKeyRangeToContinuationTokenMap, initialPageSize, this.querySpec);
    }

    private void populatePartitionToContinuationMap(Map<FeedRangeEpkImpl, CompositeContinuationToken> partitionMapping) {
        for (Map.Entry<FeedRangeEpkImpl, CompositeContinuationToken> entry : partitionMapping.entrySet()) {
            if (entry.getValue() != null) {
                this.partitionKeyRangeToContinuationTokenMap.put(entry.getKey(), entry.getValue().getToken());
                continue;
            }
            this.partitionKeyRangeToContinuationTokenMap.put(entry.getKey(), null);
        }
    }

    static void logEmptyPageDiagnostics(CosmosDiagnostics cosmosDiagnostics, UUID correlatedActivityId, String activityId, Supplier<String> operationContextTextProvider) {
        List<ClientSideRequestStatistics> requestStatistics = BridgeInternal.getClientSideRequestStatisticsList(cosmosDiagnostics);
        try {
            if (logger.isInfoEnabled()) {
                logger.info("Empty page request diagnostics for correlatedActivityId [{}] - activityId [{}] - [{}], Context: {}", new Object[]{correlatedActivityId, activityId, Utils.getSimpleObjectMapper().writeValueAsString(requestStatistics), operationContextTextProvider.get()});
            }
        }
        catch (JsonProcessingException e) {
            logger.warn("Failed to log empty page diagnostics. Context: {}", (Object)operationContextTextProvider.get(), (Object)e);
        }
    }

    @Override
    public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
        List obs = this.documentProducers.stream().sorted(Comparator.comparing(dp -> dp.feedRange.getRange().getMin())).map(DocumentProducer::produceAsync).collect(Collectors.toList());
        int fluxConcurrency = this.fluxSequentialMergeConcurrency(this.cosmosQueryRequestOptions, obs.size());
        int fluxPrefetch = this.fluxSequentialMergePrefetch(this.cosmosQueryRequestOptions, obs.size(), maxPageSize, fluxConcurrency);
        if (logger.isDebugEnabled()) {
            logger.debug("ParallelQuery: flux mergeSequential concurrency {}, prefetch {}, Context: {}", new Object[]{fluxConcurrency, fluxPrefetch, this.getOperationContextTextProvider().get()});
        }
        return Flux.mergeSequential(obs, (int)fluxConcurrency, (int)fluxPrefetch).transformDeferred(new EmptyPagesFilterTransformer(new RequestChargeTracker(), this.cosmosQueryRequestOptions, this.correlatedActivityId, this.getOperationContextTextProvider()));
    }

    @Override
    public Flux<FeedResponse<T>> executeAsync() {
        return this.drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(this.cosmosQueryRequestOptions));
    }

    @Override
    protected DocumentProducer<T> createDocumentProducer(String collectionRid, String initialContinuationToken, int initialPageSize, CosmosQueryRequestOptions cosmosQueryRequestOptions, SqlQuerySpec querySpecForInit, Map<String, String> commonRequestHeaders, TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc, Callable<DocumentClientRetryPolicy> createRetryPolicyFunc, FeedRangeEpkImpl feedRange) {
        return new DocumentProducer<T>(this.client, collectionRid, cosmosQueryRequestOptions, createRequestFunc, executeFunc, collectionRid, createRetryPolicyFunc, this.resourceType, this.correlatedActivityId, initialPageSize, initialContinuationToken, this.top, feedRange, this.getOperationContextTextProvider());
    }

    private int fluxSequentialMergeConcurrency(CosmosQueryRequestOptions options, int numberOfPartitions) {
        int parallelism = options.getMaxDegreeOfParallelism();
        if (parallelism < 0) {
            parallelism = Configs.getCPUCnt();
        } else if (parallelism == 0) {
            parallelism = 1;
        }
        return Math.min(numberOfPartitions, parallelism);
    }

    private int fluxSequentialMergePrefetch(CosmosQueryRequestOptions options, int numberOfPartitions, int pageSize, int fluxConcurrency) {
        int maxBufferedItemCount = options.getMaxBufferedItemCount();
        if (maxBufferedItemCount <= 0) {
            maxBufferedItemCount = Math.min(Configs.getCPUCnt() * numberOfPartitions * pageSize, 100000);
        }
        int fluxPrefetch = Math.max(maxBufferedItemCount / Math.max(fluxConcurrency * pageSize, 1), 1);
        return Math.min(fluxPrefetch, Queues.XS_BUFFER_SIZE);
    }

    private static class EmptyPagesFilterTransformer<T>
    implements Function<Flux<DocumentProducer.DocumentProducerFeedResponse>, Flux<FeedResponse<T>>> {
        private final RequestChargeTracker tracker;
        private DocumentProducer.DocumentProducerFeedResponse previousPage;
        private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
        private final UUID correlatedActivityId;
        private final ConcurrentMap<String, QueryMetrics> emptyPageQueryMetricsMap = new ConcurrentHashMap<String, QueryMetrics>();
        private CosmosDiagnostics cosmosDiagnostics;
        private final Supplier<String> operationContextTextProvider;

        public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequestOptions options, UUID correlatedActivityId, Supplier<String> operationContextTextProvider) {
            if (tracker == null) {
                throw new IllegalArgumentException("Request Charge Tracker must not be null.");
            }
            if (operationContextTextProvider == null) {
                throw new IllegalArgumentException("Parameter 'operationContextTextProvider' must not be null.");
            }
            this.tracker = tracker;
            this.previousPage = null;
            this.cosmosQueryRequestOptions = options;
            this.correlatedActivityId = correlatedActivityId;
            this.operationContextTextProvider = operationContextTextProvider;
        }

        private DocumentProducer.DocumentProducerFeedResponse plusCharge(DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse, double charge) {
            FeedResponse page = documentProducerFeedResponse.pageResult;
            HashMap<String, String> headers = new HashMap<String, String>(page.getResponseHeaders());
            double pageCharge = page.getRequestCharge();
            headers.put("x-ms-request-charge", String.valueOf(pageCharge += charge));
            documentProducerFeedResponse.pageResult = BridgeInternal.createFeedResponseWithQueryMetrics(page.getResults(), headers, BridgeInternal.queryMetricsFromFeedResponse(page), ModelBridgeInternal.getQueryPlanDiagnosticsContext(page), false, false, page.getCosmosDiagnostics());
            return documentProducerFeedResponse;
        }

        private DocumentProducer.DocumentProducerFeedResponse addCompositeContinuationToken(DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse, String compositeContinuationToken) {
            FeedResponse page = documentProducerFeedResponse.pageResult;
            HashMap<String, String> headers = new HashMap<String, String>(page.getResponseHeaders());
            headers.put("x-ms-continuation", compositeContinuationToken);
            documentProducerFeedResponse.pageResult = BridgeInternal.createFeedResponseWithQueryMetrics(page.getResults(), headers, BridgeInternal.queryMetricsFromFeedResponse(page), ModelBridgeInternal.getQueryPlanDiagnosticsContext(page), false, false, page.getCosmosDiagnostics());
            return documentProducerFeedResponse;
        }

        private static Map<String, String> headerResponse(double requestCharge) {
            return Utils.immutableMapOf("x-ms-request-charge", String.valueOf(requestCharge));
        }

        @Override
        public Flux<FeedResponse<T>> apply(Flux<DocumentProducer.DocumentProducerFeedResponse> source) {
            return source.filter(documentProducerFeedResponse -> {
                if (documentProducerFeedResponse.pageResult.getResults().isEmpty() && !ModelBridgeInternal.getEmptyPagesAllowedFromQueryRequestOptions(this.cosmosQueryRequestOptions)) {
                    this.tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
                    ConcurrentMap<String, QueryMetrics> currentQueryMetrics = BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult);
                    QueryMetrics.mergeQueryMetricsMap(this.emptyPageQueryMetricsMap, currentQueryMetrics);
                    this.cosmosDiagnostics = documentProducerFeedResponse.pageResult.getCosmosDiagnostics();
                    if (ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor().isEmptyPageDiagnosticsEnabled(this.cosmosQueryRequestOptions)) {
                        ParallelDocumentQueryExecutionContext.logEmptyPageDiagnostics(this.cosmosDiagnostics, this.correlatedActivityId, documentProducerFeedResponse.pageResult.getActivityId(), this.operationContextTextProvider);
                    }
                    return false;
                }
                return true;
            }).map(documentProducerFeedResponse -> {
                double charge;
                if (!this.emptyPageQueryMetricsMap.isEmpty()) {
                    ConcurrentMap<String, QueryMetrics> currentQueryMetrics = BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult);
                    QueryMetrics.mergeQueryMetricsMap(currentQueryMetrics, this.emptyPageQueryMetricsMap);
                    this.emptyPageQueryMetricsMap.clear();
                }
                if ((charge = this.tracker.getAndResetCharge()) > 0.0) {
                    return new Utils.ValueHolder<DocumentProducer.DocumentProducerFeedResponse>(this.plusCharge((DocumentProducer.DocumentProducerFeedResponse)documentProducerFeedResponse, charge));
                }
                return new Utils.ValueHolder<DocumentProducer.DocumentProducerFeedResponse>((DocumentProducer.DocumentProducerFeedResponse)documentProducerFeedResponse);
            }).concatWith((Publisher)Flux.just(new Utils.ValueHolder<Object>(null))).map(heldValue -> {
                DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse = (DocumentProducer.DocumentProducerFeedResponse)heldValue.v;
                ImmutablePair<DocumentProducer.DocumentProducerFeedResponse, DocumentProducer.DocumentProducerFeedResponse> previousCurrent = new ImmutablePair<DocumentProducer.DocumentProducerFeedResponse, DocumentProducer.DocumentProducerFeedResponse>(this.previousPage, documentProducerFeedResponse);
                this.previousPage = documentProducerFeedResponse;
                return previousCurrent;
            }).skip(1L).map(currentNext -> {
                CompositeContinuationToken compositeContinuationTokenDom;
                String compositeContinuationToken;
                DocumentProducer.DocumentProducerFeedResponse current = (DocumentProducer.DocumentProducerFeedResponse)currentNext.left;
                DocumentProducer.DocumentProducerFeedResponse next = (DocumentProducer.DocumentProducerFeedResponse)currentNext.right;
                String backendContinuationToken = current.pageResult.getContinuationToken();
                if (backendContinuationToken == null) {
                    if (next == null) {
                        compositeContinuationToken = null;
                    } else {
                        compositeContinuationTokenDom = new CompositeContinuationToken(null, next.sourceFeedRange.getRange());
                        compositeContinuationToken = compositeContinuationTokenDom.toJson();
                    }
                } else {
                    compositeContinuationTokenDom = new CompositeContinuationToken(backendContinuationToken, current.sourceFeedRange.getRange());
                    compositeContinuationToken = compositeContinuationTokenDom.toJson();
                }
                DocumentProducer.DocumentProducerFeedResponse page = current;
                page = this.addCompositeContinuationToken(page, compositeContinuationToken);
                return page;
            }).map(documentProducerFeedResponse -> documentProducerFeedResponse.pageResult).switchIfEmpty((Publisher)Flux.defer(() -> Flux.just(BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(), EmptyPagesFilterTransformer.headerResponse(this.tracker.getAndResetCharge()), this.emptyPageQueryMetricsMap, null, false, false, this.cosmosDiagnostics))));
        }
    }
}

