/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.CbsAuthorizationType;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventHubAsyncClient;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubMessageSerializer;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.qpid.proton.engine.SslDomain;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@ServiceClientBuilder(serviceClients={EventHubProducerAsyncClient.class, EventHubProducerClient.class, EventHubConsumerAsyncClient.class, EventHubConsumerClient.class}, protocol=ServiceClientProtocol.AMQP)
public class EventHubClientBuilder {
    static final int DEFAULT_PREFETCH_COUNT = 500;
    static final int DEFAULT_PREFETCH_COUNT_FOR_SYNC_CLIENT = 1;
    public static final String DEFAULT_CONSUMER_GROUP_NAME = "$Default";
    private static final int MINIMUM_PREFETCH_COUNT = 1;
    private static final int MAXIMUM_PREFETCH_COUNT = 8000;
    private static final String EVENTHUBS_PROPERTIES_FILE = "azure-messaging-eventhubs.properties";
    private static final String NAME_KEY = "name";
    private static final String VERSION_KEY = "version";
    private static final String UNKNOWN = "UNKNOWN";
    private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING";
    private static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions().setTryTimeout(ClientConstants.OPERATION_TIMEOUT);
    private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
    private final ClientLogger logger = new ClientLogger(EventHubClientBuilder.class);
    private final Object connectionLock = new Object();
    private final AtomicBoolean isSharedConnection = new AtomicBoolean();
    private TokenCredential credentials;
    private Configuration configuration;
    private ProxyOptions proxyOptions;
    private AmqpRetryOptions retryOptions;
    private Scheduler scheduler;
    private AmqpTransportType transport;
    private String fullyQualifiedNamespace;
    private String eventHubName;
    private String consumerGroup;
    private EventHubConnectionProcessor eventHubConnectionProcessor;
    private Integer prefetchCount;
    private ClientOptions clientOptions;
    private SslDomain.VerifyMode verifyMode;
    private URL customEndpointAddress;
    private final AtomicInteger openClients = new AtomicInteger();

    public EventHubClientBuilder() {
        this.transport = AmqpTransportType.AMQP;
    }

    public EventHubClientBuilder connectionString(String connectionString) {
        ConnectionStringProperties properties = new ConnectionStringProperties(connectionString);
        TokenCredential tokenCredential = this.getTokenCredential(properties);
        return this.credential(properties.getEndpoint().getHost(), properties.getEntityPath(), tokenCredential);
    }

    private TokenCredential getTokenCredential(ConnectionStringProperties properties) {
        EventHubSharedKeyCredential tokenCredential = properties.getSharedAccessSignature() == null ? new EventHubSharedKeyCredential(properties.getSharedAccessKeyName(), properties.getSharedAccessKey(), ClientConstants.TOKEN_VALIDITY) : new EventHubSharedKeyCredential(properties.getSharedAccessSignature());
        return tokenCredential;
    }

    public EventHubClientBuilder clientOptions(ClientOptions clientOptions) {
        this.clientOptions = clientOptions;
        return this;
    }

