/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.LockOwner;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class QueueSubscription
extends PrefetchSubscription
implements LockOwner {
    private static final Log LOG = LogFactory.getLog(QueueSubscription.class);

    public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
        super(broker, usageManager, context, info);
    }

    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference n) throws IOException {
        Destination q = n.getRegionDestination();
        q.acknowledge(context, this, ack, n);
        final QueueMessageReference node = (QueueMessageReference)n;
        final Queue queue = (Queue)q;
        if (!ack.isInTransaction()) {
            node.drop();
            queue.dropEvent();
        } else {
            node.setAcked(true);
            context.getTransaction().addSynchronization(new Synchronization(){

                public void afterCommit() throws Exception {
                    node.drop();
                    queue.dropEvent();
                }

                public void afterRollback() throws Exception {
                    node.setAcked(false);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean canDispatch(MessageReference n) throws IOException {
        QueueMessageReference node = (QueueMessageReference)n;
        if (node.isAcked()) {
            return false;
        }
        String groupId = node.getGroupID();
        int sequence = node.getGroupSequence();
        if (groupId != null) {
            ConsumerId groupOwner;
            MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
            if (sequence == 1) {
                if (node.lock(this)) {
                    this.assignGroupToMe(messageGroupOwners, n, groupId);
                    return true;
                }
                return false;
            }
            QueueMessageReference queueMessageReference = node;
            synchronized (queueMessageReference) {
                groupOwner = messageGroupOwners.get(groupId);
                if (groupOwner == null) {
                    if (node.lock(this)) {
                        this.assignGroupToMe(messageGroupOwners, n, groupId);
                        return true;
                    }
                    return false;
                }
            }
            if (groupOwner.equals(this.info.getConsumerId())) {
                if (sequence < 0) {
                    messageGroupOwners.removeGroup(groupId);
                }
                return true;
            }
            return false;
        }
        return node.lock(this);
    }

    protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
        messageGroupOwners.put(groupId, this.info.getConsumerId());
        Message message = n.getMessage();
        if (message instanceof ActiveMQMessage) {
            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
            try {
                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
            }
            catch (JMSException e) {
                LOG.warn((Object)("Failed to set boolean header: " + e), (Throwable)e);
            }
        }
    }

    public synchronized String toString() {
        return "QueueSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + this.dispatched.size() + ", delivered=" + this.prefetchExtension + ", pending=" + this.getPendingQueueSize();
    }

    public int getLockPriority() {
        return this.info.getPriority();
    }

    public boolean isLockExclusive() {
        return this.info.isExclusive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean dispatch(MessageReference node) throws IOException {
        boolean rc = false;
        node.incrementReferenceCount();
        try {
            rc = super.dispatch(node);
        }
        finally {
            if (!rc) {
                node.decrementReferenceCount();
            }
        }
        return rc;
    }

    protected void onDispatch(MessageReference node, Message message) {
        node.decrementReferenceCount();
        super.onDispatch(node, message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception {
        node.incrementReferenceCount();
        try {
            super.sendToDLQ(context, node);
        }
        finally {
            node.decrementReferenceCount();
        }
    }

    public void destroy() {
    }
}

