/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.query.DocumentProducer;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class NonStreamingOrderByUtils {
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();

    public static <T extends Resource> Flux<OrderByRowResult<Document>> nonStreamingOrderedMerge(OrderbyRowComparer<Document> consumeComparer, RequestChargeTracker tracker, List<DocumentProducer<Document>> documentProducers, int initialPageSize, Map<String, QueryMetrics> queryMetricsMap, Collection<ClientSideRequestStatistics> clientSideRequestStatistics) {
        Flux[] fluxes = (Flux[])documentProducers.subList(0, documentProducers.size()).stream().map(producer -> NonStreamingOrderByUtils.toNonStreamingOrderByQueryResultObservable(producer, tracker, queryMetricsMap, initialPageSize, consumeComparer, clientSideRequestStatistics)).toArray(Flux[]::new);
        return Flux.mergeComparingDelayError((int)1, consumeComparer, (Publisher[])fluxes);
    }

    private static Flux<OrderByRowResult<Document>> toNonStreamingOrderByQueryResultObservable(DocumentProducer<Document> producer, RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap, int initialPageSize, OrderbyRowComparer<Document> consumeComparer, Collection<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
        return producer.produceAsync().transformDeferred((Function)new PageToItemTransformer(tracker, queryMetricsMap, initialPageSize, consumeComparer, clientSideRequestStatisticsList));
    }

    private static class PageToItemTransformer
    implements Function<Flux<DocumentProducer.DocumentProducerFeedResponse>, Flux<OrderByRowResult<Document>>> {
        private final RequestChargeTracker tracker;
        private final Map<String, QueryMetrics> queryMetricsMap;
        private final Integer initialPageSize;
        private final OrderbyRowComparer<Document> consumeComparer;
        private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;

        private PageToItemTransformer(RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap, Integer initialPageSize, OrderbyRowComparer<Document> consumeComparer, Collection<ClientSideRequestStatistics> clientSideRequestStatistics) {
            this.tracker = tracker;
            this.queryMetricsMap = queryMetricsMap;
            this.initialPageSize = initialPageSize;
            this.consumeComparer = consumeComparer;
            this.clientSideRequestStatistics = clientSideRequestStatistics;
        }

        @Override
        public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer.DocumentProducerFeedResponse> source) {
            PriorityBlockingQueue<Document> priorityQueue = new PriorityBlockingQueue<Document>(this.initialPageSize + 1, this.consumeComparer);
            return source.flatMap(documentProducerFeedResponse -> {
                this.clientSideRequestStatistics.addAll(diagnosticsAccessor.getClientSideRequestStatisticsForQueryPipelineAggregations(documentProducerFeedResponse.pageResult.getCosmosDiagnostics()));
                QueryMetrics.mergeQueryMetricsMap(this.queryMetricsMap, BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult));
                List<Document> results = documentProducerFeedResponse.pageResult.getResults();
                results.forEach(r -> {
                    OrderByRowResult orderByRowResult = new OrderByRowResult(r.toJson(), documentProducerFeedResponse.sourceFeedRange, null);
                    priorityQueue.add(orderByRowResult);
                    if (priorityQueue.size() > this.initialPageSize) {
                        PriorityBlockingQueue<Document> tempPriorityQueue = new PriorityBlockingQueue<Document>(this.initialPageSize + 1, this.consumeComparer);
                        for (int i = 0; i < this.initialPageSize; ++i) {
                            tempPriorityQueue.add((OrderByRowResult)priorityQueue.poll());
                        }
                        priorityQueue.clear();
                        priorityQueue.addAll(tempPriorityQueue);
                    }
                });
                this.tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
                return Flux.empty();
            }, 1).thenMany((Publisher)Flux.defer(() -> Flux.fromIterable((Iterable)priorityQueue)));
        }
    }
}

