/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InvalidPartitionExceptionRetryPolicy;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneRetryPolicy;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Fetcher;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

class ChangeFeedFetcher<T>
extends Fetcher<T> {
    private final ChangeFeedState changeFeedState;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final DocumentClientRetryPolicy feedRangeContinuationSplitRetryPolicy;

    public ChangeFeedFetcher(RxDocumentClientImpl client, Supplier<RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc, ChangeFeedState changeFeedState, Map<String, Object> requestOptionProperties, int top, int maxItemCount, boolean isSplitHandlingDisabled, OperationContextAndListenerTuple operationContext) {
        super(executeFunc, true, top, maxItemCount, operationContext);
        Preconditions.checkNotNull(client, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null.");
        Preconditions.checkNotNull(changeFeedState, "Argument 'changeFeedState' must not be null.");
        this.changeFeedState = changeFeedState;
        if (isSplitHandlingDisabled) {
            this.feedRangeContinuationSplitRetryPolicy = null;
            this.createRequestFunc = createRequestFunc;
        } else {
            DocumentClientRetryPolicy retryPolicyInstance = client.getResetSessionTokenRetryPolicy().getRequestPolicy();
            String collectionLink = PathsHelper.generatePath(ResourceType.DocumentCollection, changeFeedState.getContainerRid(), false);
            retryPolicyInstance = new InvalidPartitionExceptionRetryPolicy(client.getCollectionCache(), retryPolicyInstance, collectionLink, requestOptionProperties);
            retryPolicyInstance = new PartitionKeyRangeGoneRetryPolicy(client, client.getCollectionCache(), client.getPartitionKeyRangeCache(), collectionLink, retryPolicyInstance, requestOptionProperties);
            this.feedRangeContinuationSplitRetryPolicy = new FeedRangeContinuationSplitRetryPolicy(client, this.changeFeedState, retryPolicyInstance, requestOptionProperties, retryPolicyInstance.getRetryContext(), () -> this.getOperationContextText());
            this.createRequestFunc = () -> {
                RxDocumentServiceRequest request = (RxDocumentServiceRequest)createRequestFunc.get();
                this.feedRangeContinuationSplitRetryPolicy.onBeforeSendRequest(request);
                return request;
            };
        }
    }

    @Override
    public Mono<FeedResponse<T>> nextPage() {
        if (this.feedRangeContinuationSplitRetryPolicy == null) {
            return this.nextPageInternal();
        }
        return ObservableHelper.inlineIfPossible(this::nextPageInternal, this.feedRangeContinuationSplitRetryPolicy);
    }

    private Mono<FeedResponse<T>> nextPageInternal() {
        return Mono.fromSupplier(this::nextPageCore).flatMap(Function.identity()).flatMap(r -> {
            FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation();
            if (continuationSnapshot != null && continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {
                this.reenableShouldFetchMoreForRetry();
                return Mono.empty();
            }
            return Mono.just((Object)r);
        }).repeatWhenEmpty(o -> o);
    }

    @Override
    protected String applyServerResponseContinuation(String serverContinuationToken, RxDocumentServiceRequest request) {
        return this.changeFeedState.applyServerResponseContinuation(serverContinuationToken, request);
    }

    @Override
    protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response) {
        if (ModelBridgeInternal.noChanges(response)) {
            return true;
        }
        FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
        return continuation != null && continuation.isDone();
    }

    @Override
    protected String getContinuationForLogging() {
        return this.changeFeedState.toJson();
    }

    @Override
    protected RxDocumentServiceRequest createRequest(int maxItemCount) {
        RxDocumentServiceRequest request = this.createRequestFunc.get();
        this.changeFeedState.populateRequest(request, maxItemCount);
        return request;
    }

    private static final class FeedRangeContinuationSplitRetryPolicy
    extends DocumentClientRetryPolicy {
        private static final Logger LOGGER = LoggerFactory.getLogger(FeedRangeContinuationSplitRetryPolicy.class);
        private final ChangeFeedState state;
        private final RxDocumentClientImpl client;
        private final DocumentClientRetryPolicy nextRetryPolicy;
        private final Map<String, Object> requestOptionProperties;
        private MetadataDiagnosticsContext diagnosticsContext;
        private final RetryContext retryContext;
        private final Supplier<String> operationContextTextProvider;

        public FeedRangeContinuationSplitRetryPolicy(RxDocumentClientImpl client, ChangeFeedState state, DocumentClientRetryPolicy nextRetryPolicy, Map<String, Object> requestOptionProperties, RetryContext retryContext, Supplier<String> operationContextTextProvider) {
            Preconditions.checkNotNull(operationContextTextProvider, "Argument 'operationContextTextProvider' must not be null.");
            this.client = client;
            this.state = state;
            this.nextRetryPolicy = nextRetryPolicy;
            this.requestOptionProperties = requestOptionProperties;
            this.diagnosticsContext = null;
            this.retryContext = retryContext;
            this.operationContextTextProvider = operationContextTextProvider;
        }

        @Override
        public void onBeforeSendRequest(RxDocumentServiceRequest request) {
            this.diagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
            this.nextRetryPolicy.onBeforeSendRequest(request);
        }

        @Override
        public Mono<ShouldRetryResult> shouldRetry(Exception e) {
            return this.nextRetryPolicy.shouldRetry(e).flatMap(shouldRetryResult -> {
                if (!shouldRetryResult.shouldRetry) {
                    if (!(e instanceof GoneException)) {
                        LOGGER.warn("Exception not applicable - will fail the request. Context: {}", (Object)this.operationContextTextProvider.get(), (Object)e);
                        return Mono.just((Object)ShouldRetryResult.noRetry());
                    }
                    if (this.state.getContinuation() == null) {
                        FeedRangeInternal feedRange = this.state.getFeedRange();
                        Mono<Range<String>> effectiveRangeMono = feedRange.getNormalizedEffectiveRange(this.client.getPartitionKeyRangeCache(), this.diagnosticsContext, this.client.getCollectionCache().resolveByRidAsync(this.diagnosticsContext, this.state.getContainerRid(), this.requestOptionProperties));
                        return effectiveRangeMono.map(effectiveRange -> this.state.setContinuation(FeedRangeContinuation.create(this.state.getContainerRid(), this.state.getFeedRange(), effectiveRange))).flatMap(state -> state.getContinuation().handleSplit(this.client, (GoneException)((Object)((Object)((Object)e)))));
                    }
                    return this.state.getContinuation().handleSplit(this.client, (GoneException)((Object)((Object)e))).flatMap(splitShouldRetryResult -> {
                        if (!splitShouldRetryResult.shouldRetry) {
                            LOGGER.warn("No partition split error - will fail the request. Context: {}", (Object)this.operationContextTextProvider.get(), (Object)e);
                        } else {
                            LOGGER.debug("HandleSplit will retry. Context: {}", (Object)this.operationContextTextProvider.get(), (Object)e);
                        }
                        return Mono.just((Object)splitShouldRetryResult);
                    });
                }
                LOGGER.trace("Retrying due to inner retry policy");
                return Mono.just((Object)shouldRetryResult);
            });
        }

        @Override
        public RetryContext getRetryContext() {
            return this.retryContext;
        }
    }
}

