/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.ProactiveOpenConnectionsProcessor;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConnectionEvent;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConnectionStateListenerMetrics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import io.netty.channel.ConnectTimeoutException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class RntbdConnectionStateListener {
    private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListener.class);
    private final RntbdEndpoint endpoint;
    private final RntbdConnectionStateListenerMetrics metrics;
    private final ConcurrentHashMap<String, Uri> addressUriMap;
    private final ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor;
    private final AddressSelector addressSelector;
    private final AtomicBoolean endpointValidationInProgress = new AtomicBoolean(false);

    public RntbdConnectionStateListener(RntbdEndpoint endpoint, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor, AddressSelector addressSelector) {
        this.endpoint = Preconditions.checkNotNull(endpoint, "expected non-null endpoint");
        this.metrics = new RntbdConnectionStateListenerMetrics();
        this.addressUriMap = new ConcurrentHashMap();
        this.proactiveOpenConnectionsProcessor = proactiveOpenConnectionsProcessor;
        this.addressSelector = addressSelector;
    }

    public void onBeforeSendRequest(Uri addressUri) {
        Preconditions.checkNotNull(addressUri, "Argument 'addressUri' should not be null");
        this.addressUriMap.compute(addressUri.getURIAsString(), (key, existingValue) -> addressUri);
    }

    public void onException(Throwable exception) {
        Preconditions.checkNotNull(exception, "expect non-null exception");
        this.metrics.record();
        if (exception instanceof IOException) {
            if (exception instanceof ClosedChannelException) {
                this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, exception));
            } else {
                this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_FAILURE, exception));
            }
        } else if (exception instanceof RntbdRequestManager.UnhealthyChannelException) {
            this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_FAILURE, exception));
        } else if (logger.isDebugEnabled()) {
            logger.debug("Will not raise the connection state change event for error", exception);
        }
    }

    public RntbdConnectionStateListenerMetrics getMetrics() {
        return this.metrics;
    }

    public void openConnectionIfNeeded() {
        if (this.proactiveOpenConnectionsProcessor == null) {
            logger.warn("proactiveOpenConnectionsProcessor is null");
            return;
        }
        Optional<Uri> addressUriOptional = this.addressUriMap.values().stream().findFirst();
        if (!addressUriOptional.isPresent()) {
            logger.debug("addressUri cannot be null...");
            return;
        }
        Uri addressUri = addressUriOptional.get();
        if (!this.proactiveOpenConnectionsProcessor.isAddressUriUnderOpenConnectionsFlow(addressUri.getURIAsString())) {
            return;
        }
        if (this.endpointValidationInProgress.compareAndSet(false, true)) {
            Mono.fromFuture((CompletableFuture)this.proactiveOpenConnectionsProcessor.submitOpenConnectionTaskOutsideLoop("", this.endpoint.serviceEndpoint(), addressUri, this.endpoint.getMinChannelsRequired())).doFinally(signalType -> this.endpointValidationInProgress.compareAndSet(true, false)).subscribeOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC).subscribe();
        }
    }

    private int onConnectionEvent(RntbdConnectionEvent event, Throwable exception) {
        Preconditions.checkNotNull(exception, "expected non-null exception");
        if (event == RntbdConnectionEvent.READ_EOF || event == RntbdConnectionEvent.READ_FAILURE) {
            if (logger.isDebugEnabled()) {
                logger.debug("onConnectionEvent({\"event\":{},\"time\":{},\"endpoint\":{},\"cause\":{})", new Object[]{event, RntbdObjectMapper.toJson(Instant.now()), RntbdObjectMapper.toJson(this.endpoint), RntbdObjectMapper.toJson(exception)});
            }
            for (Uri addressUri : this.addressUriMap.values()) {
                addressUri.setUnhealthy();
            }
            return this.addressUriMap.size();
        }
        return 0;
    }

    public void attemptBackgroundAddressRefresh(RxDocumentServiceRequest request, Exception exception) {
        if (request.requestContext == null) {
            return;
        }
        AtomicBoolean isRequestCancelledOnTimeout = request.requestContext.isRequestCancelledOnTimeout();
        if (isRequestCancelledOnTimeout == null || !isRequestCancelledOnTimeout.get() || !this.shouldRefreshForException(exception)) {
            return;
        }
        boolean forceAddressRefresh = request.requestContext.forceRefreshAddressCache;
        this.addressSelector.resolveAddressesAsync(request, forceAddressRefresh).publishOn(Schedulers.boundedElastic()).doOnSubscribe(ignore -> logger.debug("Background refresh of addresses started!")).doFinally(signalType -> logger.debug("Background refresh of addresses finished!")).subscribe(ignoreResult -> {}, throwable -> logger.warn("Background address refresh failed with {}", (Object)throwable.getMessage(), throwable));
    }

    private boolean shouldRefreshForException(Exception exception) {
        return exception instanceof ConnectTimeoutException || exception instanceof InvalidPartitionException || exception instanceof PartitionIsMigratingException || exception instanceof PartitionKeyRangeIsSplittingException || exception instanceof GoneException;
    }
}

