/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsMessageAvailableConsumer;
import org.apache.qpid.jms.JmsMessageAvailableListener;
import org.apache.qpid.jms.JmsMessageDispatcher;
import org.apache.qpid.jms.JmsPrefetchPolicy;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.JmsTemporaryDestination;
import org.apache.qpid.jms.JmsTransactionContext;
import org.apache.qpid.jms.JmsTxSynchronization;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;

public class JmsMessageConsumer
implements MessageConsumer,
JmsMessageAvailableConsumer,
JmsMessageDispatcher {
    protected final JmsSession session;
    protected final JmsConnection connection;
    protected JmsConsumerInfo consumerInfo;
    protected final int acknowledgementMode;
    protected final AtomicBoolean closed = new AtomicBoolean();
    protected boolean started;
    protected MessageListener messageListener;
    protected JmsMessageAvailableListener availableListener;
    protected final MessageQueue messageQueue;
    protected final Lock lock = new ReentrantLock();
    protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
    protected final AtomicBoolean delivered = new AtomicBoolean();

    protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, String selector, boolean noLocal) throws JMSException {
        this(consumerId, session, destination, null, selector, noLocal);
    }

    protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, String name, String selector, boolean noLocal) throws JMSException {
        this.session = session;
        this.connection = session.getConnection();
        this.acknowledgementMode = session.acknowledgementMode();
        if (destination.isTemporary()) {
            this.connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination)destination);
        }
        this.messageQueue = this.connection.isLocalMessagePriority() ? new PriorityMessageQueue() : new FifoMessageQueue();
        JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy();
        this.consumerInfo = new JmsConsumerInfo(consumerId);
        this.consumerInfo.setClientId(this.connection.getClientID());
        this.consumerInfo.setSelector(selector);
        this.consumerInfo.setSubscriptionName(name);
        this.consumerInfo.setDestination(destination);
        this.consumerInfo.setAcknowledgementMode(this.acknowledgementMode);
        this.consumerInfo.setNoLocal(noLocal);
        this.consumerInfo.setBrowser(this.isBrowser());
        this.consumerInfo.setPrefetchSize(this.getConfiguredPrefetch(destination, policy));
        session.getConnection().createResource(this.consumerInfo);
    }

    public void init() throws JMSException {
        this.session.add(this);
        this.startConsumerResource();
    }

    private void startConsumerResource() throws JMSException {
        try {
            this.session.getConnection().startResource(this.consumerInfo);
        }
        catch (JMSException ex) {
            this.session.remove(this);
            throw ex;
        }
    }

    public void close() throws JMSException {
        if (!this.closed.get()) {
            this.session.getTransactionContext().addSynchronization(new JmsTxSynchronization(){

                @Override
                public boolean validate(JmsTransactionContext context) throws Exception {
                    if (!context.isInTransaction() || !JmsMessageConsumer.this.delivered.get()) {
                        JmsMessageConsumer.this.doClose();
                        return false;
                    }
                    return true;
                }

                @Override
                public void afterCommit() throws Exception {
                    JmsMessageConsumer.this.doClose();
                }

                @Override
                public void afterRollback() throws Exception {
                    JmsMessageConsumer.this.doClose();
                }
            });
        }
    }

    protected void doClose() throws JMSException {
        this.shutdown();
        this.connection.destroyResource(this.consumerInfo);
    }

    protected void shutdown() throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.session.remove(this);
            this.stop(true);
        }
    }

    public Message receive() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        this.sendPullCommand(0L);
        try {
            return this.copy(this.ackFromReceive(this.messageQueue.dequeue(-1L)));
        }
        catch (Exception e) {
            throw JmsExceptionSupport.create(e);
        }
    }

    public Message receive(long timeout) throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        this.sendPullCommand(timeout);
        if (timeout > 0L) {
            try {
                return this.copy(this.ackFromReceive(this.messageQueue.dequeue(timeout)));
            }
            catch (InterruptedException e) {
                throw JmsExceptionSupport.create(e);
            }
        }
        return null;
    }

    public Message receiveNoWait() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        this.sendPullCommand(-1L);
        return this.copy(this.ackFromReceive(this.messageQueue.dequeueNoWait()));
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed.get()) {
            throw new IllegalStateException("The MessageConsumer is closed");
        }
    }

    JmsMessage copy(JmsInboundMessageDispatch envelope) throws JMSException {
        if (envelope == null || envelope.getMessage() == null) {
            return null;
        }
        return envelope.getMessage().copy();
    }

    JmsInboundMessageDispatch ackFromReceive(JmsInboundMessageDispatch envelope) throws JMSException {
        if (envelope != null && envelope.getMessage() != null) {
            JmsMessage message = envelope.getMessage();
            if (message.getAcknowledgeCallback() != null) {
                this.doAckDelivered(envelope);
            } else {
                this.doAckConsumed(envelope);
            }
            this.delivered.set(true);
        }
        return envelope;
    }

    private JmsInboundMessageDispatch doAckConsumed(JmsInboundMessageDispatch envelope) throws JMSException {
        this.checkClosed();
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.CONSUMED);
        }
        catch (JMSException ex) {
            this.session.onException(ex);
            throw ex;
        }
        return envelope;
    }

    private JmsInboundMessageDispatch doAckDelivered(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.DELIVERED);
        }
        catch (JMSException ex) {
            this.session.onException(ex);
            throw ex;
        }
        return envelope;
    }

    private void doAckReleased(JmsInboundMessageDispatch envelope) throws JMSException {
        try {
            this.session.acknowledge(envelope, ProviderConstants.ACK_TYPE.RELEASED);
        }
        catch (JMSException ex) {
            this.session.onException(ex);
            throw ex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onInboundMessage(final JmsInboundMessageDispatch envelope) {
        this.lock.lock();
        try {
            if (this.acknowledgementMode == 2) {
                envelope.getMessage().setAcknowledgeCallback(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        if (JmsMessageConsumer.this.session.isClosed()) {
                            throw new IllegalStateException("Session closed.");
                        }
                        JmsMessageConsumer.this.session.acknowledge();
                        envelope.getMessage().setAcknowledgeCallback(null);
                        return null;
                    }
                });
            }
            if (envelope.isEnqueueFirst()) {
                this.messageQueue.enqueueFirst(envelope);
            } else {
                this.messageQueue.enqueue(envelope);
            }
            if (this.messageListener != null && this.started) {
                this.session.getExecutor().execute(new MessageDeliverTask());
            } else if (this.availableListener != null) {
                this.session.getExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        if (JmsMessageConsumer.this.session.isStarted()) {
                            JmsMessageConsumer.this.availableListener.onMessageAvailable(JmsMessageConsumer.this);
                        }
                    }
                });
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        this.lock.lock();
        try {
            this.started = true;
            this.messageQueue.start();
            this.drainMessageQueueToListener();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        this.stop(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stop(boolean closeMessageQueue) {
        this.lock.lock();
        try {
            this.started = false;
            if (closeMessageQueue) {
                this.messageQueue.close();
            } else {
                this.messageQueue.stop();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    void suspendForRollback() throws JMSException {
        this.stop();
        this.session.getConnection().stopResource(this.consumerInfo);
    }

    void resumeAfterRollback() throws JMSException {
        if (!this.messageQueue.isEmpty()) {
            List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll();
            for (JmsInboundMessageDispatch envelope : drain) {
                this.doAckReleased(envelope);
            }
            drain.clear();
        }
        this.start();
        this.startConsumerResource();
    }

    void drainMessageQueueToListener() {
        if (this.messageListener != null && this.started) {
            this.session.getExecutor().execute(new MessageDeliverTask());
        }
    }

    public JmsConsumerId getConsumerId() {
        return this.consumerInfo.getConsumerId();
    }

    public JmsDestination getDestination() {
        return this.consumerInfo.getDestination();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        if (this.consumerInfo.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supportedfor asynchronous consumers please set a value of at least 1");
        }
        this.messageListener = listener;
        this.drainMessageQueueToListener();
    }

    public String getMessageSelector() throws JMSException {
        this.checkClosed();
        return this.consumerInfo.getSelector();
    }

    public int getPrefetchSize() {
        return this.consumerInfo.getPrefetchSize();
    }

    protected void checkMessageListener() throws JMSException {
        this.session.checkMessageListener();
    }

    boolean hasMessageListener() {
        return this.messageListener != null;
    }

    boolean isUsingDestination(JmsDestination destination) {
        return this.consumerInfo.getDestination().equals(destination);
    }

    protected int getMessageQueueSize() {
        return this.messageQueue.size();
    }

    protected boolean isNoLocal() {
        return this.consumerInfo.isNoLocal();
    }

    public boolean isDurableSubscription() {
        return false;
    }

    public boolean isBrowser() {
        return false;
    }

    @Override
    public void setAvailableListener(JmsMessageAvailableListener availableListener) {
        this.availableListener = availableListener;
    }

    @Override
    public JmsMessageAvailableListener getAvailableListener() {
        return this.availableListener;
    }

    protected void onConnectionInterrupted() {
        this.messageQueue.clear();
    }

    protected void onConnectionRecovery(Provider provider) throws Exception {
        ProviderFuture request = new ProviderFuture();
        provider.create(this.consumerInfo, request);
        request.sync();
    }

    protected void onConnectionRecovered(Provider provider) throws Exception {
        ProviderFuture request = new ProviderFuture();
        provider.start(this.consumerInfo, request);
        request.sync();
    }

    protected void onConnectionRestored() {
    }

    protected void sendPullCommand(long timeout) throws JMSException {
        if (this.messageQueue.isEmpty() && (this.getPrefetchSize() == 0 || this.isBrowser())) {
            this.connection.pull(this.getConsumerId(), timeout);
        }
    }

    private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) {
        int prefetch = 0;
        prefetch = destination.isTopic() ? (this.isDurableSubscription() ? policy.getDurableTopicPrefetch() : policy.getTopicPrefetch()) : (this.isBrowser() ? policy.getQueueBrowserPrefetch() : policy.getQueuePrefetch());
        return prefetch;
    }

    private final class MessageDeliverTask
    implements Runnable {
        private MessageDeliverTask() {
        }

        @Override
        public void run() {
            JmsInboundMessageDispatch envelope;
            while (JmsMessageConsumer.this.session.isStarted() && (envelope = JmsMessageConsumer.this.messageQueue.dequeueNoWait()) != null) {
                try {
                    JmsMessage copy = null;
                    boolean autoAckOrDupsOk = JmsMessageConsumer.this.acknowledgementMode == 1 || JmsMessageConsumer.this.acknowledgementMode == 3;
                    copy = autoAckOrDupsOk ? JmsMessageConsumer.this.copy(JmsMessageConsumer.this.doAckDelivered(envelope)) : JmsMessageConsumer.this.copy(JmsMessageConsumer.this.ackFromReceive(envelope));
                    JmsMessageConsumer.this.session.clearSessionRecovered();
                    JmsMessageConsumer.this.messageListener.onMessage((Message)copy);
                    if (!autoAckOrDupsOk || JmsMessageConsumer.this.session.isSessionRecovered()) continue;
                    JmsMessageConsumer.this.doAckConsumed(envelope);
                }
                catch (Exception e) {
                    JmsMessageConsumer.this.session.getConnection().onException(e);
                }
            }
        }
    }
}

