/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.amqp.outbound;

import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.handler.ReplyRequiredException;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class AsyncAmqpOutboundGateway
extends AbstractAmqpOutboundEndpoint {
    private final AsyncRabbitTemplate template;
    private final MessageConverter messageConverter;

    public AsyncAmqpOutboundGateway(AsyncRabbitTemplate template) {
        Assert.notNull((Object)template, (String)"AsyncRabbitTemplate cannot be null");
        this.template = template;
        this.messageConverter = template.getMessageConverter();
        Assert.notNull((Object)this.messageConverter, (String)"the template's message converter cannot be null");
        this.setConnectionFactory(this.template.getConnectionFactory());
        this.setAsync(true);
    }

    public String getComponentType() {
        return "amqp:outbound-async-gateway";
    }

    @Override
    protected RabbitTemplate getRabbitTemplate() {
        return this.template.getRabbitTemplate();
    }

    @Override
    protected void doStart() {
        super.doStart();
        this.template.start();
    }

    @Override
    protected void doStop() {
        this.template.stop();
        super.doStop();
    }

    protected Object handleRequestMessage(Message<?> requestMessage) {
        org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, this.messageConverter, this.getHeaderMapper(), this.getDefaultDeliveryMode(), this.isHeadersMappedLast());
        this.addDelayProperty(requestMessage, amqpMessage);
        AsyncRabbitTemplate.RabbitMessageFuture future = this.template.sendAndReceive(this.generateExchangeName(requestMessage), this.generateRoutingKey(requestMessage), amqpMessage);
        CorrelationData correlationData = this.generateCorrelationData(requestMessage);
        if (correlationData != null && future.getConfirm() != null) {
            future.getConfirm().addCallback((ListenableFutureCallback)new CorrelationCallback(correlationData, future));
        }
        future.addCallback((ListenableFutureCallback)new FutureCallback(requestMessage, correlationData));
        return null;
    }

    private final class CorrelationCallback
    implements ListenableFutureCallback<Boolean> {
        private final CorrelationData correlationData;
        private final AsyncRabbitTemplate.RabbitMessageFuture replyFuture;

        CorrelationCallback(CorrelationData correlationData, AsyncRabbitTemplate.RabbitMessageFuture replyFuture) {
            this.correlationData = correlationData;
            this.replyFuture = replyFuture;
        }

        public void onSuccess(Boolean result) {
            try {
                AsyncAmqpOutboundGateway.this.handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
            }
            catch (Exception e) {
                AsyncAmqpOutboundGateway.this.logger.error((CharSequence)"Failed to send publisher confirm");
            }
        }

        public void onFailure(Throwable ex) {
        }
    }

    private final class FutureCallback
    implements ListenableFutureCallback<org.springframework.amqp.core.Message> {
        private final Message<?> requestMessage;
        private final AbstractAmqpOutboundEndpoint.CorrelationDataWrapper correlationData;

        FutureCallback(Message<?> requestMessage, CorrelationData correlationData) {
            this.requestMessage = requestMessage;
            this.correlationData = (AbstractAmqpOutboundEndpoint.CorrelationDataWrapper)correlationData;
        }

        public void onSuccess(org.springframework.amqp.core.Message result) {
            AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
            try {
                replyMessageBuilder = AsyncAmqpOutboundGateway.this.buildReply(AsyncAmqpOutboundGateway.this.messageConverter, result);
                AsyncAmqpOutboundGateway.this.sendOutputs(replyMessageBuilder, this.requestMessage);
            }
            catch (Exception ex) {
                Exception exceptionToLogAndSend = ex;
                if (!(ex instanceof MessagingException)) {
                    exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, "failed to handle a message in the [" + (Object)((Object)AsyncAmqpOutboundGateway.this) + ']', (Throwable)ex);
                    if (replyMessageBuilder != null) {
                        exceptionToLogAndSend = new MessagingException(replyMessageBuilder.build(), (Throwable)exceptionToLogAndSend);
                    }
                }
                AsyncAmqpOutboundGateway.this.logger.error((Throwable)exceptionToLogAndSend, () -> "Failed to send async reply: " + result.toString());
                AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
            }
        }

        public void onFailure(Throwable ex) {
            Throwable exceptionToSend = ex;
            if (ex instanceof AmqpReplyTimeoutException) {
                if (AsyncAmqpOutboundGateway.this.getRequiresReply()) {
                    exceptionToSend = new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply", ex);
                } else {
                    AsyncAmqpOutboundGateway.this.logger.debug(() -> "Reply not required and async timeout for " + this.requestMessage);
                    return;
                }
            }
            if (ex instanceof AmqpMessageReturnedException) {
                AmqpMessageReturnedException amre = (AmqpMessageReturnedException)ex;
                MessageChannel returnChannel = AsyncAmqpOutboundGateway.this.getReturnChannel();
                if (returnChannel != null) {
                    Message<?> returnedMessage = AsyncAmqpOutboundGateway.this.buildReturnedMessage(new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(), amre.getExchange(), amre.getRoutingKey()), AsyncAmqpOutboundGateway.this.messageConverter);
                    AsyncAmqpOutboundGateway.this.sendOutput(returnedMessage, returnChannel, true);
                }
                this.correlationData.setReturnedMessage(amre.getReturnedMessage());
                this.correlationData.getFuture().set((Object)new CorrelationData.Confirm(true, null));
            } else {
                AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToSend);
            }
        }
    }
}

