/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BasicMessageConsumer<U>
extends Closeable
implements MessageConsumer {
    private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
    private final AMQConnection _connection;
    private final MessageFilter _messageSelectorFilter;
    private final boolean _noLocal;
    private AMQDestination _destination;
    private final AtomicBoolean _receiving = new AtomicBoolean(false);
    private final AtomicReference<MessageListener> _messageListener = new AtomicReference();
    private int _consumerTag;
    private final int _channelId;
    private final BlockingQueue _synchronousQueue;
    private final MessageFactoryRegistry _messageFactory;
    private final AMQSession _session;
    private final Map<String, Object> _arguments;
    private final int _prefetchHigh;
    private final int _prefetchLow;
    private boolean _exclusive;
    private final int _acknowledgeMode;
    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue();
    private volatile Thread _receivingThread;
    private String _queuename;
    private final boolean _autoClose;
    private final boolean _browseOnly;
    private boolean _isDurableSubscriber = false;
    private int _addressType = 3;

    protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, Map<String, Object> rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException {
        this._channelId = channelId;
        this._connection = connection;
        this._noLocal = noLocal;
        this._destination = destination;
        this._messageFactory = messageFactory;
        this._session = session;
        this._prefetchHigh = prefetchHigh;
        this._prefetchLow = prefetchLow;
        this._exclusive = exclusive;
        this._synchronousQueue = new LinkedBlockingQueue();
        this._autoClose = autoClose;
        this._browseOnly = browseOnly;
        try {
            this._messageSelectorFilter = messageSelector == null || "".equals(messageSelector.trim()) ? null : new JMSSelectorFilter(messageSelector);
        }
        catch (AMQInternalException ie) {
            throw JMSExceptionHelper.chainJMSException((JMSException)new InvalidSelectorException("cannot create consumer because of selector issue"), ie);
        }
        this._acknowledgeMode = this._browseOnly ? 257 : acknowledgeMode;
        HashMap<String, Object> ft = new HashMap<String, Object>();
        if (destination.getConsumerArguments() != null) {
            ft.putAll(destination.getConsumerArguments());
        }
        if (rawSelector != null) {
            ft.putAll(rawSelector);
        }
        ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
        if (noLocal) {
            ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
        }
        this._arguments = ft;
        this._addressType = this._destination.getAddressType();
    }

    public AMQDestination getDestination() {
        return this._destination;
    }

    public String getMessageSelector() throws JMSException {
        this.checkPreConditions();
        return this._messageSelectorFilter == null ? null : this._messageSelectorFilter.getSelector();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkPreConditions();
        return this._messageListener.get();
    }

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    protected boolean isMessageListenerSet() {
        return this._messageListener.get() != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        this.checkPreConditions();
        if (!this._session.getAMQConnection().started()) {
            this._messageListener.set(messageListener);
            this._session.setHasMessageListeners();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + this._destination);
            }
        } else {
            if (this._receiving.get()) {
                throw new IllegalStateException("Another thread is already receiving synchronously.");
            }
            if (!this._messageListener.compareAndSet(null, messageListener)) {
                throw new IllegalStateException("Attempt to alter listener while session is started.");
            }
            if (_logger.isDebugEnabled()) {
                _logger.debug("Message listener set for destination " + this._destination);
            }
            if (messageListener != null) {
                AMQSession aMQSession = this._session;
                synchronized (aMQSession) {
                    this._messageListener.set(messageListener);
                    this._session.setHasMessageListeners();
                    this._session.startDispatcherIfNecessary();
                    Object o = this._synchronousQueue.poll();
                    while (o != null) {
                        this.notifyMessage((AbstractJMSMessage)o);
                        o = this._synchronousQueue.poll();
                    }
                }
            }
        }
    }

    private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException {
        if (this._connection.isFailingOver()) {
            if (immediate) {
                return false;
            }
            this._connection.blockUntilNotFailingOver();
        }
        if (this.isMessageListenerSet()) {
            throw new IllegalStateException("A listener has already been set.");
        }
        if (!this._receiving.compareAndSet(false, true)) {
            throw new IllegalStateException("Another thread is already receiving.");
        }
        this._receivingThread = Thread.currentThread();
        return true;
    }

    private void releaseReceiving() {
        this._receiving.set(false);
        this._receivingThread = null;
    }

    public Map<String, Object> getArguments() {
        return this._arguments;
    }

    public int getPrefetch() {
        return this._prefetchHigh;
    }

    public int getPrefetchHigh() {
        return this._prefetchHigh;
    }

    public int getPrefetchLow() {
        return this._prefetchLow;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public boolean isExclusive() {
        AMQDestination dest = this.getDestination();
        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) {
            if (dest.getAddressType() == 2) {
                return true;
            }
            return dest.getLink().getSubscription().isExclusive();
        }
        return this._exclusive;
    }

    public boolean isReceiving() {
        return this._receiving.get();
    }

    public MessageFilter getMessageSelectorFilter() {
        return this._messageSelectorFilter;
    }

    public Message receive() throws JMSException {
        return this.receive(0L);
    }

    public Message receive(long l) throws JMSException {
        block10: {
            this.checkPreConditions();
            try {
                this.acquireReceiving(false);
            }
            catch (InterruptedException e) {
                _logger.warn("Interrupted acquire: " + e);
                if (!this.isClosed()) break block10;
                return null;
            }
        }
        this._session.startDispatcherIfNecessary();
        try {
            Object o = this.getMessageFromQueue(l);
            this._receivingThread = null;
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.preDeliver(m);
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        catch (InterruptedException e) {
            _logger.warn("Interrupted: " + e);
            Message message = null;
            return message;
        }
        catch (TransportException e) {
            throw this._session.toJMSException("Exception while receiving:" + e.getMessage(), e);
        }
        finally {
            this.releaseReceiving();
            Thread.interrupted();
        }
    }

    public Object getMessageFromQueue(long l) throws InterruptedException {
        Object o = l > 0L ? this._synchronousQueue.poll(l, TimeUnit.MILLISECONDS) : (l < 0L ? this._synchronousQueue.poll() : this._synchronousQueue.take());
        return o;
    }

    abstract Message receiveBrowse() throws JMSException;

    public Message receiveNoWait() throws JMSException {
        this.checkPreConditions();
        try {
            if (!this.acquireReceiving(true)) {
                return null;
            }
        }
        catch (InterruptedException e) {
            return null;
        }
        this._session.startDispatcherIfNecessary();
        try {
            Object o = this.getMessageFromQueue(-1L);
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.preDeliver(m);
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        catch (InterruptedException e) {
            _logger.warn("Interrupted: " + e);
            Message message = null;
            return message;
        }
        catch (TransportException e) {
            throw this._session.toJMSException("Exception while receiving:" + e.getMessage(), e);
        }
        finally {
            this.releaseReceiving();
        }
    }

    private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException {
        if (o instanceof Throwable) {
            throw JMSExceptionHelper.chainJMSException(new JMSException("Message consumer forcibly closed due to error: " + o), (Throwable)o);
        }
        if (o instanceof CloseConsumerMessage) {
            this.setClosed();
            this.deregisterConsumer();
            return null;
        }
        return (AbstractJMSMessage)o;
    }

    @Override
    public void close() throws JMSException {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void close(boolean sendClose) throws JMSException {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Closing consumer:" + this.debugIdentity());
        }
        if (!this.setClosed()) {
            this.setClosing(true);
            if (sendClose) {
                try {
                    while (!this._session.isClosed() || this._session.isClosing()) {
                        if (!this._session.tryLockMessageDelivery()) continue;
                        try {
                            Object object = this._connection.getFailoverMutex();
                            synchronized (object) {
                                this.sendCancel();
                            }
                        }
                        finally {
                            this._session.unlockMessageDelivery();
                        }
                    }
                }
                catch (QpidException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing consumer: " + e.getMessage()), e);
                }
                catch (FailoverException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("FailoverException interrupted basic cancel."), e);
                }
                catch (TransportException e) {
                    throw this._session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
                }
            } else {
                this.deregisterConsumer();
            }
            if (this._messageListener != null && this._receiving.get()) {
                Thread receivingThread;
                if (_logger.isInfoEnabled()) {
                    _logger.info("Interrupting thread: " + this._receivingThread);
                }
                if ((receivingThread = this._receivingThread) != null) {
                    receivingThread.interrupt();
                }
            }
            if (!this.isBrowseOnly() && !this.getSession().isClosing()) {
                this.releasePendingMessages();
            }
        }
    }

    abstract void sendCancel() throws QpidException, FailoverException;

    void markClosed() {
        this.setClosed();
        this.deregisterConsumer();
    }

    public void notifyCloseMessage(CloseConsumerMessage closeMessage) {
        if (this.isMessageListenerSet()) {
            _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
        } else {
            try {
                this._synchronousQueue.put(closeMessage);
            }
            catch (InterruptedException e) {
                _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,but we shouldn't have close yet");
            }
        }
    }

    void notifyMessage(U messageFrame) {
        if (messageFrame instanceof CloseConsumerMessage) {
            this.notifyCloseMessage((CloseConsumerMessage)messageFrame);
            return;
        }
        try {
            AbstractJMSMessage jmsMessage = this.createJMSMessageFromUnprocessedMessage(this._session.getMessageDelegateFactory(), messageFrame);
            if (_logger.isDebugEnabled()) {
                _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
            }
            this.notifyMessage(jmsMessage);
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
            }
            _logger.error("Caught exception (dump follows) - ignoring...", (Throwable)e);
        }
    }

    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory var1, U var2) throws Exception;

    public void notifyMessage(AbstractJMSMessage jmsMessage) {
        try {
            if (this.isMessageListenerSet()) {
                this.preDeliver(jmsMessage);
                this.getMessageListener().onMessage((Message)jmsMessage);
                this.postDeliver(jmsMessage);
            } else {
                this._synchronousQueue.put(jmsMessage);
            }
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
            }
            _logger.error("reNotification : Caught exception (dump follows) - ignoring...", (Throwable)e);
        }
    }

    protected void preDeliver(AbstractJMSMessage msg) {
        this._session.setInRecovery(false);
        switch (this._acknowledgeMode) {
            case 258: {
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                break;
            }
            case 1: 
            case 3: {
                this._session.addUnacknowledgedMessage(msg.getDeliveryTag());
                break;
            }
            case 2: {
                msg.setAMQSession(this._session);
                this._session.addUnacknowledgedMessage(msg.getDeliveryTag());
                this._session.markDirty();
                break;
            }
            case 0: {
                this._session.addDeliveredMessage(msg.getDeliveryTag());
                this._session.markDirty();
                break;
            }
        }
    }

    void postDeliver(AbstractJMSMessage msg) {
        switch (this._acknowledgeMode) {
            case 1: 
            case 3: {
                if (this._session.isInRecovery()) break;
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
            }
        }
    }

    void notifyError(Throwable cause) {
        this.setClosed();
        if (!this.isMessageListenerSet() && this._synchronousQueue.offer(cause)) {
            _logger.debug("Passed exception to synchronous queue for propagation to receive()");
        }
        this.deregisterConsumer();
    }

    private void deregisterConsumer() {
        this._session.deregisterConsumer(this);
    }

    public int getConsumerTag() {
        return this._consumerTag;
    }

    public void setConsumerTag(int consumerTag) {
        this._consumerTag = consumerTag;
    }

    public AMQSession getSession() {
        return this._session;
    }

    private void checkPreConditions() throws JMSException {
        this.checkNotClosed();
        if (this._session == null || this._session.isClosed()) {
            throw new IllegalStateException("Invalid Session");
        }
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public boolean isBrowseOnly() {
        return this._browseOnly;
    }

    void releasePendingMessages() {
        if (this._synchronousQueue.size() > 0) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting the messages(" + this._synchronousQueue.size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + this._consumerTag);
            }
            Iterator iterator = this._synchronousQueue.iterator();
            int initialSize = this._synchronousQueue.size();
            boolean removed = false;
            while (iterator.hasNext()) {
                Object o = iterator.next();
                if (o instanceof AbstractJMSMessage) {
                    this._session.rejectMessage((AbstractJMSMessage)o, true);
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Rejected message:" + ((AbstractJMSMessage)o).getDeliveryTag());
                    }
                    iterator.remove();
                    removed = true;
                    continue;
                }
                _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                iterator.remove();
                removed = true;
            }
            if (removed && initialSize == this._synchronousQueue.size()) {
                _logger.error("Queue had content removed but didn't change in size." + initialSize);
            }
            if (this._synchronousQueue.size() != 0) {
                _logger.warn("Queue was not empty after rejecting all messages Remaining:" + this._synchronousQueue.size());
                this.releasePendingMessages();
            }
            this.clearReceiveQueue();
        }
    }

    public String debugIdentity() {
        return String.valueOf(this._consumerTag) + "[" + System.identityHashCode(this) + "]";
    }

    public void clearReceiveQueue() {
        this._synchronousQueue.clear();
    }

    public List<Long> drainReceiverQueueAndRetrieveDeliveryTags() {
        Iterator iterator = this._synchronousQueue.iterator();
        ArrayList<Long> tags = new ArrayList<Long>(this._synchronousQueue.size());
        while (iterator.hasNext()) {
            AbstractJMSMessage msg = (AbstractJMSMessage)iterator.next();
            tags.add(msg.getDeliveryTag());
            iterator.remove();
        }
        return tags;
    }

    public String getQueuename() {
        return this._queuename;
    }

    public void setQueuename(String queuename) {
        this._queuename = queuename;
    }

    public void addBindingKey(AMQDestination amqd, String routingKey) throws QpidException {
        this._session.addBindingKey(this, amqd, routingKey);
    }

    public void failedOverPre() {
        this.clearReceiveQueue();
    }

    public void failedOverPost() {
    }

    protected AMQConnection getConnection() {
        return this._connection;
    }

    protected void setDestination(AMQDestination destination) {
        this._destination = destination;
    }

    protected int getChannelId() {
        return this._channelId;
    }

    protected BlockingQueue getSynchronousQueue() {
        return this._synchronousQueue;
    }

    protected MessageFactoryRegistry getMessageFactory() {
        return this._messageFactory;
    }

    protected boolean isDurableSubscriber() {
        return this._isDurableSubscriber;
    }

    protected void markAsDurableSubscriber() {
        this._isDurableSubscriber = true;
    }

    void setAddressType(int addressType) {
        this._addressType = addressType;
    }

    int getAddressType() {
        return this._addressType;
    }
}

