/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.amqp.eventhandling.spring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.axonframework.amqp.eventhandling.AMQPMessage;
import org.axonframework.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.amqp.eventhandling.EventPublicationFailedException;
import org.axonframework.amqp.eventhandling.PackageRoutingKeyResolver;
import org.axonframework.amqp.eventhandling.RoutingKeyResolver;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class SpringAMQPPublisher
implements InitializingBean,
ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(SpringAMQPPublisher.class);
    private static final String DEFAULT_EXCHANGE_NAME = "Axon.EventBus";
    private final SubscribableMessageSource<EventMessage<?>> messageSource;
    private ConnectionFactory connectionFactory;
    private String exchangeName = "Axon.EventBus";
    private boolean isTransactional = false;
    private boolean isDurable = true;
    private AMQPMessageConverter messageConverter;
    private ApplicationContext applicationContext;
    private Serializer serializer;
    private RoutingKeyResolver routingKeyResolver;
    private boolean waitForAck;
    private long publisherAckTimeout;
    private Registration eventBusRegistration;

    public SpringAMQPPublisher(SubscribableMessageSource<EventMessage<?>> messageSource) {
        this.messageSource = messageSource;
    }

    public void start() {
        this.eventBusRegistration = this.messageSource.subscribe(this::send);
    }

    public void shutDown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
            this.eventBusRegistration = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void send(List<? extends EventMessage<?>> events) {
        Channel channel = this.connectionFactory.createConnection().createChannel(this.isTransactional);
        try {
            if (this.isTransactional) {
                channel.txSelect();
            } else if (this.waitForAck) {
                channel.confirmSelect();
            }
            for (EventMessage<?> event : events) {
                AMQPMessage amqpMessage = this.messageConverter.createAMQPMessage(event);
                this.doSendMessage(channel, amqpMessage);
            }
            if (CurrentUnitOfWork.isStarted()) {
                UnitOfWork unitOfWork = CurrentUnitOfWork.get();
                unitOfWork.onCommit(u -> {
                    if ((this.isTransactional || this.waitForAck) && !channel.isOpen()) {
                        throw new EventPublicationFailedException("Unable to Commit UnitOfWork changes to AMQP: Channel is closed.", channel.getCloseReason());
                    }
                });
                unitOfWork.afterCommit(u -> {
                    block7: {
                        try {
                            if (this.isTransactional) {
                                channel.txCommit();
                                break block7;
                            }
                            if (!this.waitForAck) break block7;
                            try {
                                channel.waitForConfirmsOrDie(this.publisherAckTimeout);
                            }
                            catch (IOException ex) {
                                throw new EventPublicationFailedException("Failed to receive acknowledgements for all events", ex);
                            }
                            catch (TimeoutException ex) {
                                throw new EventPublicationFailedException("Timeout while waiting for publisher acknowledgements", ex);
                            }
                        }
                        catch (IOException e) {
                            logger.warn("Unable to commit transaction on channel.", (Throwable)e);
                        }
                        catch (InterruptedException e) {
                            logger.warn("Interrupt received when waiting for message confirms.");
                            Thread.currentThread().interrupt();
                        }
                    }
                    this.tryClose(channel);
                });
                unitOfWork.onRollback(u -> {
                    try {
                        if (this.isTransactional) {
                            channel.txRollback();
                        }
                    }
                    catch (IOException ex) {
                        logger.warn("Unable to rollback transaction on channel.", (Throwable)ex);
                    }
                    this.tryClose(channel);
                });
            } else if (this.isTransactional) {
                channel.txCommit();
            } else if (this.waitForAck) {
                channel.waitForConfirmsOrDie();
            }
        }
        catch (IOException e) {
            if (this.isTransactional) {
                this.tryRollback(channel);
            }
            throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e);
        }
        catch (ShutdownSignalException e) {
            throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e);
        }
        catch (InterruptedException e) {
            logger.warn("Interrupt received when waiting for message confirms.");
            Thread.currentThread().interrupt();
        }
        finally {
            if (!CurrentUnitOfWork.isStarted()) {
                this.tryClose(channel);
            }
        }
    }

    private void tryClose(Channel channel) {
        try {
            channel.close();
        }
        catch (IOException | TimeoutException e) {
            logger.info("Unable to close channel. It might already be closed.", (Throwable)e);
        }
    }

    protected void doSendMessage(Channel channel, AMQPMessage amqpMessage) throws IOException {
        channel.basicPublish(this.exchangeName, amqpMessage.getRoutingKey(), amqpMessage.isMandatory(), amqpMessage.isImmediate(), amqpMessage.getProperties(), amqpMessage.getBody());
    }

    private void tryRollback(Channel channel) {
        try {
            channel.txRollback();
        }
        catch (IOException e) {
            logger.debug("Unable to rollback. The underlying channel might already be closed.", (Throwable)e);
        }
    }

    public void afterPropertiesSet() {
        if (this.connectionFactory == null) {
            this.connectionFactory = (ConnectionFactory)this.applicationContext.getBean(ConnectionFactory.class);
        }
        if (this.messageConverter == null) {
            if (this.serializer == null) {
                this.serializer = (Serializer)this.applicationContext.getBean(Serializer.class);
            }
            if (this.routingKeyResolver == null) {
                Map routingKeyResolverCandidates = this.applicationContext.getBeansOfType(RoutingKeyResolver.class);
                if (routingKeyResolverCandidates.size() > 1) {
                    throw new AxonConfigurationException("No MessageConverter was configured, but none can be created using autowired properties, as more than 1 RoutingKeyResolver is present in the ApplicationContent");
                }
                this.routingKeyResolver = routingKeyResolverCandidates.size() == 1 ? (RoutingKeyResolver)routingKeyResolverCandidates.values().iterator().next() : new PackageRoutingKeyResolver();
            }
            this.messageConverter = new DefaultAMQPMessageConverter(this.serializer, this.routingKeyResolver, this.isDurable);
        }
    }

    public void setTransactional(boolean transactional) {
        Assert.isTrue((!this.waitForAck || !transactional ? 1 : 0) != 0, () -> "Cannot set transactional behavior when 'waitForServerAck' is enabled.");
        this.isTransactional = transactional;
    }

    public void setWaitForPublisherAck(boolean waitForPublisherAck) {
        Assert.isTrue((!waitForPublisherAck || !this.isTransactional ? 1 : 0) != 0, () -> "Cannot set 'waitForPublisherAck' when using transactions.");
        this.waitForAck = waitForPublisherAck;
    }

    public void setPublisherAckTimeout(long publisherAckTimeout) {
        this.publisherAckTimeout = publisherAckTimeout;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setMessageConverter(AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setDurable(boolean durable) {
        this.isDurable = durable;
    }

    public void setSerializer(Serializer serializer) {
        this.serializer = serializer;
    }

    public void setRoutingKeyResolver(RoutingKeyResolver routingKeyResolver) {
        this.routingKeyResolver = routingKeyResolver;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public void setExchange(Exchange exchange) {
        this.exchangeName = exchange.getName();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

