/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.operation;

import javax.inject.Inject;
import javax.jms.CompletionListener;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.mule.jms.commons.api.RequestReplyPattern;
import org.mule.jms.commons.api.RequestReplyPatternWrapper;
import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.api.destination.QueueConsumer;
import org.mule.jms.commons.api.destination.TopicConsumer;
import org.mule.jms.commons.api.exception.JmsConsumeException;
import org.mule.jms.commons.api.exception.JmsExtensionException;
import org.mule.jms.commons.api.exception.JmsPublishConsumeErrorTypeProvider;
import org.mule.jms.commons.api.exception.JmsPublishException;
import org.mule.jms.commons.api.exception.JmsSecurityException;
import org.mule.jms.commons.api.message.JmsAttributes;
import org.mule.jms.commons.api.message.JmsMessageBuilder;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.consume.JmsConsumeParameters;
import org.mule.jms.commons.internal.consume.JmsMessageConsumer;
import org.mule.jms.commons.internal.message.JmsResultFactory;
import org.mule.jms.commons.internal.metadata.JmsOutputResolver;
import org.mule.jms.commons.internal.operation.DefaultRequestReplyPatternWrapper;
import org.mule.jms.commons.internal.operation.publishconsume.ResourceCloserCompletionListener;
import org.mule.jms.commons.internal.publish.JmsMessageProducer;
import org.mule.jms.commons.internal.publish.JmsPublishParameters;
import org.mule.jms.commons.internal.support.JmsSupport;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.func.CheckedFunction;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.metadata.OutputResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.process.CompletionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsPublishConsume
implements Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsPublishConsume.class);
    private final Scheduler scheduler;
    private JmsResultFactory resultFactory = JmsResultFactory.getInstance();
    @Inject
    private JmsSessionManager sessionManager;
    private RequestReplyPatternWrapper requestReplyPatternWrapper = new DefaultRequestReplyPatternWrapper();

    public JmsPublishConsume(JmsSessionManager sessionManager, SchedulerService schedulerService) {
        this.sessionManager = sessionManager;
        this.scheduler = schedulerService.ioScheduler();
    }

    public JmsPublishConsume(JmsSessionManager sessionManager, SchedulerService schedulerService, RequestReplyPatternWrapper customRequestReplyPattern) {
        this.sessionManager = sessionManager;
        this.scheduler = schedulerService.ioScheduler();
        this.requestReplyPatternWrapper = customRequestReplyPattern;
    }

    @OutputResolver(output=JmsOutputResolver.class)
    @Throws(value={JmsPublishConsumeErrorTypeProvider.class})
    public void publishConsume(@Config JmsConfig config, @Connection JmsConnection connection, @Placement(order=0) @Summary(value="The name of the Queue destination where the Message should be sent") String destination, @Placement(order=1) @Summary(value="A builder for the message that will be published") @ParameterGroup(name="Message", showInDsl=true) JmsMessageBuilder messageBuilder, @Placement(order=2) @ParameterGroup(name="Publish Configuration", showInDsl=true) JmsPublishParameters publishParameters, @Placement(order=3) @ParameterGroup(name="Consume Configuration", showInDsl=true) JmsConsumeParameters consumeParameters, @ConfigOverride OutboundCorrelationStrategy sendCorrelationId, CorrelationInfo correlationInfo, RequestReplyPattern requestReplyPattern, CompletionCallback<Object, JmsAttributes> completionCallback) throws JmsExtensionException {
        JmsSession producerSession = null;
        InternalAckMode resolvedAckMode = JmsCommons.resolveOverride(JmsCommons.toInternalAckMode(config.getConsumerConfig().getAckMode()), JmsCommons.toInternalAckMode(consumeParameters.getAckMode()));
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Begin [publish] of [publishConsume] to the QUEUE: [" + destination + "]");
            }
            JmsSupport jmsSupport = connection.getJmsSupport();
            producerSession = connection.createSession(resolvedAckMode, false);
            Message message = messageBuilder.build(jmsSupport, sendCorrelationId, correlationInfo, producerSession.get(), config);
            Destination replyDestination = this.setReplyDestination(messageBuilder, producerSession, jmsSupport, message);
            ConsumerType replyConsumerType = this.getConsumerType(replyDestination);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message built, sending message to the QUEUE:  [" + destination + "]");
            }
            Destination publishDestination = jmsSupport.createDestination(producerSession.get(), destination, false, config);
            JmsMessageProducer producer = connection.createProducer(producerSession, publishDestination, false);
            AsyncPublishCompletionListener completionListener = new AsyncPublishCompletionListener(publishDestination, replyConsumerType, replyDestination, consumeParameters, resolvedAckMode.equals((Object)InternalAckMode.AUTO) ? InternalAckMode.IMMEDIATE : resolvedAckMode, completionCallback, requestReplyPattern, connection, config, this.requestReplyPatternWrapper);
            producer.publish(message, publishParameters, new ResourceCloserCompletionListener(completionListener, producerSession, producer, config.getResourceReleaserScheduler()));
        }
        catch (JMSSecurityException e) {
            String msg = String.format("A security error occurred while sending a message to the QUEUE: [%s]: %s", destination, e.getMessage());
            completionCallback.error((Throwable)((Object)new JmsSecurityException(msg, (Exception)((Object)e))));
        }
        catch (IllegalStateException e) {
            boolean isConnectionError = producerSession == null;
            String msg = String.format((isConnectionError ? "A connection error" : "An error") + " occurred while sending a message to the QUEUE: [%s]: %s", destination, e.getMessage());
            completionCallback.error((Throwable)(isConnectionError ? new ConnectionException((Throwable)e) : new JmsPublishException(msg, e)));
        }
        catch (Exception e) {
            String msg = String.format("An error occurred while sending a message to the QUEUE: [%s]: %s", destination, e.getMessage());
            completionCallback.error((Throwable)((Object)new JmsPublishException(msg, (Throwable)e)));
        }
    }

    private void deleteTemporaryQueue(Destination replyDestination) throws JMSException {
        if (replyDestination instanceof TemporaryQueue) {
            ((TemporaryQueue)replyDestination).delete();
        } else if (replyDestination instanceof TemporaryTopic) {
            ((TemporaryTopic)replyDestination).delete();
        }
    }

    private ConsumerType getConsumerType(Destination replyDestination) {
        ConsumerType replyConsumerType = replyDestination instanceof Queue ? new QueueConsumer() : new TopicConsumer();
        return replyConsumerType;
    }

    private Destination setReplyDestination(JmsMessageBuilder messageBuilder, JmsSession session, JmsSupport jmsSupport, Message message) throws JMSException {
        if (message.getJMSReplyTo() != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Using provided destination: [%s]", messageBuilder.getReplyTo().getDestination()));
            }
            return message.getJMSReplyTo();
        }
        TemporaryQueue temporaryDestination = jmsSupport.createTemporaryDestination(session.get());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Using temporary destination: [%s]", temporaryDestination.getQueueName()));
        }
        message.setJMSReplyTo((Destination)temporaryDestination);
        return temporaryDestination;
    }

    private String getReplyDestinationName(Destination destination, ConsumerType replyConsumerType) {
        try {
            return replyConsumerType.topic() ? ((Topic)destination).getTopicName() : ((Queue)destination).getQueueName();
        }
        catch (JMSException e) {
            return destination.toString();
        }
    }

    public void dispose() {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    private class AsyncPublishCompletionListener
    implements CompletionListener {
        private final Destination publishDestination;
        private final ConsumerType replyConsumerType;
        private final Destination replyDestination;
        private final CheckedFunction<String, JmsMessageConsumer> consumerFactory;
        private JmsMessageConsumer consumer;
        private final JmsConsumeParameters consumeParameters;
        private final InternalAckMode resolvedAckMode;
        private final JmsSession consumerSession;
        private final CompletionCallback<Object, JmsAttributes> completionCallback;
        private RequestReplyPattern requestReplyPattern;
        private final JmsConnection connection;
        private final JmsConfig config;
        private String replyDestinationName;
        private RequestReplyPatternWrapper requestReplyWrapper = new DefaultRequestReplyPatternWrapper();

        AsyncPublishCompletionListener(Destination publishDestination, ConsumerType replyConsumerType, Destination replyDestination, JmsConsumeParameters consumeParameters, InternalAckMode resolvedAckMode, CompletionCallback<Object, JmsAttributes> completionCallback, RequestReplyPattern requestReplyPattern, JmsConnection connection, JmsConfig config) throws JMSException {
            this(publishDestination, replyConsumerType, replyDestination, consumeParameters, resolvedAckMode, completionCallback, requestReplyPattern, connection, config, null);
        }

        AsyncPublishCompletionListener(Destination publishDestination, ConsumerType replyConsumerType, Destination replyDestination, JmsConsumeParameters consumeParameters, InternalAckMode resolvedAckMode, CompletionCallback<Object, JmsAttributes> completionCallback, RequestReplyPattern requestReplyPattern, JmsConnection connection, JmsConfig config, RequestReplyPatternWrapper requestReplyWrapper) throws JMSException {
            this.publishDestination = publishDestination;
            this.replyConsumerType = replyConsumerType;
            this.replyDestination = replyDestination;
            this.consumeParameters = consumeParameters;
            this.resolvedAckMode = resolvedAckMode;
            this.completionCallback = completionCallback;
            this.requestReplyPattern = requestReplyPattern;
            this.connection = connection;
            this.config = config;
            this.requestReplyWrapper = requestReplyWrapper;
            this.replyDestinationName = JmsPublishConsume.this.getReplyDestinationName(replyDestination, replyConsumerType);
            this.consumerSession = connection.createSession(resolvedAckMode, false);
            this.consumerFactory = selector -> connection.createConsumer(this.consumerSession, replyDestination, (String)selector, replyConsumerType, true);
        }

        public void onCompletion(Message message) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Finished [publish] of [publishConsume] to the %s: [%s]. Waiting for reply.", JmsCommons.getDestinationType(this.publishDestination), this.publishDestination));
            }
            try {
                this.consumer = (JmsMessageConsumer)this.consumerFactory.apply((Object)this.requestReplyWrapper.createSelector(this.requestReplyPattern, message));
                this.consumer.consume(this.consumeParameters.getMaximumWaitUnit().toMillis(this.consumeParameters.getMaximumWait()), JmsPublishConsume.this.scheduler, new AsyncConsumerCompletionListener());
            }
            catch (Exception e) {
                String msg = String.format("An error occurred while listening for the reply from the %s: [%s]: %s", JmsCommons.getDestinationType(this.replyConsumerType), this.replyDestinationName, e.getMessage());
                this.completionCallback.error((Throwable)((Object)new JmsConsumeException(msg, e)));
            }
        }

        public void onException(Message message, Exception exception) {
            if (exception instanceof JMSSecurityException) {
                String msg = String.format("A security error occurred while publishing a message to the %s: [%s]: %s", JmsCommons.getDestinationType(this.publishDestination), this.publishDestination, exception.getMessage());
                this.completionCallback.error((Throwable)((Object)new JmsSecurityException(msg, exception)));
            } else if (exception instanceof JmsExtensionException) {
                this.completionCallback.error((Throwable)exception);
            } else {
                String msg = String.format("An error occurred while publishing the message for the reply from the %s: [%s]: %s", JmsCommons.getDestinationType(this.publishDestination), this.publishDestination, exception.getMessage());
                this.completionCallback.error((Throwable)((Object)new JmsPublishException(msg, (Throwable)exception)));
            }
        }

        private void releaseConsumerResources() {
            JmsPublishConsume.this.scheduler.submit(() -> {
                JmsCommons.closeQuietly(this.consumer);
                JmsCommons.closeQuietly(this.consumerSession);
                try {
                    JmsPublishConsume.this.deleteTemporaryQueue(this.replyDestination);
                }
                catch (JMSException e) {
                    LOGGER.debug("Unexpected error when trying to delete temporary queue", (Throwable)e);
                }
            });
        }

        private class AsyncConsumerCompletionListener
        implements CompletionListener {
            private AsyncConsumerCompletionListener() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onCompletion(Message received) {
                try {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Received message from %s [%s]. Creating response result", (Object)JmsCommons.getDestinationType(AsyncPublishCompletionListener.this.replyConsumerType), (Object)AsyncPublishCompletionListener.this.replyDestinationName);
                    }
                    if (received == null) {
                        AsyncPublishCompletionListener.this.completionCallback.success(JmsPublishConsume.this.resultFactory.createEmptyResult());
                    } else {
                        JmsCommons.evaluateMessageAck(AsyncPublishCompletionListener.this.resolvedAckMode, AsyncPublishCompletionListener.this.consumerSession, received, JmsPublishConsume.this.sessionManager, null);
                        AsyncPublishCompletionListener.this.completionCallback.success(JmsPublishConsume.this.resultFactory.createResult(received, AsyncPublishCompletionListener.this.connection.getJmsSupport().getSpecification(), JmsCommons.resolveOverride(JmsCommons.resolveMessageContentType(received, AsyncPublishCompletionListener.this.config.getContentType()), AsyncPublishCompletionListener.this.consumeParameters.getInboundContentType()), JmsCommons.resolveOverride(JmsCommons.resolveMessageEncoding(received, AsyncPublishCompletionListener.this.config.getEncoding()), AsyncPublishCompletionListener.this.consumeParameters.getInboundEncoding()), AsyncPublishCompletionListener.this.consumerSession.getAckId()));
                    }
                }
                catch (JMSSecurityException e) {
                    String msg = String.format("A security error occurred while listening for the reply from the %s: [%s]: %s", JmsCommons.getDestinationType(AsyncPublishCompletionListener.this.replyConsumerType), AsyncPublishCompletionListener.this.replyDestinationName, e.getMessage());
                    AsyncPublishCompletionListener.this.completionCallback.error((Throwable)((Object)new JmsSecurityException(msg, (Exception)((Object)e))));
                }
                catch (Exception e) {
                    String msg = String.format("An error occurred while listening for the reply from the %s: [%s]: %s", JmsCommons.getDestinationType(AsyncPublishCompletionListener.this.replyConsumerType), AsyncPublishCompletionListener.this.replyDestinationName, e.getMessage());
                    AsyncPublishCompletionListener.this.completionCallback.error((Throwable)((Object)new JmsConsumeException(msg, e)));
                }
                finally {
                    AsyncPublishCompletionListener.this.releaseConsumerResources();
                }
            }

            public void onException(Message message, Exception exception) {
                AsyncPublishCompletionListener.this.releaseConsumerResources();
                if (exception instanceof JMSSecurityException) {
                    String msg = String.format("A security error occurred while consuming a message to the %s: [%s]: %s", JmsCommons.getDestinationType(AsyncPublishCompletionListener.this.publishDestination), AsyncPublishCompletionListener.this.publishDestination, exception.getMessage());
                    AsyncPublishCompletionListener.this.completionCallback.error((Throwable)((Object)new JmsSecurityException(msg, exception)));
                } else if (exception instanceof JmsExtensionException) {
                    AsyncPublishCompletionListener.this.completionCallback.error((Throwable)exception);
                } else {
                    String msg = String.format("An error occurred while listening for the message reply from the %s: [%s]: %s", JmsCommons.getDestinationType(AsyncPublishCompletionListener.this.publishDestination), AsyncPublishCompletionListener.this.publishDestination, exception.getMessage());
                    AsyncPublishCompletionListener.this.completionCallback.error((Throwable)((Object)new JmsConsumeException(msg, exception)));
                }
            }
        }
    }
}

