/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.proton.transport.proxy.ProxyAuthenticationType;
import com.microsoft.azure.proton.transport.proxy.ProxyConfiguration;
import com.microsoft.azure.proton.transport.proxy.ProxyHandler;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.device.ClientConfiguration;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.ProxySettings;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSSLContext;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.MultiplexingDeviceUnauthorizedException;
import com.microsoft.azure.sdk.iot.device.transport.ProtocolException;
import com.microsoft.azure.sdk.iot.device.transport.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsCbsSessionHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsExceptionTranslator;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSasTokenRenewalHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback;
import com.microsoft.azure.sdk.iot.device.transport.amqps.ReactorRunner;
import com.microsoft.azure.sdk.iot.device.transport.amqps.ReactorRunnerStateCallback;
import com.microsoft.azure.sdk.iot.device.transport.amqps.SendResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AmqpsIotHubConnection
extends BaseHandler
implements IotHubTransportConnection,
AmqpsSessionStateCallback,
ReactorRunnerStateCallback {
    private static final Logger log = LoggerFactory.getLogger(AmqpsIotHubConnection.class);
    private static final int MAX_WAIT_TO_CLOSE_CONNECTION = 20000;
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final String WEB_SOCKET_QUERY = "iothub-no-client-cert=true";
    private static final int MAX_MESSAGE_PAYLOAD_SIZE = 262144;
    private static final int MAX_FRAME_SIZE = 4096;
    private static final int WEB_SOCKET_PORT = 443;
    private static final int AMQP_PORT = 5671;
    private static final int REACTOR_COUNT = 1;
    private static final int CBS_SESSION_COUNT = 1;
    private final int sendInterval;
    private static final int MAX_MESSAGES_TO_SEND_PER_CALLBACK = 1000;
    private final Queue<Message> messagesToSend = new ConcurrentLinkedQueue<Message>();
    private String connectionId;
    private IotHubConnectionStatus state;
    private final String hostName;
    private SSLContext sslContext;
    private final boolean isWebsocketConnection;
    private final ClientConfiguration.AuthType authenticationType;
    private final Set<ClientConfiguration> clientConfigurations;
    private IotHubListener listener;
    private TransportException savedException;
    private final Object executorServiceLock = new Object();
    private ExecutorService executorService;
    private final ProxySettings proxySettings;
    private final String transportUniqueIdentifier;
    private CountDownLatch authenticationSessionOpenedLatch;
    private Map<String, CountDownLatch> deviceSessionsOpenedLatches;
    private CountDownLatch closeReactorLatch;
    private Connection connection;
    private Reactor reactor;
    private final Map<String, AmqpsSessionHandler> reconnectingDeviceSessionHandlers = new ConcurrentHashMap<String, AmqpsSessionHandler>();
    private final Map<String, AmqpsSessionHandler> sessionHandlers = new ConcurrentHashMap<String, AmqpsSessionHandler>();
    private final Queue<AmqpsSasTokenRenewalHandler> sasTokenRenewalHandlers = new ConcurrentLinkedQueue<AmqpsSasTokenRenewalHandler>();
    private AmqpsCbsSessionHandler amqpsCbsSessionHandler;
    private final Set<ClientConfiguration> multiplexingClientsToRegister;
    private final Map<ClientConfiguration, Boolean> multiplexingClientsToUnregister;
    private ClientConfiguration clientConfiguration = null;
    private final boolean isMultiplexing;
    private final int keepAliveInterval;
    private final Map<IotHubTransportMessage, IotHubMessageResult> queuedAcknowledgements = new ConcurrentHashMap<IotHubTransportMessage, IotHubMessageResult>();
    private final String threadNamePrefix;
    private final String threadNameSuffix;
    private final boolean useIdentifiableThreadNames;

    public AmqpsIotHubConnection(ClientConfiguration config, String transportUniqueIdentifier) {
        this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToUnregister = new ConcurrentHashMap<ClientConfiguration, Boolean>();
        this.clientConfigurations.add(config);
        this.clientConfiguration = config;
        this.transportUniqueIdentifier = transportUniqueIdentifier;
        this.isWebsocketConnection = config.isUsingWebsocket();
        this.authenticationType = config.getAuthenticationType();
        this.proxySettings = config.getProxySettings();
        String gatewayHostname = config.getGatewayHostname();
        if (gatewayHostname != null && !gatewayHostname.isEmpty()) {
            log.debug("Gateway hostname was present in config, connecting to gateway rather than directly to hub");
            this.hostName = gatewayHostname;
        } else {
            log.trace("No gateway hostname was present in config, connecting directly to hub");
            this.hostName = config.getIotHubHostname();
        }
        this.add((Handler)new Handshaker());
        this.isMultiplexing = false;
        this.keepAliveInterval = config.getKeepAliveInterval();
        this.sendInterval = this.clientConfiguration.getSendInterval();
        this.useIdentifiableThreadNames = this.clientConfiguration.isUsingIdentifiableThreadNames();
        this.threadNamePrefix = this.clientConfiguration.getThreadNamePrefix();
        this.threadNameSuffix = this.clientConfiguration.getThreadNameSuffix();
        this.state = IotHubConnectionStatus.DISCONNECTED;
        log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", (Object)(this.isWebsocketConnection ? 443 : 5671));
    }

    public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, boolean isWebsocketConnection, SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval, int sendInterval, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix) {
        this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToUnregister = new ConcurrentHashMap<ClientConfiguration, Boolean>();
        this.isWebsocketConnection = isWebsocketConnection;
        this.authenticationType = ClientConfiguration.AuthType.SAS_TOKEN;
        this.hostName = hostName;
        this.transportUniqueIdentifier = transportUniqueIdentifier;
        this.proxySettings = proxySettings;
        this.sslContext = sslContext;
        this.add((Handler)new Handshaker());
        this.isMultiplexing = true;
        this.state = IotHubConnectionStatus.DISCONNECTED;
        log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", (Object)(this.isWebsocketConnection ? 443 : 5671));
        this.keepAliveInterval = keepAliveInterval;
        this.sendInterval = sendInterval;
        this.useIdentifiableThreadNames = useIdentifiableThreadNames;
        this.threadNamePrefix = threadNamePrefix;
        this.threadNameSuffix = threadNameSuffix;
    }

    public void registerMultiplexedDevice(ClientConfiguration config) {
        if (this.state == IotHubConnectionStatus.CONNECTED) {
            log.trace("Queuing the registration of device {} to an active multiplexed connection", (Object)config.getDeviceId());
            this.deviceSessionsOpenedLatches.put(config.getDeviceId(), new CountDownLatch(1));
            this.multiplexingClientsToRegister.add(config);
        }
        this.clientConfigurations.add(config);
    }

    public void unregisterMultiplexedDevice(ClientConfiguration config, boolean willReconnect) {
        if (this.state == IotHubConnectionStatus.CONNECTED) {
            if (willReconnect) {
                log.trace("Queuing the unregistration of device {} from an active multiplexed connection. The device will be re-registered for reconnection purposes.", (Object)config.getDeviceId());
            } else {
                log.trace("Queuing the unregistration of device {} from an active multiplexed connection", (Object)config.getDeviceId());
            }
            this.multiplexingClientsToUnregister.put(config, willReconnect);
        }
        this.clientConfigurations.remove(config);
        this.deviceSessionsOpenedLatches.remove(config.getDeviceId());
    }

    @Override
    public void open() throws TransportException {
        log.debug("Opening amqp layer...");
        this.connectionId = UUID.randomUUID().toString();
        this.savedException = null;
        if (this.state == IotHubConnectionStatus.DISCONNECTED) {
            for (ClientConfiguration clientConfig : this.clientConfigurations) {
                this.addSessionHandler(clientConfig);
            }
            this.initializeStateLatches();
            try {
                ClientConfiguration config;
                boolean authenticationSessionOpenTimedOut;
                Iterator<ClientConfiguration> configsIterator;
                this.openAsync();
                if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
                    log.trace("Waiting for authentication links to open...");
                }
                ClientConfiguration defaultConfig = (configsIterator = this.clientConfigurations.iterator()).hasNext() ? configsIterator.next() : null;
                int timeoutSeconds = 20;
                if (defaultConfig != null) {
                    timeoutSeconds = defaultConfig.getAmqpOpenAuthenticationSessionTimeout();
                }
                boolean bl = authenticationSessionOpenTimedOut = !this.authenticationSessionOpenedLatch.await(timeoutSeconds, TimeUnit.SECONDS);
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (authenticationSessionOpenTimedOut) {
                    this.closeConnectionWithException("Timed out waiting for authentication session to open", true);
                }
                log.trace("Waiting for device sessions to open...");
                boolean deviceSessionsOpenTimedOut = false;
                Iterator<ClientConfiguration> iterator = this.clientConfigurations.iterator();
                while (iterator.hasNext() && !(deviceSessionsOpenTimedOut = !this.deviceSessionsOpenedLatches.get((config = iterator.next()).getDeviceId()).await(config.getAmqpOpenDeviceSessionsTimeout(), TimeUnit.SECONDS))) {
                }
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (deviceSessionsOpenTimedOut) {
                    this.closeConnectionWithException("Timed out waiting for worker links to open", true);
                }
            }
            catch (TransportException e) {
                this.clearLocalState();
                this.closeNetworkResources();
                throw e;
            }
            catch (InterruptedException e) {
                this.clearLocalState();
                this.closeNetworkResources();
                TransportException interruptedTransportException = new TransportException("Interrupted while waiting for links to open for AMQP connection", e);
                interruptedTransportException.setRetryable(true);
                throw interruptedTransportException;
            }
        }
        this.state = IotHubConnectionStatus.CONNECTED;
        this.listener.onConnectionEstablished(this.connectionId);
        log.debug("Amqp connection opened successfully");
    }

    private void clearLocalState() {
        this.sessionHandlers.clear();
        this.sasTokenRenewalHandlers.clear();
    }

    private void closeNetworkResources() {
        try {
            this.reactor.free();
        }
        catch (IllegalStateException e) {
            log.trace("Failed to free the reactor. Moving forward with cleanup anyways.", (Throwable)e);
        }
        this.executorServicesCleanup();
    }

    @Override
    public void close() {
        log.debug("Shutting down amqp layer...");
        try {
            this.closeAsync();
            try {
                log.trace("Waiting for reactor to close...");
                this.closeReactorLatch.await(20000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while waiting on reactor to close gracefully. Forcefully closing the reactor now.", (Throwable)e);
            }
        }
        finally {
            this.clearLocalState();
            this.closeNetworkResources();
            this.state = IotHubConnectionStatus.DISCONNECTED;
            log.trace("Amqp connection closed successfully");
        }
    }

    public void onReactorInit(Event event) {
        this.reactor = event.getReactor();
        String hostName = this.hostName;
        int port = 5671;
        if (this.isWebsocketConnection) {
            if (this.proxySettings != null) {
                hostName = this.proxySettings.getHostname();
                port = this.proxySettings.getPort();
            } else {
                port = 443;
            }
        }
        this.reactor.connectionToHost(hostName, port, (Handler)this);
        this.reactor.schedule(this.sendInterval, (Handler)this);
    }

    public void onReactorFinal(Event event) {
        log.trace("Amqps reactor finalized");
        this.releaseLatch(this.authenticationSessionOpenedLatch);
        this.releaseDeviceSessionLatches();
        this.releaseLatch(this.closeReactorLatch);
        if (this.savedException != null) {
            this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
            this.listener.onConnectionLost(this.savedException, this.connectionId);
        }
    }

    public void onConnectionInit(Event event) {
        this.connection = event.getConnection();
        this.connection.setHostname(this.hostName);
        this.connection.open();
    }

    public void onConnectionBound(Event event) {
        Transport transport = event.getTransport();
        transport.setIdleTimeout(this.keepAliveInterval * 1000);
        if (this.isWebsocketConnection) {
            this.addWebSocketLayer(transport);
        }
        try {
            ClientConfiguration defaultConfig;
            Iterator<ClientConfiguration> configsIterator = this.clientConfigurations.iterator();
            ClientConfiguration clientConfiguration = defaultConfig = configsIterator.hasNext() ? configsIterator.next() : null;
            SSLContext sslContext = defaultConfig != null ? defaultConfig.getAuthenticationProvider().getSSLContext() : (this.sslContext != null ? this.sslContext : new IotHubSSLContext().getSSLContext());
            if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
                Sasl sasl = transport.sasl();
                sasl.setMechanisms(new String[]{"ANONYMOUS"});
            }
            SslDomain domain = Proton.sslDomain();
            domain.setSslContext(sslContext);
            domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            domain.init(SslDomain.Mode.CLIENT);
            transport.ssl(domain);
        }
        catch (IOException e) {
            this.savedException = new TransportException(e);
            log.error("Encountered an exception while setting ssl domain for the amqp connection", (Throwable)this.savedException);
        }
        if (this.proxySettings != null) {
            this.addProxyLayer(transport, event.getConnection().getHostname() + ":" + 443);
        }
    }

    public void onConnectionLocalOpen(Event event) {
        log.trace("Amqp connection opened locally");
        if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
            Session cbsSession = this.connection.session();
            this.amqpsCbsSessionHandler = new AmqpsCbsSessionHandler(cbsSession, this);
            for (AmqpsSasTokenRenewalHandler sasTokenRenewalHandler : this.sasTokenRenewalHandlers) {
                sasTokenRenewalHandler.close();
            }
            this.sasTokenRenewalHandlers.clear();
            for (AmqpsSessionHandler amqpsSessionHandler : this.sessionHandlers.values()) {
                amqpsSessionHandler.setSession(this.connection.session());
                this.sasTokenRenewalHandlers.add(new AmqpsSasTokenRenewalHandler(this.amqpsCbsSessionHandler, amqpsSessionHandler));
            }
        } else {
            AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.values().iterator().next();
            amqpsSessionHandler.setSession(this.connection.session());
        }
    }

    public void onConnectionRemoteOpen(Event event) {
        log.trace("Amqp connection opened remotely");
    }

    public void onConnectionLocalClose(Event event) {
        log.debug("Amqp connection closed locally, shutting down all active sessions...");
        for (AmqpsSessionHandler amqpSessionHandler : this.sessionHandlers.values()) {
            amqpSessionHandler.closeSession();
        }
        if (this.amqpsCbsSessionHandler != null) {
            log.debug("Shutting down cbs session...");
            this.amqpsCbsSessionHandler.close();
        }
        log.trace("Closing reactor since connection has closed");
        event.getReactor().stop();
    }

    public void onConnectionRemoteClose(Event event) {
        Connection connection = event.getConnection();
        if (connection.getLocalState() == EndpointState.ACTIVE) {
            ErrorCondition errorCondition = connection.getRemoteCondition();
            this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
            log.error("Amqp connection was closed remotely", (Throwable)this.savedException);
            this.connection.close();
        } else {
            log.trace("Closing reactor since connection has closed");
            event.getReactor().stop();
        }
    }

    public void onTransportError(Event event) {
        super.onTransportError(event);
        this.state = IotHubConnectionStatus.DISCONNECTED;
        ErrorCondition errorCondition = event.getTransport().getRemoteCondition();
        boolean isALocalErrorCondition = false;
        if (errorCondition == null || errorCondition.getCondition() == null && errorCondition.getDescription() == null && errorCondition.getInfo() == null) {
            errorCondition = event.getTransport().getCondition();
            isALocalErrorCondition = true;
        }
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
        if (event.getConnection().getLocalState() == EndpointState.CLOSED && isALocalErrorCondition) {
            log.error("Amqp transport error occurred, calling onConnectionLocalClose", (Throwable)this.savedException);
            this.onConnectionLocalClose(event);
        } else {
            log.error("Amqp transport error occurred, closing the AMQPS connection", (Throwable)this.savedException);
            event.getConnection().close();
        }
    }

    public void onTimerTask(Event event) {
        this.sendQueuedMessages();
        this.sendQueuedAcknowledgements();
        this.checkForNewlyUnregisteredMultiplexedClientsToStop();
        this.checkForNewlyRegisteredMultiplexedClientsToStart();
        event.getReactor().schedule(this.sendInterval, (Handler)this);
    }

    @Override
    public void setListener(IotHubListener listener) {
        this.listener = listener;
    }

    @Override
    public IotHubStatusCode sendMessage(Message message) {
        log.trace("Adding message to amqp message queue to be sent later ({})", (Object)message);
        this.messagesToSend.add(message);
        return IotHubStatusCode.OK;
    }

    @Override
    public boolean sendMessageResult(IotHubTransportMessage message, IotHubMessageResult result) {
        this.queuedAcknowledgements.put(message, result);
        return true;
    }

    private void sendQueuedAcknowledgements() {
        while (!this.queuedAcknowledgements.isEmpty()) {
            Released ackType;
            IotHubTransportMessage queuedAcknowledgement = this.queuedAcknowledgements.keySet().iterator().next();
            IotHubMessageResult result = this.queuedAcknowledgements.get(queuedAcknowledgement);
            this.queuedAcknowledgements.remove(queuedAcknowledgement);
            if (result == IotHubMessageResult.ABANDON) {
                ackType = Released.getInstance();
            } else if (result == IotHubMessageResult.REJECT) {
                ackType = new Rejected();
            } else if (result == IotHubMessageResult.COMPLETE) {
                ackType = Accepted.getInstance();
            } else {
                log.warn("Invalid IoT Hub message result {}", (Object)result.name());
                continue;
            }
            AmqpsSessionHandler sessionHandler = this.sessionHandlers.get(queuedAcknowledgement.getConnectionDeviceId());
            if (sessionHandler != null && sessionHandler.acknowledgeReceivedMessage(queuedAcknowledgement, (DeliveryState)ackType)) continue;
            log.warn("No sessions could acknowledge the message ({})", (Object)queuedAcknowledgement);
        }
    }

    @Override
    public String getConnectionId() {
        return this.connectionId;
    }

    @Override
    public void onDeviceSessionOpened(String deviceId) {
        if (this.deviceSessionsOpenedLatches.containsKey(deviceId)) {
            log.trace("Device session for device {} opened, counting down the device sessions opening latch", (Object)deviceId);
            this.deviceSessionsOpenedLatches.get(deviceId).countDown();
            if (this.isMultiplexing) {
                this.listener.onMultiplexedDeviceSessionEstablished(this.connectionId, deviceId);
            }
        } else {
            log.warn("Unrecognized deviceId {} reported its device session as opened, ignoring it.", (Object)deviceId);
        }
    }

    @Override
    public void onAuthenticationSessionOpened() {
        block9: {
            log.trace("Authentication session opened, counting down the authentication session opening latch");
            this.authenticationSessionOpenedLatch.countDown();
            if (this.authenticationType != ClientConfiguration.AuthType.SAS_TOKEN) break block9;
            if (this.isWebsocketConnection) {
                ArrayList<AmqpsSasTokenRenewalHandler> handlers = new ArrayList<AmqpsSasTokenRenewalHandler>(this.sasTokenRenewalHandlers);
                int maxInFlightAuthenticationMessages = 30;
                for (int i = 0; i < handlers.size() - maxInFlightAuthenticationMessages; ++i) {
                    if (i + maxInFlightAuthenticationMessages >= handlers.size()) continue;
                    ((AmqpsSasTokenRenewalHandler)handlers.get(i)).setNextToAuthenticate((AmqpsSasTokenRenewalHandler)handlers.get(i + maxInFlightAuthenticationMessages));
                }
                int min = Math.min(maxInFlightAuthenticationMessages, handlers.size());
                for (int i = 0; i < min; ++i) {
                    try {
                        ((AmqpsSasTokenRenewalHandler)handlers.get(i)).sendAuthenticationMessage(this.connection.getReactor());
                        continue;
                    }
                    catch (TransportException e) {
                        log.error("Failed to send CBS authentication message", (Throwable)e);
                        this.savedException = e;
                    }
                }
            } else {
                for (AmqpsSasTokenRenewalHandler amqpsSasTokenRenewalHandler : this.sasTokenRenewalHandlers) {
                    try {
                        amqpsSasTokenRenewalHandler.sendAuthenticationMessage(this.connection.getReactor());
                    }
                    catch (TransportException e) {
                        log.error("Failed to send CBS authentication message", (Throwable)e);
                        this.savedException = e;
                    }
                }
            }
        }
    }

    @Override
    public void onMessageAcknowledged(Message message, DeliveryState deliveryState, String deviceId) {
        if (deliveryState == Accepted.getInstance()) {
            this.listener.onMessageSent(message, deviceId, null);
        } else if (deliveryState instanceof Rejected) {
            TransportException ex = AmqpsExceptionTranslator.convertFromAmqpException(((Rejected)deliveryState).getError());
            this.listener.onMessageSent(message, deviceId, ex);
        } else if (deliveryState == Released.getInstance()) {
            ProtocolException protocolException = new ProtocolException("Message was released by the amqp server");
            protocolException.setRetryable(true);
            this.listener.onMessageSent(message, deviceId, protocolException);
        } else {
            log.warn("Unexpected delivery state for sent message ({})", (Object)message);
        }
    }

    @Override
    public void onMessageReceived(IotHubTransportMessage message) {
        this.listener.onMessageReceived(message, null);
    }

    @Override
    public void onAuthenticationFailed(String deviceId, TransportException transportException) {
        if (this.isMultiplexing) {
            if (this.state != IotHubConnectionStatus.CONNECTED) {
                if (this.savedException == null) {
                    this.savedException = new MultiplexingDeviceUnauthorizedException("One or more multiplexed devices failed to authenticate");
                }
                if (this.savedException instanceof MultiplexingDeviceUnauthorizedException) {
                    ((MultiplexingDeviceUnauthorizedException)this.savedException).addRegistrationException(deviceId, transportException);
                }
            } else {
                log.trace("Not saving the authentication failure locally. Just notifying upper layer directly.");
            }
        } else {
            this.savedException = transportException;
        }
        this.listener.onMultiplexedDeviceSessionRegistrationFailed(this.connectionId, deviceId, transportException);
        if (this.deviceSessionsOpenedLatches.containsKey(deviceId)) {
            this.deviceSessionsOpenedLatches.get(deviceId).countDown();
        } else {
            log.warn("Unrecognized device Id reported authentication failure, could not map it to a device session latch", (Throwable)transportException);
        }
    }

    @Override
    public void onSessionClosedUnexpectedly(ErrorCondition errorCondition, String deviceId) {
        CountDownLatch deviceSessionsOpenedLatch;
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        if (this.isMultiplexing) {
            this.listener.onMultiplexedDeviceSessionRegistrationFailed(this.connectionId, deviceId, this.savedException);
        }
        if ((deviceSessionsOpenedLatch = this.deviceSessionsOpenedLatches.get(deviceId)) != null) {
            deviceSessionsOpenedLatch.countDown();
        }
        if (this.isMultiplexing) {
            log.error("Amqp session closed unexpectedly. notifying the transport layer to start reconnection logic...", (Throwable)this.savedException);
            this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
            boolean isReconnecting = this.reconnectingDeviceSessionHandlers.containsKey(deviceId);
            this.listener.onMultiplexedDeviceSessionLost(this.savedException, this.connectionId, deviceId, isReconnecting);
        } else {
            log.error("Amqp session closed unexpectedly. Closing this connection...", (Throwable)this.savedException);
            this.connection.close();
        }
    }

    @Override
    public void onCBSSessionClosedUnexpectedly(ErrorCondition errorCondition) {
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        log.error("Amqp CBS session closed unexpectedly. Closing this connection...", (Throwable)this.savedException);
        this.connection.close();
    }

    @Override
    public void onSessionClosedAsExpected(String deviceId) {
        if (this.isMultiplexing) {
            log.trace("onSessionClosedAsExpected callback executed, notifying transport layer");
            boolean isReconnecting = this.reconnectingDeviceSessionHandlers.containsKey(deviceId);
            this.listener.onMultiplexedDeviceSessionLost(this.savedException, this.connectionId, deviceId, isReconnecting);
        }
    }

    private void addWebSocketLayer(Transport transport) {
        log.debug("Adding websocket layer to amqp transport");
        WebSocketImpl webSocket = new WebSocketImpl(262144);
        webSocket.configure(this.hostName, WEB_SOCKET_PATH, WEB_SOCKET_QUERY, 443, WEB_SOCKET_SUB_PROTOCOL, null, null);
        ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
    }

    private void addProxyLayer(Transport transport, String hostName) {
        ProxyImpl proxy;
        log.debug("Adding proxy layer to amqp transport");
        if (this.proxySettings.getUsername() != null && this.proxySettings.getPassword() != null) {
            log.trace("Adding proxy username and password to amqp proxy configuration");
            ProxyConfiguration proxyConfiguration = new ProxyConfiguration(ProxyAuthenticationType.BASIC, this.proxySettings.getProxy(), this.proxySettings.getUsername(), new String(this.proxySettings.getPassword()));
            proxy = new ProxyImpl(proxyConfiguration);
        } else {
            log.trace("No proxy username and password will be used amqp proxy configuration");
            proxy = new ProxyImpl();
        }
        ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl();
        proxy.configure(hostName, null, (ProxyHandler)proxyHandler, transport);
        ((TransportInternal)transport).addTransportLayer((TransportLayer)proxy);
    }

    private void sendQueuedMessages() {
        int messagesAttemptedToBeProcessed = 0;
        Message message = this.messagesToSend.poll();
        while (message != null && messagesAttemptedToBeProcessed < 1000) {
            TransportException transportException;
            ++messagesAttemptedToBeProcessed;
            SendResult sendResult = this.sendQueuedMessage(message);
            if (sendResult == SendResult.WRONG_DEVICE) {
                TransportException transportException2;
                AmqpsSessionHandler reconnectingDeviceSessionHandler = this.reconnectingDeviceSessionHandlers.get(message.getConnectionDeviceId());
                if (reconnectingDeviceSessionHandler != null) {
                    log.trace("Amqp message failed to send because its AMQP session is currently reconnecting. Adding it back to messages to send queue ({})", (Object)message);
                    transportException2 = new TransportException("Amqp message failed to send because its AMQP session is currently reconnecting");
                    transportException2.setRetryable(true);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException2);
                } else {
                    transportException2 = new TransportException("Message failed to send because it belonged to a device that was unregistered from the AMQP connetion");
                    transportException2.setRetryable(false);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException2);
                }
            } else if (sendResult == SendResult.DUPLICATE_SUBSCRIPTION_MESSAGE) {
                log.trace("Attempted to send subscription message while the subscription was already in progress. Discarding the message ({})", (Object)message);
            } else if (sendResult == SendResult.SUBSCRIPTION_IN_PROGRESS) {
                log.trace("Attempted to send twin/method message while the twin/method subscription was in progress. Adding it back to messages to send queue to try again after the subscription has finished ({})", (Object)message);
                transportException = new TransportException("Subscription in progress needs to be completed before this message can be sent");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            } else if (sendResult == SendResult.LINKS_NOT_OPEN) {
                log.warn("Failed to send a message because its AMQP links were not open yet. Adding it back to messages to send queue ({})", (Object)message);
                transportException = new TransportException("Amqp links not open for this message");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            } else if (sendResult == SendResult.UNKNOWN_FAILURE) {
                log.warn("Unknown failure occurred while attempting to send. Adding it back to messages to send queue ({})", (Object)message);
                transportException = new TransportException("Unknown failure");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            }
            message = this.messagesToSend.poll();
        }
        if (message != null) {
            this.messagesToSend.add(message);
        }
    }

    private SendResult sendQueuedMessage(Message message) {
        log.trace("Sending message over amqp ({})", (Object)message);
        AmqpsSessionHandler sessionHandler = this.sessionHandlers.get(message.getConnectionDeviceId());
        if (sessionHandler == null) {
            return SendResult.WRONG_DEVICE;
        }
        return sessionHandler.sendMessage(message);
    }

    private Reactor createReactor() throws TransportException {
        try {
            ReactorOptions options = new ReactorOptions();
            options.setMaxFrameSize(4096);
            if (this.authenticationType == ClientConfiguration.AuthType.X509_CERTIFICATE) {
                options.setEnableSaslByDefault(false);
            }
            return Proton.reactor((ReactorOptions)options, (Handler[])new Handler[]{this});
        }
        catch (IOException e) {
            throw new TransportException("Could not create Proton reactor", e);
        }
    }

    private void releaseLatch(CountDownLatch latch) {
        int i = 0;
        while ((long)i < latch.getCount()) {
            latch.countDown();
            ++i;
        }
    }

    private void releaseDeviceSessionLatches() {
        for (String deviceId : this.deviceSessionsOpenedLatches.keySet()) {
            this.releaseLatch(this.deviceSessionsOpenedLatches.get(deviceId));
        }
    }

    private AmqpsSessionHandler addSessionHandler(ClientConfiguration clientConfiguration) {
        String deviceId = clientConfiguration.getDeviceId();
        AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(deviceId);
        if (amqpsSessionHandler != null) {
            return amqpsSessionHandler;
        }
        amqpsSessionHandler = this.reconnectingDeviceSessionHandlers.containsKey(deviceId) ? this.reconnectingDeviceSessionHandlers.remove(deviceId) : new AmqpsSessionHandler(clientConfiguration, this);
        this.sessionHandlers.put(deviceId, amqpsSessionHandler);
        return amqpsSessionHandler;
    }

    private void checkForNewlyRegisteredMultiplexedClientsToStart() {
        Iterator<ClientConfiguration> configsToRegisterIterator = this.multiplexingClientsToRegister.iterator();
        ClientConfiguration configToRegister = configsToRegisterIterator.hasNext() ? configsToRegisterIterator.next() : null;
        HashSet<ClientConfiguration> configsRegisteredSuccessfully = new HashSet<ClientConfiguration>();
        while (configToRegister != null) {
            AmqpsSessionHandler amqpsSessionHandler = this.addSessionHandler(configToRegister);
            log.trace("Adding device session for device {} to an active connection", (Object)configToRegister.getDeviceId());
            amqpsSessionHandler.setSession(this.connection.session());
            AmqpsSasTokenRenewalHandler amqpsSasTokenRenewalHandler = new AmqpsSasTokenRenewalHandler(this.amqpsCbsSessionHandler, amqpsSessionHandler);
            this.sasTokenRenewalHandlers.add(amqpsSasTokenRenewalHandler);
            try {
                amqpsSasTokenRenewalHandler.sendAuthenticationMessage(this.reactor);
                configsRegisteredSuccessfully.add(configToRegister);
            }
            catch (TransportException e) {
                log.warn("Failed to send authentication message for device {}; will try again.", (Object)amqpsSasTokenRenewalHandler.amqpsSessionHandler.getDeviceId());
                amqpsSasTokenRenewalHandler.close();
                this.sasTokenRenewalHandlers.remove(amqpsSasTokenRenewalHandler);
                return;
            }
            configToRegister = configsToRegisterIterator.hasNext() ? configsToRegisterIterator.next() : null;
        }
        this.multiplexingClientsToRegister.removeAll(configsRegisteredSuccessfully);
    }

    private void checkForNewlyUnregisteredMultiplexedClientsToStop() {
        Iterator<ClientConfiguration> configsToUnregisterIterator = this.multiplexingClientsToUnregister.keySet().iterator();
        ClientConfiguration configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
        HashSet<ClientConfiguration> configsUnregisteredSuccessfully = new HashSet<ClientConfiguration>();
        while (configToUnregister != null) {
            String deviceId = configToUnregister.getDeviceId();
            AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(deviceId);
            if (amqpsSessionHandler == null) {
                log.warn("Attempted to remove device session for device {} from multiplexed connection, but device was not currently registered.", (Object)deviceId);
            } else {
                log.trace("Removing session handler for device {}", (Object)deviceId);
                this.sessionHandlers.remove(deviceId);
                boolean isSessionReconnecting = this.multiplexingClientsToUnregister.get(configToUnregister);
                if (isSessionReconnecting) {
                    this.reconnectingDeviceSessionHandlers.put(deviceId, amqpsSessionHandler);
                } else {
                    this.reconnectingDeviceSessionHandlers.remove(deviceId);
                }
                AmqpsSasTokenRenewalHandler sasTokenRenewalHandlerToRemove = null;
                for (AmqpsSasTokenRenewalHandler existingSasTokenRenewalHandler : this.sasTokenRenewalHandlers) {
                    if (!existingSasTokenRenewalHandler.amqpsSessionHandler.getDeviceId().equals(configToUnregister.getDeviceId())) continue;
                    sasTokenRenewalHandlerToRemove = existingSasTokenRenewalHandler;
                    log.trace("Closing sas token renewal handler for device {}", (Object)configToUnregister.getDeviceId());
                    sasTokenRenewalHandlerToRemove.close();
                    break;
                }
                if (sasTokenRenewalHandlerToRemove != null) {
                    this.sasTokenRenewalHandlers.remove(sasTokenRenewalHandlerToRemove);
                }
                log.debug("Closing device session for multiplexed device {}", (Object)configToUnregister.getDeviceId());
                amqpsSessionHandler.closeSession();
            }
            configsUnregisteredSuccessfully.add(configToUnregister);
            configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
        }
        for (ClientConfiguration successfullyUnregisteredConfig : configsUnregisteredSuccessfully) {
            this.multiplexingClientsToUnregister.remove(successfullyUnregisteredConfig);
        }
        this.clientConfigurations.removeAll(configsUnregisteredSuccessfully);
    }

    private void initializeStateLatches() {
        this.closeReactorLatch = new CountDownLatch(1);
        if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
            log.trace("Initializing authentication link latch count to {}", (Object)1);
            this.authenticationSessionOpenedLatch = new CountDownLatch(1);
        } else {
            log.trace("Initializing authentication link latch count to 0 because x509 connections don't have authentication links");
            this.authenticationSessionOpenedLatch = new CountDownLatch(0);
        }
        this.deviceSessionsOpenedLatches = new ConcurrentHashMap<String, CountDownLatch>();
        for (AmqpsSessionHandler sessionHandler : this.sessionHandlers.values()) {
            String deviceId = sessionHandler.getDeviceId();
            log.trace("Initializing device session latch for device {}", (Object)deviceId);
            this.deviceSessionsOpenedLatches.put(deviceId, new CountDownLatch(1));
        }
    }

    private void closeConnectionWithException(String errorMessage, boolean isRetryable) throws TransportException {
        TransportException transportException = new TransportException(errorMessage);
        transportException.setRetryable(isRetryable);
        log.error(errorMessage, (Throwable)transportException);
        this.close();
        throw transportException;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void openAsync() throws TransportException {
        log.trace("OpenAsnyc called for amqp connection");
        Object object = this.executorServiceLock;
        synchronized (object) {
            if (this.executorService == null) {
                log.trace("Creating new executor service");
                this.executorService = Executors.newFixedThreadPool(1);
            }
        }
        this.reactor = this.createReactor();
        String runnerUniqueIdentifier = this.isMultiplexing ? "Multiplexed-" + this.transportUniqueIdentifier : this.clientConfiguration.getDeviceClientUniqueIdentifier();
        String reactorRunnerPrefix = this.hostName + "-" + runnerUniqueIdentifier + "-Cnx" + this.connectionId;
        String threadName = "";
        if (this.isMultiplexing) {
            if (this.useIdentifiableThreadNames) {
                threadName = threadName + reactorRunnerPrefix + "-" + "azure-iot-sdk-ReactorRunner" + "-ConnectionOwner";
            } else {
                if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) {
                    threadName = threadName + this.threadNamePrefix;
                }
                threadName = threadName + "azure-iot-sdk-ReactorRunner";
                if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) {
                    threadName = threadName + this.threadNameSuffix;
                }
            }
        } else if (this.useIdentifiableThreadNames) {
            threadName = threadName + reactorRunnerPrefix + "-" + "azure-iot-sdk-ReactorRunner" + "-ConnectionOwner";
        } else {
            if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) {
                threadName = threadName + this.threadNamePrefix;
            }
            threadName = threadName + "azure-iot-sdk-ReactorRunner";
            if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) {
                threadName = threadName + this.threadNameSuffix;
            }
        }
        ReactorRunner reactorRunner = new ReactorRunner(this.reactor, this.listener, this.connectionId, threadName, this);
        this.executorService.submit(reactorRunner);
    }

    private void closeAsync() {
        log.trace("CloseAsync called for amqp connection");
        if (this.connection == null && this.reactor == null) {
            this.releaseLatch(this.authenticationSessionOpenedLatch);
            this.releaseDeviceSessionLatches();
            this.releaseLatch(this.closeReactorLatch);
        } else if (this.connection == null) {
            this.reactor.stop();
        } else if (this.reactor == null) {
            log.warn("Connection was initialized without a reactor, connection is in an unknown state; closing connection anyways.");
            this.connection.close();
        } else if (this.connection.getLocalState() == EndpointState.CLOSED && this.connection.getRemoteState() == EndpointState.CLOSED) {
            log.trace("Closing amqp reactor since the connection was already closed");
            this.connection.getReactor().stop();
        } else {
            log.trace("Closing amqp connection");
            this.connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executorServicesCleanup() {
        Object object = this.executorServiceLock;
        synchronized (object) {
            if (this.executorService != null) {
                log.trace("Shutdown of executor service has started");
                this.executorService.shutdownNow();
                this.executorService = null;
                log.trace("Shutdown of executor service completed");
            }
        }
    }

    @Override
    public void onReactorClosedUnexpectedly() {
        this.releaseLatch(this.authenticationSessionOpenedLatch);
        this.releaseDeviceSessionLatches();
        this.releaseLatch(this.closeReactorLatch);
    }
}

