/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.consumer;

import com.mule.extensions.amqp.api.exception.AmqpConsumeTimeoutException;
import com.mule.extensions.amqp.internal.client.SingleMessageQueueingConsumer;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.consumer.AmqpMessageConsumer;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultAmqpMessageConsumer
implements AmqpMessageConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpMessageConsumer.class);
    private Channel channel;
    private String queue;
    private long timeout;
    private InternalAckMode ackMode;
    private String consumerTag;

    public DefaultAmqpMessageConsumer(Channel channel, String queue, long timeout, InternalAckMode ackMode, String consumerTag) {
        this.channel = channel;
        this.queue = queue;
        this.timeout = timeout;
        this.ackMode = ackMode;
        this.consumerTag = consumerTag;
    }

    @Override
    public AmqpMessage consume(String correlationId) throws IOException, InterruptedException {
        if (this.timeout == 0L) {
            return this.receiveNoWait();
        }
        return this.receiveWithTimeout(this.timeout, correlationId);
    }

    private AmqpMessage receiveNoWait() throws IOException {
        GetResponse response = this.channel.basicGet(this.queue, this.ackMode.isImmediateAck());
        return new AmqpMessage(null, response.getEnvelope(), AmqpCommons.getPropertiesFromBasicProperties(response.getProps()), response.getProps().getHeaders(), response.getBody());
    }

    private AmqpMessage receiveWithTimeout(long timeoutMillis, String correlationId) throws IOException, InterruptedException, AmqpConsumeTimeoutException {
        Delivery deliver;
        block7: {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Waiting for a message, timeout will be in [%s] millis", timeoutMillis));
            }
            SingleMessageQueueingConsumer consumer = new SingleMessageQueueingConsumer(this.channel, correlationId);
            if (this.consumerTag == null) {
                this.consumerTag = this.channel.basicConsume(this.queue, false, (Consumer)consumer);
            } else {
                this.channel.basicConsume(this.queue, false, this.consumerTag, (Consumer)consumer);
            }
            deliver = null;
            StopWatch timeoutValidator = new StopWatch();
            timeoutValidator.start();
            deliver = timeoutMillis != -1L ? consumer.nextDelivery(timeoutMillis) : consumer.nextDelivery();
            timeoutValidator.stop();
            if (deliver == null && timeoutValidator.getTime() >= timeoutMillis) {
                String msg = String.format("Failed to retrieve a Message on queue [%s]: operation timed out.", this.queue);
                throw new AmqpConsumeTimeoutException(msg);
            }
            if (this.ackMode.isImmediateAck()) {
                this.channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
            }
            try {
                this.channel.basicCancel(this.consumerTag);
            }
            catch (Exception e) {
                if (!LOGGER.isDebugEnabled()) break block7;
                LOGGER.debug("Subscription to channel with consumerTag " + this.consumerTag + " could not be closed.", (Throwable)e);
            }
        }
        return new AmqpMessage(this.consumerTag, deliver.getEnvelope(), AmqpCommons.getPropertiesFromBasicProperties(deliver.getProperties()), deliver.getProperties().getHeaders(), deliver.getBody());
    }

    @Override
    public AmqpMessage consume() throws IOException, InterruptedException {
        return this.consume(null);
    }
}

