/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosClientException;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.query.IDocumentQueryClient;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.query.TriFunction;
import com.azure.cosmos.implementation.query.metrics.ClientSideMetrics;
import com.azure.cosmos.implementation.query.metrics.FetchExecutionRangeAccumulator;
import com.azure.cosmos.implementation.query.metrics.SchedulingStopwatch;
import com.azure.cosmos.implementation.query.metrics.SchedulingTimeSpan;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class DocumentProducer<T extends Resource> {
    private static final Logger logger = LoggerFactory.getLogger(DocumentProducer.class);
    private int retries;
    protected final IDocumentQueryClient client;
    protected final String collectionRid;
    protected final FeedOptions feedOptions;
    protected final Class<T> resourceType;
    protected final PartitionKeyRange targetRange;
    protected final String collectionLink;
    protected final TriFunction<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc;
    protected final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeRequestFuncWithRetries;
    protected final Callable<DocumentClientRetryPolicy> createRetryPolicyFunc;
    protected final int pageSize;
    protected final UUID correlatedActivityId;
    public int top;
    private volatile String lastResponseContinuationToken;
    private final SchedulingStopwatch fetchSchedulingMetrics;
    private SchedulingStopwatch moveNextSchedulingMetrics;
    private final FetchExecutionRangeAccumulator fetchExecutionRangeAccumulator;

    public DocumentProducer(IDocumentQueryClient client, String collectionResourceId, FeedOptions feedOptions, TriFunction<PartitionKeyRange, String, Integer, RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeRequestFunc, PartitionKeyRange targetRange, String collectionLink, Callable<DocumentClientRetryPolicy> createRetryPolicyFunc, Class<T> resourceType, UUID correlatedActivityId, int initialPageSize, String initialContinuationToken, int top) {
        this.client = client;
        this.collectionRid = collectionResourceId;
        this.createRequestFunc = createRequestFunc;
        this.fetchSchedulingMetrics = new SchedulingStopwatch();
        this.fetchSchedulingMetrics.ready();
        this.fetchExecutionRangeAccumulator = new FetchExecutionRangeAccumulator(targetRange.getId());
        this.executeRequestFuncWithRetries = request -> {
            this.retries = -1;
            this.fetchSchedulingMetrics.start();
            this.fetchExecutionRangeAccumulator.beginFetchRange();
            DocumentClientRetryPolicy retryPolicy = null;
            if (createRetryPolicyFunc != null) {
                try {
                    retryPolicy = (DocumentClientRetryPolicy)createRetryPolicyFunc.call();
                }
                catch (Exception e) {
                    return Mono.error((Throwable)e);
                }
                retryPolicy.onBeforeSendRequest((RxDocumentServiceRequest)request);
            }
            return ObservableHelper.inlineIfPossibleAsObs(() -> {
                ++this.retries;
                return (Mono)executeRequestFunc.apply((RxDocumentServiceRequest)request);
            }, retryPolicy);
        };
        this.correlatedActivityId = correlatedActivityId;
        this.feedOptions = feedOptions != null ? feedOptions : new FeedOptions();
        ModelBridgeInternal.setFeedOptionsContinuationToken(this.feedOptions, initialContinuationToken);
        this.lastResponseContinuationToken = initialContinuationToken;
        this.resourceType = resourceType;
        this.targetRange = targetRange;
        this.collectionLink = collectionLink;
        this.createRetryPolicyFunc = createRetryPolicyFunc;
        this.pageSize = initialPageSize;
        this.top = top;
    }

    public Flux<DocumentProducerFeedResponse> produceAsync() {
        BiFunction<String, Integer, RxDocumentServiceRequest> sourcePartitionCreateRequestFunc = (token, maxItemCount) -> this.createRequestFunc.apply(this.targetRange, (String)token, (Integer)maxItemCount);
        Flux obs = Paginator.getPaginatedQueryResultAsObservable(this.feedOptions.getRequestContinuation(), sourcePartitionCreateRequestFunc, this.executeRequestFuncWithRetries, this.resourceType, this.top, this.pageSize).map(rsp -> {
            this.lastResponseContinuationToken = rsp.getContinuationToken();
            this.fetchExecutionRangeAccumulator.endFetchRange(rsp.getActivityId(), rsp.getResults().size(), this.retries);
            this.fetchSchedulingMetrics.stop();
            return rsp;
        });
        return this.splitProof((Flux<DocumentProducerFeedResponse>)obs.map(x$0 -> new DocumentProducerFeedResponse(x$0)));
    }

    private Flux<DocumentProducerFeedResponse> splitProof(Flux<DocumentProducerFeedResponse> sourceFeedResponseObservable) {
        return sourceFeedResponseObservable.onErrorResume(t -> {
            CosmosClientException dce = Utils.as(t, CosmosClientException.class);
            if (dce == null || !this.isSplit(dce)) {
                logger.error("Unexpected failure", t);
                return Flux.error((Throwable)t);
            }
            logger.info("DocumentProducer handling a partition split in [{}], detail:[{}]", (Object)this.targetRange, (Object)dce);
            Mono<Utils.ValueHolder<List<PartitionKeyRange>>> replacementRangesObs = this.getReplacementRanges(this.targetRange.toRange());
            Flux replacementProducers = replacementRangesObs.flux().flatMap(partitionKeyRangesValueHolder -> {
                if (logger.isDebugEnabled()) {
                    logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions, last continuation token is [{}].", new Object[]{this.targetRange.toJson(), ((List)partitionKeyRangesValueHolder.v).stream().map(ModelBridgeInternal::toJsonFromJsonSerializable).collect(Collectors.joining(", ")), this.lastResponseContinuationToken});
                }
                return Flux.fromIterable(this.createReplacingDocumentProducersOnSplit((List)partitionKeyRangesValueHolder.v));
            });
            return this.produceOnSplit(replacementProducers);
        });
    }

    protected Flux<DocumentProducerFeedResponse> produceOnSplit(Flux<DocumentProducer<T>> replacingDocumentProducers) {
        return replacingDocumentProducers.flatMap(DocumentProducer::produceAsync, 1);
    }

    private List<DocumentProducer<T>> createReplacingDocumentProducersOnSplit(List<PartitionKeyRange> partitionKeyRanges) {
        ArrayList<DocumentProducer<T>> replacingDocumentProducers = new ArrayList<DocumentProducer<T>>(partitionKeyRanges.size());
        for (PartitionKeyRange pkr : partitionKeyRanges) {
            replacingDocumentProducers.add(this.createChildDocumentProducerOnSplit(pkr, this.lastResponseContinuationToken));
        }
        return replacingDocumentProducers;
    }

    protected DocumentProducer<T> createChildDocumentProducerOnSplit(PartitionKeyRange targetRange, String initialContinuationToken) {
        return new DocumentProducer<T>(this.client, this.collectionRid, this.feedOptions, this.createRequestFunc, this.executeRequestFuncWithRetries, targetRange, this.collectionLink, null, this.resourceType, this.correlatedActivityId, this.pageSize, initialContinuationToken, this.top);
    }

    private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> getReplacementRanges(Range<String> range) {
        return this.client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(null, this.collectionRid, range, true, this.feedOptions.getProperties());
    }

    private boolean isSplit(CosmosClientException e) {
        return Exceptions.isPartitionSplit(e);
    }

    class DocumentProducerFeedResponse {
        FeedResponse<T> pageResult;
        PartitionKeyRange sourcePartitionKeyRange;

        DocumentProducerFeedResponse(FeedResponse<T> pageResult) {
            this.pageResult = pageResult;
            this.sourcePartitionKeyRange = DocumentProducer.this.targetRange;
            this.populatePartitionedQueryMetrics();
        }

        DocumentProducerFeedResponse(FeedResponse<T> pageResult, PartitionKeyRange pkr) {
            this.pageResult = pageResult;
            this.sourcePartitionKeyRange = pkr;
            this.populatePartitionedQueryMetrics();
        }

        void populatePartitionedQueryMetrics() {
            String queryMetricsDelimitedString = this.pageResult.getResponseHeaders().get("x-ms-documentdb-query-metrics");
            if (!StringUtils.isEmpty(queryMetricsDelimitedString)) {
                queryMetricsDelimitedString = queryMetricsDelimitedString + String.format(";%s=%.2f", "requestCharge", this.pageResult.getRequestCharge());
                ImmutablePair<String, SchedulingTimeSpan> schedulingTimeSpanMap = new ImmutablePair<String, SchedulingTimeSpan>(DocumentProducer.this.targetRange.getId(), DocumentProducer.this.fetchSchedulingMetrics.getElapsedTime());
                QueryMetrics qm = BridgeInternal.createQueryMetricsFromDelimitedStringAndClientSideMetrics(queryMetricsDelimitedString, new ClientSideMetrics(DocumentProducer.this.retries, this.pageResult.getRequestCharge(), DocumentProducer.this.fetchExecutionRangeAccumulator.getExecutionRanges(), Arrays.asList(schedulingTimeSpanMap)), this.pageResult.getActivityId());
                BridgeInternal.putQueryMetricsIntoMap(this.pageResult, DocumentProducer.this.targetRange.getId(), qm);
            }
        }
    }
}

