/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RetryPolicyWithDiagnostics;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Fetcher;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

class ChangeFeedFetcher<T extends Resource>
extends Fetcher<T> {
    private final ChangeFeedState changeFeedState;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final IRetryPolicy feedRangeContinuationSplitRetryPolicy;

    public ChangeFeedFetcher(RxDocumentClientImpl client, Supplier<RxDocumentServiceRequest> createRequestFunc, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc, ChangeFeedState changeFeedState, int top, int maxItemCount, boolean isSplitHandlingDisabled) {
        super(executeFunc, true, top, maxItemCount);
        Preconditions.checkNotNull(client, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null.");
        Preconditions.checkNotNull(changeFeedState, "Argument 'changeFeedState' must not be null.");
        this.createRequestFunc = createRequestFunc;
        this.changeFeedState = changeFeedState;
        this.feedRangeContinuationSplitRetryPolicy = isSplitHandlingDisabled ? null : new FeedRangeContinuationSplitRetryPolicy(client, this.changeFeedState);
    }

    @Override
    public Mono<FeedResponse<T>> nextPage() {
        if (this.feedRangeContinuationSplitRetryPolicy == null) {
            return this.nextPageInternal();
        }
        return ObservableHelper.inlineIfPossible(this::nextPageInternal, this.feedRangeContinuationSplitRetryPolicy);
    }

    private Mono<FeedResponse<T>> nextPageInternal() {
        return Mono.fromSupplier(this::nextPageCore).flatMap(Function.identity()).flatMap(r -> {
            FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation();
            if (continuationSnapshot != null && continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {
                this.reenableShouldFetchMoreForRetry();
                return Mono.empty();
            }
            return Mono.just((Object)r);
        }).repeatWhenEmpty(o -> o);
    }

    @Override
    protected String applyServerResponseContinuation(String serverContinuationToken, RxDocumentServiceRequest request) {
        return this.changeFeedState.applyServerResponseContinuation(serverContinuationToken, request);
    }

    @Override
    protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response) {
        if (ModelBridgeInternal.noChanges(response)) {
            return true;
        }
        FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
        return continuation != null && continuation.isDone();
    }

    @Override
    protected String getContinuationForLogging() {
        return this.changeFeedState.toJson();
    }

    @Override
    protected RxDocumentServiceRequest createRequest(int maxItemCount) {
        RxDocumentServiceRequest request = this.createRequestFunc.get();
        this.changeFeedState.populateRequest(request, maxItemCount);
        return request;
    }

    private static final class FeedRangeContinuationSplitRetryPolicy
    extends RetryPolicyWithDiagnostics {
        private final ChangeFeedState state;
        private final RxDocumentClientImpl client;

        public FeedRangeContinuationSplitRetryPolicy(RxDocumentClientImpl client, ChangeFeedState state) {
            this.client = client;
            this.state = state;
        }

        @Override
        public Mono<ShouldRetryResult> shouldRetry(Exception e) {
            if (!(e instanceof GoneException)) {
                return Mono.just((Object)ShouldRetryResult.noRetry());
            }
            if (this.state.getContinuation() == null) {
                FeedRangeInternal feedRange = this.state.getFeedRange();
                Mono<Range<String>> effectiveRangeMono = feedRange.getEffectiveRange(this.client.getPartitionKeyRangeCache(), null, this.client.getCollectionCache().resolveByRidAsync(null, this.state.getContainerRid(), null));
                return effectiveRangeMono.map(effectiveRange -> this.state.setContinuation(FeedRangeContinuation.create(this.state.getContainerRid(), this.state.getFeedRange(), effectiveRange))).flatMap(state -> state.getContinuation().handleSplit(this.client, (GoneException)((Object)((Object)e))));
            }
            return this.state.getContinuation().handleSplit(this.client, (GoneException)((Object)e));
        }
    }
}

