/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.ExtractResendAndRequeue;
import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;

public class AMQChannel<T extends AMQProtocolSession<T>>
implements AMQSessionModel<AMQChannel<T>, T>,
AsyncAutoCommitTransaction.FutureRecorder {
    public static final int DEFAULT_PREFETCH = 4096;
    private static final Logger _logger = Logger.getLogger(AMQChannel.class);
    private boolean _messageAuthorizationRequired = Boolean.getBoolean("qpid.broker_msg_auth");
    private final int _channelId;
    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0L, 0L);
    private long _deliveryTag = 0L;
    private AMQQueue _defaultQueue;
    private int _consumerTag;
    private IncomingMessage _currentMessage;
    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
    private final MessageStore _messageStore;
    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList();
    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(4096);
    private final AtomicBoolean _suspended = new AtomicBoolean(false);
    private ServerTransaction _transaction;
    private final AtomicLong _txnStarts = new AtomicLong(0L);
    private final AtomicLong _txnCommits = new AtomicLong(0L);
    private final AtomicLong _txnRejects = new AtomicLong(0L);
    private final AtomicLong _txnCount = new AtomicLong(0L);
    private final T _session;
    private AtomicBoolean _closing = new AtomicBoolean(false);
    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet());
    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private LogSubject _logSubject;
    private volatile boolean _rollingBack;
    private static final Runnable NULL_TASK = new Runnable(){

        @Override
        public void run() {
        }
    };
    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
    private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
    private long _createTime = System.currentTimeMillis();
    private final ClientDeliveryMethod _clientDeliveryMethod;
    private final TransactionTimeoutHelper _transactionTimeoutHelper;
    private final UUID _id = UUID.randomUUID();
    private final List<Action<? super AMQChannel<T>>> _taskList = new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
    private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
    private final ImmediateAction _immediateAction = new ImmediateAction();
    private Subject _subject;
    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList();
    private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
    private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList();
    private Session<?> _modelObject;
    private final String id = "(" + System.identityHashCode(this) + ")";
    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod(){

        @Override
        public void recordMessageDelivery(ConsumerImpl sub, MessageInstance entry, long deliveryTag) {
            AMQChannel.this.addUnacknowledgedMessage(entry, deliveryTag, sub);
        }
    };

    public AMQChannel(T session, int channelId, MessageStore messageStore) throws AMQException {
        this._session = session;
        this._channelId = channelId;
        this._subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), session.getAuthorizedSubject().getPublicCredentials(), session.getAuthorizedSubject().getPrivateCredentials());
        this._subject.getPrincipals().add((Principal)new SessionPrincipal((AMQSessionModel)this));
        this._logSubject = new ChannelLogSubject((AMQSessionModel)this);
        this._messageStore = messageStore;
        this._transaction = new AsyncAutoCommitTransaction(this._messageStore, (AsyncAutoCommitTransaction.FutureRecorder)this);
        this._clientDeliveryMethod = session.createDeliveryMethod(this._channelId);
        this._transactionTimeoutHelper = new TransactionTimeoutHelper(this._logSubject, new TransactionTimeoutHelper.CloseAction(){

            public void doTimeoutAction(String reason) {
                try {
                    AMQChannel.this.closeConnection(reason);
                }
                catch (AMQException e) {
                    throw new ConnectionScopedRuntimeException((Throwable)e);
                }
            }
        }, (EventLoggerProvider)this.getVirtualHost());
        Subject.doAs(this._subject, new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                AMQChannel.this.getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
                return null;
            }
        });
    }

    public void setLocalTransactional() {
        this._transaction = new LocalTransaction(this._messageStore, new LocalTransaction.ActivityTimeAccessor(){

            public long getActivityTime() {
                return AMQChannel.this._session.getLastReceivedTime();
            }
        });
        this._txnStarts.incrementAndGet();
    }

    public boolean isTransactional() {
        return this._transaction.isTransactional();
    }

    public void receivedComplete() {
        this.sync();
    }

    private void incrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(0L, 1L);
        }
    }

    private void decrementOutstandingTxnsIfNecessary() {
        if (this.isTransactional()) {
            this._txnCount.compareAndSet(1L, 0L);
        }
    }

    public Long getTxnCommits() {
        return this._txnCommits.get();
    }

    public Long getTxnRejects() {
        return this._txnRejects.get();
    }

    public Long getTxnCount() {
        return this._txnCount.get();
    }

    public Long getTxnStart() {
        return this._txnStarts.get();
    }

    public int getChannelId() {
        return this._channelId;
    }

    public void setPublishFrame(MessagePublishInfo info, MessageDestination e) {
        String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
        VirtualHostImpl virtualHost = this.getVirtualHost();
        SecurityManager securityManager = virtualHost.getSecurityManager();
        securityManager.authorisePublish(info.isImmediate(), routingKey, e.getName(), virtualHost.getName());
        this._currentMessage = new IncomingMessage(info);
        this._currentMessage.setMessageDestination(e);
    }

    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("Content header received on channel " + this._channelId));
        }
        this._currentMessage.setContentHeaderBody(contentHeaderBody);
        this.deliverCurrentMessageIfComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliverCurrentMessageIfComplete() throws AMQException {
        if (this._currentMessage.allContentReceived()) {
            try {
                MessageMetaData messageMetaData = new MessageMetaData(this._currentMessage.getMessagePublishInfo(), this._currentMessage.getContentHeader(), this.getProtocolSession().getLastReceivedTime());
                StoredMessage handle = this._messageStore.addMessage((StorableMessageMetaData)messageMetaData);
                final AMQMessage amqMessage = this.createAMQMessage(this._currentMessage, (StoredMessage<MessageMetaData>)handle);
                MessageReference reference = amqMessage.newReference();
                try {
                    int bodyCount = this._currentMessage.getBodyCount();
                    if (bodyCount > 0) {
                        long bodyLengthReceived = 0L;
                        for (int i = 0; i < bodyCount; ++i) {
                            ContentBody contentChunk = this._currentMessage.getContentChunk(i);
                            handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
                            bodyLengthReceived += (long)contentChunk.getSize();
                        }
                    }
                    if (!this.checkMessageUserId(this._currentMessage.getContentHeader())) {
                        this._transaction.addPostTransactionAction((ServerTransaction.Action)new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
                    } else {
                        final boolean immediate = this._currentMessage.getMessagePublishInfo().isImmediate();
                        InstanceProperties instanceProperties = new InstanceProperties(){

                            public Object getProperty(InstanceProperties.Property prop) {
                                switch (prop) {
                                    case EXPIRATION: {
                                        return amqMessage.getExpiration();
                                    }
                                    case IMMEDIATE: {
                                        return immediate;
                                    }
                                    case PERSISTENT: {
                                        return amqMessage.isPersistent();
                                    }
                                    case MANDATORY: {
                                        return AMQChannel.this._currentMessage.getMessagePublishInfo().isMandatory();
                                    }
                                    case REDELIVERED: {
                                        return false;
                                    }
                                }
                                return null;
                            }
                        };
                        int enqueues = this._currentMessage.getDestination().send((ServerMessage)amqMessage, amqMessage.getInitialRoutingAddress(), instanceProperties, this._transaction, (Action)(immediate ? this._immediateAction : this._capacityCheckAction));
                        if (enqueues == 0) {
                            this.handleUnroutableMessage(amqMessage);
                        } else {
                            this.incrementOutstandingTxnsIfNecessary();
                        }
                    }
                }
                finally {
                    reference.release();
                }
            }
            finally {
                long bodySize = this._currentMessage.getSize();
                long timestamp = this._currentMessage.getContentHeader().getProperties().getTimestamp();
                this._session.registerMessageReceived(bodySize, timestamp);
                this._currentMessage = null;
            }
        }
    }

    private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException {
        boolean mandatory = message.isMandatory();
        String description = this.currentMessageDescription();
        boolean closeOnNoRoute = this._session.isCloseWhenNoRoute();
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)String.format("Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s", description, mandatory, this.isTransactional(), closeOnNoRoute));
        }
        if (mandatory && this.isTransactional() && this._session.isCloseWhenNoRoute()) {
            throw new AMQConnectionException(AMQConstant.NO_ROUTE, "No route for message " + this.currentMessageDescription(), 0, 0, this.getProtocolSession().getProtocolVersion().getMajorVersion(), this.getProtocolSession().getProtocolVersion().getMinorVersion(), (Throwable)null);
        }
        if (mandatory || message.isImmediate()) {
            this._transaction.addPostTransactionAction((ServerTransaction.Action)new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + this.currentMessageDescription(), message));
        } else {
            AMQShortString exchangeName = this._currentMessage.getExchangeName();
            AMQShortString routingKey = this._currentMessage.getMessagePublishInfo().getRoutingKey();
            this.getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG((String)(exchangeName == null ? null : exchangeName.asString()), (String)(routingKey == null ? null : routingKey.asString())));
        }
    }

    private String currentMessageDescription() {
        if (this._currentMessage == null || !this._currentMessage.allContentReceived()) {
            throw new IllegalStateException("Cannot create message description for message: " + this._currentMessage);
        }
        return String.format("[Exchange: %s, Routing key: %s]", this._currentMessage.getExchangeName(), this._currentMessage.getMessagePublishInfo().getRoutingKey() == null ? null : this._currentMessage.getMessagePublishInfo().getRoutingKey().toString());
    }

    public void publishContentBody(ContentBody contentBody) throws AMQException {
        if (this._currentMessage == null) {
            throw new AMQException("Received content body without previously receiving a Content Header");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)(this.debugIdentity() + " content body received on channel " + this._channelId));
        }
        try {
            this._currentMessage.addContentBodyFrame(contentBody);
            this.deliverCurrentMessageIfComplete();
        }
        catch (AMQException e) {
            this._currentMessage = null;
            throw e;
        }
        catch (RuntimeException e) {
            this._currentMessage = null;
            throw e;
        }
    }

    public long getNextDeliveryTag() {
        return ++this._deliveryTag;
    }

    public int getNextConsumerTag() {
        return ++this._consumerTag;
    }

    public ConsumerImpl getSubscription(AMQShortString tag) {
        ConsumerTarget_0_8 target = this._tag2SubscriptionTargetMap.get(tag);
        return target == null ? null : target.getConsumer();
    }

    public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, MessageSource.ConsumerAccessRefused {
        ConsumerTarget_0_8 target;
        if (tag == null) {
            tag = new AMQShortString("sgen_" + this.getNextConsumerTag());
        }
        if (this._tag2SubscriptionTargetMap.containsKey(tag)) {
            throw new AMQException("Consumer already exists with same tag: " + tag);
        }
        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
        if (filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) {
            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, (FlowCreditManager)this._creditManager);
        } else if (acks) {
            target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, (FlowCreditManager)this._creditManager);
            options.add(ConsumerImpl.Option.ACQUIRES);
            options.add(ConsumerImpl.Option.SEES_REQUEUES);
        } else {
            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, (FlowCreditManager)this._creditManager);
            options.add(ConsumerImpl.Option.ACQUIRES);
            options.add(ConsumerImpl.Option.SEES_REQUEUES);
        }
        if (exclusive) {
            options.add(ConsumerImpl.Option.EXCLUSIVE);
        }
        this._tag2SubscriptionTargetMap.put(tag, target);
        try {
            ConsumerImpl sub;
            FilterManager filterManager = FilterManagerFactory.createManager((Map)FieldTable.convertToMap((FieldTable)filters));
            if (noLocal) {
                if (filterManager == null) {
                    filterManager = new SimpleFilterManager();
                }
                final Object connectionReference = this.getConnectionReference();
                filterManager.add(new MessageFilter(){

                    public boolean matches(Filterable message) {
                        return message.getConnectionReference() != connectionReference;
                    }
                });
            }
            if ((sub = source.addConsumer((ConsumerTarget)target, filterManager, AMQMessage.class, AMQShortString.toString((AMQShortString)tag), options)) instanceof Consumer) {
                Consumer modelConsumer = (Consumer)sub;
                this.consumerAdded(modelConsumer);
                modelConsumer.addChangeListener(this._consumerClosedListener);
                this._consumers.add(modelConsumer);
            }
        }
        catch (AccessControlException e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        catch (MessageSource.ExistingExclusiveConsumer e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        catch (MessageSource.ExistingConsumerPreventsExclusive e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        catch (AMQInvalidArgumentException e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        catch (MessageSource.ConsumerAccessRefused e) {
            this._tag2SubscriptionTargetMap.remove(tag);
            throw e;
        }
        return tag;
    }

    public boolean unsubscribeConsumer(AMQShortString consumerTag) {
        ConsumerImpl sub;
        ConsumerTarget_0_8 target = this._tag2SubscriptionTargetMap.remove(consumerTag);
        ConsumerImpl consumerImpl = sub = target == null ? null : target.getConsumer();
        if (sub != null) {
            sub.close();
            if (sub instanceof Consumer) {
                this._consumers.remove(sub);
            }
            return true;
        }
        _logger.warn((Object)("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."));
        return false;
    }

    public void close() {
        this.close(null, null);
    }

    public void close(AMQConstant cause, String message) {
        if (!this._closing.compareAndSet(false, true)) {
            return;
        }
        LogMessage operationalLogMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED((Number)cause.getCode(), (String)message);
        this.getVirtualHost().getEventLogger().message(this._logSubject, operationalLogMessage);
        this.unsubscribeAllConsumers();
        for (Action<? super AMQChannel<T>> action : this._taskList) {
            action.performAction((Object)this);
        }
        this._transaction.rollback();
        try {
            this.requeue();
        }
        catch (TransportException e) {
            _logger.error((Object)("Caught TransportException whilst attempting to requeue:" + (Object)((Object)e)));
        }
    }

    private void unsubscribeAllConsumers() {
        if (_logger.isInfoEnabled()) {
            if (!this._tag2SubscriptionTargetMap.isEmpty()) {
                _logger.info((Object)("Unsubscribing all consumers on channel " + this.toString()));
            } else {
                _logger.info((Object)("No consumers to unsubscribe on channel " + this.toString()));
            }
        }
        for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : this._tag2SubscriptionTargetMap.entrySet()) {
            ConsumerImpl sub;
            if (_logger.isInfoEnabled()) {
                _logger.info((Object)("Unsubscribing consumer '" + me.getKey() + "' on channel " + this.toString()));
            }
            if ((sub = me.getValue().getConsumer()) == null) continue;
            sub.close();
        }
        this._tag2SubscriptionTargetMap.clear();
    }

    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) {
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)(this.debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + ") for " + consumer + " on " + entry.getOwningResource().getName()));
        }
        this._unacknowledgedMessageMap.add(deliveryTag, entry);
    }

    public String debugIdentity() {
        return this._channelId + this.id;
    }

    public void requeue() {
        Collection<MessageInstance> messagesToBeDelivered = this._unacknowledgedMessageMap.cancelAllMessages();
        if (!messagesToBeDelivered.isEmpty() && _logger.isInfoEnabled()) {
            _logger.info((Object)("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + this.toString()));
        }
        for (MessageInstance unacked : messagesToBeDelivered) {
            unacked.setRedelivered();
            unacked.release();
        }
    }

    public void requeue(long deliveryTag) {
        MessageInstance unacked = this._unacknowledgedMessageMap.remove(deliveryTag);
        if (unacked != null) {
            unacked.setRedelivered();
            unacked.release();
        } else {
            _logger.warn((Object)("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + this._unacknowledgedMessageMap.size()));
        }
    }

    public boolean isMaxDeliveryCountEnabled(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            return maximumDeliveryCount > 0;
        }
        return false;
    }

    public boolean isDeliveredTooManyTimes(long deliveryTag) {
        MessageInstance queueEntry = this._unacknowledgedMessageMap.get(deliveryTag);
        if (queueEntry != null) {
            int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
            int numDeliveries = queueEntry.getDeliveryCount();
            return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
        }
        return false;
    }

    public void resend() throws AMQException {
        long deliveryTag;
        MessageInstance message;
        LinkedHashMap<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
        LinkedHashMap<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("unacked map Size:" + this._unacknowledgedMessageMap.size()));
        }
        this._unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(this._unacknowledgedMessageMap, msgToRequeue, msgToResend));
        if (_logger.isDebugEnabled()) {
            if (!msgToResend.isEmpty()) {
                _logger.debug((Object)("Preparing (" + msgToResend.size() + ") message to resend."));
            } else {
                _logger.debug((Object)"No message to resend.");
            }
        }
        for (Map.Entry entry : msgToResend.entrySet()) {
            message = (MessageInstance)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            message.setRedelivered();
            if (message.resend()) continue;
            msgToRequeue.put(deliveryTag, message);
        }
        if (_logger.isInfoEnabled() && !msgToRequeue.isEmpty()) {
            _logger.info((Object)("Preparing (" + msgToRequeue.size() + ") message to requeue to."));
        }
        for (Map.Entry entry : msgToRequeue.entrySet()) {
            message = (MessageInstance)entry.getValue();
            deliveryTag = (Long)entry.getKey();
            message.decrementDeliveryCount();
            this._unacknowledgedMessageMap.remove(deliveryTag);
            message.setRedelivered();
            message.release();
        }
    }

    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException {
        Collection<MessageInstance> ackedMessages = this.getAckedMessages(deliveryTag, multiple);
        this._transaction.dequeue(ackedMessages, (ServerTransaction.Action)new MessageAcknowledgeAction(ackedMessages));
    }

    private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple) {
        return this._unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
    }

    public UnacknowledgedMessageMap getUnacknowledgedMessageMap() {
        return this._unacknowledgedMessageMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setSuspended(boolean suspended) {
        boolean wasSuspended = this._suspended.getAndSet(suspended);
        if (wasSuspended != suspended) {
            if (!suspended) {
                this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW((String)"Started"));
            }
            if (wasSuspended) {
                for (ConsumerTarget_0_8 s : this._tag2SubscriptionTargetMap.values()) {
                    s.getConsumer().externalStateChange();
                }
            }
            if (!wasSuspended) {
                for (ConsumerTarget_0_8 s : this._tag2SubscriptionTargetMap.values()) {
                    try {
                        s.getConsumer().getSendLock();
                    }
                    finally {
                        s.getConsumer().releaseSendLock();
                    }
                }
            }
            if (suspended) {
                this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW((String)"Stopped"));
            }
        }
    }

    public boolean isSuspended() {
        return this._suspended.get() || this._closing.get() || this._session.isClosing();
    }

    public void commit() throws AMQException {
        this.commit(null, false);
    }

    public void commit(final Runnable immediateAction, boolean async) throws AMQException {
        if (!this.isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        if (async && this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).commitAsync(new Runnable(){

                @Override
                public void run() {
                    immediateAction.run();
                    AMQChannel.this._txnCommits.incrementAndGet();
                    AMQChannel.this._txnStarts.incrementAndGet();
                    AMQChannel.this.decrementOutstandingTxnsIfNecessary();
                }
            });
        } else {
            this._transaction.commit(immediateAction);
            this._txnCommits.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this.decrementOutstandingTxnsIfNecessary();
        }
    }

    public void rollback() throws AMQException {
        this.rollback(NULL_TASK);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void rollback(Runnable postRollbackTask) throws AMQException {
        if (!this.isTransactional()) {
            throw new AMQException("Fatal error: commit called on non-transactional channel");
        }
        this._rollingBack = true;
        boolean requiresSuspend = this._suspended.compareAndSet(false, true);
        for (ConsumerTarget_0_8 sub : this._tag2SubscriptionTargetMap.values()) {
            sub.getConsumer().getSendLock();
            sub.getConsumer().releaseSendLock();
        }
        try {
            this._transaction.rollback();
        }
        finally {
            this._rollingBack = false;
            this._txnRejects.incrementAndGet();
            this._txnStarts.incrementAndGet();
            this.decrementOutstandingTxnsIfNecessary();
        }
        postRollbackTask.run();
        for (MessageInstance entry : this._resendList) {
            ConsumerImpl sub = entry.getDeliveredConsumer();
            if (sub == null || sub.isClosed()) {
                entry.release();
                continue;
            }
            entry.resend();
        }
        this._resendList.clear();
        if (requiresSuspend) {
            this._suspended.set(false);
            for (ConsumerTarget_0_8 sub : this._tag2SubscriptionTargetMap.values()) {
                sub.getConsumer().externalStateChange();
            }
        }
    }

    public String toString() {
        return "[" + this._session.toString() + ":" + this._channelId + "]";
    }

    public void setDefaultQueue(AMQQueue queue) {
        this._defaultQueue = queue;
    }

    public AMQQueue getDefaultQueue() {
        return this._defaultQueue;
    }

    public boolean isClosing() {
        return this._closing.get();
    }

    public AMQProtocolSession getProtocolSession() {
        return this._session;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    public void setCredit(long prefetchSize, int prefetchCount) {
        this.getVirtualHost().getEventLogger().message(ChannelMessages.PREFETCH_SIZE((Number)prefetchSize, (Number)prefetchCount));
        this._creditManager.setCreditLimits(prefetchSize, (long)prefetchCount);
    }

    public MessageStore getMessageStore() {
        return this._messageStore;
    }

    public ClientDeliveryMethod getClientDeliveryMethod() {
        return this._clientDeliveryMethod;
    }

    public RecordDeliveryMethod getRecordDeliveryMethod() {
        return this._recordDeliveryMethod;
    }

    private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) throws AMQException {
        AMQMessage message = new AMQMessage(handle, this._session.getReference());
        BasicContentHeaderProperties properties = incomingMessage.getContentHeader().getProperties();
        return message;
    }

    private boolean checkMessageUserId(ContentHeaderBody header) {
        AMQShortString userID = header.getProperties().getUserId();
        return !this._messageAuthorizationRequired || this._session.getAuthorizedPrincipal().getName().equals(userID == null ? "" : userID.toString());
    }

    public UUID getId() {
        return this._id;
    }

    public T getConnectionModel() {
        return this._session;
    }

    public String getClientID() {
        return String.valueOf(this._session.getContextKey());
    }

    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    public int compareTo(AMQChannel o) {
        return this.getId().compareTo(o.getId());
    }

    public void addDeleteTask(Action<? super AMQChannel<T>> task) {
        this._taskList.add(task);
    }

    public void removeDeleteTask(Action<? super AMQChannel<T>> task) {
        this._taskList.remove(task);
    }

    public Subject getSubject() {
        return this._subject;
    }

    public synchronized void block() {
        if (this._blockingEntities.add(this) && this._blocking.compareAndSet(false, true)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_ENFORCED((String)"** All Queues **"));
            this.flow(false);
        }
    }

    public synchronized void unblock() {
        if (this._blockingEntities.remove(this) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.flow(true);
        }
    }

    public synchronized void block(AMQQueue queue) {
        if (this._blockingEntities.add(queue) && this._blocking.compareAndSet(false, true)) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_ENFORCED((String)queue.getName()));
            this.flow(false);
        }
    }

    public synchronized void unblock(AMQQueue queue) {
        if (this._blockingEntities.remove(queue) && this._blockingEntities.isEmpty() && this._blocking.compareAndSet(true, false) && !this.isClosing()) {
            this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.FLOW_REMOVED());
            this.flow(true);
        }
    }

    public Object getConnectionReference() {
        return this.getProtocolSession().getReference();
    }

    public int getUnacknowledgedMessageCount() {
        return this.getUnacknowledgedMessageMap().size();
    }

    private void flow(boolean flow) {
        MethodRegistry methodRegistry = this._session.getMethodRegistry();
        ChannelFlowBody responseBody = methodRegistry.createChannelFlowBody(flow);
        this._session.writeFrame((AMQDataBlock)responseBody.generateFrame(this._channelId));
    }

    public boolean getBlocking() {
        return this._blocking.get();
    }

    public VirtualHostImpl getVirtualHost() {
        return this.getProtocolSession().getVirtualHost();
    }

    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) {
        this._transactionTimeoutHelper.checkIdleOrOpenTimes(this._transaction, openWarn, openClose, idleWarn, idleClose);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(String reason) throws AMQException {
        Lock receivedLock = this._session.getReceivedLock();
        receivedLock.lock();
        try {
            this._session.close(AMQConstant.RESOURCE_ERROR, reason);
        }
        finally {
            receivedLock.unlock();
        }
    }

    public void deadLetter(long deliveryTag) throws AMQException {
        UnacknowledgedMessageMap unackedMap = this.getUnacknowledgedMessageMap();
        MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
        if (rejectedQueueEntry == null) {
            _logger.warn((Object)("No message found, unable to DLQ delivery tag: " + deliveryTag));
        } else {
            TransactionLogResource owningResource;
            final ServerMessage msg = rejectedQueueEntry.getMessage();
            int requeues = rejectedQueueEntry.routeToAlternate((Action)new Action<MessageInstance>(){

                public void performAction(MessageInstance requeueEntry) {
                    AMQChannel.this.getVirtualHost().getEventLogger().message(AMQChannel.this._logSubject, ChannelMessages.DEADLETTERMSG((Number)msg.getMessageNumber(), (String)requeueEntry.getOwningResource().getName()));
                }
            }, null);
            if (requeues == 0 && (owningResource = rejectedQueueEntry.getOwningResource()) instanceof AMQQueue) {
                AMQQueue queue = (AMQQueue)owningResource;
                Exchange altExchange = queue.getAlternateExchange();
                if (altExchange == null) {
                    _logger.debug((Object)("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag));
                    this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH((Number)msg.getMessageNumber(), (String)queue.getName(), (String)msg.getInitialRoutingAddress()));
                } else {
                    _logger.debug((Object)("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag));
                    this.getVirtualHost().getEventLogger().message(this._logSubject, ChannelMessages.DISCARDMSG_NOROUTE((Number)msg.getMessageNumber(), (String)altExchange.getName()));
                }
            }
        }
    }

    public void recordFuture(StoreFuture future, ServerTransaction.Action action) {
        this._unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    public void sync() {
        AsyncCommand cmd;
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("sync() called on channel " + this.debugIdentity()));
        }
        while ((cmd = this._unfinishedCommandsQueue.poll()) != null) {
            cmd.awaitReadyForCompletion();
            cmd.complete();
        }
        if (this._transaction instanceof LocalTransaction) {
            ((LocalTransaction)this._transaction).sync();
        }
    }

    public int getConsumerCount() {
        return this._tag2SubscriptionTargetMap.size();
    }

    public Collection<Consumer<?>> getConsumers() {
        return Collections.unmodifiableCollection(this._consumers);
    }

    private void consumerAdded(Consumer<?> consumer) {
        for (ConsumerListener l : this._consumerListeners) {
            l.consumerAdded(consumer);
        }
    }

    private void consumerRemoved(Consumer<?> consumer) {
        for (ConsumerListener l : this._consumerListeners) {
            l.consumerRemoved(consumer);
        }
    }

    public void addConsumerListener(ConsumerListener listener) {
        this._consumerListeners.add(listener);
    }

    public void removeConsumerListener(ConsumerListener listener) {
        this._consumerListeners.remove(listener);
    }

    public void setModelObject(Session<?> session) {
        this._modelObject = session;
    }

    public Session<?> getModelObject() {
        return this._modelObject;
    }

    public long getTransactionStartTime() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionStartTime();
        }
        return 0L;
    }

    public long getTransactionUpdateTime() {
        ServerTransaction serverTransaction = this._transaction;
        if (serverTransaction.isTransactional()) {
            return serverTransaction.getTransactionUpdateTime();
        }
        return 0L;
    }

    private class ConsumerClosedListener
    implements ConfigurationChangeListener {
        private ConsumerClosedListener() {
        }

        public void stateChanged(ConfiguredObject object, State oldState, State newState) {
            if (newState == State.DELETED) {
                AMQChannel.this.consumerRemoved((Consumer)object);
            }
        }

        public void childAdded(ConfiguredObject object, ConfiguredObject child) {
        }

        public void childRemoved(ConfiguredObject object, ConfiguredObject child) {
        }

        public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) {
        }
    }

    private static class AsyncCommand {
        private final StoreFuture _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(StoreFuture future, ServerTransaction.Action action) {
            this._future = future;
            this._action = action;
        }

        void awaitReadyForCompletion() {
            this._future.waitForCompletion();
        }

        void complete() {
            if (!this._future.isComplete()) {
                this._future.waitForCompletion();
            }
            this._action.postCommit();
            this._action = null;
        }
    }

    private class WriteReturnAction
    implements ServerTransaction.Action {
        private final AMQConstant _errorCode;
        private final String _description;
        private final MessageReference<AMQMessage> _reference;

        public WriteReturnAction(AMQConstant errorCode, String description, AMQMessage message) {
            this._errorCode = errorCode;
            this._description = description;
            this._reference = message.newReference();
        }

        public void postCommit() {
            AMQMessage message = (AMQMessage)this._reference.getMessage();
            AMQChannel.this._session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, this._errorCode.getCode(), AMQShortString.validValueOf((Object)this._description));
            this._reference.release();
        }

        public void onRollback() {
            this._reference.release();
        }
    }

    private class MessageAcknowledgeAction
    implements ServerTransaction.Action {
        private final Collection<MessageInstance> _ackedMessages;

        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages) {
            this._ackedMessages = ackedMessages;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void postCommit() {
            try {
                for (MessageInstance entry : this._ackedMessages) {
                    entry.delete();
                }
            }
            finally {
                this._ackedMessages.clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onRollback() {
            if (AMQChannel.this._rollingBack) {
                for (MessageInstance entry : this._ackedMessages) {
                    entry.unlockAcquisition();
                }
                AMQChannel.this._resendList.addAll(this._ackedMessages);
            } else {
                try {
                    for (MessageInstance entry : this._ackedMessages) {
                        entry.release();
                    }
                }
                finally {
                    this._ackedMessages.clear();
                }
            }
        }
    }

    private final class CapacityCheckAction
    implements Action<MessageInstance> {
        private CapacityCheckAction() {
        }

        public void performAction(MessageInstance entry) {
            TransactionLogResource queue = entry.getOwningResource();
            if (queue instanceof CapacityChecker) {
                ((CapacityChecker)queue).checkCapacity((AMQSessionModel)AMQChannel.this);
            }
        }
    }

    private class ImmediateAction
    implements Action<MessageInstance> {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void performAction(MessageInstance entry) {
            TransactionLogResource queue = entry.getOwningResource();
            if (!entry.getDeliveredToConsumer() && entry.acquire()) {
                LocalTransaction txn = new LocalTransaction(AMQChannel.this._messageStore);
                final AMQMessage message = (AMQMessage)entry.getMessage();
                MessageReference ref = message.newReference();
                try {
                    entry.delete();
                    txn.dequeue(queue, (EnqueueableMessage)message, new ServerTransaction.Action(){

                        public void postCommit() {
                            ProtocolOutputConverter outputConverter = AMQChannel.this._session.getProtocolOutputConverter();
                            outputConverter.writeReturn(message.getMessagePublishInfo(), message.getContentHeaderBody(), (MessageContentSource)message, AMQChannel.this._channelId, AMQConstant.NO_CONSUMERS.getCode(), IMMEDIATE_DELIVERY_REPLY_TEXT);
                        }

                        public void onRollback() {
                        }
                    });
                    txn.commit();
                }
                finally {
                    ref.release();
                }
            } else if (queue instanceof CapacityChecker) {
                ((CapacityChecker)queue).checkCapacity((AMQSessionModel)AMQChannel.this);
            }
        }
    }
}

