/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ProactiveOpenConnectionsProcessor;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConnectionEvent;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConnectionStateListenerMetrics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class RntbdConnectionStateListener {
    private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListener.class);
    private final RntbdEndpoint endpoint;
    private final RntbdConnectionStateListenerMetrics metrics;
    private final Set<Uri> addressUris;
    private final ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor;
    private final AtomicBoolean endpointValidationInProgress = new AtomicBoolean(false);

    public RntbdConnectionStateListener(RntbdEndpoint endpoint, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor) {
        this.endpoint = Preconditions.checkNotNull(endpoint, "expected non-null endpoint");
        this.metrics = new RntbdConnectionStateListenerMetrics();
        this.addressUris = ConcurrentHashMap.newKeySet();
        this.proactiveOpenConnectionsProcessor = proactiveOpenConnectionsProcessor;
    }

    public void onBeforeSendRequest(Uri addressUri) {
        Preconditions.checkNotNull(addressUri, "Argument 'addressUri' should not be null");
        this.addressUris.add(addressUri);
    }

    public void onException(Throwable exception) {
        Preconditions.checkNotNull(exception, "expect non-null exception");
        this.metrics.record();
        if (exception instanceof IOException) {
            if (exception instanceof ClosedChannelException) {
                this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, exception));
            } else {
                this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_FAILURE, exception));
            }
        } else if (exception instanceof RntbdRequestManager.UnhealthyChannelException) {
            this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_FAILURE, exception));
        } else if (logger.isDebugEnabled()) {
            logger.debug("Will not raise the connection state change event for error", exception);
        }
    }

    public RntbdConnectionStateListenerMetrics getMetrics() {
        return this.metrics;
    }

    public void openConnectionIfNeeded() {
        if (this.proactiveOpenConnectionsProcessor == null) {
            logger.warn("proactiveOpenConnectionsProcessor is null");
            return;
        }
        Optional addressUriOptional = this.addressUris.stream().findFirst();
        if (!addressUriOptional.isPresent()) {
            logger.debug("addressUri cannot be null...");
            return;
        }
        Uri addressUri = (Uri)addressUriOptional.get();
        if (!this.proactiveOpenConnectionsProcessor.isAddressUriUnderOpenConnectionsFlow(addressUri.getURIAsString())) {
            return;
        }
        if (this.endpointValidationInProgress.compareAndSet(false, true)) {
            Mono.fromFuture((CompletableFuture)this.proactiveOpenConnectionsProcessor.submitOpenConnectionTaskOutsideLoop("", this.endpoint.serviceEndpoint(), addressUri, this.endpoint.getMinChannelsRequired())).doFinally(signalType -> this.endpointValidationInProgress.compareAndSet(true, false)).subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).subscribe();
        }
    }

    private int onConnectionEvent(RntbdConnectionEvent event, Throwable exception) {
        Preconditions.checkNotNull(exception, "expected non-null exception");
        if (event == RntbdConnectionEvent.READ_EOF || event == RntbdConnectionEvent.READ_FAILURE) {
            if (logger.isDebugEnabled()) {
                logger.debug("onConnectionEvent({\"event\":{},\"time\":{},\"endpoint\":{},\"cause\":{})", new Object[]{event, RntbdObjectMapper.toJson(Instant.now()), RntbdObjectMapper.toJson(this.endpoint), RntbdObjectMapper.toJson(exception)});
            }
            for (Uri addressUri : this.addressUris) {
                addressUri.setUnhealthy();
            }
            return this.addressUris.size();
        }
        return 0;
    }
}

