/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.listener;

import com.mule.extensions.amqp.api.config.AckMode;
import com.mule.extensions.amqp.api.config.AmqpConsumerConfig;
import com.mule.extensions.amqp.api.config.ListenerQualityOfService;
import com.mule.extensions.amqp.api.exception.AmqpQueueNotFoundException;
import com.mule.extensions.amqp.api.listener.RecoverStrategy;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.model.QueueDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.config.InternalAckMode;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.entity.AmqpQueueDeclarer;
import com.mule.extensions.amqp.internal.listener.MultiChannelReceiverManager;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import com.mule.extensions.amqp.internal.model.message.Message;
import com.mule.extensions.amqp.internal.publisher.DefaultAmqpMessagePublisher;
import com.mule.extensions.amqp.internal.source.AmqpResponseMessageBuilder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.annotation.source.OnBackPressure;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.source.BackPressureContext;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceCompletionCallback;
import org.mule.runtime.extension.api.runtime.source.SourceResult;
import org.mule.runtime.extension.api.tx.SourceTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="listener")
@EmitsResponse
@MetadataScope(outputResolver=AmqpOutputResolver.class)
@BackPressure(defaultMode=BackPressureMode.WAIT, supportedModes={BackPressureMode.WAIT})
@MediaType(value="*/*", strict=false)
public class AmqpListener
extends Source<InputStream, AmqpAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpListener.class);
    static final String AMQP_CHANNEL = "AMQP_CHANNEL";
    static final String AMQP_DELIVERY_TAG = "AMQP_DELIVERY_TAG";
    static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    @Inject
    private AmqpChannelManager channelManager;
    @Inject
    private TransformationService transformationService;
    @Connection
    protected ConnectionProvider<AmqpTransactionalConnection> connectionProvider;
    private AmqpTransactionalConnection connection;
    private SourceTransactionalAction transactionalAction;
    @Config
    protected AmqpConfig config;
    private MultiChannelReceiverManager multiChannelReceiverManager = null;
    @Parameter
    private String queueName;
    @Inject
    private MuleContext muleContext;
    @Parameter
    @Optional
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Summary(value="Declaration of a queue definition to use in case no queue with the queueName provided exists in the broker.")
    private QueueDefinition fallbackQueueDefinition;
    @Parameter
    @ConfigOverride
    @Optional
    @Summary(value="The acknowledgment mode to use when consuming from the AMQP broker.")
    private AckMode ackMode;
    @Parameter
    @ConfigOverride
    @Summary(value="The number of channels that are spawned per inbound endpoint to receive AMQP messages.")
    private int numberOfConsumers;
    @Parameter
    @Optional
    @Summary(value="A client-generated consumer tag to establish context.")
    private String consumerTag;
    @Parameter
    @Optional(defaultValue="REQUEUE")
    @Summary(value="Recovery strategy in case of error")
    private RecoverStrategy recoverStrategy;
    @Parameter
    @Optional
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Summary(value="The default encoding of the message body to be used if the message doesn't communicate it")
    private String inboundEncoding;
    @Parameter
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Optional
    @Summary(value="The content type encoding of the message body to be used if the message doesn't communicate it")
    private String inboundContentType;
    @Parameter
    @ConfigOverride
    @Summary(value="Whether non existing queues will be created according to the fallback definition or an error will be raised if they do not exist.")
    private boolean createFallbackQueue;
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @ParameterGroup(name="Listener Quality of Service Config", showInDsl=true)
    @ConfigOverride
    @Summary(value="Listener Quality of Service Config")
    private ListenerQualityOfService qualityOfService;
    private InternalAckMode resolvedAckMode;

    public void onStart(SourceCallback<InputStream, AmqpAttributes> sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting AMQP Message Listener");
        }
        AmqpConsumerConfig consumerConfig = this.config.getConsumerConfig();
        this.resolvedAckMode = this.transactionalAction.equals((Object)SourceTransactionalAction.ALWAYS_BEGIN) ? InternalAckMode.TRANSACTED : AmqpCommons.resolveOverride(AmqpCommons.toInternalAckMode(consumerConfig.getAckMode()), AmqpCommons.toInternalAckMode(this.ackMode));
        this.configureConnection(sourceCallback);
        this.validateNumberOfConsumers(this.numberOfConsumers);
        this.declareTargetQueueIfNeeded(this.queueName, this.fallbackQueueDefinition, this.createFallbackQueue);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Starting AMQP Listener with [%s] consumers on queue [%s] with AckMode [%s]", this.numberOfConsumers, this.queueName, this.resolvedAckMode.name()));
        }
        MultiChannelReceiverManager.Builder builder = new MultiChannelReceiverManager.Builder().withAckMode(this.resolvedAckMode).withConnection(this.connection).withConsumerTag(this.resolveConsumerTag()).withExclusiveConsumers(this.config.getConsumerConfig().isExclusiveConsumers()).withNoLocal(this.config.getConsumerConfig().isNoLocal()).withNumberOfConsumers(this.numberOfConsumers).withQueueName(this.queueName).withSourceCallback(sourceCallback).withInboundContentType(this.inboundContentType).withConfigContentType(this.config.getContentType()).withChannelManager(this.channelManager).withConfigEncoding(this.config.getEncoding()).withInboundEncoding(this.inboundEncoding).withTransactionalAction(this.transactionalAction).withQualityOfService(this.qualityOfService).withMuleContext(this.muleContext);
        this.multiChannelReceiverManager = builder.build();
        this.multiChannelReceiverManager.start();
    }

    protected void configureConnection(final SourceCallback<InputStream, AmqpAttributes> sourceCallback) throws ConnectionException {
        this.connection = (AmqpTransactionalConnection)this.connectionProvider.connect();
        this.connection.addShutdownListener(new ShutdownListener(){

            public void shutdownCompleted(ShutdownSignalException sse) {
                if (sse.isInitiatedByApplication()) {
                    return;
                }
                sourceCallback.onConnectionException(new ConnectionException((Throwable)sse, (Object)AmqpListener.this.connection));
            }
        });
    }

    private void validateNumberOfConsumers(int numberOfConsumers) {
        if (numberOfConsumers < 1) {
            throw new IllegalArgumentException("Invalid number of consumers: [" + numberOfConsumers + "]. The number should be 1 or greater.");
        }
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name="Response", showInDsl=true) AmqpResponseMessageBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext callbackContext) {
        if (callbackContext.getVariable(AMQP_DELIVERY_TAG).isPresent() && callbackContext.getVariable(AMQP_CHANNEL).isPresent()) {
            Long deliveryTag = (Long)callbackContext.getVariable(AMQP_DELIVERY_TAG).get();
            Channel channel = (Channel)callbackContext.getVariable(AMQP_CHANNEL).get();
            if (this.ackMode == AckMode.AUTO) {
                try {
                    channel.basicAck(deliveryTag.longValue(), false);
                }
                catch (IOException e) {
                    LOGGER.error("Error while trying to acknowledge a message.");
                    return;
                }
            }
            callbackContext.getVariable(REPLY_TO_DESTINATION_VAR).ifPresent(replyTo -> this.doReply(messageBuilder, callbackContext, (String)replyTo, correlationInfo, channel));
        } else {
            LOGGER.debug("A successful was not acknowledged though an AUTO acknowledge mode was set. It was possibly previously ack'ed");
        }
    }

    @OnBackPressure
    public void onBackPressure(BackPressureContext ctx, SourceCompletionCallback completionCallback) {
        LOGGER.warn("Back pressure applied on the message source. The AMQP consumers will be temporarily suspended");
        this.multiChannelReceiverManager.suspendConsumers();
    }

    @OnTerminate
    public void onTerminate(SourceResult sourceResult) {
    }

    private void doReply(AmqpResponseMessageBuilder messageBuilder, SourceCallbackContext callbackContext, String replyTo, CorrelationInfo correlationInfo, Channel channel) {
        DefaultAmqpMessagePublisher messagePublisher = DefaultAmqpMessagePublisher.Builder.newInstance().withChannel(channel).withExchangeName("").withRequestBrokerConfirms(AmqpCommons.resolveOverride(this.config.getPublisherConfig().isRequestBrokerConfirms(), messageBuilder.isRequestBrokerConfirms())).withReturnedMessageExchange(AmqpCommons.resolveOverride(this.config.getPublisherConfig().getReturnedMessageExchange(), messageBuilder.getReturnedMessageExchange())).build();
        messageBuilder.overridePriorityIfNeeded(this.config.getPublisherConfig().getPriority());
        Message message = messageBuilder.build((Boolean)AmqpCommons.resolveOverride(this.config.getPublisherConfig().isImmediate(), null), (Boolean)AmqpCommons.resolveOverride(this.config.getPublisherConfig().isMandatory(), null), replyTo, "", this.config.getContentType(), this.config.getEncoding(), messageBuilder.getDeliveryMode(), OutboundCorrelationStrategy.AUTO, correlationInfo, this.transformationService);
        messagePublisher.publish(message);
    }

    @OnError
    public void onError(Error error, SourceCallbackContext callbackContext) {
        LOGGER.debug("An error ocurred when processing messages.");
        if (callbackContext.getVariable(AMQP_CHANNEL).isPresent() && callbackContext.getVariable(AMQP_DELIVERY_TAG).isPresent()) {
            Long deliveryTag = (Long)callbackContext.getVariable(AMQP_DELIVERY_TAG).get();
            Channel channel = (Channel)callbackContext.getVariable(AMQP_CHANNEL).get();
            if (this.mustApplyRecoverStrategy(error)) {
                this.applyRecoverStrategy(callbackContext, channel, deliveryTag);
            } else {
                this.rejectOnRedeliveryExhaustion(deliveryTag, channel);
            }
        }
    }

    private void rejectOnRedeliveryExhaustion(Long deliveryTag, Channel channel) {
        try {
            if (channel.isOpen()) {
                channel.basicReject(deliveryTag.longValue(), false);
            }
        }
        catch (IOException e) {
            LOGGER.warn("Error while rejecting a message after exhaustion of redeliveries", (Throwable)e);
        }
    }

    private boolean mustApplyRecoverStrategy(Error error) {
        if (error.getErrorType() != null && error.getErrorType().getIdentifier() == null) {
            LOGGER.warn("An error type without identifier was handled: {}", (Object)error.getErrorType());
            return true;
        }
        return error.getErrorType() == null || !error.getErrorType().getIdentifier().equals(Errors.ComponentIdentifiers.Handleable.REDELIVERY_EXHAUSTED.getName());
    }

    protected void applyRecoverStrategy(SourceCallbackContext callbackContext, Channel channel, Long deliveryTag) {
        switch (this.recoverStrategy) {
            case NONE: {
                break;
            }
            case NO_REQUEUE: {
                this.recoverMessageFromChannel(callbackContext, false, channel, deliveryTag);
                break;
            }
            case REQUEUE: {
                this.recoverMessageFromChannel(callbackContext, true, channel, deliveryTag);
            }
        }
    }

    private void recoverMessageFromChannel(SourceCallbackContext callbackContext, boolean requeue, Channel channel, Long deliveryTag) {
        try {
            if (channel.isOpen()) {
                channel.basicReject(deliveryTag.longValue(), requeue);
                LOGGER.debug("Applied " + (Object)((Object)this.recoverStrategy) + " recover strategy on channel: " + channel);
            } else {
                LOGGER.warn("Channel closed. Cannot reject message with delivery tag {} on channel {}", (Object)deliveryTag, (Object)channel);
            }
        }
        catch (IOException e) {
            LOGGER.error("Error while trying to recover a channel.");
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Stopping AMQP Listener on queue [%s]", this.queueName));
        }
        if (this.multiChannelReceiverManager != null) {
            this.multiChannelReceiverManager.stop();
        }
        if (this.connection != null) {
            this.connectionProvider.disconnect((Object)this.connection);
        }
    }

    private String resolveConsumerTag() {
        return this.consumerTag == null ? "" : this.consumerTag;
    }

    private void declareTargetQueueIfNeeded(String queueName, QueueDefinition fallbackQueueDefinition, boolean createFallbackQueue) throws ConnectionException {
        try {
            Channel channel = this.connection.createChannel();
            AmqpQueueDeclarer queueDeclarer = new AmqpQueueDeclarer(channel, fallbackQueueDefinition, queueName);
            boolean existingQueue = queueDeclarer.queueExists();
            if (!(fallbackQueueDefinition != null && createFallbackQueue || existingQueue)) {
                throw new ConnectionException((Throwable)((Object)new AmqpQueueNotFoundException("Queue defined in listener was not found and no fallback definition declared.")), (Object)this.connection);
            }
            if (fallbackQueueDefinition != null && !existingQueue) {
                channel = this.connection.createChannel();
                queueDeclarer = new AmqpQueueDeclarer(channel, fallbackQueueDefinition, queueName);
                queueDeclarer.declareActive();
            }
            channel.close();
        }
        catch (IOException e) {
            String msg = "A Connection error occurred while declaring the queue with name " + queueName + ".";
            LOGGER.error(msg, (Throwable)e);
            throw new ConnectionException(msg, (Throwable)e, null, (Object)this.connection);
        }
        catch (TimeoutException e) {
            String msg = "A Timeout error occurred while declaring the queue with name " + queueName + ".";
            LOGGER.error(msg, (Throwable)e);
            throw new ConnectionException(msg, (Throwable)e, null, (Object)this.connection);
        }
    }
}

