/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.amqp.outbound;

import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.handler.ReplyRequiredException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class AsyncAmqpOutboundGateway
extends AbstractAmqpOutboundEndpoint {
    private final AsyncRabbitTemplate template;
    private final MessageConverter messageConverter;

    public AsyncAmqpOutboundGateway(AsyncRabbitTemplate template) {
        Assert.notNull((Object)template, (String)"AsyncRabbitTemplate cannot be null");
        this.template = template;
        this.messageConverter = template.getMessageConverter();
        Assert.notNull((Object)this.messageConverter, (String)"the template's message converter cannot be null");
        this.setConnectionFactory(this.template.getConnectionFactory());
        this.setAsync(true);
    }

    public String getComponentType() {
        return "amqp:outbound-async-gateway";
    }

    protected Object handleRequestMessage(Message<?> requestMessage) {
        AsyncRabbitTemplate.RabbitMessageFuture future = this.template.sendAndReceive(this.generateExchangeName(requestMessage), this.generateRoutingKey(requestMessage), MappingUtils.mapMessage(requestMessage, this.messageConverter, this.getHeaderMapper(), this.getDefaultDeliveryMode()));
        future.addCallback((ListenableFutureCallback)new FutureCallback(requestMessage));
        CorrelationData correlationData = this.generateCorrelationData(requestMessage);
        if (correlationData != null && future.getConfirm() != null) {
            future.getConfirm().addCallback((ListenableFutureCallback)new CorrelationCallback(correlationData, future));
        }
        return null;
    }

    private final class CorrelationCallback
    implements ListenableFutureCallback<Boolean> {
        private final CorrelationData correlationData;
        private final AsyncRabbitTemplate.RabbitMessageFuture replyFuture;

        CorrelationCallback(CorrelationData correlationData, AsyncRabbitTemplate.RabbitMessageFuture replyFuture) {
            this.correlationData = correlationData;
            this.replyFuture = replyFuture;
        }

        public void onSuccess(Boolean result) {
            try {
                AsyncAmqpOutboundGateway.this.handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
            }
            catch (Exception e) {
                AsyncAmqpOutboundGateway.this.logger.error((Object)"Failed to send publisher confirm");
            }
        }

        public void onFailure(Throwable ex) {
        }
    }

    private final class FutureCallback
    implements ListenableFutureCallback<org.springframework.amqp.core.Message> {
        private final Message<?> requestMessage;

        FutureCallback(Message<?> requestMessage) {
            this.requestMessage = requestMessage;
        }

        public void onSuccess(org.springframework.amqp.core.Message result) {
            Message<?> replyMessage = null;
            try {
                replyMessage = AsyncAmqpOutboundGateway.this.buildReplyMessage(AsyncAmqpOutboundGateway.this.messageConverter, result);
                AsyncAmqpOutboundGateway.this.sendOutputs(replyMessage, this.requestMessage);
            }
            catch (Exception e) {
                Exception exceptionToLogAndSend = e;
                if (!(e instanceof MessagingException)) {
                    exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, (Throwable)e);
                    if (replyMessage != null) {
                        exceptionToLogAndSend = new MessagingException(replyMessage, (Throwable)exceptionToLogAndSend);
                    }
                }
                AsyncAmqpOutboundGateway.this.logger.error((Object)("Failed to send async reply: " + result.toString()), (Throwable)exceptionToLogAndSend);
                AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
            }
        }

        public void onFailure(Throwable ex) {
            Throwable exceptionToSend = ex;
            if (ex instanceof AmqpReplyTimeoutException) {
                if (AsyncAmqpOutboundGateway.this.getRequiresReply()) {
                    exceptionToSend = new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply", ex);
                } else {
                    if (AsyncAmqpOutboundGateway.this.logger.isDebugEnabled()) {
                        AsyncAmqpOutboundGateway.this.logger.debug((Object)("Reply not required and async timeout for " + this.requestMessage));
                    }
                    return;
                }
            }
            if (ex instanceof AmqpMessageReturnedException) {
                if (AsyncAmqpOutboundGateway.this.getReturnChannel() == null) {
                    AsyncAmqpOutboundGateway.this.logger.error((Object)("Returned message received and no return channel " + ((AmqpMessageReturnedException)ex).getReturnedMessage()));
                } else {
                    AmqpMessageReturnedException amre = (AmqpMessageReturnedException)ex;
                    Message<?> returnedMessage = AsyncAmqpOutboundGateway.this.buildReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(), amre.getExchange(), amre.getRoutingKey(), AsyncAmqpOutboundGateway.this.messageConverter);
                    AsyncAmqpOutboundGateway.this.sendOutput(returnedMessage, AsyncAmqpOutboundGateway.this.getReturnChannel(), true);
                }
            } else {
                AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToSend);
            }
        }
    }
}

