/*
 * Decompiled with CFR 0.152.
 */
package com.azure.data.cosmos.internal.query;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.Resource;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.query.AggregateDocumentQueryExecutionContext;
import com.azure.data.cosmos.internal.query.IDocumentQueryExecutionComponent;
import com.azure.data.cosmos.internal.query.ParallelDocumentQueryExecutionContextBase;
import com.azure.data.cosmos.internal.query.TakeContinuationToken;
import java.util.HashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;

public class TopDocumentQueryExecutionContext<T extends Resource>
implements IDocumentQueryExecutionComponent<T> {
    private final IDocumentQueryExecutionComponent<T> component;
    private final int top;

    public TopDocumentQueryExecutionContext(IDocumentQueryExecutionComponent<T> component, int top) {
        this.component = component;
        this.top = top;
    }

    public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(Function<String, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction, int topCount, String topContinuationToken) {
        TakeContinuationToken takeContinuationToken;
        if (topContinuationToken == null) {
            takeContinuationToken = new TakeContinuationToken(topCount, null);
        } else {
            Utils.ValueHolder<TakeContinuationToken> outTakeContinuationToken = new Utils.ValueHolder<TakeContinuationToken>();
            if (!TakeContinuationToken.tryParse(topContinuationToken, outTakeContinuationToken)) {
                String message = String.format("INVALID JSON in continuation token %s for Top~Context", topContinuationToken);
                CosmosClientException dce = BridgeInternal.createCosmosClientException(400, message);
                return Flux.error((Throwable)dce);
            }
            takeContinuationToken = (TakeContinuationToken)outTakeContinuationToken.v;
        }
        if (takeContinuationToken.getTakeCount() > topCount) {
            String message = String.format("top count in continuation token: %d can not be greater than the top count in the query: %d.", takeContinuationToken.getTakeCount(), topCount);
            CosmosClientException dce = BridgeInternal.createCosmosClientException(400, message);
            return Flux.error((Throwable)dce);
        }
        return createSourceComponentFunction.apply(takeContinuationToken.getSourceToken()).map(component -> new TopDocumentQueryExecutionContext(component, takeContinuationToken.getTakeCount()));
    }

    @Override
    public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
        ParallelDocumentQueryExecutionContextBase context = this.component instanceof AggregateDocumentQueryExecutionContext ? (ParallelDocumentQueryExecutionContextBase)((AggregateDocumentQueryExecutionContext)this.component).getComponent() : (ParallelDocumentQueryExecutionContextBase)this.component;
        context.setTop(this.top);
        return this.component.drainAsync(maxPageSize).takeUntil(new Predicate<FeedResponse<T>>(){
            private volatile int fetchedItems = 0;

            @Override
            public boolean test(FeedResponse<T> frp) {
                this.fetchedItems += frp.results().size();
                return this.fetchedItems >= TopDocumentQueryExecutionContext.this.top;
            }
        }).map(new Function<FeedResponse<T>, FeedResponse<T>>(){
            private volatile int collectedItems = 0;
            private volatile boolean lastPage = false;

            @Override
            public FeedResponse<T> apply(FeedResponse<T> t) {
                if (this.collectedItems + t.results().size() <= TopDocumentQueryExecutionContext.this.top) {
                    this.collectedItems += t.results().size();
                    HashMap<String, String> headers = new HashMap<String, String>(t.responseHeaders());
                    if (TopDocumentQueryExecutionContext.this.top != this.collectedItems) {
                        String sourceContinuationToken = t.continuationToken();
                        TakeContinuationToken takeContinuationToken = new TakeContinuationToken(TopDocumentQueryExecutionContext.this.top - this.collectedItems, sourceContinuationToken);
                        headers.put("x-ms-continuation", takeContinuationToken.toJson());
                    } else {
                        headers.put("x-ms-continuation", null);
                    }
                    return BridgeInternal.createFeedResponseWithQueryMetrics(t.results(), headers, BridgeInternal.queryMetricsFromFeedResponse(t));
                }
                assert (!this.lastPage);
                this.lastPage = true;
                int lastPageSize = TopDocumentQueryExecutionContext.this.top - this.collectedItems;
                this.collectedItems += lastPageSize;
                HashMap<String, String> headers = new HashMap<String, String>(t.responseHeaders());
                headers.put("x-ms-continuation", null);
                return BridgeInternal.createFeedResponseWithQueryMetrics(t.results().subList(0, lastPageSize), headers, BridgeInternal.queryMetricsFromFeedResponse(t));
            }
        });
    }
}

