/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.JavaStreamUtils;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.RequestVerb;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.directconnectivity.Address;
import com.azure.cosmos.implementation.directconnectivity.AddressInformation;
import com.azure.cosmos.implementation.directconnectivity.HttpClientUtils;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.IAddressCache;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.ServiceConfig;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import io.netty.handler.codec.http.HttpMethod;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GatewayAddressCache
implements IAddressCache {
    private static final Logger logger = LoggerFactory.getLogger(GatewayAddressCache.class);
    private static final String protocolFilterFormat = "%s eq %s";
    private static final int DefaultBatchSize = 50;
    private static final int DefaultSuboptimalPartitionForceRefreshIntervalInSeconds = 600;
    private final DiagnosticsClientContext clientContext;
    private final ServiceConfig serviceConfig = ServiceConfig.getInstance();
    private final String databaseFeedEntryUrl = PathsHelper.generatePath(ResourceType.Database, "", true);
    private final URI serviceEndpoint;
    private final URI addressEndpoint;
    private final AsyncCache<PartitionKeyRangeIdentity, AddressInformation[]> serverPartitionAddressCache;
    private final ConcurrentHashMap<PartitionKeyRangeIdentity, Instant> suboptimalServerPartitionTimestamps;
    private final long suboptimalPartitionForceRefreshIntervalInSeconds;
    private final String protocolScheme;
    private final String protocolFilter;
    private final IAuthorizationTokenProvider tokenProvider;
    private final HashMap<String, String> defaultRequestHeaders;
    private final HttpClient httpClient;
    private volatile Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterPartitionAddressCache;
    private volatile Instant suboptimalMasterPartitionTimestamp;

    public GatewayAddressCache(DiagnosticsClientContext clientContext, URI serviceEndpoint, Protocol protocol, IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, HttpClient httpClient, long suboptimalPartitionForceRefreshIntervalInSeconds) {
        this.clientContext = clientContext;
        try {
            this.addressEndpoint = new URL(serviceEndpoint.toURL(), "addresses").toURI();
        }
        catch (MalformedURLException | URISyntaxException e) {
            logger.error("serviceEndpoint {} is invalid", (Object)serviceEndpoint, (Object)e);
            assert (false);
            throw new IllegalStateException(e);
        }
        this.tokenProvider = tokenProvider;
        this.serviceEndpoint = serviceEndpoint;
        this.serverPartitionAddressCache = new AsyncCache();
        this.suboptimalServerPartitionTimestamps = new ConcurrentHashMap();
        this.suboptimalMasterPartitionTimestamp = Instant.MAX;
        this.suboptimalPartitionForceRefreshIntervalInSeconds = suboptimalPartitionForceRefreshIntervalInSeconds;
        this.protocolScheme = protocol.scheme();
        this.protocolFilter = String.format(protocolFilterFormat, "protocol", this.protocolScheme);
        this.httpClient = httpClient;
        if (userAgent == null) {
            userAgent = new UserAgentContainer();
        }
        this.defaultRequestHeaders = new HashMap();
        this.defaultRequestHeaders.put("User-Agent", userAgent.getUserAgent());
        this.defaultRequestHeaders.put("x-ms-version", "2018-12-31");
    }

    public GatewayAddressCache(DiagnosticsClientContext clientContext, URI serviceEndpoint, Protocol protocol, IAuthorizationTokenProvider tokenProvider, UserAgentContainer userAgent, HttpClient httpClient) {
        this(clientContext, serviceEndpoint, protocol, tokenProvider, userAgent, httpClient, 600L);
    }

    @Override
    public void removeAddress(PartitionKeyRangeIdentity partitionKeyRangeIdentity) {
        Objects.requireNonNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity");
        if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals("M")) {
            this.masterPartitionAddressCache = null;
        } else {
            this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
        }
    }

    @Override
    public Mono<Utils.ValueHolder<AddressInformation[]>> tryGetAddresses(RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, boolean forceRefreshPartitionAddresses) {
        boolean forceRefreshPartitionAddressesModified;
        Utils.checkNotNullOrThrow(request, "request", "");
        Utils.checkNotNullOrThrow(partitionKeyRangeIdentity, "partitionKeyRangeIdentity", "");
        logger.debug("PartitionKeyRangeIdentity {}, forceRefreshPartitionAddresses {}", (Object)partitionKeyRangeIdentity, (Object)forceRefreshPartitionAddresses);
        if (StringUtils.equals(partitionKeyRangeIdentity.getPartitionKeyRangeId(), "M")) {
            return this.resolveMasterAsync(request, forceRefreshPartitionAddresses, request.properties).map(partitionKeyRangeIdentityPair -> new Utils.ValueHolder<AddressInformation[]>((AddressInformation[])partitionKeyRangeIdentityPair.getRight()));
        }
        Instant suboptimalServerPartitionTimestamp = this.suboptimalServerPartitionTimestamps.get(partitionKeyRangeIdentity);
        if (suboptimalServerPartitionTimestamp != null) {
            boolean forceRefreshDueToSuboptimalPartitionReplicaSet;
            logger.debug("suboptimalServerPartitionTimestamp is {}", (Object)suboptimalServerPartitionTimestamp);
            boolean bl = forceRefreshDueToSuboptimalPartitionReplicaSet = Duration.between(suboptimalServerPartitionTimestamp, Instant.now()).getSeconds() > this.suboptimalPartitionForceRefreshIntervalInSeconds;
            if (forceRefreshDueToSuboptimalPartitionReplicaSet) {
                Instant newValue = this.suboptimalServerPartitionTimestamps.computeIfPresent(partitionKeyRangeIdentity, (key, oldVal) -> {
                    logger.debug("key = {}, oldValue = {}", key, oldVal);
                    if (suboptimalServerPartitionTimestamp.equals(oldVal)) {
                        return Instant.MAX;
                    }
                    return oldVal;
                });
                logger.debug("newValue is {}", (Object)newValue);
                if (!suboptimalServerPartitionTimestamp.equals(newValue)) {
                    logger.debug("setting forceRefreshPartitionAddresses to true");
                    forceRefreshPartitionAddresses = true;
                }
            }
        }
        if (forceRefreshPartitionAddressesModified = forceRefreshPartitionAddresses) {
            logger.debug("refresh serverPartitionAddressCache for {}", (Object)partitionKeyRangeIdentity);
            this.serverPartitionAddressCache.refresh(partitionKeyRangeIdentity, () -> this.getAddressesForRangeId(request, partitionKeyRangeIdentity.getCollectionRid(), partitionKeyRangeIdentity.getPartitionKeyRangeId(), true));
            this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
        }
        Mono addressesObs = this.serverPartitionAddressCache.getAsync(partitionKeyRangeIdentity, null, () -> this.getAddressesForRangeId(request, partitionKeyRangeIdentity.getCollectionRid(), partitionKeyRangeIdentity.getPartitionKeyRangeId(), false)).map(Utils.ValueHolder::new);
        return addressesObs.map(addressesValueHolder -> {
            if (this.notAllReplicasAvailable((AddressInformation[])addressesValueHolder.v)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("not all replicas available {}", (Object)JavaStreamUtils.info((AddressInformation[])addressesValueHolder.v));
                }
                this.suboptimalServerPartitionTimestamps.putIfAbsent(partitionKeyRangeIdentity, Instant.now());
            }
            return addressesValueHolder;
        }).onErrorResume(ex -> {
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)ex);
            CosmosException dce = Utils.as(unwrappedException, CosmosException.class);
            if (dce == null) {
                logger.error("unexpected failure", ex);
                if (forceRefreshPartitionAddressesModified) {
                    this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
                }
                return Mono.error((Throwable)unwrappedException);
            }
            logger.debug("tryGetAddresses dce", (Throwable)((Object)dce));
            if (Exceptions.isStatusCode(dce, 404) || Exceptions.isStatusCode(dce, 410) || Exceptions.isSubStatusCode(dce, 1002)) {
                this.suboptimalServerPartitionTimestamps.remove(partitionKeyRangeIdentity);
                logger.debug("tryGetAddresses: inner onErrorResumeNext return null", (Throwable)((Object)dce));
                return Mono.just(new Utils.ValueHolder<Object>(null));
            }
            return Mono.error((Throwable)unwrappedException);
        });
    }

    public Mono<List<Address>> getServerAddressesViaGatewayAsync(RxDocumentServiceRequest request, String collectionRid, List<String> partitionKeyRangeIds, boolean forceRefresh) {
        if (logger.isDebugEnabled()) {
            logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", (Object)collectionRid, (Object)JavaStreamUtils.toString(partitionKeyRangeIds, ","));
        }
        request.setAddressRefresh(true);
        String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
        HashMap<String, String> addressQuery = new HashMap<String, String>();
        addressQuery.put("$resolveFor", HttpUtils.urlEncode(entryUrl));
        HashMap<String, String> headers = new HashMap<String, String>(this.defaultRequestHeaders);
        if (forceRefresh) {
            headers.put("x-ms-force-refresh", "true");
        }
        if (request.forceCollectionRoutingMapRefresh) {
            headers.put("x-ms-collectionroutingmap-refresh", "true");
        }
        addressQuery.put("$filter", HttpUtils.urlEncode(this.protocolFilter));
        addressQuery.put("$partitionKeyRangeIds", String.join((CharSequence)",", partitionKeyRangeIds));
        headers.put("x-ms-date", Utils.nowAsRFC1123());
        if (this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            String token = this.tokenProvider.getUserAuthorizationToken(collectionRid, ResourceType.Document, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, request.properties);
            if (token == null && request.getIsNameBased()) {
                String collectionAltLink = PathsHelper.getCollectionPath(request.getResourceAddress());
                token = this.tokenProvider.getUserAuthorizationToken(collectionAltLink, ResourceType.Document, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, request.properties);
            }
            token = HttpUtils.urlEncode(token);
            headers.put("authorization", token);
        }
        URI targetEndpoint = Utils.setQuery(this.addressEndpoint.toString(), Utils.createQuery(addressQuery));
        String identifier = GatewayAddressCache.logAddressResolutionStart(request, targetEndpoint);
        HttpHeaders httpHeaders = new HttpHeaders(headers);
        Instant addressCallStartTime = Instant.now();
        HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), httpHeaders);
        Mono httpResponseMono = this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken ? this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())) : this.tokenProvider.populateAuthorizationHeader(httpHeaders).flatMap(valueHttpHeaders -> this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
        Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(this.clientContext, (Mono<HttpResponse>)httpResponseMono, httpRequest);
        return dsrObs.map(dsr -> {
            MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
            if (metadataDiagnosticsContext != null) {
                Instant addressCallEndTime = Instant.now();
                MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic = new MetadataDiagnosticsContext.MetadataDiagnostics(addressCallStartTime, addressCallEndTime, MetadataDiagnosticsContext.MetadataType.SERVER_ADDRESS_LOOKUP);
                metadataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("getServerAddressesViaGatewayAsync deserializes result");
            }
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, null);
            return dsr.getQueryResponse(Address.class);
        }).onErrorResume(throwable -> {
            CosmosException dce;
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)throwable);
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, unwrappedException.toString());
            if (!(unwrappedException instanceof Exception)) {
                logger.error("Unexpected failure {}", (Object)unwrappedException.getMessage(), (Object)unwrappedException);
                return Mono.error((Throwable)unwrappedException);
            }
            Exception exception = (Exception)unwrappedException;
            if (!(exception instanceof CosmosException)) {
                logger.error("Network failure", (Throwable)exception);
                dce = BridgeInternal.createCosmosException(0, exception);
                BridgeInternal.setRequestHeaders(dce, request.getHeaders());
            } else {
                dce = (CosmosException)((Object)((Object)exception));
            }
            if (WebExceptionUtility.isNetworkFailure((Exception)((Object)dce))) {
                if (WebExceptionUtility.isReadTimeoutException((Exception)((Object)dce))) {
                    BridgeInternal.setSubStatusCode(dce, 10002);
                } else {
                    BridgeInternal.setSubStatusCode(dce, 10001);
                }
            }
            if (request.requestContext.cosmosDiagnostics != null) {
                BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, dce);
                BridgeInternal.setCosmosDiagnostics(dce, request.requestContext.cosmosDiagnostics);
            }
            return Mono.error((Throwable)((Object)dce));
        });
    }

    public void dispose() {
    }

    private Mono<Pair<PartitionKeyRangeIdentity, AddressInformation[]>> resolveMasterAsync(RxDocumentServiceRequest request, boolean forceRefresh, Map<String, Object> properties) {
        logger.debug("resolveMasterAsync forceRefresh: {}", (Object)forceRefresh);
        Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeInitial = this.masterPartitionAddressCache;
        boolean bl = forceRefresh = forceRefresh || masterAddressAndRangeInitial != null && this.notAllReplicasAvailable(masterAddressAndRangeInitial.getRight()) && Duration.between(this.suboptimalMasterPartitionTimestamp, Instant.now()).getSeconds() > this.suboptimalPartitionForceRefreshIntervalInSeconds;
        if (forceRefresh || this.masterPartitionAddressCache == null) {
            Mono<List<Address>> masterReplicaAddressesObs = this.getMasterAddressesViaGatewayAsync(request, ResourceType.Database, null, this.databaseFeedEntryUrl, forceRefresh, false, properties);
            return masterReplicaAddressesObs.map(masterAddresses -> {
                Pair<PartitionKeyRangeIdentity, AddressInformation[]> masterAddressAndRangeRes = this.toPartitionAddressAndRange("", (List<Address>)masterAddresses);
                this.masterPartitionAddressCache = masterAddressAndRangeRes;
                this.suboptimalMasterPartitionTimestamp = this.notAllReplicasAvailable(masterAddressAndRangeRes.getRight()) && this.suboptimalMasterPartitionTimestamp.equals(Instant.MAX) ? Instant.now() : Instant.MAX;
                return this.masterPartitionAddressCache;
            }).doOnError(e -> {
                this.suboptimalMasterPartitionTimestamp = Instant.MAX;
            });
        }
        if (this.notAllReplicasAvailable(masterAddressAndRangeInitial.getRight()) && this.suboptimalMasterPartitionTimestamp.equals(Instant.MAX)) {
            this.suboptimalMasterPartitionTimestamp = Instant.now();
        }
        return Mono.just(masterAddressAndRangeInitial);
    }

    private Mono<AddressInformation[]> getAddressesForRangeId(RxDocumentServiceRequest request, String collectionRid, String partitionKeyRangeId, boolean forceRefresh) {
        logger.debug("getAddressesForRangeId collectionRid {}, partitionKeyRangeId {}, forceRefresh {}", new Object[]{collectionRid, partitionKeyRangeId, forceRefresh});
        Mono<List<Address>> addressResponse = this.getServerAddressesViaGatewayAsync(request, collectionRid, Collections.singletonList(partitionKeyRangeId), forceRefresh);
        Mono addressInfos = addressResponse.map(addresses -> {
            if (logger.isDebugEnabled()) {
                logger.debug("addresses from getServerAddressesViaGatewayAsync in getAddressesForRangeId {}", (Object)JavaStreamUtils.info(addresses));
            }
            return addresses.stream().filter(addressInfo -> this.protocolScheme.equals(addressInfo.getProtocolScheme())).collect(Collectors.groupingBy(Address::getParitionKeyRangeId)).values().stream().map(groupedAddresses -> this.toPartitionAddressAndRange(collectionRid, (List<Address>)addresses)).collect(Collectors.toList());
        });
        Mono result = addressInfos.map(addressInfo -> addressInfo.stream().filter(a -> StringUtils.equals(((PartitionKeyRangeIdentity)a.getLeft()).getPartitionKeyRangeId(), partitionKeyRangeId)).collect(Collectors.toList()));
        return result.flatMap(list -> {
            if (logger.isDebugEnabled()) {
                logger.debug("getAddressesForRangeId flatMap got result {}", (Object)JavaStreamUtils.info(list));
            }
            if (list.isEmpty()) {
                String errorMessage = String.format("PartitionKeyRange with id %s in collection %s doesn't exist", partitionKeyRangeId, collectionRid);
                PartitionKeyRangeGoneException e = new PartitionKeyRangeGoneException(errorMessage);
                BridgeInternal.setResourceAddress(e, collectionRid);
                return Mono.error((Throwable)((Object)e));
            }
            return Mono.just((Object)((AddressInformation[])((Pair)list.get(0)).getRight()));
        }).doOnError(e -> logger.debug("getAddressesForRangeId", e));
    }

    public Mono<List<Address>> getMasterAddressesViaGatewayAsync(RxDocumentServiceRequest request, ResourceType resourceType, String resourceAddress, String entryUrl, boolean forceRefresh, boolean useMasterCollectionResolver, Map<String, Object> properties) {
        logger.debug("getMasterAddressesViaGatewayAsync resourceType {}, resourceAddress {}, entryUrl {}, forceRefresh {}, useMasterCollectionResolver {}", new Object[]{resourceType, resourceAddress, entryUrl, forceRefresh, useMasterCollectionResolver});
        request.setAddressRefresh(true);
        HashMap<String, String> queryParameters = new HashMap<String, String>();
        queryParameters.put("$resolveFor", HttpUtils.urlEncode(entryUrl));
        HashMap<String, String> headers = new HashMap<String, String>(this.defaultRequestHeaders);
        if (forceRefresh) {
            headers.put("x-ms-force-refresh", "true");
        }
        if (useMasterCollectionResolver) {
            headers.put("x-ms-use-master-collection-resolver", "true");
        }
        if (request.forceCollectionRoutingMapRefresh) {
            headers.put("x-ms-collectionroutingmap-refresh", "true");
        }
        queryParameters.put("$filter", HttpUtils.urlEncode(this.protocolFilter));
        headers.put("x-ms-date", Utils.nowAsRFC1123());
        if (this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            String token = this.tokenProvider.getUserAuthorizationToken(resourceAddress, resourceType, RequestVerb.GET, headers, AuthorizationTokenType.PrimaryMasterKey, properties);
            headers.put("authorization", HttpUtils.urlEncode(token));
        }
        URI targetEndpoint = Utils.setQuery(this.addressEndpoint.toString(), Utils.createQuery(queryParameters));
        String identifier = GatewayAddressCache.logAddressResolutionStart(request, targetEndpoint);
        HttpHeaders defaultHttpHeaders = new HttpHeaders(headers);
        HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), defaultHttpHeaders);
        Instant addressCallStartTime = Instant.now();
        Mono httpResponseMono = this.tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken ? this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())) : this.tokenProvider.populateAuthorizationHeader(defaultHttpHeaders).flatMap(valueHttpHeaders -> this.httpClient.send(httpRequest, Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
        Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(this.clientContext, (Mono<HttpResponse>)httpResponseMono, httpRequest);
        return dsrObs.map(dsr -> {
            MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);
            if (metadataDiagnosticsContext != null) {
                Instant addressCallEndTime = Instant.now();
                MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic = new MetadataDiagnosticsContext.MetadataDiagnostics(addressCallStartTime, addressCallEndTime, MetadataDiagnosticsContext.MetadataType.MASTER_ADDRESS_LOOK_UP);
                metadataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic);
            }
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, null);
            return dsr.getQueryResponse(Address.class);
        }).onErrorResume(throwable -> {
            CosmosException dce;
            Throwable unwrappedException = reactor.core.Exceptions.unwrap((Throwable)throwable);
            GatewayAddressCache.logAddressResolutionEnd(request, identifier, unwrappedException.toString());
            if (!(unwrappedException instanceof Exception)) {
                logger.error("Unexpected failure {}", (Object)unwrappedException.getMessage(), (Object)unwrappedException);
                return Mono.error((Throwable)unwrappedException);
            }
            Exception exception = (Exception)unwrappedException;
            if (!(exception instanceof CosmosException)) {
                logger.error("Network failure", (Throwable)exception);
                dce = BridgeInternal.createCosmosException(0, exception);
                BridgeInternal.setRequestHeaders(dce, request.getHeaders());
            } else {
                dce = (CosmosException)((Object)((Object)exception));
            }
            if (WebExceptionUtility.isNetworkFailure((Exception)((Object)dce))) {
                if (WebExceptionUtility.isReadTimeoutException((Exception)((Object)dce))) {
                    BridgeInternal.setSubStatusCode(dce, 10002);
                } else {
                    BridgeInternal.setSubStatusCode(dce, 10001);
                }
            }
            if (request.requestContext.cosmosDiagnostics != null) {
                BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, dce);
                BridgeInternal.setCosmosDiagnostics(dce, request.requestContext.cosmosDiagnostics);
            }
            return Mono.error((Throwable)((Object)dce));
        });
    }

    private Pair<PartitionKeyRangeIdentity, AddressInformation[]> toPartitionAddressAndRange(String collectionRid, List<Address> addresses) {
        logger.debug("toPartitionAddressAndRange");
        Address address = addresses.get(0);
        AddressInformation[] addressInfos = addresses.stream().map(addr -> GatewayAddressCache.toAddressInformation(addr)).collect(Collectors.toList()).toArray(new AddressInformation[addresses.size()]);
        return Pair.of(new PartitionKeyRangeIdentity(collectionRid, address.getParitionKeyRangeId()), addressInfos);
    }

    private static AddressInformation toAddressInformation(Address address) {
        return new AddressInformation(true, address.isPrimary(), address.getPhyicalUri(), address.getProtocolScheme());
    }

    public Mono<Void> openAsync(DocumentCollection collection, List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
        if (logger.isDebugEnabled()) {
            logger.debug("openAsync collection: {}, partitionKeyRangeIdentities: {}", (Object)collection, (Object)JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
        }
        ArrayList<Flux> tasks = new ArrayList<Flux>();
        int batchSize = 50;
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this.clientContext, OperationType.Read, collection.getResourceId(), ResourceType.DocumentCollection, Collections.emptyMap());
        for (int i = 0; i < partitionKeyRangeIdentities.size(); i += batchSize) {
            int endIndex = i + batchSize;
            endIndex = endIndex < partitionKeyRangeIdentities.size() ? endIndex : partitionKeyRangeIdentities.size();
            tasks.add(this.getServerAddressesViaGatewayAsync(request, collection.getResourceId(), partitionKeyRangeIdentities.subList(i, endIndex).stream().map(PartitionKeyRangeIdentity::getPartitionKeyRangeId).collect(Collectors.toList()), false).flux());
        }
        return Flux.concat(tasks).doOnNext(list -> {
            List addressInfos = list.stream().filter(addressInfo -> this.protocolScheme.equals(addressInfo.getProtocolScheme())).collect(Collectors.groupingBy(Address::getParitionKeyRangeId)).values().stream().map(addresses -> this.toPartitionAddressAndRange(collection.getResourceId(), (List<Address>)addresses)).collect(Collectors.toList());
            for (Pair addressInfo2 : addressInfos) {
                this.serverPartitionAddressCache.set(new PartitionKeyRangeIdentity(collection.getResourceId(), ((PartitionKeyRangeIdentity)addressInfo2.getLeft()).getPartitionKeyRangeId()), (AddressInformation[])addressInfo2.getRight());
            }
        }).then();
    }

    private boolean notAllReplicasAvailable(AddressInformation[] addressInformations) {
        return addressInformations.length < 4;
    }

    private static String logAddressResolutionStart(RxDocumentServiceRequest request, URI targetEndpointUrl) {
        if (request.requestContext.cosmosDiagnostics != null) {
            return BridgeInternal.recordAddressResolutionStart(request.requestContext.cosmosDiagnostics, targetEndpointUrl);
        }
        return null;
    }

    private static void logAddressResolutionEnd(RxDocumentServiceRequest request, String identifier, String errorMessage) {
        if (request.requestContext.cosmosDiagnostics != null) {
            BridgeInternal.recordAddressResolutionEnd(request.requestContext.cosmosDiagnostics, identifier, errorMessage);
        }
    }
}

