/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.qpid.server.QpidException;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQConnectionException;
import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.BrokerDecoder;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl;
import org.apache.qpid.server.protocol.v0_8.ServerDecoder;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionRedirectBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation;
import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor;
import org.apache.qpid.server.protocol.v0_8.transport.ServerMethodDispatcher;
import org.apache.qpid.server.protocol.v0_8.transport.ServerMethodProcessor;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.TransportException;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPConnection_0_8Impl
extends AbstractAMQPConnection<AMQPConnection_0_8Impl, AMQPConnection_0_8Impl>
implements ServerMethodProcessor<ServerChannelMethodProcessor>,
AMQPConnection_0_8<AMQPConnection_0_8Impl> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_0_8Impl.class);
    private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
    private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
    private final AtomicBoolean _stateChanged = new AtomicBoolean();
    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference();
    private final Object _channelAddRemoveLock = new Object();
    private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<Integer, AMQChannel>();
    private volatile ConnectionState _state = ConnectionState.INIT;
    private final Set<AMQChannel> _channelsForCurrentMessage = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ServerDecoder _decoder;
    private volatile SaslNegotiator _saslNegotiator;
    private volatile int _maxNoOfChannels;
    private volatile ProtocolVersion _protocolVersion;
    private volatile MethodRegistry _methodRegistry;
    private final Queue<Action<? super AMQPConnection_0_8Impl>> _asyncTaskList = new ConcurrentLinkedQueue<Action<? super AMQPConnection_0_8Impl>>();
    private final Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
    private volatile ProtocolOutputConverter _protocolOutputConverter;
    private final Object _reference = new Object();
    private volatile int _maxFrameSize;
    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
    private final ByteBufferSender _sender;
    private volatile boolean _deferFlush;
    private boolean _blocking;
    private volatile boolean _closeWhenNoRoute;
    private volatile boolean _compressionSupported;
    private volatile boolean _sendQueueDeleteOkRegardless;
    private final Pattern _sendQueueDeleteOkRegardlessClientVerRegexp;
    private volatile int _currentClassId;
    private volatile int _currentMethodId;
    private final int _binaryDataLimit;
    private volatile boolean _transportBlockedForWriting;
    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
    private final Set<AMQPSession<?, ?>> _sessionsWithWork = Collections.newSetFromMap(new ConcurrentHashMap());
    private volatile int _heartBeatDelay;
    private volatile String _closeCause;
    private volatile int _closeCauseCode;

    public AMQPConnection_0_8Impl(Broker<?> broker, ServerNetworkConnection network, AmqpPort<?> port, Transport transport, Protocol protocol, long connectionId, AggregateTicker aggregateTicker) {
        super(broker, network, port, transport, protocol, connectionId, aggregateTicker);
        this._maxNoOfChannels = port.getSessionCountLimit();
        this._decoder = new BrokerDecoder(this);
        this._binaryDataLimit = this.getBroker().getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) ? (Integer)this.getBroker().getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) : 80;
        String sendQueueDeleteOkRegardlessRegexp = this.getBroker().getContextKeys(false).contains("connection.sendQueueDeleteOkRegardlessClientVerRegexp") ? (String)this.getBroker().getContextValue(String.class, "connection.sendQueueDeleteOkRegardlessClientVerRegexp") : "";
        this._sendQueueDeleteOkRegardlessClientVerRegexp = Pattern.compile(sendQueueDeleteOkRegardlessRegexp);
        this._sender = network.getSender();
        this._closeWhenNoRoute = port.getCloseWhenNoRoute();
    }

    public boolean isTransportBlockedForWriting() {
        return this._transportBlockedForWriting;
    }

    public void setTransportBlockedForWriting(boolean blocked) {
        if (this._transportBlockedForWriting != blocked) {
            this._transportBlockedForWriting = blocked;
            for (AMQChannel channel : this._channelMap.values()) {
                channel.transportStateChanged();
            }
        }
    }

    public void setMaxFrameSize(int frameMax) {
        this._maxFrameSize = frameMax;
        this._decoder.setMaxFrameSize(frameMax);
    }

    public long getMaxFrameSize() {
        return this._maxFrameSize;
    }

    private int getDefaultMaxFrameSize() {
        Broker broker = this.getBroker();
        return broker.getNetworkBufferSize() - AMQFrame.getFrameOverhead();
    }

    public boolean isClosing() {
        return this._orderlyClose.get();
    }

    @Override
    public ClientDeliveryMethod createDeliveryMethod(int channelId) {
        return new WriteDeliverMethod(channelId);
    }

    protected void onReceive(QpidByteBuffer msg) {
        try {
            this._decoder.decodeBuffer(msg);
            this.receivedCompleteAllChannels();
        }
        catch (IOException | AMQFrameDecodingException e) {
            LOGGER.error("Unexpected exception", e);
            throw new ConnectionScopedRuntimeException(e);
        }
    }

    private void receivedCompleteAllChannels() {
        RuntimeException exception = null;
        for (AMQChannel channel : this._channelsForCurrentMessage) {
            try {
                channel.receivedComplete();
            }
            catch (RuntimeException exceptionForThisChannel) {
                if (exception == null) {
                    exception = exceptionForThisChannel;
                }
                LOGGER.error("Error informing channel that receiving is complete. Channel: " + channel, (Throwable)exceptionForThisChannel);
            }
        }
        this._channelsForCurrentMessage.clear();
        if (exception != null) {
            throw exception;
        }
    }

    void channelRequiresSync(AMQChannel amqChannel) {
        this._channelsForCurrentMessage.add(amqChannel);
    }

    private synchronized void protocolInitiationReceived(ProtocolInitiation pi) {
        this._decoder.setExpectProtocolInitiation(false);
        try {
            ProtocolVersion pv = pi.checkVersion();
            this.setProtocolVersion(pv);
            StringBuilder mechanismBuilder = new StringBuilder();
            for (String mechanismName : this.getPort().getAuthenticationProvider().getAvailableMechanisms(this.getTransport().isSecure())) {
                if (mechanismBuilder.length() != 0) {
                    mechanismBuilder.append(' ');
                }
                mechanismBuilder.append(mechanismName);
            }
            String mechanisms = mechanismBuilder.toString();
            String locales = "en_US";
            Map props = Collections.emptyMap();
            for (ConnectionPropertyEnricher enricher : this.getPort().getConnectionPropertyEnrichers()) {
                props = enricher.addConnectionProperties((AMQPConnection)this, props);
            }
            FieldTable serverProperties = FieldTable.convertToFieldTable(props);
            ConnectionStartBody responseBody = this.getMethodRegistry().createConnectionStartBody(this.getProtocolMajorVersion(), pv.getActualMinorVersion(), serverProperties, mechanisms.getBytes(StandardCharsets.US_ASCII), locales.getBytes(StandardCharsets.US_ASCII));
            this.writeFrame(responseBody.generateFrame(0));
            this._state = ConnectionState.AWAIT_START_OK;
            this._sender.flush();
        }
        catch (QpidException e) {
            LOGGER.debug("Received unsupported protocol initiation for protocol version: {} ", (Object)this.getProtocolVersion());
            this.writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
            this._sender.flush();
        }
    }

    @Override
    public synchronized void writeFrame(AMQDataBlock frame) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("SEND: " + frame);
        }
        frame.writePayload(this._sender);
        this.updateLastWriteTime();
        if (!this._deferFlush) {
            this._sender.flush();
        }
    }

    public AMQChannel getChannel(int channelId) {
        AMQChannel channel = this._channelMap.get(channelId);
        if (channel == null || channel.isClosing()) {
            return null;
        }
        return channel;
    }

    @Override
    public boolean channelAwaitingClosure(int channelId) {
        return this.ignoreAllButCloseOk() || !this._closingChannelsList.isEmpty() && this._closingChannelsList.containsKey(channelId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addChannel(AMQChannel channel) {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            this._channelMap.put(channel.getChannelId(), channel);
            if (this._blocking) {
                channel.block();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeChannel(int channelId) {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            this._channelMap.remove(channelId);
        }
    }

    @Override
    public void closeChannel(AMQChannel channel) {
        this.closeChannel(channel, 0, null, false);
    }

    @Override
    public void closeChannelAndWriteFrame(AMQChannel channel, int cause, String message) {
        this.writeFrame(new AMQFrame(channel.getChannelId(), this.getMethodRegistry().createChannelCloseBody(cause, AMQShortString.validValueOf((Object)message), this._currentClassId, this._currentMethodId)));
        this.closeChannel(channel, cause, message, true);
    }

    public void closeChannel(int channelId, int cause, String message) {
        AMQChannel channel = this.getChannel(channelId);
        if (channel == null) {
            throw new IllegalArgumentException("Unknown channel id");
        }
        this.closeChannel(channel, cause, message, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeChannel(AMQChannel channel, int cause, String message, boolean mark) {
        int channelId = channel.getChannelId();
        try {
            channel.close(cause, message);
            if (mark) {
                this.markChannelAwaitingCloseOk(channelId);
            }
        }
        finally {
            this.removeChannel(channelId);
        }
    }

    @Override
    public void closeChannelOk(int channelId) {
        this._closingChannelsList.remove(channelId);
    }

    private void markChannelAwaitingCloseOk(int channelId) {
        this._closingChannelsList.put(channelId, System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeAllChannels() {
        try {
            RuntimeException firstException = null;
            for (AMQChannel aMQChannel : this.getSessionModels()) {
                try {
                    aMQChannel.close(this._closeCauseCode, this._closeCause);
                }
                catch (RuntimeException re) {
                    if (!(re instanceof ConnectionScopedRuntimeException)) {
                        LOGGER.error("Unexpected exception closing channel", (Throwable)re);
                    }
                    firstException = re;
                }
            }
            if (firstException != null) {
                throw firstException;
            }
        }
        finally {
            Object object = this._channelAddRemoveLock;
            synchronized (object) {
                this._channelMap.clear();
            }
        }
    }

    private void completeAndCloseAllChannels() {
        try {
            this.receivedCompleteAllChannels();
        }
        finally {
            this.closeAllChannels();
        }
    }

    @Override
    public void sendConnectionClose(int errorCode, String message, int channelId) {
        this.sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(this.getProtocolVersion(), errorCode, AMQShortString.validValueOf((Object)message), this._currentClassId, this._currentMethodId)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendConnectionClose(int channelId, AMQFrame frame) {
        if (this._orderlyClose.compareAndSet(false, true)) {
            try {
                this.markChannelAwaitingCloseOk(channelId);
                this.completeAndCloseAllChannels();
            }
            finally {
                try {
                    this.writeFrame(frame);
                }
                finally {
                    long timeoutTime = System.currentTimeMillis() + (Long)this.getContextValue(Long.class, "connection.closeResponseTimeout");
                    this.getAggregateTicker().addTicker((Ticker)new ConnectionClosingTicker(timeoutTime, this.getNetwork()));
                    this.notifyWork();
                }
            }
        }
    }

    public void closeNetworkConnection() {
        this.getNetwork().close();
    }

    @Override
    public boolean isSendQueueDeleteOkRegardless() {
        return this._sendQueueDeleteOkRegardless;
    }

    void setSendQueueDeleteOkRegardless(boolean sendQueueDeleteOkRegardless) {
        this._sendQueueDeleteOkRegardless = sendQueueDeleteOkRegardless;
    }

    private void setClientProperties(FieldTable clientProperties) {
        if (clientProperties != null) {
            Object compressionSupported;
            Object closeWhenNoRoute = clientProperties.get("qpid.close_when_no_route");
            if (closeWhenNoRoute != null) {
                this._closeWhenNoRoute = Boolean.parseBoolean(String.valueOf(closeWhenNoRoute));
                LOGGER.debug("Client set closeWhenNoRoute={} for connection {}", (Object)this._closeWhenNoRoute, (Object)this);
            }
            if ((compressionSupported = clientProperties.get("qpid.message_compression_supported")) != null) {
                this._compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
                LOGGER.debug("Client set compressionSupported={} for connection {}", (Object)this._compressionSupported, (Object)this);
            }
            String clientId = Objects.toString(clientProperties.get("instance"), null);
            String clientVersion = Objects.toString(clientProperties.get("version"), null);
            String clientProduct = Objects.toString(clientProperties.get("product"), null);
            String remoteProcessPid = Objects.toString(clientProperties.get("qpid.client_pid"), null);
            boolean mightBeQpidClient = clientProduct != null && (clientProduct.toLowerCase().contains("qpid") || clientProduct.toLowerCase().equals("unknown"));
            boolean sendQueueDeleteOkRegardless = mightBeQpidClient && (clientVersion == null || this._sendQueueDeleteOkRegardlessClientVerRegexp.matcher(clientVersion).matches());
            this.setSendQueueDeleteOkRegardless(sendQueueDeleteOkRegardless);
            if (sendQueueDeleteOkRegardless) {
                LOGGER.debug("Peer is an older Qpid client, queue delete-ok response will be sent regardless for connection {}", (Object)this);
            }
            this.setClientVersion(clientVersion);
            this.setClientProduct(clientProduct);
            this.setRemoteProcessPid(remoteProcessPid);
            this.setClientId(clientId == null ? UUID.randomUUID().toString() : clientId);
        }
    }

    private void setProtocolVersion(ProtocolVersion pv) {
        this._protocolVersion = pv;
        this._methodRegistry = new MethodRegistry(this._protocolVersion);
        this._protocolOutputConverter = new ProtocolOutputConverterImpl(this);
    }

    public byte getProtocolMajorVersion() {
        return this._protocolVersion.getMajorVersion();
    }

    @Override
    public ProtocolVersion getProtocolVersion() {
        return this._protocolVersion;
    }

    public byte getProtocolMinorVersion() {
        return this._protocolVersion.getMinorVersion();
    }

    public MethodRegistry getRegistry() {
        return this.getMethodRegistry();
    }

    @Override
    public ProtocolOutputConverter getProtocolOutputConverter() {
        return this._protocolOutputConverter;
    }

    @Override
    public MethodRegistry getMethodRegistry() {
        return this._methodRegistry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed() {
        try {
            try {
                if (!this._orderlyClose.get()) {
                    this.completeAndCloseAllChannels();
                }
            }
            finally {
                this.performDeleteTasks();
                NamedAddressSpace virtualHost = this.getAddressSpace();
                if (virtualHost != null) {
                    virtualHost.deregisterConnection((AMQPConnection)this);
                }
            }
        }
        catch (TransportException | ConnectionScopedRuntimeException e) {
            LOGGER.error("Could not close protocol engine", e);
        }
        finally {
            this.markTransportClosed();
        }
    }

    protected boolean isOrderlyClose() {
        return this._orderlyClose.get();
    }

    protected String getCloseCause() {
        if (this._closeCause == null) {
            return null;
        }
        return this._closeCauseCode + " - " + this._closeCause;
    }

    public void encryptedTransport() {
    }

    public void readerIdle() {
        AccessController.doPrivileged(new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQPConnection_0_8Impl.this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE((String)("Current connection state: " + (Object)((Object)AMQPConnection_0_8Impl.this._state)), (boolean)true));
                AMQPConnection_0_8Impl.this.getNetwork().close();
                return null;
            }
        }, this.getAccessControllerContext());
    }

    public synchronized void writerIdle() {
        this.writeFrame(HeartbeatBody.FRAME);
    }

    public int getSessionCountLimit() {
        return this._maxNoOfChannels;
    }

    @Override
    public int getHeartbeatDelay() {
        return this._heartBeatDelay;
    }

    public String getAddress() {
        return String.valueOf(this.getNetwork().getRemoteAddress());
    }

    public void closeSessionAsync(final AMQPSession<?, ?> session, AMQPConnection.CloseReason reason, final String message) {
        int cause;
        switch (reason) {
            case MANAGEMENT: {
                cause = 320;
                break;
            }
            case TRANSACTION_TIMEOUT: {
                cause = 506;
                break;
            }
            default: {
                cause = 541;
            }
        }
        this.addAsyncTask((Action<? super AMQPConnection_0_8Impl>)new Action<AMQPConnection_0_8Impl>(){

            public void performAction(AMQPConnection_0_8Impl object) {
                int channelId = session.getChannelId();
                AMQPConnection_0_8Impl.this.closeChannel(channelId, cause, message);
                MethodRegistry methodRegistry = AMQPConnection_0_8Impl.this.getMethodRegistry();
                ChannelCloseBody responseBody = methodRegistry.createChannelCloseBody(cause, AMQShortString.validValueOf((Object)message), 0, 0);
                AMQPConnection_0_8Impl.this.writeFrame(responseBody.generateFrame(channelId));
            }
        });
    }

    public void sendConnectionCloseAsync(AMQPConnection.CloseReason reason, final String description) {
        int cause;
        this.stopConnection();
        switch (reason) {
            case MANAGEMENT: {
                cause = 320;
                break;
            }
            case TRANSACTION_TIMEOUT: {
                cause = 506;
                break;
            }
            default: {
                cause = 541;
            }
        }
        this._closeCauseCode = cause;
        this._closeCause = description;
        Action<AMQPConnection_0_8Impl> action = new Action<AMQPConnection_0_8Impl>(){

            public void performAction(AMQPConnection_0_8Impl object) {
                AMQConnectionException e = new AMQConnectionException(cause, description, 0, 0, AMQPConnection_0_8Impl.this.getMethodRegistry(), null);
                AMQPConnection_0_8Impl.this.sendConnectionClose(0, e.getCloseFrame());
            }
        };
        this.addAsyncTask((Action<? super AMQPConnection_0_8Impl>)action);
    }

    protected void addAsyncTask(Action<? super AMQPConnection_0_8Impl> action) {
        this._asyncTaskList.add(action);
        this.notifyWork();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void block() {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            if (!this._blocking) {
                this._blocking = true;
                for (AMQChannel channel : this._channelMap.values()) {
                    channel.block();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unblock() {
        Object object = this._channelAddRemoveLock;
        synchronized (object) {
            if (this._blocking) {
                this._blocking = false;
                for (AMQChannel channel : this._channelMap.values()) {
                    channel.unblock();
                }
            }
        }
    }

    public Collection<? extends AMQChannel> getSessionModels() {
        return Collections.unmodifiableCollection(this._channelMap.values());
    }

    public String getRemoteContainerName() {
        return this.getClientId();
    }

    @Override
    public void setDeferFlush(boolean deferFlush) {
        this._deferFlush = deferFlush;
    }

    public boolean hasSessionWithName(byte[] name) {
        return false;
    }

    public Iterator<ServerTransaction> getOpenTransactions() {
        return this._channelMap.values().stream().filter(channel -> channel.getTransaction() instanceof LocalTransaction).map(AMQChannel::getTransaction).iterator();
    }

    @Override
    public void receiveChannelOpen(int channelId) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV[" + channelId + "] ChannelOpen");
        }
        this.assertState(ConnectionState.OPEN);
        NamedAddressSpace virtualHost = this.getAddressSpace();
        if (virtualHost == null) {
            this.sendConnectionClose(503, "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
        } else if (this.getChannel(channelId) != null || this.channelAwaitingClosure(channelId)) {
            this.sendConnectionClose(504, "Channel " + channelId + " already exists", channelId);
        } else if (channelId > this.getSessionCountLimit()) {
            this.sendConnectionClose(504, "Channel " + channelId + " cannot be created as the max allowed channel id is " + this.getSessionCountLimit(), channelId);
        } else {
            LOGGER.debug("Connecting to: {}", (Object)virtualHost.getName());
            AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
            channel.create();
            this.addChannel(channel);
            ChannelOpenOkBody response = this.getMethodRegistry().createChannelOpenOkBody();
            this.writeFrame(response.generateFrame(channelId));
        }
    }

    void assertState(ConnectionState requiredState) {
        if (this._state != requiredState) {
            String replyText = "Command Invalid, expected " + (Object)((Object)requiredState) + " but was " + (Object)((Object)this._state);
            this.sendConnectionClose(503, replyText, 0);
            throw new ConnectionScopedRuntimeException(replyText);
        }
    }

    @Override
    public void receiveConnectionOpen(AMQShortString virtualHostName, AMQShortString capabilities, boolean insist) {
        NamedAddressSpace addressSpace;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionOpen[ virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]");
        }
        this.assertState(ConnectionState.AWAIT_OPEN);
        String virtualHostStr = AMQShortString.toString((AMQShortString)virtualHostName);
        if (virtualHostStr != null && virtualHostStr.charAt(0) == '/') {
            virtualHostStr = virtualHostStr.substring(1);
        }
        if ((addressSpace = this.getPort().getAddressSpace(virtualHostStr)) == null) {
            this.sendConnectionClose(404, "Unknown virtual host: '" + virtualHostName + "'", 0);
        } else if (!addressSpace.isActive()) {
            String redirectHost = addressSpace.getRedirectHost(this.getPort());
            if (redirectHost != null) {
                this.sendConnectionClose(0, new AMQFrame(0, new ConnectionRedirectBody(this.getProtocolVersion(), AMQShortString.valueOf((String)redirectHost), null)));
            } else {
                this.sendConnectionClose(320, "Virtual host '" + addressSpace.getName() + "' is not active", 0);
            }
        } else {
            try {
                addressSpace.registerConnection((AMQPConnection)this, (ConnectionEstablishmentPolicy)new NoopConnectionEstablishmentPolicy());
                this.setAddressSpace(addressSpace);
                if (addressSpace.authoriseCreateConnection((AMQPConnection)this)) {
                    MethodRegistry methodRegistry = this.getMethodRegistry();
                    ConnectionOpenOkBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
                    this.writeFrame(responseBody.generateFrame(0));
                    this._state = ConnectionState.OPEN;
                } else {
                    this.sendConnectionClose(403, "Connection refused", 0);
                }
            }
            catch (AccessControlException | VirtualHostUnavailableException e) {
                this.sendConnectionClose(403, e.getMessage(), 0);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionClose[ replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
        }
        try {
            if (this._orderlyClose.compareAndSet(false, true)) {
                this.completeAndCloseAllChannels();
            }
            MethodRegistry methodRegistry = this.getMethodRegistry();
            ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
            this.writeFrame(responseBody.generateFrame(0));
        }
        catch (Exception e) {
            LOGGER.error("Error closing connection for " + this.getRemoteAddressString(), (Throwable)e);
        }
        finally {
            this.closeNetworkConnection();
        }
    }

    @Override
    public void receiveConnectionCloseOk() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionCloseOk");
        }
        this.closeNetworkConnection();
    }

    @Override
    public void receiveConnectionSecureOk(byte[] response) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionSecureOk[ response: ******** ] ");
        }
        this.assertState(ConnectionState.AWAIT_SECURE_OK);
        this.processSaslResponse(response, this.getSubjectCreator());
    }

    private void disposeSaslNegotiator() {
        this._saslNegotiator.dispose();
        this._saslNegotiator = null;
    }

    @Override
    public void receiveConnectionStartOk(FieldTable clientProperties, AMQShortString mechanism, byte[] response, AMQShortString locale) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionStartOk[ clientProperties: " + clientProperties + " mechanism: " + mechanism + " response: ******** locale: " + locale + " ]");
        }
        this.assertState(ConnectionState.AWAIT_START_OK);
        LOGGER.debug("SASL Mechanism selected: {} Locale : {}", (Object)mechanism, (Object)locale);
        if (mechanism == null || mechanism.length() == 0) {
            this.sendConnectionClose(320, "No Sasl mechanism was specified", 0);
            return;
        }
        SubjectCreator subjectCreator = this.getSubjectCreator();
        this._saslNegotiator = subjectCreator.createSaslNegotiator(String.valueOf(mechanism), (SaslSettings)this);
        if (this._saslNegotiator == null) {
            this.sendConnectionClose(320, "No SaslServer could be created for mechanism: " + mechanism, 0);
        } else {
            this.setClientProperties(clientProperties);
            this.processSaslResponse(response, subjectCreator);
        }
    }

    private void processSaslResponse(byte[] response, SubjectCreator subjectCreator) {
        MethodRegistry methodRegistry = this.getMethodRegistry();
        SubjectAuthenticationResult authResult = this._successfulAuthenticationResult;
        byte[] challenge = null;
        if (authResult == null) {
            authResult = subjectCreator.authenticate(this._saslNegotiator, response);
            challenge = authResult.getChallenge();
        }
        switch (authResult.getStatus()) {
            case ERROR: {
                Exception cause = authResult.getCause();
                LOGGER.debug("Authentication failed: {}", (Object)(cause == null ? "" : cause.getMessage()));
                this.sendConnectionClose(530, "Authentication failed", 0);
                this.disposeSaslNegotiator();
                break;
            }
            case SUCCESS: {
                this._successfulAuthenticationResult = authResult;
                if (challenge == null || challenge.length == 0) {
                    LOGGER.debug("Connected as: {}", (Object)authResult.getSubject());
                    this.setSubject(authResult.getSubject());
                    int frameMax = this.getDefaultMaxFrameSize();
                    if (frameMax <= 0) {
                        frameMax = Integer.MAX_VALUE;
                    }
                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(this.getPort().getSessionCountLimit(), frameMax, this.getPort().getHeartbeatDelay());
                    this.writeFrame(tuneBody.generateFrame(0));
                    this._state = ConnectionState.AWAIT_TUNE_OK;
                    this.disposeSaslNegotiator();
                    break;
                }
                this.continueSaslNegotiation(challenge);
                break;
            }
            case CONTINUE: {
                this.continueSaslNegotiation(challenge);
            }
        }
    }

    private void continueSaslNegotiation(byte[] challenge) {
        ConnectionSecureBody secureBody = this.getMethodRegistry().createConnectionSecureBody(challenge);
        this.writeFrame(secureBody.generateFrame(0));
        this._state = ConnectionState.AWAIT_SECURE_OK;
    }

    @Override
    public void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat) {
        int brokerFrameMax;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ConnectionTuneOk[ channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
        }
        this.assertState(ConnectionState.AWAIT_TUNE_OK);
        if (heartbeat > 0) {
            this._heartBeatDelay = heartbeat;
            long writerDelay = 1000L * (long)heartbeat;
            long readerDelay = 1000L * (long)((Integer)this.getContextValue(Integer.class, "qpid.broker_heartbeat_timeout_factor")).intValue() * (long)heartbeat;
            this.initialiseHeartbeating(writerDelay, readerDelay);
        }
        if ((brokerFrameMax = this.getDefaultMaxFrameSize()) <= 0) {
            brokerFrameMax = Integer.MAX_VALUE;
        }
        if (frameMax > (long)brokerFrameMax) {
            this.sendConnectionClose(502, "Attempt to set max frame size to " + frameMax + " greater than the broker will allow: " + brokerFrameMax, 0);
        } else if (frameMax > 0L && frameMax < 4096L) {
            this.sendConnectionClose(502, "Attempt to set max frame size to " + frameMax + " which is smaller than the specification defined minimum: " + 4096, 0);
        } else {
            int value;
            int calculatedFrameMax = frameMax == 0L ? brokerFrameMax : (int)frameMax;
            this.setMaxFrameSize(calculatedFrameMax);
            this._maxNoOfChannels = value = channelMax == 0 || channelMax > 65535 ? 65535 : channelMax;
        }
        this._state = ConnectionState.AWAIT_OPEN;
    }

    @Override
    public int getBinaryDataLimit() {
        return this._binaryDataLimit;
    }

    @Override
    public Object getReference() {
        return this._reference;
    }

    @Override
    public boolean isCloseWhenNoRoute() {
        return this._closeWhenNoRoute;
    }

    public boolean isCompressionSupported() {
        return this._compressionSupported && this.getBroker().isMessageCompressionEnabled();
    }

    private SubjectCreator getSubjectCreator() {
        return this.getPort().getSubjectCreator(this.getTransport().isSecure(), this.getNetwork().getSelectedHost());
    }

    @Override
    public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) {
        this.assertState(ConnectionState.OPEN);
        ServerChannelMethodProcessor channelMethodProcessor = this.getChannel(channelId);
        if (channelMethodProcessor == null) {
            channelMethodProcessor = (ServerChannelMethodProcessor)Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(), new Class[]{ServerChannelMethodProcessor.class}, new InvocationHandler(){

                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    if (method.getName().equals("receiveChannelCloseOk") && AMQPConnection_0_8Impl.this.channelAwaitingClosure(channelId)) {
                        AMQPConnection_0_8Impl.this.closeChannelOk(channelId);
                    } else if (method.getName().startsWith("receive")) {
                        AMQPConnection_0_8Impl.this.sendConnectionClose(504, "Unknown channel id: " + channelId, channelId);
                    } else if (method.getName().equals("ignoreAllButCloseOk")) {
                        return AMQPConnection_0_8Impl.this.channelAwaitingClosure(channelId);
                    }
                    return null;
                }
            });
        }
        return channelMethodProcessor;
    }

    @Override
    public void receiveHeartbeat() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV Heartbeat");
        }
    }

    @Override
    public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("RECV ProtocolHeader [" + protocolInitiation + " ]");
        }
        this.protocolInitiationReceived(protocolInitiation);
    }

    @Override
    public void setCurrentMethod(int classId, int methodId) {
        this._currentClassId = classId;
        this._currentMethodId = methodId;
    }

    @Override
    public boolean ignoreAllButCloseOk() {
        return this.isClosing();
    }

    public boolean hasWork() {
        return this._stateChanged.get();
    }

    public void notifyWork() {
        this._stateChanged.set(true);
        Action<ProtocolEngine> listener = this._workListener.get();
        if (listener != null) {
            listener.performAction((Object)this);
        }
    }

    public void notifyWork(AMQPSession<?, ?> sessionModel) {
        this._sessionsWithWork.add(sessionModel);
        this.notifyWork();
    }

    public void clearWork() {
        this._stateChanged.set(false);
    }

    public void setWorkListener(Action<ProtocolEngine> listener) {
        this._workListener.set(listener);
    }

    public Iterator<Runnable> processPendingIterator() {
        if (!this.isIOThread()) {
            return Collections.emptyIterator();
        }
        return new ProcessPendingIterator();
    }

    protected boolean isOpeningInProgress() {
        switch (this._state) {
            case INIT: 
            case AWAIT_START_OK: 
            case AWAIT_SECURE_OK: 
            case AWAIT_TUNE_OK: 
            case AWAIT_OPEN: {
                return true;
            }
            case OPEN: {
                return false;
            }
        }
        throw new IllegalStateException("Unsupported state " + (Object)((Object)this._state));
    }

    private class ProcessPendingIterator
    implements Iterator<Runnable> {
        private Iterator<? extends AMQPSession<?, ?>> _sessionIterator;

        private ProcessPendingIterator() {
            this._sessionIterator = AMQPConnection_0_8Impl.this._sessionsWithWork.iterator();
        }

        @Override
        public boolean hasNext() {
            return !AMQPConnection_0_8Impl.this._sessionsWithWork.isEmpty() && !AMQPConnection_0_8Impl.this.isClosing() && !AMQPConnection_0_8Impl.this.isConnectionStopped() || !AMQPConnection_0_8Impl.this._asyncTaskList.isEmpty();
        }

        @Override
        public Runnable next() {
            if (!AMQPConnection_0_8Impl.this._sessionsWithWork.isEmpty()) {
                if (AMQPConnection_0_8Impl.this.isClosing() || AMQPConnection_0_8Impl.this.isConnectionStopped()) {
                    final Action asyncAction = (Action)AMQPConnection_0_8Impl.this._asyncTaskList.poll();
                    if (asyncAction != null) {
                        return new Runnable(){

                            @Override
                            public void run() {
                                asyncAction.performAction((Object)AMQPConnection_0_8Impl.this);
                            }
                        };
                    }
                    return new Runnable(){

                        @Override
                        public void run() {
                        }
                    };
                }
                if (!this._sessionIterator.hasNext()) {
                    this._sessionIterator = AMQPConnection_0_8Impl.this._sessionsWithWork.iterator();
                }
                final AMQPSession<?, ?> session = this._sessionIterator.next();
                return new Runnable(){

                    @Override
                    public void run() {
                        ProcessPendingIterator.this._sessionIterator.remove();
                        if (session.processPending()) {
                            AMQPConnection_0_8Impl.this._sessionsWithWork.add(session);
                        }
                    }
                };
            }
            if (!AMQPConnection_0_8Impl.this._asyncTaskList.isEmpty()) {
                final Action asyncAction = (Action)AMQPConnection_0_8Impl.this._asyncTaskList.poll();
                return new Runnable(){

                    @Override
                    public void run() {
                        asyncAction.performAction((Object)AMQPConnection_0_8Impl.this);
                    }
                };
            }
            throw new NoSuchElementException();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public final class WriteDeliverMethod
    implements ClientDeliveryMethod {
        private final int _channelId;

        public WriteDeliverMethod(int channelId) {
            this._channelId = channelId;
        }

        @Override
        public long deliverToClient(ConsumerTarget_0_8 target, AMQMessage message, InstanceProperties props, long deliveryTag) {
            long size = AMQPConnection_0_8Impl.this._protocolOutputConverter.writeDeliver(message, props, this._channelId, deliveryTag, target.getConsumerTag());
            AMQPConnection_0_8Impl.this.registerMessageDelivered(size);
            if (target.getChannel().isTransactional()) {
                AMQPConnection_0_8Impl.this.registerTransactedMessageDelivered();
            }
            return size;
        }
    }

    static enum ConnectionState {
        INIT,
        AWAIT_START_OK,
        AWAIT_SECURE_OK,
        AWAIT_TUNE_OK,
        AWAIT_OPEN,
        OPEN;

    }
}