    public EventHubClientBuilder connectionString(String connectionString, String eventHubName) {
        Objects.requireNonNull(connectionString, "'connectionString' cannot be null.");
        Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
        if (connectionString.isEmpty()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'connectionString' cannot be an empty string."));
        }
        if (eventHubName.isEmpty()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'eventHubName' cannot be an empty string."));
        }
        ConnectionStringProperties properties = new ConnectionStringProperties(connectionString);
        TokenCredential tokenCredential = this.getTokenCredential(properties);
        if (!CoreUtils.isNullOrEmpty((CharSequence)properties.getEntityPath()) && !eventHubName.equals(properties.getEntityPath())) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException(String.format(Locale.US, "'connectionString' contains an Event Hub name [%s] and it does not match the given 'eventHubName' parameter [%s]. Please use the credentials(String connectionString) overload. Or supply a 'connectionString' without 'EntityPath' in it.", properties.getEntityPath(), eventHubName)));
        }
        return this.credential(properties.getEndpoint().getHost(), eventHubName, tokenCredential);
    }

    public EventHubClientBuilder configuration(Configuration configuration) {
        this.configuration = configuration;
        return this;
    }

    public EventHubClientBuilder customEndpointAddress(String customEndpointAddress) {
        if (customEndpointAddress == null) {
            this.customEndpointAddress = null;
            return this;
        }
        try {
            this.customEndpointAddress = new URL(customEndpointAddress);
        }
        catch (MalformedURLException e) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException(customEndpointAddress + " : is not a valid URL.", e));
        }
        return this;
    }

    public EventHubClientBuilder shareConnection() {
        this.isSharedConnection.set(true);
        return this;
    }

    public EventHubClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.credentials = Objects.requireNonNull(credential, "'credential' cannot be null.");
        this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
        if (CoreUtils.isNullOrEmpty((CharSequence)fullyQualifiedNamespace)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'host' cannot be an empty string."));
        }
        if (CoreUtils.isNullOrEmpty((CharSequence)eventHubName)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'eventHubName' cannot be an empty string."));
        }
        return this;
    }

    public EventHubClientBuilder proxyOptions(ProxyOptions proxyOptions) {
        this.proxyOptions = proxyOptions;
        return this;
    }

    public EventHubClientBuilder transportType(AmqpTransportType transport) {
        this.transport = transport;
        return this;
    }

    public EventHubClientBuilder retry(AmqpRetryOptions retryOptions) {
        this.retryOptions = retryOptions;
        return this;
    }

    public EventHubClientBuilder consumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
        return this;
    }

    public EventHubClientBuilder prefetchCount(int prefetchCount) {
        if (prefetchCount < 1) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException(String.format(Locale.US, "PrefetchCount, '%s' has to be above %s", prefetchCount, 1)));
        }
        if (prefetchCount > 8000) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException(String.format(Locale.US, "PrefetchCount, '%s', has to be below %s", prefetchCount, 8000)));
        }
        this.prefetchCount = prefetchCount;
        return this;
    }

    EventHubClientBuilder scheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        return this;
    }

    EventHubClientBuilder verifyMode(SslDomain.VerifyMode verifyMode) {
        this.verifyMode = verifyMode;
        return this;
    }

    public EventHubConsumerAsyncClient buildAsyncConsumerClient() {
        if (CoreUtils.isNullOrEmpty((CharSequence)this.consumerGroup)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'consumerGroup' cannot be null or an empty string. using EventHubClientBuilder.consumerGroup(String)"));
        }
        return this.buildAsyncClient().createConsumer(this.consumerGroup, this.prefetchCount);
    }

    public EventHubConsumerClient buildConsumerClient() {
        return this.buildClient().createConsumer(this.consumerGroup, this.prefetchCount);
    }

    public EventHubProducerAsyncClient buildAsyncProducerClient() {
        return this.buildAsyncClient().createProducer();
    }

    public EventHubProducerClient buildProducerClient() {
        return this.buildClient().createProducer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    EventHubAsyncClient buildAsyncClient() {
        EventHubConnectionProcessor processor;
        if (this.retryOptions == null) {
            this.retryOptions = DEFAULT_RETRY;
        }
        if (this.scheduler == null) {
            this.scheduler = Schedulers.boundedElastic();
        }
        if (this.prefetchCount == null) {
            this.prefetchCount = 500;
        }
        EventHubMessageSerializer messageSerializer = new EventHubMessageSerializer();
        if (this.isSharedConnection.get()) {
            Object object = this.connectionLock;
            synchronized (object) {
                if (this.eventHubConnectionProcessor == null) {
                    this.eventHubConnectionProcessor = this.buildConnectionProcessor(messageSerializer);
                }
            }
            processor = this.eventHubConnectionProcessor;
            int numberOfOpenClients = this.openClients.incrementAndGet();
            this.logger.info("# of open clients with shared connection: {}", new Object[]{numberOfOpenClients});
        } else {
            processor = this.buildConnectionProcessor(messageSerializer);
        }
        TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
        return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, this.scheduler, this.isSharedConnection.get(), this::onClientClose);
    }

    EventHubClient buildClient() {
        if (this.prefetchCount == null) {
            this.prefetchCount = 1;
        }
        EventHubAsyncClient client = this.buildAsyncClient();
        return new EventHubClient(client, this.retryOptions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onClientClose() {
        Object object = this.connectionLock;
        synchronized (object) {
            int numberOfOpenClients = this.openClients.decrementAndGet();
            this.logger.info("Closing a dependent client. # of open clients: {}", new Object[]{numberOfOpenClients});
            if (numberOfOpenClients > 0) {
                return;
            }
            if (numberOfOpenClients < 0) {
                this.logger.warning("There should not be less than 0 clients. actual: {}", new Object[]{numberOfOpenClients});
            }
            this.logger.info("No more open clients, closing shared connection.");
            if (this.eventHubConnectionProcessor != null) {
                this.eventHubConnectionProcessor.dispose();
                this.eventHubConnectionProcessor = null;
            } else {
                this.logger.warning("Shared EventHubConnectionProcessor was already disposed.");
            }
        }
    }

    private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer) {
        ConnectionOptions connectionOptions = this.getConnectionOptions();
        Flux connectionFlux = Flux.create(sink -> sink.onRequest(request -> {
            if (request == 0L) {
                return;
            }
            if (request > 1L) {
                sink.error((Throwable)this.logger.logExceptionAsWarning((RuntimeException)new IllegalArgumentException("Requested more than one connection. Only emitting one. Request: " + request)));
                return;
            }
            String connectionId = StringUtil.getRandomString((String)"MF");
            this.logger.info("connectionId[{}]: Emitting a single connection.", new Object[]{connectionId});
            AzureTokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(), "https://eventhubs.azure.net/.default");
            ReactorProvider provider = new ReactorProvider();
            ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
            EventHubReactorAmqpConnection connection = new EventHubReactorAmqpConnection(connectionId, connectionOptions, this.eventHubName, provider, handlerProvider, (TokenManagerProvider)tokenManagerProvider, messageSerializer);
            sink.next((Object)connection);
        }));
        return (EventHubConnectionProcessor)connectionFlux.subscribeWith((Subscriber)new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), this.eventHubName, connectionOptions.getRetry()));
    }

    private ConnectionOptions getConnectionOptions() {
        Configuration configuration = this.configuration = this.configuration == null ? Configuration.getGlobalConfiguration().clone() : this.configuration;
        if (this.credentials == null) {
            String connectionString = this.configuration.get(AZURE_EVENT_HUBS_CONNECTION_STRING);
            if (CoreUtils.isNullOrEmpty((CharSequence)connectionString)) {
                throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("Credentials have not been set. They can be set using: connectionString(String), connectionString(String, String), credentials(String, String, TokenCredential), or setting the environment variable 'AZURE_EVENT_HUBS_CONNECTION_STRING' with a connection string"));
            }
            this.connectionString(connectionString);
        }
        if (this.proxyOptions == null) {
            this.proxyOptions = this.getDefaultProxyConfiguration(this.configuration);
        }
        if (this.proxyOptions != null && this.proxyOptions.isProxyAddressConfigured() && this.transport != AmqpTransportType.AMQP_WEB_SOCKETS) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("Cannot use a proxy when TransportType is not AMQP Web Sockets."));
        }
        CbsAuthorizationType authorizationType = this.credentials instanceof EventHubSharedKeyCredential ? CbsAuthorizationType.SHARED_ACCESS_SIGNATURE : CbsAuthorizationType.JSON_WEB_TOKEN;
        SslDomain.VerifyMode verificationMode = this.verifyMode != null ? this.verifyMode : SslDomain.VerifyMode.VERIFY_PEER_NAME;
        ClientOptions options = this.clientOptions != null ? this.clientOptions : new ClientOptions();
        Map properties = CoreUtils.getProperties((String)EVENTHUBS_PROPERTIES_FILE);
        String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
        String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);
        if (this.customEndpointAddress == null) {
            return new ConnectionOptions(this.fullyQualifiedNamespace, this.credentials, authorizationType, this.transport, this.retryOptions, this.proxyOptions, this.scheduler, options, verificationMode, product, clientVersion);
        }
        return new ConnectionOptions(this.fullyQualifiedNamespace, this.credentials, authorizationType, this.transport, this.retryOptions, this.proxyOptions, this.scheduler, options, verificationMode, product, clientVersion, this.customEndpointAddress.getHost(), this.customEndpointAddress.getPort());
    }

    private ProxyOptions getDefaultProxyConfiguration(Configuration configuration) {
        String proxyAddress;
        ProxyAuthenticationType authentication = ProxyAuthenticationType.NONE;
        if (this.proxyOptions != null) {
            authentication = this.proxyOptions.getAuthentication();
        }
        if (CoreUtils.isNullOrEmpty((CharSequence)(proxyAddress = configuration.get("HTTP_PROXY")))) {
            return ProxyOptions.SYSTEM_DEFAULTS;
        }
        return this.getProxyOptions(authentication, proxyAddress);
    }

    private ProxyOptions getProxyOptions(ProxyAuthenticationType authentication, String proxyAddress) {
        if (HOST_PORT_PATTERN.matcher(proxyAddress.trim()).find()) {
            String[] hostPort = proxyAddress.split(":");
            String host = hostPort[0];
            int port = Integer.parseInt(hostPort[1]);
            Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(host, port));
            String username = this.configuration.get("PROXY_USERNAME");
            String password = this.configuration.get("PROXY_PASSWORD");
            return new ProxyOptions(authentication, proxy, username, password);
        }
        com.azure.core.http.ProxyOptions coreProxyOptions = com.azure.core.http.ProxyOptions.fromConfiguration((Configuration)this.configuration);
        return new ProxyOptions(authentication, new Proxy(coreProxyOptions.getType().toProxyType(), coreProxyOptions.getAddress()), coreProxyOptions.getUsername(), coreProxyOptions.getPassword());
    }
}

