/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.TransportClient;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RntbdOpenConnectionsHandler
implements IOpenConnectionsHandler {
    private static final Logger logger = LoggerFactory.getLogger(RntbdOpenConnectionsHandler.class);
    private static int DEFAULT_CONNECTION_SEMAPHORE_TIMEOUT_IN_MINUTES = 30;
    private final TransportClient transportClient;
    private final Semaphore openConnectionsSemaphore;

    public RntbdOpenConnectionsHandler(TransportClient transportClient) {
        Preconditions.checkNotNull(transportClient, "Argument 'transportClient' can not be null");
        this.transportClient = transportClient;
        this.openConnectionsSemaphore = new Semaphore(Configs.getCPUCnt() * 10);
    }

    @Override
    public Flux<OpenConnectionResponse> openConnections(String collectionRid, URI serviceEndpoint, List<Uri> addresses) {
        Preconditions.checkNotNull(addresses, "Argument 'addresses' should not be null");
        Preconditions.checkArgument(StringUtils.isNotEmpty(collectionRid), "Argument 'collectionRid' cannot be null nor empty");
        if (logger.isDebugEnabled()) {
            logger.debug("Open connections for addresses {}", (Object)StringUtils.join(addresses, ","));
        }
        return Flux.fromIterable(addresses).flatMap(addressUri -> {
            try {
                if (this.openConnectionsSemaphore.tryAcquire(DEFAULT_CONNECTION_SEMAPHORE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) {
                    RxDocumentServiceRequest openConnectionRequest = this.getOpenConnectionRequest(collectionRid, serviceEndpoint, (Uri)addressUri);
                    return this.transportClient.openConnection((Uri)addressUri, openConnectionRequest).onErrorResume(throwable -> Mono.just((Object)new OpenConnectionResponse((Uri)addressUri, false, (Throwable)throwable))).doOnNext(response -> {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Connection result: isConnected [{}], address [{}]", (Object)response.isConnected(), (Object)response.getUri());
                        }
                    }).doOnTerminate(() -> this.openConnectionsSemaphore.release());
                }
            }
            catch (InterruptedException e) {
                logger.warn("Acquire connection semaphore failed", (Throwable)e);
            }
            return Mono.just((Object)new OpenConnectionResponse((Uri)addressUri, false, new IllegalStateException("Unable to acquire semaphore")));
        });
    }

    private RxDocumentServiceRequest getOpenConnectionRequest(String collectionRid, URI serviceEndpoint, Uri addressUri) {
        RxDocumentServiceRequest openConnectionRequest = RxDocumentServiceRequest.create(null, OperationType.Create, ResourceType.Connection);
        openConnectionRequest.requestContext.locationEndpointToRoute = serviceEndpoint;
        openConnectionRequest.requestContext.resolvedCollectionRid = collectionRid;
        openConnectionRequest.faultInjectionRequestContext.setLocationEndpointToRoute(serviceEndpoint);
        return openConnectionRequest;
    }
}

