/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.Address;
import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.ConnectionSettings;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.ObservationCollector;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.PublisherBuilder;
import com.rabbitmq.client.amqp.Resource;
import com.rabbitmq.client.amqp.RpcClient;
import com.rabbitmq.client.amqp.RpcClientBuilder;
import com.rabbitmq.client.amqp.RpcServer;
import com.rabbitmq.client.amqp.RpcServerBuilder;
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
import com.rabbitmq.client.amqp.impl.AmqpConnectionBuilder;
import com.rabbitmq.client.amqp.impl.AmqpConsumer;
import com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder;
import com.rabbitmq.client.amqp.impl.AmqpEnvironment;
import com.rabbitmq.client.amqp.impl.AmqpManagement;
import com.rabbitmq.client.amqp.impl.AmqpManagementParameters;
import com.rabbitmq.client.amqp.impl.AmqpPublisher;
import com.rabbitmq.client.amqp.impl.AmqpPublisherBuilder;
import com.rabbitmq.client.amqp.impl.AmqpRpcClient;
import com.rabbitmq.client.amqp.impl.AmqpRpcServer;
import com.rabbitmq.client.amqp.impl.AsyncRetry;
import com.rabbitmq.client.amqp.impl.ClientProperties;
import com.rabbitmq.client.amqp.impl.Clock;
import com.rabbitmq.client.amqp.impl.ConnectionUtils;
import com.rabbitmq.client.amqp.impl.DefaultConnectionSettings;
import com.rabbitmq.client.amqp.impl.EntityRecovery;
import com.rabbitmq.client.amqp.impl.ExceptionUtils;
import com.rabbitmq.client.amqp.impl.RecordingTopologyListener;
import com.rabbitmq.client.amqp.impl.ResourceBase;
import com.rabbitmq.client.amqp.impl.RetryUtils;
import com.rabbitmq.client.amqp.impl.RpcSupport;
import com.rabbitmq.client.amqp.impl.SessionHandler;
import com.rabbitmq.client.amqp.impl.TopologyListener;
import com.rabbitmq.client.amqp.impl.Utils;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.qpid.protonj2.client.ConnectionOptions;
import com.rabbitmq.qpid.protonj2.client.DisconnectionEvent;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.SslOptions;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AmqpConnection
extends ResourceBase
implements Connection {
    private static final Predicate<Exception> RECOVERY_PREDICATE = t -> t instanceof AmqpException.AmqpConnectionException;
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnection.class);
    private final long id;
    private final AmqpEnvironment environment;
    private final AmqpManagement management;
    private volatile com.rabbitmq.qpid.protonj2.client.Connection nativeConnection;
    private volatile Address connectionAddress;
    private volatile String connectionNodename;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile Session nativeSession;
    private final List<AmqpPublisher> publishers = new CopyOnWriteArrayList<AmqpPublisher>();
    private final List<AmqpConsumer> consumers = new CopyOnWriteArrayList<AmqpConsumer>();
    private final List<RpcClient> rpcClients = new CopyOnWriteArrayList<RpcClient>();
    private final List<RpcServer> rpcServers = new CopyOnWriteArrayList<RpcServer>();
    private final TopologyListener topologyListener;
    private volatile EntityRecovery entityRecovery;
    private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
    private final DefaultConnectionSettings<?> connectionSettings;
    private final Supplier<SessionHandler> sessionHandlerSupplier;
    private final ConnectionUtils.AffinityContext affinity;
    private final ConnectionSettings.AffinityStrategy affinityStrategy;
    private final String name;
    private final Lock instanceLock = new ReentrantLock();
    private final boolean filterExpressionsSupported;
    private final boolean setTokenSupported;
    private volatile ExecutorService dispatchingExecutorService;

    AmqpConnection(AmqpConnectionBuilder builder) {
        super(builder.listeners());
        this.id = ID_SEQUENCE.getAndIncrement();
        this.name = builder.name();
        this.environment = builder.environment();
        this.connectionSettings = builder.connectionSettings().consolidate();
        this.sessionHandlerSupplier = builder.isolateResources() ? () -> new SessionHandler.SingleSessionSessionHandler(this) : () -> new SessionHandler.ConnectionNativeSessionSessionHandler(this);
        AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration = builder.recoveryConfiguration();
        this.topologyListener = this.createTopologyListener(builder);
        BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent> disconnectHandler = recoveryConfiguration.activated() ? this.recoveryDisconnectHandler(recoveryConfiguration, builder.name()) : (c, e) -> {
            AmqpException failureCause = ExceptionUtils.convert(e.failureCause(), "Connection disconnected", new Object[0]);
            this.close(failureCause);
        };
        if (((DefaultConnectionSettings.DefaultAffinity)this.connectionSettings.affinity()).activated()) {
            this.affinity = new ConnectionUtils.AffinityContext(((DefaultConnectionSettings.DefaultAffinity)this.connectionSettings.affinity()).queue(), ((DefaultConnectionSettings.DefaultAffinity)this.connectionSettings.affinity()).operation());
            this.affinityStrategy = ((DefaultConnectionSettings.DefaultAffinity)this.connectionSettings.affinity()).strategy();
        } else {
            this.affinity = null;
            this.affinityStrategy = null;
        }
        this.management = this.createManagement();
        LOGGER.debug("Opening native connection for connection '{}'...", (Object)this.name());
        NativeConnectionWrapper ncw = ConnectionUtils.enforceAffinity(addrs -> {
            NativeConnectionWrapper wrapper = this.connect(this.connectionSettings, builder.name(), disconnectHandler, (List<Address>)addrs);
            this.nativeConnection = wrapper.connection();
            return wrapper;
        }, this.management, this.affinity, this.environment.affinityCache(), this.affinityStrategy, ConnectionUtils.NO_RETRY_STRATEGY, this.name());
        this.sync(ncw);
        String brokerVesion = AmqpConnection.brokerVersion(this.nativeConnection);
        this.filterExpressionsSupported = Utils.supportFilterExpressions(brokerVesion);
        this.setTokenSupported = Utils.supportSetToken(brokerVesion);
        LOGGER.debug("Opened connection '{}' on node '{}'.", (Object)this.name(), (Object)this.connectionNodename());
        this.state(Resource.State.OPEN);
        this.environment.metricsCollector().openConnection();
    }

    @Override
    public Management management() {
        this.checkOpen();
        return this.managementNoCheck();
    }

    Management managementNoCheck() {
        this.management.init();
        return this.management;
    }

    AmqpManagement createManagement() {
        return new AmqpManagement(new AmqpManagementParameters(this).topologyListener(this.topologyListener));
    }

    @Override
    public PublisherBuilder publisherBuilder() {
        this.checkOpen();
        return new AmqpPublisherBuilder(this);
    }

    @Override
    public ConsumerBuilder consumerBuilder() {
        this.checkOpen();
        return new AmqpConsumerBuilder(this);
    }

    @Override
    public RpcClientBuilder rpcClientBuilder() {
        return new RpcSupport.AmqpRpcClientBuilder(this);
    }

    @Override
    public RpcServerBuilder rpcServerBuilder() {
        return new RpcSupport.AmqpRpcServerBuilder(this);
    }

    @Override
    public void close() {
        this.close(null);
    }

    private NativeConnectionWrapper connect(DefaultConnectionSettings<?> connectionSettings, String name, BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent> disconnectHandler, List<Address> addresses) {
        ConnectionOptions connectionOptions = new ConnectionOptions();
        if (connectionSettings.credentialsProvider() instanceof UsernamePasswordCredentialsProvider) {
            UsernamePasswordCredentialsProvider credentialsProvider = (UsernamePasswordCredentialsProvider)connectionSettings.credentialsProvider();
            connectionOptions.user(credentialsProvider.getUsername());
            connectionOptions.password(credentialsProvider.getPassword());
        }
        connectionOptions.virtualHost("vhost:" + connectionSettings.virtualHost());
        connectionOptions.saslOptions().addAllowedMechanism(connectionSettings.saslMechanism());
        connectionOptions.idleTimeout(connectionSettings.idleTimeout().toMillis(), TimeUnit.MILLISECONDS);
        connectionOptions.disconnectedHandler(disconnectHandler);
        if (name == null) {
            connectionOptions.properties(ClientProperties.DEFAULT_CLIENT_PROPERTIES);
        } else {
            LinkedHashMap<String, Object> props = new LinkedHashMap<String, Object>(ClientProperties.DEFAULT_CLIENT_PROPERTIES);
            props.put("connection_name", name);
            connectionOptions.properties(Map.copyOf(props));
        }
        if (connectionSettings.tlsEnabled()) {
            DefaultConnectionSettings.DefaultTlsSettings<?> tlsSettings = connectionSettings.tlsSettings();
            connectionOptions.sslEnabled(true);
            SslOptions sslOptions = connectionOptions.sslOptions();
            sslOptions.sslContextOverride(tlsSettings.sslContext());
            sslOptions.verifyHost(tlsSettings.isHostnameVerification());
        }
        Address address = connectionSettings.selectAddress(addresses);
        Utils.StopWatch stopWatch = new Utils.StopWatch();
        try {
            LOGGER.trace("Connecting '{}' to {}...", (Object)this.name(), (Object)address);
            com.rabbitmq.qpid.protonj2.client.Connection connection = this.environment.client().connect(address.host(), address.port(), connectionOptions);
            LOGGER.debug("Created native connection instance for '{}'", (Object)this.name());
            ExceptionUtils.wrapGet(connection.openFuture());
            LOGGER.debug("Connection attempt '{}' succeeded", (Object)this.name());
            AmqpConnection.checkBrokerVersion(connection);
            NativeConnectionWrapper nativeConnectionWrapper = new NativeConnectionWrapper(connection, AmqpConnection.extractNode(connection), address);
            return nativeConnectionWrapper;
        }
        catch (ClientException e) {
            throw ExceptionUtils.convert(e);
        }
        finally {
            LOGGER.debug("Connection attempt for '{}' took {}", (Object)this.name(), (Object)stopWatch.stop());
        }
    }

    private void sync(NativeConnectionWrapper wrapper) {
        this.connectionAddress = wrapper.address();
        this.connectionNodename = wrapper.nodename();
        this.nativeConnection = wrapper.connection();
    }

    private static void checkBrokerVersion(com.rabbitmq.qpid.protonj2.client.Connection connection) throws ClientException {
        String version = (String)connection.properties().get("version");
        if (version == null) {
            throw new AmqpException("No broker version set in connection properties", new Object[0]);
        }
        if (!Utils.is4_0_OrMore(version)) {
            throw new AmqpException("The AMQP client library requires RabbitMQ 4.0 or more", new Object[0]);
        }
    }

    private static String brokerVersion(com.rabbitmq.qpid.protonj2.client.Connection connection) {
        try {
            return (String)connection.properties().get("version");
        }
        catch (ClientException e) {
            throw ExceptionUtils.convert(e);
        }
    }

    String brokerVersion() {
        return AmqpConnection.brokerVersion(this.nativeConnection);
    }

    private static String extractNode(com.rabbitmq.qpid.protonj2.client.Connection connection) throws ClientException {
        String node = (String)connection.properties().get("node");
        if (node == null) {
            throw new AmqpException("The broker node name is not available", new Object[0]);
        }
        return node;
    }

    TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
        TopologyListener topologyListener;
        if (builder.recoveryConfiguration().topology()) {
            RecordingTopologyListener rtl = new RecordingTopologyListener("topology-listener-connection-" + this.name(), this.environment.recoveryEventLoop());
            this.entityRecovery = new EntityRecovery(this, rtl);
            topologyListener = rtl;
        } else {
            topologyListener = TopologyListener.NO_OP;
        }
        return builder.topologyListener() == null ? topologyListener : TopologyListener.compose(List.of(builder.topologyListener(), topologyListener));
    }

    private BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent> recoveryDisconnectHandler(AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration, String name) {
        AtomicReference<BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent>> resultReference = new AtomicReference<BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent>>();
        BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent> result = (conn, event) -> {
            ClientIOException ioex = event.failureCause();
            LOGGER.debug("Disconnect handler of '{}', error is the following: {}", (Object)this.name(), (Object)ioex.getMessage());
            if (this.state() == Resource.State.OPENING) {
                LOGGER.debug("Connection is still opening, disconnect handler skipped");
                return;
            }
            if (this.recoveringConnection.get()) {
                LOGGER.debug("Filtering recovery task scheduling, connection recovery of '{}' already in progress", (Object)this.name());
                return;
            }
            AmqpException exception = ExceptionUtils.convert(event.failureCause());
            LOGGER.debug("Converted native exception to {}", (Object)exception.getClass().getSimpleName());
            if (RECOVERY_PREDICATE.test(exception) && this.state() != Resource.State.OPENING) {
                LOGGER.debug("Queueing recovery task for '{}', error is {}", (Object)this.name(), (Object)exception.getMessage());
                this.environment.executorService().submit(() -> {
                    if (!this.recoveringConnection.get()) {
                        this.recoverAfterConnectionFailure(recoveryConfiguration, name, exception, resultReference);
                    }
                });
            } else {
                LOGGER.debug("Not recovering connection '{}' for error {}", (Object)this.name(), (Object)event.failureCause().getMessage());
                this.close(ExceptionUtils.convert(ioex));
            }
        };
        resultReference.set(result);
        return result;
    }

    private void recoverAfterConnectionFailure(AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration, String connectionName, Exception failureCause, AtomicReference<BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent>> disconnectedHandlerReference) {
        LOGGER.info("Connection '{}' to '{}' has been disconnected, trying to recover.", (Object)this.name(), (Object)this.currentConnectionLabel());
        LOGGER.debug("Notifying listeners of connection '{}'.", (Object)this.name());
        this.state(Resource.State.RECOVERING, failureCause);
        this.changeStateOfPublishers(Resource.State.RECOVERING, failureCause);
        this.changeStateOfConsumers(Resource.State.RECOVERING, failureCause);
        this.nativeConnection = null;
        this.nativeSession = null;
        this.connectionAddress = null;
        LOGGER.debug("Releasing management resource of connection '{}'.", (Object)this.name());
        this.releaseManagementResources();
        if (!this.recoveringConnection.compareAndSet(false, true)) {
            LOGGER.debug("Connection '{}' already recovering, returning.", (Object)this.name());
            return;
        }
        this.recoveringConnection.set(true);
        LOGGER.debug("Connection attempt for '{}'.", (Object)this.name());
        CompletableFuture<NativeConnectionWrapper> ncwFuture = this.recoverNativeConnection(recoveryConfiguration, connectionName, disconnectedHandlerReference);
        ((CompletableFuture)ncwFuture.thenAccept(ncw -> {
            block3: {
                this.sync((NativeConnectionWrapper)ncw);
                LOGGER.debug("Reconnected '{}' to {}", (Object)this.name(), (Object)this.currentConnectionLabel());
                this.recoveringConnection.set(false);
                try {
                    if (recoveryConfiguration.topology()) {
                        this.management.init();
                        LOGGER.debug("Recovering topology of connection '{}'...", (Object)this.name());
                        this.recoverTopology();
                        this.recoverConsumers();
                        this.recoverPublishers();
                        LOGGER.debug("Recovered topology of connection '{}'.", (Object)this.name());
                    }
                    LOGGER.info("Recovered connection '{}' to {}", (Object)this.name(), (Object)this.currentConnectionLabel());
                    this.state(Resource.State.OPEN);
                }
                catch (Exception ex) {
                    LOGGER.warn("Error while trying to recover topology for connection '{}': {}", (Object)this.name(), (Object)ex.getMessage());
                    if (!RECOVERY_PREDICATE.test(ex)) break block3;
                    LOGGER.debug("Error during topology recoverable, queueing recovery task for '{}', error is {}", (Object)this.name(), (Object)ex.getMessage());
                    this.environment.executorService().submit(() -> {
                        if (!this.recoveringConnection.get()) {
                            this.recoverAfterConnectionFailure(recoveryConfiguration, this.name, ex, disconnectedHandlerReference);
                        }
                    });
                }
            }
        })).exceptionally(t -> {
            this.recoveringConnection.set(false);
            if (t instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            this.close((Throwable)t);
            return null;
        });
    }

    private CompletableFuture<NativeConnectionWrapper> recoverNativeConnection(AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration, String connectionName, AtomicReference<BiConsumer<com.rabbitmq.qpid.protonj2.client.Connection, DisconnectionEvent>> disconnectedHandlerReference) {
        return AsyncRetry.asyncRetry(() -> ConnectionUtils.enforceAffinity(addrs -> {
            NativeConnectionWrapper wrapper = this.connect(this.connectionSettings, connectionName, (BiConsumer)disconnectedHandlerReference.get(), (List<Address>)addrs);
            this.nativeConnection = wrapper.connection();
            return wrapper;
        }, this.management, this.affinity, this.environment.affinityCache(), this.affinityStrategy, new ConnectionUtils.RetryStrategy(){

            @Override
            public <T> T maybeRetry(Supplier<T> task) {
                return (T)RetryUtils.callAndMaybeRetry(task::get, e -> true, Duration.ofMillis(10L), 5, "Connection affinity operation", new Object[0]);
            }
        }, connectionName)).description("Trying to create native connection for '%s'.", connectionName).delayPolicy(recoveryConfiguration.backOffDelayPolicy()).retry(RECOVERY_PREDICATE).scheduler(this.scheduledExecutorService()).build();
    }

    private void recoverTopology() throws InterruptedException {
        if (this.entityRecovery != null) {
            Utils.throwIfInterrupted();
            this.entityRecovery.recover();
        }
    }

    private void recoverConsumers() throws InterruptedException {
        if (this.consumers.isEmpty()) {
            LOGGER.debug("No consumers to recover");
        } else {
            LOGGER.debug("{} consumer(s) to recover", (Object)this.consumers.size());
            ArrayList<AmqpConsumer> failedConsumers = new ArrayList<AmqpConsumer>();
            for (AmqpConsumer consumer : this.consumers) {
                Utils.throwIfInterrupted();
                try {
                    LOGGER.debug("Recovering consumer {} (queue '{}')", (Object)consumer.id(), (Object)consumer.queue());
                    consumer.recoverAfterConnectionFailure();
                    consumer.state(Resource.State.OPEN);
                    LOGGER.debug("Recovered consumer {} (queue '{}')", (Object)consumer.id(), (Object)consumer.queue());
                }
                catch (AmqpException.AmqpConnectionException ex) {
                    LOGGER.warn("Connection error while trying to recover consumer {} (queue '{}'), restarting recovery", new Object[]{consumer.id(), consumer.queue(), ex});
                    throw ex;
                }
                catch (Exception ex) {
                    LOGGER.warn("Error while trying to recover consumer {} (queue '{}')", new Object[]{consumer.id(), consumer.queue(), ex});
                    failedConsumers.add(consumer);
                }
            }
            failedConsumers.forEach(AmqpConsumer::close);
        }
    }

    private void recoverPublishers() throws InterruptedException {
        if (this.publishers.isEmpty()) {
            LOGGER.debug("No publishers to recover");
        } else {
            LOGGER.debug("{} publisher(s) to recover", (Object)this.publishers.size());
            ArrayList<AmqpPublisher> failedPublishers = new ArrayList<AmqpPublisher>();
            for (AmqpPublisher publisher : this.publishers) {
                Utils.throwIfInterrupted();
                try {
                    LOGGER.debug("Recovering publisher {} (address '{}')", (Object)publisher.id(), (Object)publisher.address());
                    publisher.recoverAfterConnectionFailure();
                    publisher.state(Resource.State.OPEN);
                    LOGGER.debug("Recovered publisher {} (address '{}')", (Object)publisher.id(), (Object)publisher.address());
                }
                catch (Exception ex) {
                    LOGGER.warn("Error while trying to recover publisher {} (address '{}')", new Object[]{publisher.id(), publisher.address(), ex});
                    failedPublishers.add(publisher);
                }
            }
            failedPublishers.forEach(AmqpPublisher::close);
        }
    }

    private void closeManagement() {
        this.management.close();
    }

    private void releaseManagementResources() {
        if (this.management != null) {
            this.management.releaseResources();
        }
    }

    Session nativeSession() {
        return this.nativeSession(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Session nativeSession(boolean check) {
        Session result;
        if (check) {
            this.checkOpen();
        }
        if ((result = this.nativeSession) != null) {
            return result;
        }
        this.instanceLock.lock();
        try {
            if (this.nativeSession == null) {
                this.nativeSession = this.openSession(this.nativeConnection);
            }
            Session session = this.nativeSession;
            return session;
        }
        finally {
            this.instanceLock.unlock();
        }
    }

    private Session openSession(com.rabbitmq.qpid.protonj2.client.Connection connection) {
        try {
            return connection.openSession();
        }
        catch (ClientException e) {
            throw ExceptionUtils.convert(e, "Error while opening session", new Object[0]);
        }
    }

    com.rabbitmq.qpid.protonj2.client.Connection nativeConnection() {
        return this.nativeConnection;
    }

    AmqpEnvironment environment() {
        return this.environment;
    }

    ScheduledExecutorService scheduledExecutorService() {
        return this.environment.scheduledExecutorService();
    }

    ExecutorService dispatchingExecutorService() {
        this.checkOpen();
        ExecutorService result = this.dispatchingExecutorService;
        if (result != null) {
            return result;
        }
        this.instanceLock.lock();
        try {
            if (this.dispatchingExecutorService == null) {
                this.dispatchingExecutorService = Executors.newSingleThreadExecutor(Utils.threadFactory("dispatching-" + this.name + "-"));
            }
            ExecutorService executorService = this.dispatchingExecutorService;
            return executorService;
        }
        finally {
            this.instanceLock.unlock();
        }
    }

    Clock clock() {
        return this.environment.clock();
    }

    MetricsCollector metricsCollector() {
        return this.environment.metricsCollector();
    }

    ObservationCollector observationCollector() {
        return this.environment.observationCollector();
    }

    SessionHandler createSessionHandler() {
        return this.sessionHandlerSupplier.get();
    }

    Publisher createPublisher(AmqpPublisherBuilder builder) {
        AmqpPublisher publisher = new AmqpPublisher(builder);
        this.publishers.add(publisher);
        return publisher;
    }

    void removePublisher(AmqpPublisher publisher) {
        this.publishers.remove(publisher);
    }

    Consumer createConsumer(AmqpConsumerBuilder builder) {
        AmqpConsumer consumer = new AmqpConsumer(builder);
        this.consumers.add(consumer);
        this.topologyListener.consumerCreated(consumer.id(), builder.queue());
        return consumer;
    }

    void removeConsumer(AmqpConsumer consumer) {
        this.consumers.remove(consumer);
        this.topologyListener.consumerDeleted(consumer.id(), consumer.queue());
    }

    RpcClient createRpcClient(RpcSupport.AmqpRpcClientBuilder builder) {
        AmqpRpcClient rpcClient = new AmqpRpcClient(builder);
        this.rpcClients.add(rpcClient);
        return rpcClient;
    }

    void removeRpcClient(RpcClient rpcClient) {
        this.rpcClients.remove(rpcClient);
    }

    RpcServer createRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
        AmqpRpcServer rpcServer = new AmqpRpcServer(builder);
        this.rpcServers.add(rpcServer);
        return rpcServer;
    }

    void removeRpcServer(RpcServer rpcServer) {
        this.rpcServers.remove(rpcServer);
    }

    private void changeStateOfPublishers(Resource.State newState, Throwable failure) {
        this.changeStateOfResources(this.publishers, newState, failure);
    }

    private void changeStateOfConsumers(Resource.State newState, Throwable failure) {
        this.changeStateOfResources(this.consumers, newState, failure);
    }

    private void changeStateOfResources(List<? extends ResourceBase> resources, Resource.State newState, Throwable failure) {
        resources.forEach(r -> r.state(newState, failure));
    }

    private String currentConnectionLabel() {
        if (this.connectionAddress == null) {
            return "<null>";
        }
        return this.connectionAddress.host() + ":" + this.connectionAddress.port();
    }

    Address connectionAddress() {
        return this.connectionAddress;
    }

    String connectionNodename() {
        return this.connectionNodename;
    }

    String name() {
        return this.name == null ? "<no-name>" : this.name;
    }

    ConnectionUtils.AffinityContext affinity() {
        return this.affinity;
    }

    boolean filterExpressionsSupported() {
        return this.filterExpressionsSupported;
    }

    boolean setTokenSupported() {
        return this.setTokenSupported;
    }

    long id() {
        return this.id;
    }

    private void close(Throwable cause) {
        if (this.closed.compareAndSet(false, true)) {
            this.state(Resource.State.CLOSING, cause);
            this.environment.removeConnection(this);
            if (this.topologyListener instanceof AutoCloseable) {
                try {
                    ((AutoCloseable)((Object)this.topologyListener)).close();
                }
                catch (Exception e) {
                    LOGGER.info("Error while closing topology listener", (Throwable)e);
                }
            }
            this.closeManagement();
            for (RpcClient rpcClient : this.rpcClients) {
                rpcClient.close();
            }
            for (RpcServer rpcServer : this.rpcServers) {
                rpcServer.close();
            }
            for (AmqpPublisher publisher : this.publishers) {
                publisher.close(cause);
            }
            for (AmqpConsumer consumer : this.consumers) {
                consumer.close(cause);
            }
            try {
                this.dispatchingExecutorService.shutdownNow();
            }
            catch (Exception e) {
                LOGGER.info("Error while shutting down dispatching executor service for connection '{}': {}", (Object)this.name(), (Object)e.getMessage());
            }
            try {
                com.rabbitmq.qpid.protonj2.client.Connection nc = this.nativeConnection;
                if (nc != null) {
                    nc.close();
                }
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing native connection", (Throwable)e);
            }
            this.state(Resource.State.CLOSED, cause);
            this.environment.metricsCollector().closeConnection();
        }
    }

    public String toString() {
        return this.environment.toString() + "-" + this.id;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        AmqpConnection that = (AmqpConnection)o;
        return this.id == that.id;
    }

    public int hashCode() {
        return Objects.hashCode(this.id);
    }

    static class NativeConnectionWrapper {
        private final com.rabbitmq.qpid.protonj2.client.Connection connection;
        private final String nodename;
        private final Address address;

        NativeConnectionWrapper(com.rabbitmq.qpid.protonj2.client.Connection connection, String nodename, Address address) {
            this.connection = connection;
            this.nodename = nodename;
            this.address = address;
        }

        String nodename() {
            return this.nodename;
        }

        Address address() {
            return this.address;
        }

        com.rabbitmq.qpid.protonj2.client.Connection connection() {
            return this.connection;
        }
    }
}

