/*
 * Decompiled with CFR 0.152.
 */
package com.azure.data.cosmos.internal.query;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.Resource;
import com.azure.data.cosmos.SqlQuerySpec;
import com.azure.data.cosmos.internal.IDocumentClientRetryPolicy;
import com.azure.data.cosmos.internal.PartitionKeyRange;
import com.azure.data.cosmos.internal.RequestChargeTracker;
import com.azure.data.cosmos.internal.ResourceType;
import com.azure.data.cosmos.internal.RxDocumentServiceRequest;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.query.CompositeContinuationToken;
import com.azure.data.cosmos.internal.query.DocumentProducer;
import com.azure.data.cosmos.internal.query.IDocumentQueryClient;
import com.azure.data.cosmos.internal.query.IDocumentQueryExecutionComponent;
import com.azure.data.cosmos.internal.query.ParallelDocumentQueryExecutionContextBase;
import com.azure.data.cosmos.internal.query.PartitionedQueryExecutionInfo;
import com.azure.data.cosmos.internal.query.TriFunction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class ParallelDocumentQueryExecutionContext<T extends Resource>
extends ParallelDocumentQueryExecutionContextBase<T> {
    private ParallelDocumentQueryExecutionContext(IDocumentQueryClient client, List<PartitionKeyRange> partitionKeyRanges, ResourceType resourceTypeEnum, Class<T> resourceType, SqlQuerySpec query, FeedOptions feedOptions, String resourceLink, String rewrittenQuery, String collectionRid, boolean isContinuationExpected, boolean getLazyFeedResponse, UUID correlatedActivityId) {
        super(client, partitionKeyRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink, rewrittenQuery, isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
    }

    public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(IDocumentQueryClient client, ResourceType resourceTypeEnum, Class<T> resourceType, SqlQuerySpec query, FeedOptions feedOptions, String resourceLink, String collectionRid, PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, List<PartitionKeyRange> targetRanges, int initialPageSize, boolean isContinuationExpected, boolean getLazyFeedResponse, UUID correlatedActivityId) {
        ParallelDocumentQueryExecutionContext<T> context = new ParallelDocumentQueryExecutionContext<T>(client, targetRanges, resourceTypeEnum, resourceType, query, feedOptions, resourceLink, partitionedQueryExecutionInfo.getQueryInfo().getRewrittenQuery(), collectionRid, isContinuationExpected, getLazyFeedResponse, correlatedActivityId);
        try {
            super.initialize(collectionRid, targetRanges, initialPageSize, feedOptions.requestContinuation());
            return Flux.just(context);
        }
        catch (CosmosClientException dce) {
            return Flux.error((Throwable)dce);
        }
    }

    private void initialize(String collectionRid, List<PartitionKeyRange> targetRanges, int initialPageSize, String continuationToken) throws CosmosClientException {
        HashMap<PartitionKeyRange, String> partitionKeyRangeToContinuationTokenMap = new HashMap<PartitionKeyRange, String>();
        if (continuationToken == null) {
            for (PartitionKeyRange targetRange : targetRanges) {
                partitionKeyRangeToContinuationTokenMap.put(targetRange, null);
            }
        } else {
            Utils.ValueHolder<CompositeContinuationToken> outCompositeContinuationToken = new Utils.ValueHolder<CompositeContinuationToken>();
            if (!CompositeContinuationToken.tryParse(continuationToken, outCompositeContinuationToken)) {
                String message = String.format("INVALID JSON in continuation token %s for Parallel~Context", continuationToken);
                throw BridgeInternal.createCosmosClientException(400, message);
            }
            CompositeContinuationToken compositeContinuationToken = (CompositeContinuationToken)outCompositeContinuationToken.v;
            List<PartitionKeyRange> filteredPartitionKeyRanges = this.getPartitionKeyRangesForContinuation(compositeContinuationToken, targetRanges);
            partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(0), compositeContinuationToken.getToken());
            for (int i = 1; i < filteredPartitionKeyRanges.size(); ++i) {
                partitionKeyRangeToContinuationTokenMap.put(filteredPartitionKeyRanges.get(i), null);
            }
        }
        super.initialize(collectionRid, partitionKeyRangeToContinuationTokenMap, initialPageSize, this.querySpec);
    }

    private List<PartitionKeyRange> getPartitionKeyRangesForContinuation(CompositeContinuationToken compositeContinuationToken, List<PartitionKeyRange> partitionKeyRanges) throws CosmosClientException {
        int startIndex = this.FindTargetRangeAndExtractContinuationTokens(partitionKeyRanges, compositeContinuationToken.getRange());
        ArrayList<PartitionKeyRange> rightHandSideRanges = new ArrayList<PartitionKeyRange>();
        for (int i = startIndex; i < partitionKeyRanges.size(); ++i) {
            rightHandSideRanges.add(partitionKeyRanges.get(i));
        }
        return rightHandSideRanges;
    }

    @Override
    public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
        List obs = this.documentProducers.stream().sorted(Comparator.comparing(dp -> dp.targetRange.getMinInclusive())).map(DocumentProducer::produceAsync).collect(Collectors.toList());
        return Flux.concat(obs).compose(new EmptyPagesFilterTransformer(new RequestChargeTracker()));
    }

    @Override
    public Flux<FeedResponse<T>> executeAsync() {
        return this.drainAsync(this.feedOptions.maxItemCount());
    }

    @Override
    protected DocumentProducer<T> createDocumentProducer(String collectionRid, PartitionKeyRange targetRange, String initialContinuationToken, int initialPageSize, FeedOptions feedOptions, SqlQuerySpec querySpecForInit, Map<String, String> commonRequestHeaders, TriFunction<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Flux<FeedResponse<T>>> executeFunc, Callable<IDocumentClientRetryPolicy> createRetryPolicyFunc) {
        return new DocumentProducer<T>(this.client, collectionRid, feedOptions, createRequestFunc, executeFunc, targetRange, collectionRid, () -> this.client.getResetSessionTokenRetryPolicy().getRequestPolicy(), this.resourceType, this.correlatedActivityId, initialPageSize, initialContinuationToken, this.top);
    }

    private static class EmptyPagesFilterTransformer<T extends Resource>
    implements Function<Flux<DocumentProducer.DocumentProducerFeedResponse>, Flux<FeedResponse<T>>> {
        private final RequestChargeTracker tracker;
        private DocumentProducer.DocumentProducerFeedResponse previousPage;

        public EmptyPagesFilterTransformer(RequestChargeTracker tracker) {
            if (tracker == null) {
                throw new IllegalArgumentException("Request Charge Tracker must not be null.");
            }
            this.tracker = tracker;
            this.previousPage = null;
        }

        private DocumentProducer.DocumentProducerFeedResponse plusCharge(DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse, double charge) {
            FeedResponse page = documentProducerFeedResponse.pageResult;
            HashMap<String, String> headers = new HashMap<String, String>(page.responseHeaders());
            double pageCharge = page.requestCharge();
            headers.put("x-ms-request-charge", String.valueOf(pageCharge += charge));
            FeedResponse newPage = BridgeInternal.createFeedResponseWithQueryMetrics(page.results(), headers, BridgeInternal.queryMetricsFromFeedResponse(page));
            documentProducerFeedResponse.pageResult = newPage;
            return documentProducerFeedResponse;
        }

        private DocumentProducer.DocumentProducerFeedResponse addCompositeContinuationToken(DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse, String compositeContinuationToken) {
            FeedResponse page = documentProducerFeedResponse.pageResult;
            HashMap<String, String> headers = new HashMap<String, String>(page.responseHeaders());
            headers.put("x-ms-continuation", compositeContinuationToken);
            FeedResponse newPage = BridgeInternal.createFeedResponseWithQueryMetrics(page.results(), headers, BridgeInternal.queryMetricsFromFeedResponse(page));
            documentProducerFeedResponse.pageResult = newPage;
            return documentProducerFeedResponse;
        }

        private static Map<String, String> headerResponse(double requestCharge) {
            return Utils.immutableMapOf("x-ms-request-charge", String.valueOf(requestCharge));
        }

        @Override
        public Flux<FeedResponse<T>> apply(Flux<DocumentProducer.DocumentProducerFeedResponse> source) {
            return source.filter(documentProducerFeedResponse -> {
                if (documentProducerFeedResponse.pageResult.results().isEmpty()) {
                    this.tracker.addCharge(documentProducerFeedResponse.pageResult.requestCharge());
                    return false;
                }
                return true;
            }).map(documentProducerFeedResponse -> {
                double charge = this.tracker.getAndResetCharge();
                if (charge > 0.0) {
                    return new Utils.ValueHolder<DocumentProducer.DocumentProducerFeedResponse>(this.plusCharge((DocumentProducer.DocumentProducerFeedResponse)documentProducerFeedResponse, charge));
                }
                return new Utils.ValueHolder<DocumentProducer.DocumentProducerFeedResponse>((DocumentProducer.DocumentProducerFeedResponse)documentProducerFeedResponse);
            }).concatWith((Publisher)Flux.just(new Utils.ValueHolder<Object>(null))).map(heldValue -> {
                DocumentProducer.DocumentProducerFeedResponse documentProducerFeedResponse = (DocumentProducer.DocumentProducerFeedResponse)heldValue.v;
                ImmutablePair previousCurrent = new ImmutablePair((Object)this.previousPage, (Object)documentProducerFeedResponse);
                this.previousPage = documentProducerFeedResponse;
                return previousCurrent;
            }).skip(1L).map(currentNext -> {
                CompositeContinuationToken compositeContinuationTokenDom;
                String compositeContinuationToken;
                DocumentProducer.DocumentProducerFeedResponse current = (DocumentProducer.DocumentProducerFeedResponse)currentNext.left;
                DocumentProducer.DocumentProducerFeedResponse next = (DocumentProducer.DocumentProducerFeedResponse)currentNext.right;
                String backendContinuationToken = current.pageResult.continuationToken();
                if (backendContinuationToken == null) {
                    if (next == null) {
                        compositeContinuationToken = null;
                    } else {
                        compositeContinuationTokenDom = new CompositeContinuationToken(null, next.sourcePartitionKeyRange.toRange());
                        compositeContinuationToken = compositeContinuationTokenDom.toJson();
                    }
                } else {
                    compositeContinuationTokenDom = new CompositeContinuationToken(backendContinuationToken, current.sourcePartitionKeyRange.toRange());
                    compositeContinuationToken = compositeContinuationTokenDom.toJson();
                }
                DocumentProducer.DocumentProducerFeedResponse page = current;
                page = this.addCompositeContinuationToken(page, compositeContinuationToken);
                return page;
            }).map(documentProducerFeedResponse -> documentProducerFeedResponse.pageResult).switchIfEmpty((Publisher)Flux.defer(() -> Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(), EmptyPagesFilterTransformer.headerResponse(this.tracker.getAndResetCharge())))));
        }
    }
}

