/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.CosmosContainerProactiveInitConfig;
import com.azure.cosmos.implementation.ApiType;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.directconnectivity.AddressInformation;
import com.azure.cosmos.implementation.directconnectivity.AddressResolver;
import com.azure.cosmos.implementation.directconnectivity.ContainerDirectConnectionMetadata;
import com.azure.cosmos.implementation.directconnectivity.GatewayAddressCache;
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ProactiveOpenConnectionsProcessor;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.models.CosmosContainerIdentity;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GlobalAddressResolver
implements IAddressResolver {
    private static final Logger logger = LoggerFactory.getLogger(GlobalAddressResolver.class);
    private static final int MaxBackupReadRegions = 3;
    private final DiagnosticsClientContext diagnosticsClientContext;
    private final GlobalEndpointManager endpointManager;
    private final Protocol protocol;
    private final IAuthorizationTokenProvider tokenProvider;
    private final UserAgentContainer userAgentContainer;
    private final RxCollectionCache collectionCache;
    private final RxPartitionKeyRangeCache routingMapProvider;
    private final int maxEndpoints;
    private final GatewayServiceConfigurationReader serviceConfigReader;
    final Map<URI, EndpointCache> addressCacheByEndpoint;
    private final boolean tcpConnectionEndpointRediscoveryEnabled;
    private ApiType apiType;
    private HttpClient httpClient;
    private ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor;
    private ConnectionPolicy connectionPolicy;

    public GlobalAddressResolver(DiagnosticsClientContext diagnosticsClientContext, HttpClient httpClient, GlobalEndpointManager endpointManager, Protocol protocol, IAuthorizationTokenProvider tokenProvider, RxCollectionCache collectionCache, RxPartitionKeyRangeCache routingMapProvider, UserAgentContainer userAgentContainer, GatewayServiceConfigurationReader serviceConfigReader, ConnectionPolicy connectionPolicy, ApiType apiType) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.httpClient = httpClient;
        this.endpointManager = endpointManager;
        this.protocol = protocol;
        this.tokenProvider = tokenProvider;
        this.userAgentContainer = userAgentContainer;
        this.collectionCache = collectionCache;
        this.routingMapProvider = routingMapProvider;
        this.serviceConfigReader = serviceConfigReader;
        this.tcpConnectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled();
        this.connectionPolicy = connectionPolicy;
        int maxBackupReadEndpoints = connectionPolicy.isReadRequestsFallbackEnabled() ? 3 : 0;
        this.maxEndpoints = maxBackupReadEndpoints + 2;
        this.addressCacheByEndpoint = new ConcurrentHashMap<URI, EndpointCache>();
        this.apiType = apiType;
        for (URI endpoint : endpointManager.getWriteEndpoints()) {
            this.getOrAddEndpoint(endpoint);
        }
        for (URI endpoint : endpointManager.getReadEndpoints()) {
            this.getOrAddEndpoint(endpoint);
        }
    }

    @Override
    public Flux<Void> submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiveInitConfig proactiveContainerInitConfig) {
        return Flux.fromIterable(proactiveContainerInitConfig.getCosmosContainerIdentities()).publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).flatMap(cosmosContainerIdentity -> this.collectionCache.resolveByNameAsync(null, ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink((CosmosContainerIdentity)cosmosContainerIdentity), null).flatMapMany(collection -> {
            if (collection == null) {
                logger.warn("Can not find the collection, no connections will be opened");
                return Flux.empty();
            }
            return this.routingMapProvider.tryGetOverlappingRangesAsync(null, collection.getResourceId(), PartitionKeyInternalHelper.FullRange, true, null).flatMap(valueHolder -> {
                String containerLink = ImplementationBridgeHelpers.CosmosContainerIdentityHelper.getCosmosContainerIdentityAccessor().getContainerLink((CosmosContainerIdentity)cosmosContainerIdentity);
                if (valueHolder == null || valueHolder.v == null || ((List)valueHolder.v).size() == 0) {
                    logger.warn("There is no pkRanges found for collection {}, no connections will be opened", (Object)collection.getResourceId());
                    return Mono.just(new ImmutablePair(containerLink, new ArrayList()));
                }
                List pkrs = ((List)valueHolder.v).stream().map(pkRange -> new PartitionKeyRangeIdentity(collection.getResourceId(), pkRange.getId())).collect(Collectors.toList());
                return Mono.just(new ImmutablePair(containerLink, pkrs));
            }).flatMapMany(containerLinkToPkrs -> {
                if (proactiveContainerInitConfig.getProactiveConnectionRegionsCount() > 0) {
                    return Flux.fromIterable(this.endpointManager.getReadEndpoints().subList(0, proactiveContainerInitConfig.getProactiveConnectionRegionsCount())).flatMap(readEndpoint -> {
                        if (this.addressCacheByEndpoint.containsKey(readEndpoint)) {
                            EndpointCache endpointCache = this.addressCacheByEndpoint.get(readEndpoint);
                            return this.resolveAddressesPerCollection(endpointCache, (String)containerLinkToPkrs.left, (DocumentCollection)collection, (List)containerLinkToPkrs.right).flatMap(collectionToAddresses -> {
                                ImmutablePair containerLinkToCollection = (ImmutablePair)collectionToAddresses.left;
                                AddressInformation addressInformation = (AddressInformation)collectionToAddresses.right;
                                Map<CosmosContainerIdentity, ContainerDirectConnectionMetadata> containerPropertiesMap = ImplementationBridgeHelpers.CosmosContainerProactiveInitConfigHelper.getCosmosContainerProactiveInitConfigAccessor().getContainerPropertiesMap(proactiveContainerInitConfig);
                                ContainerDirectConnectionMetadata containerDirectConnectionMetadata = containerPropertiesMap.get(cosmosContainerIdentity);
                                int connectionsPerEndpointCountForContainer = containerDirectConnectionMetadata.getMinConnectionPoolSizePerEndpointForContainer();
                                return this.submitOpenConnectionInternal(endpointCache, addressInformation, (DocumentCollection)containerLinkToCollection.getRight(), connectionsPerEndpointCountForContainer).then();
                            }).onErrorResume(throwable -> {
                                Throwable unwrappedThrowable = Exceptions.unwrap((Throwable)throwable);
                                logger.warn("An exception occurred when resolving addresses for region : {}", readEndpoint, (Object)unwrappedThrowable);
                                return Flux.empty();
                            });
                        }
                        return Flux.empty();
                    }, 1);
                }
                return Flux.empty();
            });
        }), Configs.getCPUCnt(), Configs.getCPUCnt());
    }

    private Flux<ImmutablePair<ImmutablePair<String, DocumentCollection>, AddressInformation>> resolveAddressesPerCollection(EndpointCache endpointCache, String containerLink, DocumentCollection collection, List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
        return endpointCache.addressCache.resolveAddressesAndInitCaches(containerLink, collection, partitionKeyRangeIdentities);
    }

    private Mono<OpenConnectionResponse> submitOpenConnectionInternal(EndpointCache endpointCache, AddressInformation address, DocumentCollection documentCollection, int connectionPerEndpointCount) {
        return endpointCache.addressCache.submitOpenConnectionTask(address, documentCollection, connectionPerEndpointCount);
    }

    @Override
    public void setOpenConnectionsProcessor(ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor) {
        this.proactiveOpenConnectionsProcessor = proactiveOpenConnectionsProcessor;
        for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
            endpointCache.addressCache.setOpenConnectionsProcessor(this.proactiveOpenConnectionsProcessor);
        }
    }

    @Override
    public Mono<AddressInformation[]> resolveAsync(RxDocumentServiceRequest request, boolean forceRefresh) {
        IAddressResolver resolver = this.getAddressResolver(request);
        return resolver.resolveAsync(request, forceRefresh);
    }

    public void dispose() {
        for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
            endpointCache.addressCache.dispose();
        }
    }

    private IAddressResolver getAddressResolver(RxDocumentServiceRequest rxDocumentServiceRequest) {
        URI endpoint = this.endpointManager.resolveServiceEndpoint(rxDocumentServiceRequest);
        return this.getOrAddEndpoint((URI)endpoint).addressResolver;
    }

    private EndpointCache getOrAddEndpoint(URI endpoint) {
        EndpointCache endpointCache = this.addressCacheByEndpoint.computeIfAbsent(endpoint, key -> {
            GatewayAddressCache gatewayAddressCache = new GatewayAddressCache(this.diagnosticsClientContext, endpoint, this.protocol, this.tokenProvider, this.userAgentContainer, this.httpClient, this.apiType, this.endpointManager, this.connectionPolicy, this.proactiveOpenConnectionsProcessor);
            AddressResolver addressResolver = new AddressResolver();
            addressResolver.initializeCaches(this.collectionCache, this.routingMapProvider, gatewayAddressCache);
            EndpointCache cache = new EndpointCache();
            cache.addressCache = gatewayAddressCache;
            cache.addressResolver = addressResolver;
            return cache;
        });
        if (this.addressCacheByEndpoint.size() > this.maxEndpoints) {
            ArrayList<URI> allEndpoints = new ArrayList<URI>(this.endpointManager.getWriteEndpoints());
            allEndpoints.addAll(this.endpointManager.getReadEndpoints());
            Collections.reverse(allEndpoints);
            LinkedList<URI> endpoints = new LinkedList<URI>(allEndpoints);
            while (this.addressCacheByEndpoint.size() > this.maxEndpoints && endpoints.size() > 0) {
                URI dequeueEndpoint = endpoints.pop();
                if (this.addressCacheByEndpoint.get(dequeueEndpoint) == null) continue;
                this.addressCacheByEndpoint.remove(dequeueEndpoint);
            }
        }
        return endpointCache;
    }

    static class EndpointCache {
        GatewayAddressCache addressCache;
        AddressResolver addressResolver;

        EndpointCache() {
        }
    }
}

