/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ApiType;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.BackoffRetryUtility;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.JavaStreamUtils;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.RequestVerb;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.UnauthorizedException;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.directconnectivity.Address;
import com.azure.cosmos.implementation.directconnectivity.AddressInformation;
import com.azure.cosmos.implementation.directconnectivity.HttpClientUtils;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.IAddressCache;
import com.azure.cosmos.implementation.directconnectivity.OpenConnectionAndInitCachesRetryPolicy;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import io.netty.handler.codec.http.HttpMethod;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GatewayAddressCache
implements IAddressCache {
    private static final Duration minDurationBeforeEnforcingCollectionRoutingMapRefresh = Duration.ofSeconds(30L);
    private static final Logger logger = LoggerFactory.getLogger(GatewayAddressCache.class);
    private static final String protocolFilterFormat = "%s eq %s";
    private static final int DefaultBatchSize = 50;
    private static final int DefaultSuboptimalPartitionForceRefreshIntervalInSeconds = 600;
    private final DiagnosticsClientContext clientContext;
    private final String databaseFeedEntryUrl = PathsHelper.generatePath(ResourceType.Database, "", true);
    private final URI addressEndpoint;
    private final AsyncCache<PartitionKeyRangeIdentity, AddressInformation[]> serverPartitionAddressCache;
    private final ConcurrentHashMap<PartitionKeyRangeIdentity, Instant> suboptimalServerPartitionTimestamps;
    private final long suboptimalPartitionForceRefreshIntervalInSeconds;
    private final String protocolScheme;
    private final String protocolFilter;
    private final IAuthorizationTokenProvider tokenProvider;
    private final HashMap<String, String> defaultRequestHeaders;
    private final HttpClient httpClient;
    private volatile Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterPartitionAddressCache;
    private volatile Instant suboptimalMasterPartitionTimestamp;
    private final ConcurrentHashMap<URI, Set<PartitionKeyRangeIdentity>> serverPartitionAddressToPkRangeIdMap;
    private final boolean tcpConnectionEndpointRediscoveryEnabled;
    private final ConcurrentHashMap<String, ForcedRefreshMetadata> lastForcedRefreshMap;
    private final GlobalEndpointManager globalEndpointManager;
    private IOpenConnectionsHandler openConnectionsHandler;
    private final ConnectionPolicy connectionPolicy;

    public GatewayAddressCache(DiagnosticsClientContext clientContext, URI serviceEndpoint, Protocol protocol, IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, HttpClient httpClient, long suboptimalPartitionForceRefreshIntervalInSeconds, boolean tcpConnectionEndpointRediscoveryEnabled, ApiType apiType, GlobalEndpointManager globalEndpointManager, ConnectionPolicy connectionPolicy, IOpenConnectionsHandler openConnectionsHandler) {
        this.clientContext = clientContext;
        try {
            this.addressEndpoint = new URL(serviceEndpoint.toURL(), "addresses").toURI();
        }
        catch (MalformedURLException | URISyntaxException e) {
            logger.error("serviceEndpoint {} is invalid", (Object)serviceEndpoint, (Object)e);
            assert (false);
            throw new IllegalStateException(e);
        }
        this.tokenProvider = tokenProvider;
        this.serverPartitionAddressCache = new AsyncCache();
        this.suboptimalServerPartitionTimestamps = new ConcurrentHashMap();
        this.suboptimalMasterPartitionTimestamp = Instant.MAX;
        this.suboptimalPartitionForceRefreshIntervalInSeconds = suboptimalPartitionForceRefreshIntervalInSeconds;
        this.protocolScheme = protocol.scheme();
        this.protocolFilter = String.format(protocolFilterFormat, "protocol", this.protocolScheme);
        this.httpClient = httpClient;
        if (userAgent == null) {
            userAgent = new UserAgentContainer();
        }
        this.defaultRequestHeaders = new HashMap();
        this.defaultRequestHeaders.put("User-Agent", userAgent.getUserAgent());
        if (apiType != null) {
            this.defaultRequestHeaders.put("x-ms-cosmos-apitype", apiType.toString());
        }
        this.defaultRequestHeaders.put("x-ms-version", "2020-07-15");
        this.serverPartitionAddressToPkRangeIdMap = new ConcurrentHashMap();
        this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled;
        this.lastForcedRefreshMap = new ConcurrentHashMap();
        this.globalEndpointManager = globalEndpointManager;
        this.openConnectionsHandler = openConnectionsHandler;
        this.connectionPolicy = connectionPolicy;
    }

    public GatewayAddressCache(DiagnosticsClientContext clientContext, URI serviceEndpoint, Protocol protocol, IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, HttpClient httpClient, boolean tcpConnectionEndpointRediscoveryEnabled, ApiType apiType, GlobalEndpointManager globalEndpointManager, ConnectionPolicy connectionPolicy, IOpenConnectionsHandler openConnectionsHandler) {
        this(clientContext, serviceEndpoint, protocol, tokenProvider, userAgent, httpClient, 600L, tcpConnectionEndpointRediscoveryEnabled, apiType, globalEndpointManager, connectionPolicy, openConnectionsHandler);
    }

    @Override
    public int updateAddresses(URI serverKey) {
        Objects.requireNonNull(serverKey, "expected non-null serverKey");
        AtomicInteger updatedCacheEntryCount = new AtomicInteger(0);
        if (this.tcpConnectionEndpointRediscoveryEnabled) {
            this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> {
                for (PartitionKeyRangeIdentity partitionKeyRangeIdentity : partitionKeyRangeIdentitySet) {
                    if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals("M")) {
                        this.masterPartitionAddressCache = null;
                    } else {
                        this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
                    }
                    updatedCacheEntryCount.incrementAndGet();
                }
                return null;
            });
        } else {
            logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
        }
        return updatedCacheEntryCount.get();
    }

    @Override
    public Mono<Utils.ValueHolder<AddressInformation[]>> tryGetAddresses(RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, boolean forceRefreshPartitionAddresses) {
        boolean forceRefreshPartitionAddressesModified;
        Utils.checkNotNullOrThrow(request, "request", "");
        Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", "");
        logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}", (Object)partitionKeyRangeIdentity, (Object)forceRefreshPartitionAddresses);
        if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(), "M")) {
            return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties).map(partitionKeyRangeIdentityPair -> new Utils.ValueHolder<AddressInformation[]>((AddressInformation[])partitionKeyRangeIdentityPair.getRight()));
        }
        this.evaluateCollectionRoutingMapRefreshForServerPartition(request, partitionKeyRangeIdentity, forceRefreshPartitionAddresses);
        Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity);
        if (suboptimalServerPartitionTimestamp != null) {
            boolean forceRefreshDueToSuboptimalPartitionReplicaSet;
            logger.debug("suboptimalServerPartitionTimestamp is {}", (Object)suboptimalServerPartitionTimestamp);
            boolean bl = forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds() > this.suboptimalPartitionForceRefreshIntervalInSeconds;
            if (forceRefreshDueToSuboptimalPartitionReplicaSet) {
                Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity, (key, oldVal) -> {
                    logger.debug("key = {}, oldValue = {}", key, oldVal);
                    if (suboptimalServerPartitionTimestamp.equals(oldVal)) {
                        return Instant.MAX;
                    }
                    return oldVal;
                });
                logger.debug("newValue is {}", (Object)newValue);
                if (!suboptimalServerPartitionTimestamp.equals(newValue)) {
                    logger.debug("setting forceRefreshPartitionAddresses to true");
                    forceRefreshPartitionAddresses = true;
                }
            }
        }
        if (forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses) {
            logger.debug("refresh serverPartitionAddressCache for {}", (Object)partitionKeyRangeIdentity);
            this.serverPartitionAddressCache.refresh(partitionKeyRangeIdentity, () -> this.getAddressesForRangeId(request, partitionKeyRangeIdentity, true));
            this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
        }
        Mono addressesObs = this.serverPartitionAddressCache.getAsync(partitionKeyRangeIdentity, null, () -> this.getAddressesForRangeId(request, partitionKeyRangeIdentity, false)).map(Utils.ValueHolder::new);
        return addressesObs.map(addressesValueHolder -> {
            if (this.notAllReplicasAvailable((AddressInformation[])addressesValueHolder.v)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("not all replicas available {}", (Object)JavaStreamUtils.info((AddressInformation[])addressesValueHolder.v));
                }
                this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now());
            }
            return addressesValueHolder;
        }).onErrorResume(ex -> {
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)ex);
            CosmosException dce = Utils.as(unwrappedException, CosmosException.class);
            if (dce == null) {
                logger.error("unexpected failure", ex);
                if (forceRefreshPartitionAddressesModified) {
                    this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
                }
                return Mono.error((Throwable)unwrappedException);
            }
            logger.debug("tryGetAddresses dce", (Throwable)((Object)dce));
            if (Exceptions.isStatusCode(dce, 404) || Exceptions.isStatusCode(dce, 410) || Exceptions.isSubStatusCode(dce, 1002)) {
                this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
                logger.debug("tryGetAddresses: inner onErrorResumeNext return null", (Throwable)((Object)dce));
                return Mono.just(new Utils.ValueHolder<Object>(null));
            }
            return Mono.error((Throwable)unwrappedException);
        });
    }

    @Override
    public void setOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHandler) {
        this.openConnectionsHandler = openConnectionsHandler;
    }

    public Mono<List<Address>> getServerAddressesViaGatewayAsync(RxDocumentServiceRequest request, String collectionRid, List<String> partitionKeyRangeIds, boolean forceRefresh) {
        if (logger.isDebugEnabled()) {
            logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", (Object)collectionRid, (Object)JavaStreamUtils.toString(partitionKeyRangeIds, ","));
        }
        request.setAddressRefresh(true, forceRefresh);
        String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
        HashMap<String, String> addressQuery = new HashMap<String, String>();
        addressQuery.put("$resolveFor", HttpUtils.urlEncode(entryUrl));
        HashMap<String, String> headers = new HashMap<String, String>(this.defaultRequestHeaders);
        if (forceRefresh) {
            headers.put("x-ms-force-refresh", "true");
        }
        if (request.forceCollectionRoutingMapRefresh) {
            headers.put("x-ms-collectionroutingmap-refresh", "true");
        }
        addressQuery.put("$filter", HttpUtils.urlEncode(this.protocolFilter));
        addressQuery.put("$partitionKeyRangeIds", String.join((CharSequence)",", partitionKeyRangeIds));
        headers.put("x-ms-date", Utils.nowAsRFC1123());
        if (this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            String token;
            block7: {
                token = null;
                try {
                    token = this.tokenProvider.getUserAuthorizationToken(collectionRid, ResourceType.Document, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, request.properties);
                }
                catch (UnauthorizedException e) {
                    if (!logger.isDebugEnabled()) break block7;
                    logger.debug("User doesn't have resource token for collection rid {}", (Object)collectionRid);
                }
            }
            if (token == null && request.getIsNameBased()) {
                String collectionAltLink = PathsHelper.getCollectionPath(request.getResourceAddress());
                token = this.tokenProvider.getUserAuthorizationToken(collectionAltLink, ResourceType.Document, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, request.properties);
            }
            token = HttpUtils.urlEncode(token);
            headers.put("authorization", token);
        }
        URI targetEndpoint = Utils.setQuery(this.addressEndpoint.toString(), Utils.createQuery(addressQuery));
        String identifier = GatewayAddressCache.logAddressResolutionStart(request, targetEndpoint, forceRefresh, request.forceCollectionRoutingMapRefresh);
        HttpHeaders httpHeaders = new HttpHeaders(headers);
        Instant addressCallStartTime = Instant.now();
        HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), httpHeaders);
        Mono httpResponseMono = this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken ? this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())) : this.tokenProvider.populateAuthorizationHeader(httpHeaders).flatMap(valueHttpHeaders -> this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
        Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(request, this.clientContext, (Mono<HttpResponse>)httpResponseMono, httpRequest);
        return dsrObs.map(dsr -> {
            MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
            if (metadataDiagnosticsContext != null) {
                Instant addressCallEndTime = Instant.now();
                MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic = new MetadataDiagnosticsContext.MetadataDiagnostics(addressCallStartTime, addressCallEndTime, MetadataDiagnosticsContext.MetadataType.SERVER_ADDRESS_LOOKUP);
                metadataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("getServerAddressesViaGatewayAsync deserializes result");
            }
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, null);
            return dsr.getQueryResponse(null, Address.class);
        }).onErrorResume(throwable -> {
            CosmosException dce;
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)throwable);
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, unwrappedException.toString());
            if (!(unwrappedException instanceof Exception)) {
                logger.error("Unexpected failure {}", (Object)unwrappedException.getMessage(), (Object)unwrappedException);
                return Mono.error((Throwable)unwrappedException);
            }
            Exception exception = (Exception)unwrappedException;
            if (!(exception instanceof CosmosException)) {
                logger.error("Network failure", (Throwable)exception);
                int statusCode = 0;
                if (WebExceptionUtility.isNetworkFailure(exception)) {
                    statusCode = WebExceptionUtility.isReadTimeoutException(exception) ? 408 : 503;
                }
                dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, statusCode, exception);
                BridgeInternal.setRequestHeaders(dce, request.getHeaders());
            } else {
                dce = (CosmosException)((Object)((Object)exception));
            }
            if (WebExceptionUtility.isNetworkFailure((Exception)((Object)dce))) {
                if (WebExceptionUtility.isReadTimeoutException((Exception)((Object)dce))) {
                    BridgeInternal.setSubStatusCode(dce, 10002);
                } else {
                    BridgeInternal.setSubStatusCode(dce, 10001);
                }
            }
            if (request.requestContext.cosmosDiagnostics != null) {
                BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, dce, this.globalEndpointManager);
            }
            return Mono.error((Throwable)((Object)dce));
        });
    }

    public void dispose() {
    }

    private Mono<Pair<PartitionKeyRangeIdentity, AddressInformation[]>> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map<String, Object> properties) {
        logger.debug("resolveMasterAsync forceRefresh: {}", (Object)forceRefresh);
        Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeInitial = this.masterPartitionAddressCache;
        boolean bl = forceRefresh = forceRefresh || masterAddressAndRangeInitial != null && this.notAllReplicasAvailable(masterAddressAndRangeInitial.getRight()) && Duration.between(this.suboptimalMasterPartitionTimestamp, Instant.now()).getSeconds() > this.suboptimalPartitionForceRefreshIntervalInSeconds;
        if (forceRefresh || this.masterPartitionAddressCache == null) {
            Mono<List<Address>> masterReplicaAddressesObs = this.getMasterAddressesViaGatewayAsync(request, ResourceType.Database, null, this.databaseFeedEntryUrl, forceRefresh, false, properties);
            return masterReplicaAddressesObs.map(masterAddresses -> {
                Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeRes = this.toPartitionAddressAndRange("", (List<Address>)masterAddresses);
                this.masterPartitionAddressCache = masterAddressAndRangeRes;
                this.suboptimalMasterPartitionTimestamp = this.notAllReplicasAvailable(masterAddressAndRangeRes.getRight()) && this.suboptimalMasterPartitionTimestamp.equals(Instant.MAX) ? Instant.now() : Instant.MAX;
                return this.masterPartitionAddressCache;
            }).doOnError(e -> {
                this.suboptimalMasterPartitionTimestamp = Instant.MAX;
            });
        }
        if (this.notAllReplicasAvailable(masterAddressAndRangeInitial.getRight()) && this.suboptimalMasterPartitionTimestamp.equals(Instant.MAX)) {
            this.suboptimalMasterPartitionTimestamp = Instant.now();
        }
        return Mono.just(masterAddressAndRangeInitial);
    }

    private void evaluateCollectionRoutingMapRefreshForServerPartition(RxDocumentServiceRequest request, PartitionKeyRangeIdentity pkRangeIdentity, boolean forceRefreshPartitionAddresses) {
        Utils.checkNotNullOrThrow(request, "request", "");
        this.validatePkRangeIdentity(pkRangeIdentity);
        String collectionRid = pkRangeIdentity.getCollectionRid();
        String partitionKeyRangeId = pkRangeIdentity.getPartitionKeyRangeId();
        if (forceRefreshPartitionAddresses) {
            ForcedRefreshMetadata forcedRefreshMetadata = this.lastForcedRefreshMap.computeIfAbsent(collectionRid, colRid -> new ForcedRefreshMetadata());
            if (request.forceCollectionRoutingMapRefresh) {
                forcedRefreshMetadata.signalCollectionRoutingMapRefresh(pkRangeIdentity, true);
            } else if (forcedRefreshMetadata.shouldIncludeCollectionRoutingMapRefresh(pkRangeIdentity)) {
                request.forceCollectionRoutingMapRefresh = true;
                forcedRefreshMetadata.signalCollectionRoutingMapRefresh(pkRangeIdentity, true);
            } else {
                forcedRefreshMetadata.signalPartitionAddressOnlyRefresh(pkRangeIdentity);
            }
        } else if (request.forceCollectionRoutingMapRefresh) {
            ForcedRefreshMetadata forcedRefreshMetadata = this.lastForcedRefreshMap.computeIfAbsent(collectionRid, colRid -> new ForcedRefreshMetadata());
            forcedRefreshMetadata.signalCollectionRoutingMapRefresh(pkRangeIdentity, false);
        }
        logger.debug("evaluateCollectionRoutingMapRefreshForServerPartition collectionRid {}, partitionKeyRangeId {}, forceRefreshPartitionAddresses {}, forceCollectionRoutingMapRefresh {}", new Object[]{collectionRid, partitionKeyRangeId, forceRefreshPartitionAddresses, request.forceCollectionRoutingMapRefresh});
    }

    private void validatePkRangeIdentity(PartitionKeyRangeIdentity pkRangeIdentity) {
        Utils.checkNotNullOrThrow(pkRangeIdentity, "pkRangeId", "");
        Utils.checkNotNullOrThrow(pkRangeIdentity.getCollectionRid(), "pkRangeId.getCollectionRid()", "");
        Utils.checkNotNullOrThrow(pkRangeIdentity.getPartitionKeyRangeId(), "pkRangeId.getPartitionKeyRangeId()", "");
    }

    private Mono<AddressInformation[]> getAddressesForRangeId(RxDocumentServiceRequest request, PartitionKeyRangeIdentity pkRangeIdentity, boolean forceRefresh) {
        Utils.checkNotNullOrThrow(request, "request", "");
        this.validatePkRangeIdentity(pkRangeIdentity);
        String collectionRid = pkRangeIdentity.getCollectionRid();
        String partitionKeyRangeId = pkRangeIdentity.getPartitionKeyRangeId();
        logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}", new Object[]{collectionRid, partitionKeyRangeId, forceRefresh});
        Mono<List<Address>> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh);
        Mono addressInfos = addressResponse.map(addresses -> {
            if (logger.isDebugEnabled()) {
                logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}", (Object)JavaStreamUtils.info(addresses));
            }
            return addresses.stream().filter(addressInfo -> this.protocolScheme.equals(addressInfo.getProtocolScheme())).collect(Collectors.groupingBy(Address::getParitionKeyRangeId)).values().stream().map(groupedAddresses -> this.toPartitionAddressAndRange(collectionRid, (List<Address>)addresses)).collect(Collectors.toList());
        });
        Mono result = addressInfos.map(addressInfo -> addressInfo.stream().filter(a -> StringUtils.equals(((PartitionKeyRangeIdentity)a.getLeft()).getPartitionKeyRangeId(), partitionKeyRangeId)).collect(Collectors.toList()));
        return result.flatMap(list -> {
            if (logger.isDebugEnabled()) {
                logger.debug("getAddressesForRangeId flatMap got result {}", (Object)JavaStreamUtils.info(list));
            }
            if (list.isEmpty()) {
                String errorMessage = String.format("PartitionKeyRange with id %s in collection %s doesn't exist", partitionKeyRangeId, collectionRid);
                PartitionKeyRangeGoneException e = new PartitionKeyRangeGoneException(errorMessage);
                BridgeInternal.setResourceAddress(e, collectionRid);
                return Mono.error((Throwable)((Object)e));
            }
            return Mono.just((Object)((AddressInformation[])((Pair)list.get(0)).getRight()));
        }).doOnError(e -> logger.debug("getAddressesForRangeId", e));
    }

    public Mono<List<Address>> getMasterAddressesViaGatewayAsync(RxDocumentServiceRequest request, ResourceType resourceType, String resourceAddress, String entryUrl, boolean forceRefresh, boolean useMasterCollectionResolver, Map<String, Object> properties) {
        logger.debug("getMasterAddressesViaGatewayAsync resourceType {}, resourceAddress {}, entryUrl {}, forceRefresh {}, useMasterCollectionResolver {}", new Object[]{resourceType, resourceAddress, entryUrl, forceRefresh, useMasterCollectionResolver});
        request.setAddressRefresh(true, forceRefresh);
        HashMap<String, String> queryParameters = new HashMap<String, String>();
        queryParameters.put("$resolveFor", HttpUtils.urlEncode(entryUrl));
        HashMap<String, String> headers = new HashMap<String, String>(this.defaultRequestHeaders);
        if (forceRefresh) {
            headers.put("x-ms-force-refresh", "true");
        }
        if (useMasterCollectionResolver) {
            headers.put("x-ms-use-master-collection-resolver", "true");
        }
        if (request.forceCollectionRoutingMapRefresh) {
            headers.put("x-ms-collectionroutingmap-refresh", "true");
        }
        queryParameters.put("$filter", HttpUtils.urlEncode(this.protocolFilter));
        headers.put("x-ms-date", Utils.nowAsRFC1123());
        if (this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            String token = this.tokenProvider.getUserAuthorizationToken(resourceAddress, resourceType, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, properties);
            headers.put("authorization", HttpUtils.urlEncode(token));
        }
        URI targetEndpoint = Utils.setQuery(this.addressEndpoint.toString(), Utils.createQuery(queryParameters));
        String identifier = GatewayAddressCache.logAddressResolutionStart(request, targetEndpoint, true, true);
        HttpHeaders defaultHttpHeaders = new HttpHeaders(headers);
        HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), defaultHttpHeaders);
        Instant addressCallStartTime = Instant.now();
        Mono httpResponseMono = this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken ? this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())) : this.tokenProvider.populateAuthorizationHeader(defaultHttpHeaders).flatMap(valueHttpHeaders -> this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
        Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(request, this.clientContext, (Mono<HttpResponse>)httpResponseMono, httpRequest);
        return dsrObs.map(dsr -> {
            MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
            if (metadataDiagnosticsContext != null) {
                Instant addressCallEndTime = Instant.now();
                MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic = new MetadataDiagnosticsContext.MetadataDiagnostics(addressCallStartTime, addressCallEndTime, MetadataDiagnosticsContext.MetadataType.MASTER_ADDRESS_LOOK_UP);
                metadataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic);
            }
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, null);
            return dsr.getQueryResponse(null, Address.class);
        }).onErrorResume(throwable -> {
            CosmosException dce;
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)throwable);
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, unwrappedException.toString());
            if (!(unwrappedException instanceof Exception)) {
                logger.error("Unexpected failure {}", (Object)unwrappedException.getMessage(), (Object)unwrappedException);
                return Mono.error((Throwable)unwrappedException);
            }
            Exception exception = (Exception)unwrappedException;
            if (!(exception instanceof CosmosException)) {
                logger.error("Network failure", (Throwable)exception);
                int statusCode = 0;
                if (WebExceptionUtility.isNetworkFailure(exception)) {
                    statusCode = WebExceptionUtility.isReadTimeoutException(exception) ? 408 : 503;
                }
                dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, statusCode, exception);
                BridgeInternal.setRequestHeaders(dce, request.getHeaders());
            } else {
                dce = (CosmosException)((Object)((Object)exception));
            }
            if (WebExceptionUtility.isNetworkFailure((Exception)((Object)dce))) {
                if (WebExceptionUtility.isReadTimeoutException((Exception)((Object)dce))) {
                    BridgeInternal.setSubStatusCode(dce, 10002);
                } else {
                    BridgeInternal.setSubStatusCode(dce, 10001);
                }
            }
            if (request.requestContext.cosmosDiagnostics != null) {
                BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, dce, this.globalEndpointManager);
            }
            return Mono.error((Throwable)((Object)dce));
        });
    }

    private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
        if (logger.isDebugEnabled()) {
            logger.debug("toPartitionAddressAndRange");
        }
        Address address = addresses.get(0);
        PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId());
        AddressInformation[] addressInfos = addresses.stream().map(addr -> GatewayAddressCache.toAddressInformation(addr)).collect(Collectors.toList()).toArray(new AddressInformation[addresses.size()]);
        if (this.tcpConnectionEndpointRediscoveryEnabled) {
            for (AddressInformation addressInfo : addressInfos) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Added address to serverPartitionAddressToPkRangeIdMap: ({\"partitionKeyRangeIdentity\":{},\"address\":{}})", (Object)partitionKeyRangeIdentity, (Object)addressInfo);
                }
                this.serverPartitionAddressToPkRangeIdMap.compute(addressInfo.getServerKey(), (serverKey, partitionKeyRangeIdentitySet) -> {
                    if (partitionKeyRangeIdentitySet == null) {
                        partitionKeyRangeIdentitySet = ConcurrentHashMap.newKeySet();
                    }
                    partitionKeyRangeIdentitySet.add(partitionKeyRangeIdentity);
                    return partitionKeyRangeIdentitySet;
                });
            }
        }
        return Pair.of(partitionKeyRangeIdentity, addressInfos);
    }

    private static AddressInformation toAddressInformation(Address address) {
        return new AddressInformation(true, address.isPrimary(), address.getPhyicalUri(), address.getProtocolScheme());
    }

    public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(DocumentCollection collection, List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
        Preconditions.checkNotNull(collection, "Argument 'collection' should not be null");
        Preconditions.checkNotNull(partitionKeyRangeIdentities, "Argument 'partitionKeyRangeIdentities' should not be null");
        if (logger.isDebugEnabled()) {
            logger.debug("openConnectionsAndInitCaches collection: {}, partitionKeyRangeIdentities: {}", (Object)collection.getResourceId(), (Object)JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
        }
        ArrayList<Flux> tasks = new ArrayList<Flux>();
        int batchSize = 50;
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this.clientContext, OperationType.Read, collection.getResourceId(), ResourceType.DocumentCollection, Collections.emptyMap());
        for (int i = 0; i < partitionKeyRangeIdentities.size(); i += batchSize) {
            int endIndex = i + batchSize;
            endIndex = Math.min(endIndex, partitionKeyRangeIdentities.size());
            tasks.add(this.getServerAddressesViaGatewayWithRetry(request, collection.getResourceId(), partitionKeyRangeIdentities.subList(i, endIndex).stream().map(PartitionKeyRangeIdentity::getPartitionKeyRangeId).collect(Collectors.toList()), false).flux());
        }
        return Flux.concat(tasks).flatMap(list -> {
            List addressInfos = list.stream().filter(addressInfo -> this.protocolScheme.equals(addressInfo.getProtocolScheme())).collect(Collectors.groupingBy(Address::getParitionKeyRangeId)).values().stream().map(addresses -> this.toPartitionAddressAndRange(collection.getResourceId(), (List<Address>)addresses)).collect(Collectors.toList());
            return Flux.fromIterable(addressInfos).flatMap(addressInfo -> {
                this.serverPartitionAddressCache.set((PartitionKeyRangeIdentity)addressInfo.getLeft(), (AddressInformation[])addressInfo.getRight());
                if (this.openConnectionsHandler != null) {
                    return this.openConnectionsHandler.openConnections(Arrays.stream((AddressInformation[])addressInfo.getRight()).map(addressInformation -> addressInformation.getPhysicalUri()).collect(Collectors.toList()));
                }
                logger.info("OpenConnectionHandler is null, can not open connections");
                return Flux.empty();
            });
        });
    }

    private Mono<List<Address>> getServerAddressesViaGatewayWithRetry(RxDocumentServiceRequest request, String collectionRid, List<String> partitionKeyRangeIds, boolean forceRefresh) {
        OpenConnectionAndInitCachesRetryPolicy openConnectionAndInitCachesRetryPolicy = new OpenConnectionAndInitCachesRetryPolicy(this.connectionPolicy.getThrottlingRetryOptions());
        return BackoffRetryUtility.executeRetry(() -> this.getServerAddressesViaGatewayAsync(request, collectionRid, partitionKeyRangeIds, forceRefresh), openConnectionAndInitCachesRetryPolicy);
    }

    private boolean notAllReplicasAvailable(AddressInformation[] addressInformations) {
        return addressInformations.length < 4;
    }

    private static String logAddressResolutionStart(RxDocumentServiceRequest request, URI targetEndpointUrl, boolean forceRefresh, boolean forceCollectionRoutingMapRefresh) {
        if (request.requestContext.cosmosDiagnostics != null) {
            return BridgeInternal.recordAddressResolutionStart(request.requestContext.cosmosDiagnostics, targetEndpointUrl, forceRefresh, forceCollectionRoutingMapRefresh);
        }
        return null;
    }

    private static void logAddressResolutionEnd(RxDocumentServiceRequest request, String identifier, String errorMessage) {
        if (request.requestContext.cosmosDiagnostics != null) {
            BridgeInternal.recordAddressResolutionEnd(request.requestContext.cosmosDiagnostics, identifier, errorMessage);
        }
    }

    private static class ForcedRefreshMetadata {
        private final ConcurrentHashMap<PartitionKeyRangeIdentity, Instant> lastPartitionAddressOnlyRefresh = new ConcurrentHashMap();
        private Instant lastCollectionRoutingMapRefresh = Instant.now();

        public void signalCollectionRoutingMapRefresh(PartitionKeyRangeIdentity pk, boolean forcePartitionAddressRefresh) {
            Instant nowSnapshot = Instant.now();
            if (forcePartitionAddressRefresh) {
                this.lastPartitionAddressOnlyRefresh.put(pk, nowSnapshot);
            }
            this.lastCollectionRoutingMapRefresh = nowSnapshot;
        }

        public void signalPartitionAddressOnlyRefresh(PartitionKeyRangeIdentity pk) {
            this.lastPartitionAddressOnlyRefresh.put(pk, Instant.now());
        }

        public boolean shouldIncludeCollectionRoutingMapRefresh(PartitionKeyRangeIdentity pk) {
            Instant lastPartitionAddressRefreshSnapshot = this.lastPartitionAddressOnlyRefresh.get(pk);
            Instant lastCollectionRoutingMapRefreshSnapshot = this.lastCollectionRoutingMapRefresh;
            if (lastPartitionAddressRefreshSnapshot == null || !lastPartitionAddressRefreshSnapshot.isAfter(lastCollectionRoutingMapRefreshSnapshot)) {
                return false;
            }
            Duration durationSinceLastForcedCollectionRoutingMapRefresh = Duration.between(lastCollectionRoutingMapRefreshSnapshot, Instant.now());
            boolean returnValue = durationSinceLastForcedCollectionRoutingMapRefresh.compareTo(minDurationBeforeEnforcingCollectionRoutingMapRefresh) >= 0;
            return returnValue;
        }
    }
}

