/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQNoMethodHandlerException;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;

public class AMQProtocolEngine
implements ServerProtocolEngine,
AMQProtocolSession<AMQProtocolEngine> {
    private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
    private static final int CHANNEL_CACHE_SIZE = 255;
    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 66560;
    private final Port<?> _port;
    private final long _creationTime;
    private AMQShortString _contextKey;
    private String _clientVersion = null;
    private String _clientProduct = null;
    private String _remoteProcessPid = null;
    private VirtualHostImpl<?, ?, ?> _virtualHost;
    private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
    private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList();
    private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[256];
    private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = new HashSet<AMQChannel<AMQProtocolEngine>>();
    private final AMQStateManager _stateManager;
    private AMQDecoder _decoder;
    private SaslServer _saslServer;
    private volatile boolean _closed;
    private long _maxNoOfChannels;
    private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
    private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry((ProtocolVersion)this._protocolVersion);
    private final List<Action<? super AMQProtocolEngine>> _taskList = new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
    private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
    private ProtocolOutputConverter _protocolOutputConverter;
    private final Subject _authorizedSubject = new Subject();
    private MethodDispatcher _dispatcher;
    private final long _connectionID;
    private Object _reference = new Object();
    private LogSubject _logSubject;
    private long _lastIoTime;
    private long _writtenBytes;
    private long _maxFrameSize;
    private final AtomicBoolean _closing = new AtomicBoolean(false);
    private final StatisticsCounter _messagesDelivered;
    private final StatisticsCounter _dataDelivered;
    private final StatisticsCounter _messagesReceived;
    private final StatisticsCounter _dataReceived;
    private NetworkConnection _network;
    private Sender<ByteBuffer> _sender;
    private volatile boolean _deferFlush;
    private long _lastReceivedTime;
    private boolean _blocking;
    private final ReentrantLock _receivedLock;
    private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
    private final Broker<?> _broker;
    private final Transport _transport;
    private volatile boolean _closeWhenNoRoute;
    private volatile boolean _stopped;
    private long _readBytes;
    private boolean _authenticated;
    private boolean _compressionSupported;
    private int _messageCompressionThreshold;
    private final byte[] _reusableBytes = new byte[66560];
    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(this._reusableBytes);
    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(this._reusableBytes);

    public AMQProtocolEngine(Broker broker, final NetworkConnection network, long connectionId, Port port, Transport transport) {
        this._broker = broker;
        this._port = port;
        this._transport = transport;
        this._maxNoOfChannels = broker.getConnection_sessionCountLimit();
        this._receivedLock = new ReentrantLock();
        this._stateManager = new AMQStateManager(broker, this);
        this._decoder = new AMQDecoder(true, (AMQVersionAwareProtocolSession)this);
        this._connectionID = connectionId;
        this._logSubject = new ConnectionLogSubject((AMQConnectionModel)this);
        this._authorizedSubject.getPrincipals().add((Principal)new ConnectionPrincipal((AMQConnectionModel)this));
        this.runAsSubject(new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                AMQProtocolEngine.this.setNetworkConnection(network);
                AMQProtocolEngine.this.getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, (boolean)false, (boolean)false, (boolean)false, (boolean)false));
                AMQProtocolEngine.this._closeWhenNoRoute = AMQProtocolEngine.this._broker.getConnection_closeWhenNoRoute();
                return null;
            }
        });
        this._messagesDelivered = new StatisticsCounter("messages-delivered-" + this.getSessionID());
        this._dataDelivered = new StatisticsCounter("data-delivered-" + this.getSessionID());
        this._messagesReceived = new StatisticsCounter("messages-received-" + this.getSessionID());
        this._dataReceived = new StatisticsCounter("data-received-" + this.getSessionID());
        this._creationTime = System.currentTimeMillis();
    }

    private <T> T runAsSubject(PrivilegedAction<T> action) {
        return Subject.doAs(this.getAuthorizedSubject(), action);
    }

    private boolean runningAsSubject() {
        return this.getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
    }

    public Subject getSubject() {
        return this._authorizedSubject;
    }

    public void setNetworkConnection(NetworkConnection network) {
        this.setNetworkConnection(network, (Sender<ByteBuffer>)network.getSender());
    }

    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) {
        this._network = network;
        this._sender = sender;
    }

    @Override
    public long getSessionID() {
        return this._connectionID;
    }

    @Override
    public void setMaxFrameSize(long frameMax) {
        this._maxFrameSize = frameMax;
        this._decoder.setMaxFrameSize(frameMax);
    }

    @Override
    public long getMaxFrameSize() {
        return this._maxFrameSize;
    }

    @Override
    public boolean isClosing() {
        return this._closing.get();
    }

    @Override
    public synchronized void flushBatched() {
        this._sender.flush();
    }

    @Override
    public ClientDeliveryMethod createDeliveryMethod(int channelId) {
        return new WriteDeliverMethod(channelId);
    }

    public void received(final ByteBuffer msg) {
        Subject.doAs(this._authorizedSubject, new PrivilegedAction<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void run() {
                long arrivalTime = System.currentTimeMillis();
                if (!AMQProtocolEngine.this._authenticated && arrivalTime - AMQProtocolEngine.this._creationTime > (Long)AMQProtocolEngine.this._port.getContextValue(Long.class, "connection.maximumAuthenticationDelay")) {
                    _logger.warn((Object)("Connection has taken more than " + AMQProtocolEngine.this._port.getContextValue(Long.class, "connection.maximumAuthenticationDelay") + "ms to establish identity.  Closing as possible DoS."));
                    AMQProtocolEngine.this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
                    AMQProtocolEngine.this.closeProtocolSession();
                }
                AMQProtocolEngine.this._lastReceivedTime = arrivalTime;
                AMQProtocolEngine.this._lastIoTime = arrivalTime;
                AMQProtocolEngine.this._readBytes += msg.remaining();
                AMQProtocolEngine.this._receivedLock.lock();
                try {
                    ArrayList dataBlocks = AMQProtocolEngine.this._decoder.decodeBuffer(msg);
                    for (AMQDataBlock dataBlock : dataBlocks) {
                        try {
                            AMQProtocolEngine.this.dataBlockReceived(dataBlock);
                        }
                        catch (AMQConnectionException e) {
                            if (!_logger.isDebugEnabled()) break;
                            _logger.debug((Object)"Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", (Throwable)e);
                            break;
                        }
                        catch (Exception e) {
                            _logger.error((Object)"Unexpected exception when processing datablock", (Throwable)e);
                            AMQProtocolEngine.this.closeProtocolSession();
                            break;
                        }
                    }
                    AMQProtocolEngine.this.receivedComplete();
                }
                catch (ConnectionScopedRuntimeException e) {
                    _logger.error((Object)"Unexpected exception", (Throwable)e);
                    AMQProtocolEngine.this.closeProtocolSession();
                }
                catch (AMQProtocolVersionException e) {
                    _logger.error((Object)"Unexpected protocol version", (Throwable)e);
                    AMQProtocolEngine.this.closeProtocolSession();
                }
                catch (AMQFrameDecodingException e) {
                    _logger.error((Object)"Frame decoding", (Throwable)e);
                    AMQProtocolEngine.this.closeProtocolSession();
                }
                catch (IOException e) {
                    _logger.error((Object)"I/O Exception", (Throwable)e);
                    AMQProtocolEngine.this.closeProtocolSession();
                }
                finally {
                    AMQProtocolEngine.this._receivedLock.unlock();
                }
                return null;
            }
        });
    }

    private void receivedComplete() {
        RuntimeException exception = null;
        for (AMQChannel<AMQProtocolEngine> channel : this._channelsForCurrentMessage) {
            try {
                channel.receivedComplete();
            }
            catch (RuntimeException exceptionForThisChannel) {
                if (exception == null) {
                    exception = exceptionForThisChannel;
                }
                _logger.error((Object)("Error informing channel that receiving is complete. Channel: " + channel), (Throwable)exceptionForThisChannel);
            }
        }
        this._channelsForCurrentMessage.clear();
        if (exception != null) {
            throw exception;
        }
    }

    private void dataBlockReceived(AMQDataBlock message) throws Exception {
        if (message instanceof ProtocolInitiation) {
            this.protocolInitiationReceived((ProtocolInitiation)message);
        } else if (message instanceof AMQFrame) {
            AMQFrame frame = (AMQFrame)message;
            this.frameReceived(frame);
        } else {
            throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message);
        }
    }

    private void frameReceived(AMQFrame frame) throws AMQException {
        int channelId = frame.getChannel();
        AMQChannel<AMQProtocolEngine> amqChannel = this._channelMap.get(channelId);
        if (amqChannel != null) {
            this._channelsForCurrentMessage.add(amqChannel);
        }
        AMQBody body = frame.getBodyFrame();
        long startTime = 0L;
        String frameToString = null;
        if (_logger.isDebugEnabled()) {
            startTime = System.currentTimeMillis();
            frameToString = frame.toString();
            _logger.debug((Object)("RECV: " + frame));
        }
        if (this.channelAwaitingClosure(channelId)) {
            if (frame.getBodyFrame() instanceof ChannelCloseOkBody) {
                if (_logger.isInfoEnabled()) {
                    _logger.info((Object)("Channel[" + channelId + "] awaiting closure - processing close-ok"));
                }
            } else {
                return;
            }
        }
        try {
            body.handle(channelId, (AMQVersionAwareProtocolSession)this);
        }
        catch (AMQConnectionException e) {
            _logger.info((Object)(e.getMessage() + " whilst processing frame: " + body));
            this.closeConnection(channelId, e);
            throw e;
        }
        catch (AMQException e) {
            this.closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
            throw e;
        }
        catch (TransportException e) {
            this.closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
            throw e;
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString));
        }
    }

    private synchronized void protocolInitiationReceived(ProtocolInitiation pi) {
        this._decoder.setExpectProtocolInitiation(false);
        try {
            this.getEventLogger().message(ConnectionMessages.OPEN(null, (String)(pi.getProtocolMajor() + "-" + pi.getProtocolMinor()), null, null, (boolean)false, (boolean)true, (boolean)false, (boolean)false));
            ProtocolVersion pv = pi.checkVersion();
            this.setProtocolVersion(pv);
            StringBuilder mechanismBuilder = new StringBuilder();
            for (String mechanismName : this._broker.getSubjectCreator(this.getLocalAddress(), this._transport.isSecure()).getMechanisms()) {
                if (mechanismBuilder.length() != 0) {
                    mechanismBuilder.append(' ');
                }
                mechanismBuilder.append(mechanismName);
            }
            String mechanisms = mechanismBuilder.toString();
            String locales = "en_US";
            FieldTable serverProperties = FieldTableFactory.newFieldTable();
            serverProperties.setString("product", QpidProperties.getProductName());
            serverProperties.setString("version", QpidProperties.getReleaseVersion());
            serverProperties.setString("qpid.build", QpidProperties.getBuildVersion());
            serverProperties.setString("qpid.instance_name", this._broker.getName());
            serverProperties.setString("qpid.close_when_no_route", String.valueOf(this._closeWhenNoRoute));
            serverProperties.setString("qpid.message_compression_supported", String.valueOf(this._broker.isMessageCompressionEnabled()));
            ConnectionStartBody responseBody = this.getMethodRegistry().createConnectionStartBody((short)this.getProtocolMajorVersion(), (short)pv.getActualMinorVersion(), serverProperties, mechanisms.getBytes(), locales.getBytes());
            this._sender.send((Object)this.asByteBuffer((AMQDataBlock)responseBody.generateFrame(0)));
            this._sender.flush();
        }
        catch (AMQException e) {
            _logger.info((Object)("Received unsupported protocol initiation for protocol version: " + this.getProtocolVersion()));
            this._sender.send((Object)this.asByteBuffer((AMQDataBlock)new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
            this._sender.flush();
        }
    }

    private ByteBuffer asByteBuffer(AMQDataBlock block) {
        ByteBuffer buf;
        int size = (int)block.getSize();
        byte[] data = size > 66560 ? new byte[size] : this._reusableBytes;
        this._reusableDataOutput.setBuffer(data);
        try {
            block.writePayload((DataOutput)this._reusableDataOutput);
        }
        catch (IOException e) {
            throw new ServerScopedRuntimeException((Throwable)e);
        }
        if (size <= 66560) {
            buf = this._reusableByteBuffer;
            buf.position(0);
        } else {
            buf = ByteBuffer.wrap(data);
        }
        buf.limit(this._reusableDataOutput.length());
        return buf;
    }

    public void methodFrameReceived(int channelId, AMQMethodBody methodBody) {
        block10: {
            AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody);
            try {
                try {
                    boolean wasAnyoneInterested = this._stateManager.methodReceived(evt);
                    if (!wasAnyoneInterested) {
                        throw new AMQNoMethodHandlerException((AMQMethodEvent<AMQMethodBody>)evt);
                    }
                }
                catch (AMQChannelException e) {
                    if (this.getChannel(channelId) != null) {
                        if (_logger.isInfoEnabled()) {
                            _logger.info((Object)("Closing channel due to: " + e.getMessage()));
                        }
                        this.writeFrame((AMQDataBlock)e.getCloseFrame(channelId));
                        this.closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
                        break block10;
                    }
                    if (_logger.isDebugEnabled()) {
                        _logger.debug((Object)("ChannelException occurred on non-existent channel:" + e.getMessage()));
                    }
                    if (_logger.isInfoEnabled()) {
                        _logger.info((Object)("Closing connection due to: " + e.getMessage()));
                    }
                    AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, AMQConstant.CHANNEL_ERROR.getName().toString());
                    _logger.info((Object)(e.getMessage() + " whilst processing:" + methodBody));
                    this.closeConnection(channelId, ce);
                }
                catch (AMQConnectionException e) {
                    _logger.info((Object)(e.getMessage() + " whilst processing:" + methodBody));
                    this.closeConnection(channelId, e);
                }
            }
            catch (Exception e) {
                _logger.error((Object)"Unexpected exception while processing frame.  Closing connection.", (Throwable)e);
                this.closeProtocolSession();
            }
        }
    }

    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException {
        AMQChannel<AMQProtocolEngine> channel = this.getAndAssertChannel(channelId);
        channel.publishContentHeader(body);
    }

    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException {
        AMQChannel<AMQProtocolEngine> channel = this.getAndAssertChannel(channelId);
        channel.publishContentBody(body);
    }

    public void heartbeatBodyReceived(int channelId, HeartbeatBody body) {
    }

    public synchronized void writeFrame(AMQDataBlock frame) {
        long time;
        ByteBuffer buf = this.asByteBuffer(frame);
        this._writtenBytes += (long)buf.remaining();
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("SEND: " + frame));
        }
        this._sender.send((Object)buf);
        this._lastIoTime = time = System.currentTimeMillis();
        this._lastWriteTime.set(time);
        if (!this._deferFlush) {
            this._sender.flush();
        }
    }

    @Override
    public AMQShortString getContextKey() {
        return this._contextKey;
    }

    @Override
    public void setContextKey(AMQShortString contextKey) {
        this._contextKey = contextKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<AMQChannel<AMQProtocolEngine>> getChannels() {
        Map<Integer, AMQChannel<AMQProtocolEngine>> map = this._channelMap;
        synchronized (map) {
            return new ArrayList<AMQChannel<AMQProtocolEngine>>(this._channelMap.values());
        }
    }

    public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException {
        AMQChannel<AMQProtocolEngine> channel = this.getChannel(channelId);
        if (channel == null) {
            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
        }
        return channel;
    }

    @Override
    public AMQChannel<AMQProtocolEngine> getChannel(int channelId) {
        AMQChannel<AMQProtocolEngine> channel;
        AMQChannel<AMQProtocolEngine> aMQChannel = channel = (channelId & 0xFF) == channelId ? this._cachedChannels[channelId] : this._channelMap.get(channelId);
        if (channel == null || channel.isClosing()) {
            return null;
        }
        return channel;
    }

    @Override
    public boolean channelAwaitingClosure(int channelId) {
        return !this._closingChannelsList.isEmpty() && this._closingChannelsList.containsKey(channelId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException {
        if (this._closed) {
            throw new AMQException("Session is closed");
        }
        int channelId = channel.getChannelId();
        if (this._closingChannelsList.containsKey(channelId)) {
            throw new AMQException("Session is marked awaiting channel close");
        }
        if ((long)this._channelMap.size() == this._maxNoOfChannels) {
            String errorMessage = this.toString() + ": maximum number of channels has been reached (" + this._maxNoOfChannels + "); can't create channel";
            _logger.error((Object)errorMessage);
            throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
        }
        Map<Integer, AMQChannel<AMQProtocolEngine>> map = this._channelMap;
        synchronized (map) {
            this._channelMap.put(channel.getChannelId(), channel);
            this.sessionAdded(channel);
            if (this._blocking) {
                channel.block();
            }
        }
        if ((channelId & 0xFF) == channelId) {
            this._cachedChannels[channelId] = channel;
        }
    }

    private void sessionAdded(AMQSessionModel<?, ?> session) {
        for (SessionModelListener l : this._sessionListeners) {
            l.sessionAdded(session);
        }
    }

    private void sessionRemoved(AMQSessionModel<?, ?> session) {
        for (SessionModelListener l : this._sessionListeners) {
            l.sessionRemoved(session);
        }
    }

    @Override
    public Long getMaximumNumberOfChannels() {
        return this._maxNoOfChannels;
    }

    @Override
    public void setMaximumNumberOfChannels(Long value) {
        this._maxNoOfChannels = value;
    }

    @Override
    public void closeChannel(int channelId) {
        this.closeChannel(channelId, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeChannel(int channelId, AMQConstant cause, String message) {
        AMQChannel<AMQProtocolEngine> channel = this.getChannel(channelId);
        if (channel == null) {
            throw new IllegalArgumentException("Unknown channel id");
        }
        try {
            channel.close(cause, message);
            this.markChannelAwaitingCloseOk(channelId);
        }
        finally {
            this.removeChannel(channelId);
        }
    }

    @Override
    public void closeChannelOk(int channelId) {
        this._closingChannelsList.remove(channelId);
    }

    private void markChannelAwaitingCloseOk(int channelId) {
        this._closingChannelsList.put(channelId, System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeChannel(int channelId) {
        AMQChannel<AMQProtocolEngine> session;
        Map<Integer, AMQChannel<AMQProtocolEngine>> map = this._channelMap;
        synchronized (map) {
            session = this._channelMap.remove(channelId);
            if ((channelId & 0xFF) == channelId) {
                this._cachedChannels[channelId] = null;
            }
        }
        this.sessionRemoved(session);
    }

    @Override
    public void initHeartbeats(int delay) {
        if (delay > 0) {
            this._network.setMaxWriteIdle(delay);
            this._network.setMaxReadIdle(BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
        } else {
            this._network.setMaxWriteIdle(0);
            this._network.setMaxReadIdle(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeAllChannels() {
        for (AMQChannel<AMQProtocolEngine> channel : this.getChannels()) {
            channel.close();
        }
        Map<Integer, AMQChannel<AMQProtocolEngine>> i$ = this._channelMap;
        synchronized (i$) {
            this._channelMap.clear();
        }
        for (int i = 0; i <= 255; ++i) {
            this._cachedChannels[i] = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void closeSession() {
        if (this.runningAsSubject()) {
            if (this._closing.compareAndSet(false, true)) {
                this._receivedLock.lock();
                try {
                    this.receivedComplete();
                }
                finally {
                    this._receivedLock.unlock();
                }
                if (this._closed) return;
                if (this._virtualHost != null) {
                    this._virtualHost.getConnectionRegistry().deregisterConnection((AMQConnectionModel)this);
                }
                this.closeAllChannels();
                for (Action<? super AMQProtocolEngine> task : this._taskList) {
                    task.performAction((Object)this);
                }
                AMQProtocolEngine aMQProtocolEngine = this;
                synchronized (aMQProtocolEngine) {
                    this._closed = true;
                    this.notifyAll();
                }
                this.getEventLogger().message(this._logSubject, ConnectionMessages.CLOSE());
                return;
            }
            AMQProtocolEngine aMQProtocolEngine = this;
            synchronized (aMQProtocolEngine) {
                boolean lockHeld = this._receivedLock.isHeldByCurrentThread();
                while (!this._closed) {
                    try {
                        if (lockHeld) {
                            this._receivedLock.unlock();
                        }
                        this.wait(1000L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                    finally {
                        if (!lockHeld) continue;
                        this._receivedLock.lock();
                    }
                }
                return;
            }
        }
        this.runAsSubject(new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQProtocolEngine.this.closeSession();
                return null;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(int channelId, AMQConnectionException e) {
        try {
            if (_logger.isInfoEnabled()) {
                _logger.info((Object)("Closing connection due to: " + (Object)((Object)e)));
            }
            this.markChannelAwaitingCloseOk(channelId);
            this.closeSession();
        }
        finally {
            try {
                this._stateManager.changeState(AMQState.CONNECTION_CLOSING);
                this.writeFrame((AMQDataBlock)e.getCloseFrame(channelId));
            }
            finally {
                this.closeProtocolSession();
            }
        }
    }

    @Override
    public void closeProtocolSession() {
        this._network.close();
        try {
            this._stateManager.changeState(AMQState.CONNECTION_CLOSED);
        }
        catch (ConnectionScopedRuntimeException e) {
            _logger.info((Object)e.getMessage());
        }
        catch (TransportException e) {
            _logger.info((Object)e.getMessage());
        }
    }

    public String toString() {
        return this.getRemoteAddress() + "(" + (this.getAuthorizedPrincipal() == null ? "?" : this.getAuthorizedPrincipal().getName() + ")");
    }

    @Override
    public Object getKey() {
        return this.getRemoteAddress();
    }

    @Override
    public String getLocalFQDN() {
        SocketAddress address = this._network.getLocalAddress();
        if (address instanceof InetSocketAddress) {
            return ((InetSocketAddress)address).getHostName();
        }
        throw new IllegalArgumentException("Unsupported socket address class: " + address);
    }

    @Override
    public SaslServer getSaslServer() {
        return this._saslServer;
    }

    @Override
    public void setSaslServer(SaslServer saslServer) {
        this._saslServer = saslServer;
    }

    @Override
    public void setClientProperties(FieldTable clientProperties) {
        if (clientProperties != null) {
            String compressionSupported;
            String closeWhenNoRoute = clientProperties.getString("qpid.close_when_no_route");
            if (closeWhenNoRoute != null) {
                this._closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)("Client set closeWhenNoRoute=" + this._closeWhenNoRoute + " for protocol engine " + this));
                }
            }
            if ((compressionSupported = clientProperties.getString("qpid.message_compression_supported")) != null) {
                this._compressionSupported = Boolean.parseBoolean(compressionSupported);
                if (_logger.isDebugEnabled()) {
                    _logger.debug((Object)("Client set compressionSupported=" + this._compressionSupported + " for protocol engine " + this));
                }
            }
            this._clientVersion = clientProperties.getString("version");
            this._clientProduct = clientProperties.getString("product");
            this._remoteProcessPid = clientProperties.getString("qpid.client_pid");
            String clientId = clientProperties.getString("instance");
            if (clientId != null) {
                this.setContextKey(new AMQShortString(clientId));
            }
            this.getEventLogger().message(ConnectionMessages.OPEN((String)clientId, (String)this._protocolVersion.toString(), (String)this._clientVersion, (String)this._clientProduct, (boolean)true, (boolean)true, (boolean)true, (boolean)true));
        }
    }

    private void setProtocolVersion(ProtocolVersion pv) {
        this._protocolVersion = pv;
        this._methodRegistry = MethodRegistry.getMethodRegistry((ProtocolVersion)this._protocolVersion);
        this._protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
        this._dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this._stateManager, this._protocolVersion);
    }

    public byte getProtocolMajorVersion() {
        return this._protocolVersion.getMajorVersion();
    }

    public ProtocolVersion getProtocolVersion() {
        return this._protocolVersion;
    }

    public byte getProtocolMinorVersion() {
        return this._protocolVersion.getMinorVersion();
    }

    public MethodRegistry getRegistry() {
        return this.getMethodRegistry();
    }

    @Override
    public VirtualHostImpl<?, ?, ?> getVirtualHost() {
        return this._virtualHost;
    }

    @Override
    public void setVirtualHost(VirtualHostImpl<?, ?, ?> virtualHost) throws AMQException {
        this._virtualHost = virtualHost;
        this._virtualHost.getConnectionRegistry().registerConnection((AMQConnectionModel)this);
        this._messageCompressionThreshold = (Integer)virtualHost.getContextValue(Integer.class, "connection.messageCompressionThresholdSize");
        if (this._messageCompressionThreshold <= 0) {
            this._messageCompressionThreshold = Integer.MAX_VALUE;
        }
    }

    public void addDeleteTask(Action<? super AMQProtocolEngine> task) {
        this._taskList.add(task);
    }

    public void removeDeleteTask(Action<? super AMQProtocolEngine> task) {
        this._taskList.remove(task);
    }

    @Override
    public ProtocolOutputConverter getProtocolOutputConverter() {
        return this._protocolOutputConverter;
    }

    @Override
    public void setAuthorizedSubject(Subject authorizedSubject) {
        if (authorizedSubject == null) {
            throw new IllegalArgumentException("authorizedSubject cannot be null");
        }
        this._authenticated = true;
        this._authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
        this._authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
        this._authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
    }

    public Subject getAuthorizedSubject() {
        return this._authorizedSubject;
    }

    public Principal getAuthorizedPrincipal() {
        return this._authorizedSubject.getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject((Subject)this._authorizedSubject);
    }

    @Override
    public SocketAddress getRemoteAddress() {
        return this._network.getRemoteAddress();
    }

    public String getRemoteProcessPid() {
        return this._remoteProcessPid;
    }

    @Override
    public SocketAddress getLocalAddress() {
        return this._network.getLocalAddress();
    }

    @Override
    public Principal getPeerPrincipal() {
        return this._network.getPeerPrincipal();
    }

    @Override
    public MethodRegistry getMethodRegistry() {
        return this._methodRegistry;
    }

    @Override
    public MethodDispatcher getMethodDispatcher() {
        return this._dispatcher;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed() {
        try {
            try {
                this.closeSession();
            }
            finally {
                this.closeProtocolSession();
            }
        }
        catch (ConnectionScopedRuntimeException e) {
            _logger.error((Object)"Could not close protocol engine", (Throwable)e);
        }
        catch (TransportException e) {
            _logger.error((Object)"Could not close protocol engine", (Throwable)e);
        }
    }

    public void readerIdle() {
        Subject.doAs(this._authorizedSubject, new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQProtocolEngine.this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
                AMQProtocolEngine.this._network.close();
                return null;
            }
        });
    }

    public synchronized void writerIdle() {
        this.writeFrame((AMQDataBlock)HeartbeatBody.FRAME);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exception(Throwable throwable) {
        if (throwable instanceof AMQProtocolHeaderException) {
            this.writeFrame((AMQDataBlock)new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
            this._sender.close();
            _logger.error((Object)("Error in protocol initiation " + this + ":" + this.getRemoteAddress() + " :" + throwable.getMessage()), throwable);
        } else if (throwable instanceof IOException) {
            _logger.info((Object)("IOException caught in " + this + ", connection closed implicitly: " + throwable));
        } else {
            try {
                _logger.error((Object)("Exception caught in " + this + ", closing connection explicitly: " + throwable), throwable);
                MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry((ProtocolVersion)this.getProtocolVersion());
                ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, new AMQShortString(throwable.getMessage()), 0, 0);
                this.writeFrame((AMQDataBlock)closeBody.generateFrame(0));
                this._sender.close();
            }
            finally {
                if (throwable instanceof Error) {
                    throw (Error)throwable;
                }
                if (throwable instanceof ServerScopedRuntimeException) {
                    throw (ServerScopedRuntimeException)throwable;
                }
            }
        }
    }

    public void setSender(Sender<ByteBuffer> sender) {
    }

    public long getReadBytes() {
        return this._readBytes;
    }

    @Override
    public long getWrittenBytes() {
        return this._writtenBytes;
    }

    @Override
    public long getLastIoTime() {
        return this._lastIoTime;
    }

    public Port<?> getPort() {
        return this._port;
    }

    public Transport getTransport() {
        return this._transport;
    }

    public void stop() {
        this._stopped = true;
    }

    public boolean isStopped() {
        return this._stopped;
    }

    public String getVirtualHostName() {
        return this._virtualHost == null ? null : this._virtualHost.getName();
    }

    @Override
    public long getLastReceivedTime() {
        return this._lastReceivedTime;
    }

    @Override
    public String getClientVersion() {
        return this._clientVersion;
    }

    public String getClientProduct() {
        return this._clientProduct;
    }

    public long getSessionCountLimit() {
        return this.getMaximumNumberOfChannels();
    }

    public boolean isDurable() {
        return false;
    }

    public long getConnectionId() {
        return this.getSessionID();
    }

    public String getAddress() {
        return String.valueOf(this.getRemoteAddress());
    }

    public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) {
        int channelId = session.getChannelId();
        this.closeChannel(channelId, cause, message);
        MethodRegistry methodRegistry = this.getMethodRegistry();
        ChannelCloseBody responseBody = methodRegistry.createChannelCloseBody(cause.getCode(), new AMQShortString(message), 0, 0);
        this.writeFrame((AMQDataBlock)responseBody.generateFrame(channelId));
    }

    public void close(AMQConstant cause, String message) {
        this.closeConnection(0, new AMQConnectionException(cause, message, 0, 0, this.getProtocolOutputConverter().getProtocolMajorVersion(), this.getProtocolOutputConverter().getProtocolMinorVersion(), null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void block() {
        Map<Integer, AMQChannel<AMQProtocolEngine>> map = this._channelMap;
        synchronized (map) {
            if (!this._blocking) {
                this._blocking = true;
                for (AMQChannel<AMQProtocolEngine> channel : this._channelMap.values()) {
                    channel.block();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unblock() {
        Map<Integer, AMQChannel<AMQProtocolEngine>> map = this._channelMap;
        synchronized (map) {
            if (this._blocking) {
                this._blocking = false;
                for (AMQChannel<AMQProtocolEngine> channel : this._channelMap.values()) {
                    channel.unblock();
                }
            }
        }
    }

    public boolean isClosed() {
        return this._closed;
    }

    public List<AMQChannel<AMQProtocolEngine>> getSessionModels() {
        return new ArrayList<AMQChannel<AMQProtocolEngine>>(this.getChannels());
    }

    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    public void registerMessageDelivered(long messageSize) {
        this._messagesDelivered.registerEvent(1L);
        this._dataDelivered.registerEvent(messageSize);
        this._virtualHost.registerMessageDelivered(messageSize);
    }

    public void registerMessageReceived(long messageSize, long timestamp) {
        this._messagesReceived.registerEvent(1L, timestamp);
        this._dataReceived.registerEvent(messageSize, timestamp);
        this._virtualHost.registerMessageReceived(messageSize, timestamp);
    }

    public StatisticsCounter getMessageReceiptStatistics() {
        return this._messagesReceived;
    }

    public StatisticsCounter getDataReceiptStatistics() {
        return this._dataReceived;
    }

    public StatisticsCounter getMessageDeliveryStatistics() {
        return this._messagesDelivered;
    }

    public StatisticsCounter getDataDeliveryStatistics() {
        return this._dataDelivered;
    }

    public void resetStatistics() {
        this._messagesDelivered.reset();
        this._dataDelivered.reset();
        this._messagesReceived.reset();
        this._dataReceived.reset();
    }

    public boolean isSessionNameUnique(byte[] name) {
        return true;
    }

    public String getRemoteAddressString() {
        return String.valueOf(this.getRemoteAddress());
    }

    public String getClientId() {
        return String.valueOf(this.getContextKey());
    }

    public String getRemoteContainerName() {
        return String.valueOf(this.getContextKey());
    }

    public void addSessionListener(SessionModelListener listener) {
        this._sessionListeners.add(listener);
    }

    public void removeSessionListener(SessionModelListener listener) {
        this._sessionListeners.remove(listener);
    }

    @Override
    public void setDeferFlush(boolean deferFlush) {
        this._deferFlush = deferFlush;
    }

    @Override
    public Object getReference() {
        return this._reference;
    }

    @Override
    public Lock getReceivedLock() {
        return this._receivedLock;
    }

    public long getLastReadTime() {
        return this._lastReceivedTime;
    }

    public long getLastWriteTime() {
        return this._lastWriteTime.get();
    }

    @Override
    public boolean isCloseWhenNoRoute() {
        return this._closeWhenNoRoute;
    }

    @Override
    public boolean isCompressionSupported() {
        return this._compressionSupported && this._broker.isMessageCompressionEnabled();
    }

    @Override
    public int getMessageCompressionThreshold() {
        return this._messageCompressionThreshold;
    }

    public EventLogger getEventLogger() {
        if (this._virtualHost != null) {
            return this._virtualHost.getEventLogger();
        }
        return this._broker.getEventLogger();
    }

    public final class WriteDeliverMethod
    implements ClientDeliveryMethod {
        private final int _channelId;

        public WriteDeliverMethod(int channelId) {
            this._channelId = channelId;
        }

        @Override
        public long deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) {
            long size = AMQProtocolEngine.this._protocolOutputConverter.writeDeliver(message, props, this._channelId, deliveryTag, new AMQShortString(sub.getName()));
            AMQProtocolEngine.this.registerMessageDelivered(size);
            return size;
        }
    }
}

