/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit;

import java.nio.charset.Charset;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class AsyncRabbitTemplate
implements SmartLifecycle,
MessageListener,
RabbitTemplate.ReturnCallback,
RabbitTemplate.ConfirmCallback,
BeanNameAware {
    public static final int DEFAULT_RECEIVE_TIMEOUT = 30000;
    private final Log logger = LogFactory.getLog(this.getClass());
    private final RabbitTemplate template;
    private final SimpleMessageListenerContainer container;
    private final String replyAddress;
    private final ConcurrentMap<String, RabbitFuture> pending = new ConcurrentHashMap<String, RabbitFuture>();
    private volatile boolean running;
    private volatile boolean enableConfirms;
    private volatile long receiveTimeout = 30000L;
    private int phase;
    private boolean autoStartup = true;
    private Charset charset = Charset.forName("UTF-8");
    private String beanName;
    private TaskScheduler taskScheduler;

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue) {
        this(connectionFactory, exchange, routingKey, replyQueue, null);
    }

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue, String replyAddress) {
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' cannot be null");
        Assert.notNull((Object)routingKey, (String)"'routingKey' cannot be null");
        Assert.notNull((Object)replyQueue, (String)"'replyQueue' cannot be null");
        this.template = new RabbitTemplate(connectionFactory);
        this.template.setExchange(exchange == null ? "" : exchange);
        this.template.setRoutingKey(routingKey);
        this.container = new SimpleMessageListenerContainer(connectionFactory);
        this.container.setQueueNames(replyQueue);
        this.container.setMessageListener(this);
        this.container.afterPropertiesSet();
        this.replyAddress = replyAddress == null ? replyQueue : replyAddress;
    }

    public AsyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) {
        this(template, container, null);
    }

    public AsyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container, String replyAddress) {
        Assert.notNull((Object)template, (String)"'template' cannot be null");
        Assert.notNull((Object)container, (String)"'container' cannot be null");
        this.template = template;
        this.container = container;
        this.container.setMessageListener(this);
        this.replyAddress = replyAddress == null ? container.getQueueNames()[0] : replyAddress;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public void setMandatory(boolean mandatory) {
        if (mandatory) {
            this.template.setReturnCallback(this);
        }
        this.template.setMandatory(mandatory);
    }

    public void setEnableConfirms(boolean enableConfirms) {
        this.enableConfirms = enableConfirms;
        if (enableConfirms) {
            this.template.setConfirmCallback(this);
        }
    }

    @Deprecated
    public void setCharset(Charset charset) {
        this.charset = charset;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.template.getConnectionFactory();
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
    }

    public MessageConverter getMessageConverter() {
        return this.template.getMessageConverter();
    }

    public RabbitMessageFuture sendAndReceive(Message message) {
        return this.sendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message);
    }

    public RabbitMessageFuture sendAndReceive(String routingKey, Message message) {
        return this.sendAndReceive(this.template.getExchange(), routingKey, message);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object message) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object message) throws AmqpException {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, message, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object message) {
        return this.convertSendAndReceive(exchange, routingKey, message, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message, messagePostProcessor);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, message, messagePostProcessor);
    }

    public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
        String correlationId = this.getOrSetCorrelationIdAndSetReplyTo(message);
        RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message);
        CorrelationData correlationData = null;
        if (this.enableConfirms) {
            correlationData = new CorrelationData(correlationId);
            future.setConfirm((ListenableFuture<Boolean>)new SettableListenableFuture());
        }
        this.pending.put(correlationId, future);
        this.template.send(exchange, routingKey, message, correlationData);
        return future;
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
        CorrelationData correlationData = null;
        if (this.enableConfirms) {
            correlationData = new CorrelationData(null);
        }
        CorrelationMessagePostProcessor correlationPostProcessor = new CorrelationMessagePostProcessor(messagePostProcessor, correlationData);
        this.template.convertAndSend(exchange, routingKey, message, correlationPostProcessor, correlationData);
        return correlationPostProcessor.getFuture();
    }

    public synchronized void start() {
        if (!this.running) {
            if (this.taskScheduler == null) {
                ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
                scheduler.setThreadNamePrefix(this.getBeanName() == null ? "asyncTemplate-" : this.getBeanName() + "-");
                scheduler.afterPropertiesSet();
                this.taskScheduler = scheduler;
            }
            this.container.start();
        }
        this.running = true;
    }

    public synchronized void stop() {
        if (this.running) {
            this.container.stop();
            for (RabbitFuture future : this.pending.values()) {
                future.setNackCause("AsyncRabbitTemplate was stopped while waiting for reply");
                future.cancel(true);
            }
        }
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void onMessage(Message message) {
        byte[] correlationId;
        MessageProperties messageProperties = message.getMessageProperties();
        if (messageProperties != null && (correlationId = messageProperties.getCorrelationId()) != null) {
            RabbitFuture future;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("onMessage: " + message));
            }
            if ((future = (RabbitFuture)((Object)this.pending.remove(new String(correlationId, this.charset)))) != null) {
                if (future instanceof RabbitConverterFuture) {
                    Object converted = this.template.getMessageConverter().fromMessage(message);
                    ((RabbitConverterFuture)future).set(converted);
                } else {
                    ((RabbitMessageFuture)future).set(message);
                }
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No pending reply - perhaps timed out: " + message));
            }
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        RabbitFuture future;
        MessageProperties messageProperties = message.getMessageProperties();
        byte[] correlationId = messageProperties.getCorrelationId();
        if (correlationId != null && (future = (RabbitFuture)((Object)this.pending.remove(new String(correlationId, this.charset)))) != null) {
            future.setException((Throwable)new AmqpMessageReturnedException("Message returned", message, replyCode, replyText, exchange, routingKey));
        }
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String correlationId;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Confirm: " + correlationData + ", ack=" + ack + (cause == null ? "" : ", cause: " + cause)));
        }
        if ((correlationId = correlationData.getId()) != null) {
            RabbitFuture future = (RabbitFuture)((Object)this.pending.get(correlationId));
            if (future != null) {
                future.setNackCause(cause);
                ((SettableListenableFuture)future.getConfirm()).set((Object)ack);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Confirm: " + correlationData + ", ack=" + ack + (cause == null ? "" : ", cause: " + cause) + " no pending future - either canceled or the reply is already received"));
            }
        }
    }

    private String getOrSetCorrelationIdAndSetReplyTo(Message message) {
        String correlationId;
        MessageProperties messageProperties = message.getMessageProperties();
        Assert.notNull((Object)messageProperties, (String)"the message properties cannot be null");
        byte[] currentCorrelationId = messageProperties.getCorrelationId();
        if (currentCorrelationId == null) {
            correlationId = UUID.randomUUID().toString();
            messageProperties.setCorrelationId(correlationId.getBytes(this.charset));
            Assert.isNull((Object)messageProperties.getReplyTo(), (String)"'replyTo' property must be null");
        } else {
            correlationId = new String(currentCorrelationId, this.charset);
        }
        messageProperties.setReplyTo(this.replyAddress);
        return correlationId;
    }

    public String toString() {
        return this.beanName == null ? super.toString() : this.getClass().getSimpleName() + ": " + this.beanName;
    }

    private final class CorrelationMessagePostProcessor<C>
    implements MessagePostProcessor {
        private final MessagePostProcessor userPostProcessor;
        private final CorrelationData correlationData;
        private volatile RabbitConverterFuture<C> future;

        private CorrelationMessagePostProcessor(MessagePostProcessor userPostProcessor, CorrelationData correlationData) {
            this.userPostProcessor = userPostProcessor;
            this.correlationData = correlationData;
        }

        public Message postProcessMessage(Message message) throws AmqpException {
            Message messageToSend = message;
            if (this.userPostProcessor != null) {
                messageToSend = this.userPostProcessor.postProcessMessage(message);
            }
            String correlationId = AsyncRabbitTemplate.this.getOrSetCorrelationIdAndSetReplyTo(messageToSend);
            this.future = new RabbitConverterFuture(correlationId, message);
            if (this.correlationData != null && this.correlationData.getId() == null) {
                this.correlationData.setId(correlationId);
                this.future.setConfirm((ListenableFuture<Boolean>)new SettableListenableFuture());
            }
            AsyncRabbitTemplate.this.pending.put(correlationId, this.future);
            return messageToSend;
        }

        private RabbitConverterFuture<C> getFuture() {
            return this.future;
        }
    }

    public class RabbitConverterFuture<C>
    extends RabbitFuture<C> {
        public RabbitConverterFuture(String correlationId, Message requestMessage) {
            super(correlationId, requestMessage);
        }
    }

    public class RabbitMessageFuture
    extends RabbitFuture<Message> {
        public RabbitMessageFuture(String correlationId, Message requestMessage) {
            super(correlationId, requestMessage);
        }
    }

    public abstract class RabbitFuture<T>
    extends SettableListenableFuture<T> {
        private final String correlationId;
        private final Message requestMessage;
        private final ScheduledFuture<?> cancelTask;
        private volatile ListenableFuture<Boolean> confirm;
        private String nackCause;

        public RabbitFuture(String correlationId, Message requestMessage) {
            this.correlationId = correlationId;
            this.requestMessage = requestMessage;
            this.cancelTask = AsyncRabbitTemplate.this.receiveTimeout > 0L ? AsyncRabbitTemplate.this.taskScheduler.schedule((Runnable)new CancelTask(), new Date(System.currentTimeMillis() + AsyncRabbitTemplate.this.receiveTimeout)) : null;
        }

        public boolean cancel(boolean mayInterruptIfRunning) {
            if (this.cancelTask != null) {
                this.cancelTask.cancel(true);
            }
            AsyncRabbitTemplate.this.pending.remove(this.correlationId);
            return super.cancel(mayInterruptIfRunning);
        }

        public ListenableFuture<Boolean> getConfirm() {
            return this.confirm;
        }

        void setConfirm(ListenableFuture<Boolean> confirm) {
            this.confirm = confirm;
        }

        public String getNackCause() {
            return this.nackCause;
        }

        void setNackCause(String nackCause) {
            this.nackCause = nackCause;
        }

        private class CancelTask
        implements Runnable {
            private CancelTask() {
            }

            @Override
            public void run() {
                AsyncRabbitTemplate.this.pending.remove(RabbitFuture.this.correlationId);
                RabbitFuture.this.setException((Throwable)new AmqpReplyTimeoutException("Reply timed out", RabbitFuture.this.requestMessage));
            }
        }
    }
}

