/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.operation;

import com.mule.extensions.amqp.api.config.AckMode;
import com.mule.extensions.amqp.api.config.DeliveryMode;
import com.mule.extensions.amqp.api.exception.AmqpExtensionException;
import com.mule.extensions.amqp.api.exception.AmqpPublishConsumeErrorTypeProvider;
import com.mule.extensions.amqp.api.exception.AmqpPublishConsumeException;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.message.AmqpMessageBuilder;
import com.mule.extensions.amqp.api.model.ExchangeDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel;
import com.mule.extensions.amqp.internal.consumer.AmqpMessageConsumer;
import com.mule.extensions.amqp.internal.exception.resolver.ConsumeExceptionResolver;
import com.mule.extensions.amqp.internal.exception.resolver.PublishExceptionResolver;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.mule.extensions.amqp.internal.message.AmqpResultFactory;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import com.mule.extensions.amqp.internal.model.message.Message;
import com.mule.extensions.amqp.internal.publish.AmqpPublishParameters;
import com.mule.extensions.amqp.internal.publisher.AmqpPublisherCommons;
import com.mule.extensions.amqp.internal.publisher.DefaultAmqpMessagePublisher;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.metadata.OutputResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AmqpPublishConsume {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublishConsume.class);
    private PublishExceptionResolver publishExceptionResolver = new PublishExceptionResolver();
    private ConsumeExceptionResolver consumeExceptionResolver = new ConsumeExceptionResolver();
    @Inject
    private AmqpChannelManager channelManager;
    @Inject
    private TransformationService transformationService;
    private final AmqpResultFactory resultFactory = new AmqpResultFactory();

    @OutputResolver(output=AmqpOutputResolver.class)
    @Throws(value={AmqpPublishConsumeErrorTypeProvider.class})
    @MediaType(value="*/*", strict=false)
    public Result<InputStream, AmqpAttributes> publishConsume(@Config AmqpConfig config, @Connection AmqpTransactionalConnection connection, @Summary(value="The name of the exchange to publish the message to") String exchangeName, @Optional @Summary(value="The content type of the message body") @Example(value="application/json") String contentType, @Optional @Summary(value="The encoding of the message body") @Example(value="UTF-8") String encoding, @Optional @Summary(value="The queue exchange to use for exchange declaration in case there is no exchange with the exchangeName") @Expression(value=ExpressionSupport.NOT_SUPPORTED) ExchangeDefinition fallbackExchangeDefinition, @Optional @Summary(value="The routing key to publish to") String routingKey, @ConfigOverride @Optional @Summary(value="The delivery mode to use when publishing to the AMQP broker") DeliveryMode deliveryMode, @Optional(defaultValue="10000") @Summary(value="Maximum time to wait for a message to arrive before timeout") Long maximumWait, @Optional(defaultValue="MILLISECONDS") @Example(value="MILLISECONDS") @Summary(value="Time unit to be used in the maximumWaitTime configuration") TimeUnit maximumWaitUnit, @Summary(value="A builder for the message that will be published") @ParameterGroup(name="Message", showInDsl=true) AmqpMessageBuilder messageBuilder, @ParameterGroup(name="Publish Configuration") AmqpPublishParameters overrides, OperationTransactionalAction transactionalAction, @ConfigOverride OutboundCorrelationStrategy sendCorrelationId, @ConfigOverride boolean createFallbackExchange, CorrelationInfo correlationInfo) throws AmqpExtensionException {
        String replyQueueName = messageBuilder.getReplyToQueue();
        Message message = null;
        MuleAmqpChannel channel = null;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Begin [publish-consume] to : [" + exchangeName + "]");
        }
        AmqpPublisherCommons.checkBrokerNotBlocked(connection);
        try {
            channel = AmqpCommons.createAmqpChannel(connection, this.channelManager, transactionalAction, config.getQualityOfService());
            channel = AmqpCommons.declareFallbackExchangeIfNeeded(connection, exchangeName, fallbackExchangeDefinition, transactionalAction, channel, this.channelManager, createFallbackExchange);
            DefaultAmqpMessagePublisher messagePublisher = DefaultAmqpMessagePublisher.Builder.newInstance().withChannel(channel).withExchangeName(exchangeName).withRequestBrokerConfirms(AmqpPublisherCommons.resolveRequestBrokerConfirms(config, overrides)).withReturnedMessageExchange(AmqpPublisherCommons.resolvedReturnedMessageExchange(config, overrides)).build();
            if (replyQueueName == null) {
                replyQueueName = channel.queueDeclare().getQueue();
                messageBuilder.setReplyTo(replyQueueName);
            }
            messageBuilder.overridePriorityIfNeeded(config.getPublisherConfig().getPriority());
            message = messageBuilder.build(AmqpCommons.resolveOverride(config.getPublisherConfig().isImmediate(), overrides.isImmediate()), AmqpCommons.resolveOverride(config.getPublisherConfig().isMandatory(), overrides.isMandatory()), routingKey, exchangeName, config.getContentType(), config.getEncoding(), deliveryMode, sendCorrelationId, correlationInfo, this.transformationService);
            messagePublisher.publish(message);
        }
        catch (IOException e) {
            throw this.publishExceptionResolver.resolveException(e);
        }
        catch (AmqpExtensionException e) {
            throw e;
        }
        catch (Exception e) {
            String msg = String.format("An error occurred while publishing a message from the exchange [%s]: %s", exchangeName, e.getMessage());
            throw new AmqpPublishConsumeException(msg, e);
        }
        try {
            AmqpMessageConsumer consumer = connection.createConsumer(connection.createChannel(), replyQueueName, maximumWaitUnit.toMillis(maximumWait), AckMode.IMMEDIATE.getInternalAckMode());
            AmqpMessage receivedMessage = consumer.consume(message.getProperties().getCorrelationId());
            String ackId = AmqpCommons.getMessageAckId(channel, receivedMessage.getEnvelope().getDeliveryTag());
            String resolvedContentType = AmqpCommons.resolveOverride(AmqpCommons.resolveMessageContentType(receivedMessage.getProperties().getContentType(), config.getContentType()), contentType);
            String resolvedEncoding = AmqpCommons.resolveOverride(AmqpCommons.resolveMessageEncoding(receivedMessage.getProperties().getContentEncoding(), config.getEncoding()), encoding);
            AmqpCommons.evaluateMessageAck(channel, receivedMessage, this.channelManager, AckMode.IMMEDIATE.getInternalAckMode(), ackId);
            Result<InputStream, AmqpAttributes> result = this.resultFactory.createResult(receivedMessage, resolvedContentType, resolvedEncoding, ackId);
            return result;
        }
        catch (IOException e) {
            AmqpExtensionException exception = this.consumeExceptionResolver.resolveException(e);
            throw exception;
        }
        catch (AmqpExtensionException e) {
            throw e;
        }
        catch (Exception e) {
            String msg = String.format("An error occurred while consuming a message from the queue [%s]: %s", replyQueueName, e.getMessage());
            throw new AmqpPublishConsumeException(msg, (Exception)((Object)this.consumeExceptionResolver.resolveException(e)));
        }
    }
}

