/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.amqp.spring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.ClusterMetaData;
import org.axonframework.eventhandling.EventBusTerminal;
import org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.AMQPMessage;
import org.axonframework.eventhandling.amqp.AMQPMessageConverter;
import org.axonframework.eventhandling.amqp.DefaultAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.EventPublicationFailedException;
import org.axonframework.eventhandling.amqp.PackageRoutingKeyResolver;
import org.axonframework.eventhandling.amqp.RoutingKeyResolver;
import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.serializer.Serializer;
import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.axonframework.unitofwork.UnitOfWork;
import org.axonframework.unitofwork.UnitOfWorkListener;
import org.axonframework.unitofwork.UnitOfWorkListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class SpringAMQPTerminal
implements EventBusTerminal,
InitializingBean,
ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(SpringAMQPTerminal.class);
    private static final String DEFAULT_EXCHANGE_NAME = "Axon.EventBus";
    private ConnectionFactory connectionFactory;
    private String exchangeName = "Axon.EventBus";
    private boolean isTransactional = false;
    private boolean isDurable = true;
    private ListenerContainerLifecycleManager listenerContainerLifecycleManager;
    private AMQPMessageConverter messageConverter;
    private ApplicationContext applicationContext;
    private Serializer serializer;
    private RoutingKeyResolver routingKeyResolver;
    private boolean waitForAck;
    private long publisherAckTimeout;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publish(EventMessage ... events) {
        Channel channel = this.connectionFactory.createConnection().createChannel(this.isTransactional);
        try {
            if (this.waitForAck) {
                channel.confirmSelect();
            }
            for (EventMessage event : events) {
                AMQPMessage amqpMessage = this.messageConverter.createAMQPMessage(event);
                this.doSendMessage(channel, amqpMessage);
            }
            if (CurrentUnitOfWork.isStarted()) {
                CurrentUnitOfWork.get().registerListener((UnitOfWorkListener)new ChannelTransactionUnitOfWorkListener(channel));
            } else if (this.isTransactional) {
                channel.txCommit();
            } else if (this.waitForAck) {
                channel.waitForConfirmsOrDie();
            }
        }
        catch (IOException e) {
            if (this.isTransactional) {
                this.tryRollback(channel);
            }
            throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e);
        }
        catch (ShutdownSignalException e) {
            throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e);
        }
        catch (InterruptedException e) {
            logger.warn("Interrupt received when waiting for message confirms.");
            Thread.currentThread().interrupt();
        }
        finally {
            if (!CurrentUnitOfWork.isStarted()) {
                this.tryClose(channel);
            }
        }
    }

    private void tryClose(Channel channel) {
        try {
            channel.close();
        }
        catch (IOException e) {
            logger.info("Unable to close channel. It might already be closed.", (Throwable)e);
        }
    }

    protected void doSendMessage(Channel channel, AMQPMessage amqpMessage) throws IOException {
        channel.basicPublish(this.exchangeName, amqpMessage.getRoutingKey(), amqpMessage.isMandatory(), amqpMessage.isImmediate(), amqpMessage.getProperties(), amqpMessage.getBody());
    }

    private void tryRollback(Channel channel) {
        try {
            channel.txRollback();
        }
        catch (IOException e) {
            logger.debug("Unable to rollback. The underlying channel might already be closed.", (Throwable)e);
        }
    }

    public void onClusterCreated(Cluster cluster) {
        ClusterMetaData clusterMetaData = cluster.getMetaData();
        AMQPConsumerConfiguration config = clusterMetaData.getProperty("AMQP.Config") instanceof AMQPConsumerConfiguration ? (AMQPConsumerConfiguration)clusterMetaData.getProperty("AMQP.Config") : new DefaultAMQPConsumerConfiguration(cluster.getName());
        this.getListenerContainerLifecycleManager().registerCluster(cluster, config, this.messageConverter);
    }

    public void afterPropertiesSet() throws Exception {
        if (this.connectionFactory == null) {
            this.connectionFactory = (ConnectionFactory)this.applicationContext.getBean(ConnectionFactory.class);
        }
        if (this.messageConverter == null) {
            if (this.serializer == null) {
                this.serializer = (Serializer)this.applicationContext.getBean(Serializer.class);
            }
            if (this.routingKeyResolver == null) {
                Map routingKeyResolverCandidates = this.applicationContext.getBeansOfType(RoutingKeyResolver.class);
                if (routingKeyResolverCandidates.size() > 1) {
                    throw new AxonConfigurationException("No MessageConverter was configured, but none can be created using autowired properties, as more than 1 RoutingKeyResolver is present in the ApplicationContent");
                }
                this.routingKeyResolver = routingKeyResolverCandidates.size() == 1 ? (RoutingKeyResolver)routingKeyResolverCandidates.values().iterator().next() : new PackageRoutingKeyResolver();
            }
            this.messageConverter = new DefaultAMQPMessageConverter(this.serializer, this.routingKeyResolver, this.isDurable);
        }
    }

    private ListenerContainerLifecycleManager getListenerContainerLifecycleManager() {
        if (this.listenerContainerLifecycleManager == null) {
            this.listenerContainerLifecycleManager = (ListenerContainerLifecycleManager)this.applicationContext.getBean(ListenerContainerLifecycleManager.class);
        }
        return this.listenerContainerLifecycleManager;
    }

    public void setTransactional(boolean transactional) {
        Assert.isTrue((!this.waitForAck || !transactional ? 1 : 0) != 0, (String)"Cannot set transactional behavior when 'waitForServerAck' is enabled.");
        this.isTransactional = transactional;
    }

    public void setWaitForPublisherAck(boolean waitForPublisherAck) {
        Assert.isTrue((!waitForPublisherAck || !this.isTransactional ? 1 : 0) != 0, (String)"Cannot set 'waitForPublisherAck' when using transactions.");
        this.waitForAck = waitForPublisherAck;
    }

    public void setPublisherAckTimeout(long publisherAckTimeout) {
        this.publisherAckTimeout = publisherAckTimeout;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setMessageConverter(AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setDurable(boolean durable) {
        this.isDurable = durable;
    }

    public void setSerializer(Serializer serializer) {
        this.serializer = serializer;
    }

    public void setRoutingKeyResolver(RoutingKeyResolver routingKeyResolver) {
        this.routingKeyResolver = routingKeyResolver;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public void setExchange(Exchange exchange) {
        this.exchangeName = exchange.getName();
    }

    public void setListenerContainerLifecycleManager(ListenerContainerLifecycleManager listenerContainerLifecycleManager) {
        this.listenerContainerLifecycleManager = listenerContainerLifecycleManager;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private class ChannelTransactionUnitOfWorkListener
    extends UnitOfWorkListenerAdapter {
        private boolean isOpen;
        private final Channel channel;

        public ChannelTransactionUnitOfWorkListener(Channel channel) {
            this.channel = channel;
            this.isOpen = true;
        }

        public void onPrepareTransactionCommit(UnitOfWork unitOfWork, Object transaction) {
            if ((SpringAMQPTerminal.this.isTransactional || SpringAMQPTerminal.this.waitForAck) && this.isOpen && !this.channel.isOpen()) {
                throw new EventPublicationFailedException("Unable to Commit UnitOfWork changes to AMQP: Channel is closed.", (Throwable)this.channel.getCloseReason());
            }
        }

        public void afterCommit(UnitOfWork unitOfWork) {
            if (this.isOpen) {
                try {
                    if (SpringAMQPTerminal.this.isTransactional) {
                        this.channel.txCommit();
                    } else if (SpringAMQPTerminal.this.waitForAck) {
                        this.waitForConfirmations();
                    }
                }
                catch (IOException e) {
                    logger.warn("Unable to commit transaction on channel.", (Throwable)e);
                }
                catch (InterruptedException e) {
                    logger.warn("Interrupt received when waiting for message confirms.");
                    Thread.currentThread().interrupt();
                }
                SpringAMQPTerminal.this.tryClose(this.channel);
            }
        }

        private void waitForConfirmations() throws InterruptedException {
            try {
                this.channel.waitForConfirmsOrDie(SpringAMQPTerminal.this.publisherAckTimeout);
            }
            catch (IOException ex) {
                throw new EventPublicationFailedException("Failed to receive acknowledgements for all events", ex);
            }
            catch (TimeoutException ex) {
                throw new EventPublicationFailedException("Timeout while waiting for publisher acknowledgements", ex);
            }
        }

        public void onRollback(UnitOfWork unitOfWork, Throwable failureCause) {
            try {
                if (SpringAMQPTerminal.this.isTransactional) {
                    this.channel.txRollback();
                }
            }
            catch (IOException e) {
                logger.warn("Unable to rollback transaction on channel.", (Throwable)e);
            }
            SpringAMQPTerminal.this.tryClose(this.channel);
            this.isOpen = false;
        }
    }
}

