/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.listener.ActiveObjectCounter;
import org.springframework.amqp.rabbit.listener.ConsumerCancelledException;
import org.springframework.amqp.rabbit.listener.FatalListenerStartupException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class BlockingQueueConsumer {
    private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
    private final BlockingQueue<Delivery> queue = new LinkedBlockingQueue<Delivery>();
    private volatile ShutdownSignalException shutdown;
    private final String[] queues;
    private final int prefetchCount;
    private final boolean transactional;
    private Channel channel;
    private InternalConsumer consumer;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final AtomicBoolean cancelReceived = new AtomicBoolean(false);
    private final AcknowledgeMode acknowledgeMode;
    private final ConnectionFactory connectionFactory;
    private final MessagePropertiesConverter messagePropertiesConverter;
    private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;
    private Set<Long> deliveryTags = new LinkedHashSet<Long>();
    private final boolean defaultRequeuRejected;

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, String ... queues) {
        this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional, prefetchCount, true, queues);
    }

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String ... queues) {
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = transactional;
        this.prefetchCount = prefetchCount;
        this.defaultRequeuRejected = defaultRequeueRejected;
        this.queues = queues;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public String getConsumerTag() {
        return this.consumer.getConsumerTag();
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw (ShutdownSignalException)Utility.fixStackTrace((Throwable)this.shutdown);
        }
    }

    private Message handle(Delivery delivery) throws InterruptedException {
        if (delivery == null && this.shutdown != null) {
            throw this.shutdown;
        }
        if (delivery == null) {
            return null;
        }
        byte[] body = delivery.getBody();
        Envelope envelope = delivery.getEnvelope();
        MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(delivery.getProperties(), envelope, "UTF-8");
        messageProperties.setMessageCount(Integer.valueOf(0));
        Message message = new Message(body, messageProperties);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Received message: " + message));
        }
        this.deliveryTags.add(messageProperties.getDeliveryTag());
        return message;
    }

    public Message nextMessage() throws InterruptedException, ShutdownSignalException {
        logger.trace((Object)("Retrieving delivery for " + this));
        return this.handle(this.queue.take());
    }

    public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Retrieving delivery for " + this));
        }
        this.checkShutdown();
        Message message = this.handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
        if (message == null && this.cancelReceived.get()) {
            throw new ConsumerCancelledException();
        }
        return message;
    }

    public void start() throws AmqpException {
        int i;
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Starting consumer " + this));
        }
        this.channel = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory, this.transactional).getChannel();
        this.consumer = new InternalConsumer(this.channel);
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);
        int passiveDeclareTries = 3;
        do {
            try {
                if (!this.acknowledgeMode.isAutoAck()) {
                    this.channel.basicQos(this.prefetchCount);
                }
                for (i = 0; i < this.queues.length; ++i) {
                    this.channel.queueDeclarePassive(this.queues[i]);
                }
                passiveDeclareTries = 0;
            }
            catch (IOException e) {
                if (passiveDeclareTries > 0 && this.channel.isOpen()) {
                    if (!logger.isWarnEnabled()) continue;
                    logger.warn((Object)("Reconnect failed; retries left=" + (passiveDeclareTries - 1)), (Throwable)e);
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    continue;
                }
                this.activeObjectCounter.release(this);
                throw new FatalListenerStartupException("Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.", e);
            }
        } while (passiveDeclareTries-- > 0);
        try {
            for (i = 0; i < this.queues.length; ++i) {
                this.channel.basicConsume(this.queues[i], this.acknowledgeMode.isAutoAck(), (Consumer)this.consumer);
                if (!logger.isDebugEnabled()) continue;
                logger.debug((Object)("Started on queue '" + this.queues[i] + "': " + this));
            }
        }
        catch (IOException e) {
            throw RabbitUtils.convertRabbitAccessException(e);
        }
    }

    public void stop() {
        block4: {
            this.cancelled.set(true);
            if (this.consumer != null && this.consumer.getChannel() != null && this.consumer.getConsumerTag() != null && !this.cancelReceived.get()) {
                try {
                    RabbitUtils.closeMessageConsumer(this.consumer.getChannel(), this.consumer.getConsumerTag(), this.transactional);
                }
                catch (Exception e) {
                    if (!logger.isDebugEnabled()) break block4;
                    logger.debug((Object)"Error closing consumer", (Throwable)e);
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Closing Rabbit Channel: " + this.channel));
        }
        RabbitUtils.setPhysicalCloseRequired(true);
        RabbitUtils.closeChannel(this.channel);
        this.deliveryTags.clear();
        this.consumer = null;
    }

    public String toString() {
        return "Consumer: tag=[" + (this.consumer != null ? this.consumer.getConsumerTag() : null) + "], channel=" + this.channel + ", acknowledgeMode=" + this.acknowledgeMode + " local queue size=" + this.queue.size();
    }

    public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
        boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
        try {
            if (this.transactional) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Initiating transaction rollback on application exception: " + ex));
                }
                RabbitUtils.rollbackIfNecessary(this.channel);
            }
            if (ackRequired) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)"Rejecting messages");
                }
                boolean shouldRequeue = this.defaultRequeuRejected;
                for (Throwable t = ex; shouldRequeue && t != null; t = t.getCause()) {
                    if (!(t instanceof AmqpRejectAndDontRequeueException)) continue;
                    shouldRequeue = false;
                }
                for (Long deliveryTag : this.deliveryTags) {
                    this.channel.basicReject(deliveryTag.longValue(), shouldRequeue);
                }
                if (this.transactional) {
                    RabbitUtils.commitIfNecessary(this.channel);
                }
            }
        }
        catch (Exception e) {
            logger.error((Object)"Application exception overridden by rollback exception", ex);
            throw e;
        }
        finally {
            this.deliveryTags.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
        if (this.deliveryTags.isEmpty()) {
            return false;
        }
        try {
            boolean ackRequired;
            boolean bl = ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
            if (ackRequired) {
                if (this.transactional && !locallyTransacted) {
                    for (Long deliveryTag : this.deliveryTags) {
                        ConnectionFactoryUtils.registerDeliveryTag(this.connectionFactory, this.channel, deliveryTag);
                    }
                } else if (!this.deliveryTags.isEmpty()) {
                    long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
                    this.channel.basicAck(deliveryTag, true);
                }
            }
            if (locallyTransacted) {
                RabbitUtils.commitIfNecessary(this.channel);
            }
        }
        finally {
            this.deliveryTags.clear();
        }
        return true;
    }

    private static class Delivery {
        private final Envelope envelope;
        private final AMQP.BasicProperties properties;
        private final byte[] body;

        public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            this.envelope = envelope;
            this.properties = properties;
            this.body = body;
        }

        public Envelope getEnvelope() {
            return this.envelope;
        }

        public AMQP.BasicProperties getProperties() {
            return this.properties;
        }

        public byte[] getBody() {
            return this.body;
        }
    }

    private class InternalConsumer
    extends DefaultConsumer {
        public InternalConsumer(Channel channel) {
            super(channel);
        }

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Received shutdown signal for consumer tag=" + consumerTag), (Throwable)sig);
            }
            BlockingQueueConsumer.this.shutdown = sig;
            BlockingQueueConsumer.this.deliveryTags.clear();
        }

        public void handleCancel(String consumerTag) throws IOException {
            if (logger.isWarnEnabled()) {
                logger.warn((Object)"Cancel received");
            }
            BlockingQueueConsumer.this.cancelReceived.set(true);
        }

        public void handleCancelOk(String consumerTag) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Received cancellation notice for " + BlockingQueueConsumer.this));
            }
            BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (BlockingQueueConsumer.this.cancelled.get() && BlockingQueueConsumer.this.acknowledgeMode.isTransactionAllowed()) {
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Storing delivery for " + BlockingQueueConsumer.this));
            }
            try {
                BlockingQueueConsumer.this.queue.put(new Delivery(envelope, properties, body));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

