/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.operation;

import com.mule.extensions.amqp.api.config.AmqpConsumerConfig;
import com.mule.extensions.amqp.api.config.ConsumerAckMode;
import com.mule.extensions.amqp.api.exception.AmqpConsumeException;
import com.mule.extensions.amqp.api.exception.AmqpConsumerErrorTypeProvider;
import com.mule.extensions.amqp.api.exception.AmqpCreationNotAllowedException;
import com.mule.extensions.amqp.api.exception.AmqpExtensionException;
import com.mule.extensions.amqp.api.exception.AmqpQueueNotFoundException;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.model.QueueDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel;
import com.mule.extensions.amqp.internal.consumer.AmqpMessageConsumer;
import com.mule.extensions.amqp.internal.entity.AmqpQueueDeclarer;
import com.mule.extensions.amqp.internal.exception.resolver.ConsumeExceptionResolver;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.mule.extensions.amqp.internal.message.AmqpResultFactory;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.metadata.OutputResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AmqpConsume {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConsume.class);
    private ConsumeExceptionResolver exceptionResolver = new ConsumeExceptionResolver();
    @Inject
    private AmqpChannelManager channelManager;
    private final AmqpResultFactory resultFactory = new AmqpResultFactory();

    @OutputResolver(output=AmqpOutputResolver.class)
    @Throws(value={AmqpConsumerErrorTypeProvider.class})
    @MediaType(value="*/*", strict=false)
    public Result<InputStream, AmqpAttributes> consume(@Config AmqpConfig config, @Connection AmqpTransactionalConnection connection, @Summary(value="The name of the queue from where the Message should be consumed") String queueName, @Optional @Summary(value="The content type of the message body") @Example(value="application/json") String contentType, @Optional @Summary(value="The encoding of the message body") @Example(value="UTF-8") String encoding, @Optional @Summary(value="The queue definition to use for queue declaration in case there is no queue with the queueName") @Expression(value=ExpressionSupport.NOT_SUPPORTED) QueueDefinition fallbackQueueDefinition, @Optional @Summary(value="The ACK mode to use when consuming a message") ConsumerAckMode ackMode, @Optional @Summary(value="The consumer tag to use for the consumer involved in the operation") String consumerTag, @Optional(defaultValue="10000") @Summary(value="Maximum time to wait for a message to arrive before timeout") Long maximumWait, @Optional(defaultValue="MILLISECONDS") @Example(value="MILLISECONDS") @Summary(value="Time unit to be used in the maximumWaitTime configuration") TimeUnit maximumWaitUnit, @ConfigOverride boolean createFallbackQueue, OperationTransactionalAction transactionalAction) throws AmqpExtensionException {
        Result<InputStream, AmqpAttributes> result;
        InternalAckMode resolvedAckMode = this.resolveAck(config.getConsumerConfig(), ackMode);
        MuleAmqpChannel channel = null;
        boolean onError = false;
        try {
            Result<InputStream, AmqpAttributes> result2;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Begin [consume] on : [" + queueName + "]");
            }
            if ((channel = AmqpCommons.createAmqpChannel(connection, this.channelManager, transactionalAction, config.getQualityOfService(), true)).isInTransaction()) {
                resolvedAckMode = InternalAckMode.TRANSACTED;
            }
            channel = this.declareTargetQueueIfNeeded(connection, queueName, fallbackQueueDefinition, transactionalAction, channel, createFallbackQueue);
            AmqpMessage message = this.consumeMessage(connection, queueName, maximumWait, maximumWaitUnit, resolvedAckMode, consumerTag, channel);
            String ackId = AmqpCommons.getMessageAckId(channel, message.getEnvelope().getDeliveryTag());
            String resolvedContentType = AmqpCommons.resolveOverride(AmqpCommons.resolveMessageContentType(message.getProperties().getContentType(), config.getContentType()), contentType);
            String resolvedEncoding = AmqpCommons.resolveOverride(AmqpCommons.resolveMessageEncoding(message.getProperties().getContentEncoding(), config.getEncoding()), encoding);
            AmqpCommons.evaluateMessageAck(channel, message, this.channelManager, resolvedAckMode, ackId);
            result = result2 = this.resultFactory.createResult(message, resolvedContentType, resolvedEncoding, ackId);
        }
        catch (Exception e) {
            try {
                onError = true;
                String msg = String.format("An error occurred while consuming a message from the queue [%s]: %s", queueName, e.getMessage());
                throw new AmqpConsumeException(msg, (Exception)((Object)this.exceptionResolver.resolveException(e)));
            }
            catch (Throwable throwable) {
                if (!resolvedAckMode.equals((Object)InternalAckMode.MANUAL) || onError) {
                    AmqpCommons.releaseChannelIfNeeded(channel);
                }
                throw throwable;
            }
        }
        if (!resolvedAckMode.equals((Object)InternalAckMode.MANUAL) || onError) {
            AmqpCommons.releaseChannelIfNeeded(channel);
        }
        return result;
    }

    private AmqpMessage consumeMessage(AmqpTransactionalConnection connection, String queueName, Long maximumWait, TimeUnit maximumWaitUnit, InternalAckMode resolvedAckMode, String consumerTag, MuleAmqpChannel channel) throws IOException, InterruptedException {
        AmqpMessageConsumer consumer = connection.createConsumer(channel, queueName, maximumWaitUnit.toMillis(maximumWait), consumerTag, resolvedAckMode);
        AmqpMessage message = consumer.consume();
        return message;
    }

    private MuleAmqpChannel declareTargetQueueIfNeeded(AmqpTransactionalConnection connection, String queueName, QueueDefinition fallbackQueueDefinition, OperationTransactionalAction transactionalAction, MuleAmqpChannel channel, boolean createFallbackQueue) throws IOException {
        AmqpQueueDeclarer queueDeclarer = new AmqpQueueDeclarer(channel, fallbackQueueDefinition, queueName);
        boolean existingQueue = queueDeclarer.queueExists();
        if (fallbackQueueDefinition != null && !createFallbackQueue && !existingQueue) {
            throw new AmqpCreationNotAllowedException("Creation not allowed for queue: " + queueName + ". Set createFallbackQueue to true or create the queue.");
        }
        if (!(fallbackQueueDefinition != null && createFallbackQueue || existingQueue)) {
            throw new AmqpQueueNotFoundException("Queue was not found.");
        }
        if (fallbackQueueDefinition != null && !existingQueue) {
            channel = AmqpCommons.createAmqpChannel(connection, this.channelManager, transactionalAction, true);
            queueDeclarer = new AmqpQueueDeclarer(channel, fallbackQueueDefinition, queueName);
            queueDeclarer.declareActive();
        }
        return channel;
    }

    private InternalAckMode resolveAck(AmqpConsumerConfig config, ConsumerAckMode ackMode) {
        InternalAckMode fallbackAck = AmqpCommons.toInternalAckMode(config.getAckMode());
        if (InternalAckMode.AUTO.equals((Object)fallbackAck)) {
            fallbackAck = InternalAckMode.IMMEDIATE;
        }
        return AmqpCommons.resolveOverride(fallbackAck, AmqpCommons.toInternalAckMode(ackMode));
    }
}

