/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosContainerProactiveInitConfig;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.implementation.BackoffRetryUtility;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.Quadruple;
import com.azure.cosmos.implementation.ReplicatedResourceClientUtils;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.ConsistencyReader;
import com.azure.cosmos.implementation.directconnectivity.ConsistencyWriter;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.GoneAndRetryWithRetryPolicy;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;
import com.azure.cosmos.implementation.directconnectivity.TransportClient;
import com.azure.cosmos.implementation.directconnectivity.speculativeprocessors.SpeculativeProcessor;
import com.azure.cosmos.implementation.directconnectivity.speculativeprocessors.ThresholdBasedSpeculation;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import com.azure.cosmos.models.CosmosContainerIdentity;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReplicatedResourceClient {
    private final DiagnosticsClientContext diagnosticsClientContext;
    private final Logger logger = LoggerFactory.getLogger(ReplicatedResourceClient.class);
    private static final int GONE_AND_RETRY_WITH_TIMEOUT_IN_SECONDS = 30;
    private static final int STRONG_GONE_AND_RETRY_WITH_RETRY_TIMEOUT_SECONDS = 60;
    private static final int MIN_BACKOFF_FOR_FAILLING_BACK_TO_OTHER_REGIONS_FOR_READ_REQUESTS_IN_SECONDS = 1;
    private final AddressSelector addressSelector;
    private final ConsistencyReader consistencyReader;
    private final ConsistencyWriter consistencyWriter;
    private final Protocol protocol;
    private final TransportClient transportClient;
    private final boolean enableReadRequestsFallback;
    private final GatewayServiceConfigurationReader serviceConfigReader;
    private final Configs configs;
    private final SpeculativeProcessor speculativeProcessor;

    public ReplicatedResourceClient(DiagnosticsClientContext diagnosticsClientContext, Configs configs, AddressSelector addressSelector, ISessionContainer sessionContainer, TransportClient transportClient, GatewayServiceConfigurationReader serviceConfigReader, IAuthorizationTokenProvider authorizationTokenProvider, boolean enableReadRequestsFallback, boolean useMultipleWriteLocations) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.configs = configs;
        this.protocol = configs.getProtocol();
        this.addressSelector = addressSelector;
        if (this.protocol != Protocol.HTTPS && this.protocol != Protocol.TCP) {
            throw new IllegalArgumentException("protocol");
        }
        this.transportClient = transportClient;
        this.serviceConfigReader = serviceConfigReader;
        this.consistencyReader = new ConsistencyReader(diagnosticsClientContext, configs, this.addressSelector, sessionContainer, transportClient, serviceConfigReader, authorizationTokenProvider);
        this.consistencyWriter = new ConsistencyWriter(diagnosticsClientContext, this.addressSelector, sessionContainer, transportClient, authorizationTokenProvider, serviceConfigReader, useMultipleWriteLocations);
        this.enableReadRequestsFallback = enableReadRequestsFallback;
        this.speculativeProcessor = Configs.getSpeculationType() == 1 ? new ThresholdBasedSpeculation() : null;
    }

    public void enableThroughputControl(ThroughputControlStore throughputControlStore) {
        this.transportClient.enableThroughputControl(throughputControlStore);
    }

    public static boolean isReadingFromMaster(ResourceType resourceType, OperationType operationType) {
        return ReplicatedResourceClientUtils.isReadingFromMaster(resourceType, operationType);
    }

    public static boolean isMasterResource(ResourceType resourceType) {
        return ReplicatedResourceClientUtils.isMasterResource(resourceType);
    }

    public static boolean isGlobalStrongEnabled() {
        return true;
    }

    public Mono<StoreResponse> invokeAsync(RxDocumentServiceRequest request, Function<RxDocumentServiceRequest, Mono<RxDocumentServiceRequest>> prepareRequestAsyncDelegate) {
        BiFunction<Quadruple, RxDocumentServiceRequest, Mono> mainFuncDelegate = (forceRefreshAndTimeout, documentServiceRequest) -> {
            documentServiceRequest.getHeaders().put("x-ms-client-retry-attempt-count", ((Integer)forceRefreshAndTimeout.getValue3()).toString());
            documentServiceRequest.getHeaders().put("x-ms-remaining-time-in-ms-on-client", Long.toString(((Duration)forceRefreshAndTimeout.getValue2()).toMillis()));
            if (this.shouldSpeculate(request)) {
                this.logger.debug("Speculating request {}", (Object)request.getOperationType());
                return this.getStoreResponseMonoWithSpeculation(request, (Quadruple<Boolean, Boolean, Duration, Integer>)forceRefreshAndTimeout);
            }
            return this.getStoreResponseMono(request, (Quadruple<Boolean, Boolean, Duration, Integer>)forceRefreshAndTimeout);
        };
        Function funcDelegate = forceRefreshAndTimeout -> {
            if (prepareRequestAsyncDelegate != null) {
                return ((Mono)prepareRequestAsyncDelegate.apply(request)).flatMap(responseReq -> (Mono)mainFuncDelegate.apply((Quadruple)forceRefreshAndTimeout, (RxDocumentServiceRequest)responseReq));
            }
            return (Mono)mainFuncDelegate.apply((Quadruple)forceRefreshAndTimeout, request);
        };
        int retryTimeout = this.serviceConfigReader.getDefaultConsistencyLevel() == ConsistencyLevel.STRONG ? 60 : 30;
        return BackoffRetryUtility.executeAsync(funcDelegate, new GoneAndRetryWithRetryPolicy(request, retryTimeout), null, Duration.ofSeconds(1L), request, this.addressSelector);
    }

    private Mono<StoreResponse> getStoreResponseMonoWithSpeculation(RxDocumentServiceRequest request, Quadruple<Boolean, Boolean, Duration, Integer> forceRefreshAndTimeout) {
        CosmosEndToEndOperationLatencyPolicyConfig config = request.requestContext.getEndToEndOperationLatencyPolicyConfig();
        ArrayList<Object> monoList = new ArrayList<Object>();
        ArrayList<RxDocumentServiceRequest> requests = new ArrayList<RxDocumentServiceRequest>();
        if (this.speculativeProcessor.shouldIncludeOriginalRequestRegion()) {
            monoList.add(this.getStoreResponseMono(request, forceRefreshAndTimeout));
            requests.add(request);
        }
        for (URI endpoint : this.speculativeProcessor.getRegionsToSpeculate(config, this.transportClient.getGlobalEndpointManager().getReadEndpoints())) {
            if (request.requestContext.locationEndpointToRoute == endpoint) continue;
            RxDocumentServiceRequest newRequest = request.clone();
            newRequest.requestContext.routeToLocation(endpoint);
            requests.add(newRequest);
            monoList.add(this.getStoreResponseMono(newRequest, forceRefreshAndTimeout).delaySubscription(this.speculativeProcessor.getThreshold(config).plus(this.speculativeProcessor.getThresholdStepDuration(config, monoList.size() - 1))));
        }
        return Mono.firstWithValue(monoList).map(storeResponse -> {
            for (RxDocumentServiceRequest r : requests) {
                CosmosDiagnostics diagnostics = r.requestContext.cosmosDiagnostics;
                if (!r.getActivityId().toString().equals(storeResponse.getActivityId())) continue;
                this.speculativeProcessor.onResponseReceived(r.requestContext.locationEndpointToRoute, diagnostics.getDuration());
            }
            return storeResponse;
        });
    }

    private boolean shouldSpeculate(RxDocumentServiceRequest request) {
        if (this.speculativeProcessor == null) {
            return false;
        }
        if (!request.isReadOnlyRequest() && request.getResourceType() != ResourceType.Document) {
            return false;
        }
        CosmosEndToEndOperationLatencyPolicyConfig config = request.requestContext.getEndToEndOperationLatencyPolicyConfig();
        return config != null && config.isEnabled();
    }

    private Mono<StoreResponse> getStoreResponseMono(RxDocumentServiceRequest request, Quadruple<Boolean, Boolean, Duration, Integer> forceRefreshAndTimeout) {
        return this.invokeAsync(request, new TimeoutHelper(forceRefreshAndTimeout.getValue2()), forceRefreshAndTimeout.getValue1(), forceRefreshAndTimeout.getValue0());
    }

    public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
        this.transportClient.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities);
    }

    public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
        this.transportClient.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities);
    }

    private Mono<StoreResponse> invokeAsync(RxDocumentServiceRequest request, TimeoutHelper timeout, boolean isInRetry, boolean forceRefresh) {
        if (request.getOperationType().equals((Object)OperationType.ExecuteJavaScript)) {
            if (request.isReadOnlyScript()) {
                return this.consistencyReader.readAsync(request, timeout, isInRetry, forceRefresh);
            }
            return this.consistencyWriter.writeAsync(request, timeout, forceRefresh);
        }
        if (request.getOperationType().isWriteOperation()) {
            return this.consistencyWriter.writeAsync(request, timeout, forceRefresh);
        }
        if (request.isReadOnlyRequest()) {
            return this.consistencyReader.readAsync(request, timeout, isInRetry, forceRefresh);
        }
        throw new IllegalArgumentException(String.format("Unexpected operation type %s", new Object[]{request.getOperationType()}));
    }

    public Flux<Void> submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiveInitConfig proactiveContainerInitConfig) {
        return this.addressSelector.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig);
    }

    public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) {
        this.transportClient.configureFaultInjectorProvider(injectorProvider);
    }
}

