/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.FeedResponseDiagnostics;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.SerializationDiagnosticsContext;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ReportPayload;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.Temporal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;

public final class CosmosPagedFlux<T>
extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFlux.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;
    private ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor;
    private final int defaultPageSize;

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
        this(optionsFluxFunction, null, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer) {
        this(optionsFluxFunction, feedResponseConsumer, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer, int defaultPageSize) {
        this.optionsFluxFunction = optionsFluxFunction;
        this.feedResponseConsumer = feedResponseConsumer;
        this.cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
        this.defaultPageSize = defaultPageSize;
    }

    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
        if (this.feedResponseConsumer != null) {
            return new CosmosPagedFlux<T>(this.optionsFluxFunction, this.feedResponseConsumer.andThen(newFeedResponseConsumer));
        }
        return new CosmosPagedFlux<T>(this.optionsFluxFunction, newFeedResponseConsumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(String continuationToken) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        Flux<FeedResponse<T>> pagedResponse = this.byPage();
        pagedResponse.flatMap(tFeedResponse -> {
            IterableStream elements = tFeedResponse.getElements();
            if (elements == null) {
                return Flux.empty();
            }
            return Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
        return new CosmosPagedFlux<T>(this.optionsFluxFunction, this.feedResponseConsumer, pageSize);
    }

    private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        if (this.defaultPageSize > 0) {
            cosmosPagedFluxOptions.setMaxItemCount(this.defaultPageSize);
        }
        return cosmosPagedFluxOptions;
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) {
        AtomicReference<Context> parentContext = new AtomicReference<Context>(Context.NONE);
        AtomicReference startTime = new AtomicReference();
        return this.optionsFluxFunction.apply(pagedFluxOptions).doOnSubscribe(ignoredValue -> {
            if (pagedFluxOptions.getTracerProvider() != null && pagedFluxOptions.getTracerProvider().isEnabled()) {
                parentContext.set(pagedFluxOptions.getTracerProvider().startSpan(pagedFluxOptions.getTracerSpanName(), pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getServiceEndpoint(), context));
            }
            startTime.set(Instant.now());
        }).doOnComplete(() -> {
            if (pagedFluxOptions.getTracerProvider() != null && pagedFluxOptions.getTracerProvider().isEnabled()) {
                pagedFluxOptions.getTracerProvider().endSpan((Context)parentContext.get(), Signal.complete(), 200);
            }
        }).doOnError(throwable -> {
            CosmosException cosmosException;
            if (pagedFluxOptions.getCosmosAsyncClient() != null && Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(pagedFluxOptions.getCosmosAsyncClient())) && throwable instanceof CosmosException && this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux((cosmosException = (CosmosException)((Object)((Object)throwable))).getDiagnostics()).compareAndSet(false, true)) {
                this.fillClientTelemetry(pagedFluxOptions.getCosmosAsyncClient(), 0, pagedFluxOptions.getContainerId(), pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getOperationType(), pagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(pagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float)cosmosException.getRequestCharge(), Duration.between((Temporal)startTime.get(), Instant.now()));
            }
            if (this.isTracerEnabled(pagedFluxOptions)) {
                pagedFluxOptions.getTracerProvider().endSpan((Context)parentContext.get(), Signal.error((Throwable)throwable), 0);
            }
            startTime.set(Instant.now());
        }).doOnNext(feedResponse -> {
            if (this.isTracerEnabled(pagedFluxOptions) && this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(feedResponse.getCosmosDiagnostics()).compareAndSet(false, true)) {
                try {
                    Duration threshold = pagedFluxOptions.getThresholdForDiagnosticsOnTracer();
                    if (threshold == null) {
                        threshold = pagedFluxOptions.getTracerProvider().QUERY_THRESHOLD_FOR_DIAGNOSTICS;
                    }
                    if (Duration.between((Temporal)startTime.get(), Instant.now()).compareTo(threshold) > 0) {
                        this.addDiagnosticsOnTracerEvent(pagedFluxOptions.getTracerProvider(), feedResponse.getCosmosDiagnostics(), (Context)parentContext.get());
                    }
                }
                catch (JsonProcessingException ex) {
                    LOGGER.warn("Error while serializing diagnostics for tracer", (Object)ex.getMessage());
                }
            }
            if (this.feedResponseConsumer != null) {
                this.feedResponseConsumer.accept((FeedResponse<FeedResponse>)feedResponse);
            }
            if (pagedFluxOptions.getCosmosAsyncClient() != null && Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(pagedFluxOptions.getCosmosAsyncClient())) && this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(feedResponse.getCosmosDiagnostics()).compareAndSet(false, true)) {
                this.fillClientTelemetry(pagedFluxOptions.getCosmosAsyncClient(), 200, pagedFluxOptions.getContainerId(), pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getOperationType(), pagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(pagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float)feedResponse.getRequestCharge(), Duration.between((Temporal)startTime.get(), Instant.now()));
                startTime.set(Instant.now());
            }
        });
    }

    private void fillClientTelemetry(CosmosAsyncClient cosmosAsyncClient, int statusCode, String containerId, String databaseId, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, float requestCharge, Duration latency) {
        ClientTelemetry telemetry = BridgeInternal.getContextClient(cosmosAsyncClient).getClientTelemetry();
        ReportPayload reportPayloadLatency = this.createReportPayload(cosmosAsyncClient, statusCode, containerId, databaseId, operationType, resourceType, consistencyLevel, "RequestLatency", "MilliSecond");
        ConcurrentDoubleHistogram latencyHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadLatency);
        if (latencyHistogram != null) {
            ClientTelemetry.recordValue(latencyHistogram, latency.toMillis());
        } else {
            latencyHistogram = statusCode == 200 ? new ConcurrentDoubleHistogram(300000L, 4) : new ConcurrentDoubleHistogram(300000L, 2);
            latencyHistogram.setAutoResize(true);
            ClientTelemetry.recordValue(latencyHistogram, latency.toMillis());
            telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadLatency, latencyHistogram);
        }
        ReportPayload reportPayloadRequestCharge = this.createReportPayload(cosmosAsyncClient, statusCode, containerId, databaseId, operationType, resourceType, consistencyLevel, "RequestCharge", "RU");
        ConcurrentDoubleHistogram requestChargeHistogram = telemetry.getClientTelemetryInfo().getOperationInfoMap().get(reportPayloadRequestCharge);
        if (requestChargeHistogram != null) {
            ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
        } else {
            requestChargeHistogram = new ConcurrentDoubleHistogram(10000L, 2);
            requestChargeHistogram.setAutoResize(true);
            ClientTelemetry.recordValue(requestChargeHistogram, requestCharge);
            telemetry.getClientTelemetryInfo().getOperationInfoMap().put(reportPayloadRequestCharge, requestChargeHistogram);
        }
    }

    private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient, int statusCode, String containerId, String databaseId, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, String metricsName, String unitName) {
        ReportPayload reportPayload = new ReportPayload(metricsName, unitName);
        reportPayload.setConsistency(consistencyLevel == null ? BridgeInternal.getContextClient(cosmosAsyncClient).getConsistencyLevel() : consistencyLevel);
        reportPayload.setDatabaseName(databaseId);
        reportPayload.setContainerName(containerId);
        reportPayload.setOperation(operationType);
        reportPayload.setResource(resourceType);
        reportPayload.setStatusCode(statusCode);
        return reportPayload;
    }

    private void addDiagnosticsOnTracerEvent(TracerProvider tracerProvider, CosmosDiagnostics cosmosDiagnostics, Context parentContext) throws JsonProcessingException {
        FeedResponseDiagnostics feedResponseDiagnostics;
        QueryInfo.QueryPlanDiagnosticsContext queryPlanDiagnosticsContext;
        if (cosmosDiagnostics == null) {
            return;
        }
        HashMap<String, Object> attributes = new HashMap<String, Object>();
        QueryInfo.QueryPlanDiagnosticsContext queryPlanDiagnosticsContext2 = queryPlanDiagnosticsContext = this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics) != null ? this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics).getQueryPlanDiagnosticsContext() : null;
        if (queryPlanDiagnosticsContext != null) {
            attributes.put("JSON", mapper.writeValueAsString((Object)queryPlanDiagnosticsContext));
            tracerProvider.addEvent("Query Plan Statistics", attributes, OffsetDateTime.ofInstant(queryPlanDiagnosticsContext.getStartTimeUTC(), ZoneOffset.UTC), parentContext);
        }
        if ((feedResponseDiagnostics = this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics)) != null && feedResponseDiagnostics.getQueryMetricsMap() != null && feedResponseDiagnostics.getQueryMetricsMap().size() > 0) {
            for (Map.Entry<String, QueryMetrics> entry : feedResponseDiagnostics.getQueryMetricsMap().entrySet()) {
                attributes = new HashMap();
                attributes.put("Query Metrics", entry.getValue().toString());
                tracerProvider.addEvent("Query Metrics for PKRange " + entry.getKey(), attributes, OffsetDateTime.now(), parentContext);
            }
        }
        int queryDiagnosticsCounter = 1;
        for (ClientSideRequestStatistics clientSideRequestStatistics : BridgeInternal.getClientSideRequestStatisticsList(cosmosDiagnostics)) {
            String eventName;
            attributes = new HashMap();
            int counter = 1;
            for (ClientSideRequestStatistics.StoreResponseStatistics statistics : clientSideRequestStatistics.getResponseStatisticsList()) {
                attributes.put("StoreResponse" + counter++, mapper.writeValueAsString((Object)statistics));
            }
            counter = 1;
            for (ClientSideRequestStatistics.StoreResponseStatistics statistics : ClientSideRequestStatistics.getCappedSupplementalResponseStatisticsList(clientSideRequestStatistics.getSupplementalResponseStatisticsList())) {
                attributes.put("Supplemental StoreResponse" + counter++, mapper.writeValueAsString((Object)statistics));
            }
            if (clientSideRequestStatistics.getRetryContext().getRetryStartTime() != null) {
                attributes.put("Retry Context", mapper.writeValueAsString((Object)clientSideRequestStatistics.getRetryContext()));
            }
            counter = 1;
            for (ClientSideRequestStatistics.AddressResolutionStatistics addressResolutionStatistics : clientSideRequestStatistics.getAddressResolutionStatistics().values()) {
                attributes.put("AddressResolutionStatistics" + counter++, mapper.writeValueAsString((Object)addressResolutionStatistics));
            }
            if (clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList != null) {
                counter = 1;
                for (SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics : clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList) {
                    attributes = new HashMap();
                    attributes.put("SerializationDiagnostics" + counter++, mapper.writeValueAsString((Object)serializationDiagnostics));
                }
            }
            if (clientSideRequestStatistics.getGatewayStatistics() != null) {
                attributes.put("GatewayStatistics", mapper.writeValueAsString((Object)clientSideRequestStatistics.getGatewayStatistics()));
            }
            attributes.put("RegionContacted", mapper.writeValueAsString(clientSideRequestStatistics.getRegionsContacted()));
            attributes.put("SystemInformation", mapper.writeValueAsString((Object)ClientSideRequestStatistics.fetchSystemInformation()));
            attributes.put("ClientCfgs", mapper.writeValueAsString((Object)clientSideRequestStatistics.getDiagnosticsClientContext()));
            if (clientSideRequestStatistics.getResponseStatisticsList() != null && clientSideRequestStatistics.getResponseStatisticsList().size() > 0 && clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult() != null) {
                eventName = "Diagnostics for PKRange " + clientSideRequestStatistics.getResponseStatisticsList().get((int)0).getStoreResult().partitionKeyRangeId;
                tracerProvider.addEvent(eventName, attributes, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), parentContext);
                continue;
            }
            if (clientSideRequestStatistics.getGatewayStatistics() != null) {
                eventName = "Diagnostics for PKRange " + clientSideRequestStatistics.getGatewayStatistics().getPartitionKeyRangeId();
                tracerProvider.addEvent(eventName, attributes, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), parentContext);
                continue;
            }
            eventName = "Diagnostics " + queryDiagnosticsCounter++;
            tracerProvider.addEvent(eventName, attributes, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), parentContext);
        }
    }

    private boolean isTracerEnabled(CosmosPagedFluxOptions pagedFluxOptions) {
        return pagedFluxOptions.getTracerProvider() != null && pagedFluxOptions.getTracerProvider().isEnabled();
    }

    static {
        ImplementationBridgeHelpers.CosmosPageFluxHelper.setCosmosPageFluxAccessor(new ImplementationBridgeHelpers.CosmosPageFluxHelper.CosmosPageFluxAccessor(){

            @Override
            public <T> CosmosPagedFlux<T> getCosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
                return new CosmosPagedFlux<T>(optionsFluxFunction);
            }
        });
    }
}

