/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.amqp;

import com.google.common.collect.Maps;
import com.netflix.conductor.contribs.queue.amqp.AMQPConnection;
import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties;
import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants;
import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings;
import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.metrics.Monitors;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class AMQPObservableQueue
implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class);
    private final AMQPSettings settings;
    private final AMQPRetryPattern retrySettings;
    private final String QUEUE_TYPE = "x-queue-type";
    private final int batchSize;
    private final boolean useExchange;
    private int pollTimeInMS;
    private AMQPConnection amqpConnection;
    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue();
    private volatile boolean running;

    public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange, AMQPSettings settings, AMQPRetryPattern retrySettings, int batchSize, int pollTimeInMS) {
        if (factory == null) {
            throw new IllegalArgumentException("Connection factory is undefined");
        }
        if (addresses == null || addresses.length == 0) {
            throw new IllegalArgumentException("Addresses are undefined");
        }
        if (settings == null) {
            throw new IllegalArgumentException("Settings are undefined");
        }
        if (batchSize <= 0) {
            throw new IllegalArgumentException("Batch size must be greater than 0");
        }
        if (pollTimeInMS <= 0) {
            throw new IllegalArgumentException("Poll time must be greater than 0 ms");
        }
        this.useExchange = useExchange;
        this.settings = settings;
        this.batchSize = batchSize;
        this.amqpConnection = AMQPConnection.getInstance(factory, addresses, retrySettings);
        this.retrySettings = retrySettings;
        this.setPollTimeInMS(pollTimeInMS);
    }

    public Observable<Message> observe() {
        Observable.OnSubscribe onSubscribe = null;
        if (this.settings.isSequentialProcessing()) {
            LOGGER.info("Subscribing for the message processing on schedule basis");
            this.receiveMessages();
            onSubscribe = subscriber -> {
                Observable interval = Observable.interval((long)this.pollTimeInMS, (TimeUnit)TimeUnit.MILLISECONDS);
                interval.flatMap(x -> {
                    if (!this.isRunning()) {
                        LOGGER.debug("Component stopped, skip listening for messages from RabbitMQ");
                        return Observable.from(Collections.emptyList());
                    }
                    LinkedList available = new LinkedList();
                    this.messages.drainTo(available);
                    if (!available.isEmpty()) {
                        AtomicInteger count = new AtomicInteger(0);
                        StringBuilder buffer = new StringBuilder();
                        available.forEach(msg -> {
                            buffer.append(msg.getId()).append("=").append(msg.getPayload());
                            count.incrementAndGet();
                            if (count.get() < available.size()) {
                                buffer.append(",");
                            }
                        });
                        LOGGER.info(String.format("Batch from %s to conductor is %s", this.settings.getQueueOrExchangeName(), buffer.toString()));
                    }
                    return Observable.from(available);
                }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0));
            };
            LOGGER.info("Subscribed for the message processing on schedule basis");
        } else {
            onSubscribe = subscriber -> {
                LOGGER.info("Subscribing for the event based AMQP message processing");
                this.receiveMessages((Subscriber<? super Message>)subscriber);
                LOGGER.info("Subscribed for the event based AMQP message processing");
            };
        }
        return Observable.create((Observable.OnSubscribe)onSubscribe);
    }

    public String getType() {
        return this.useExchange ? AMQPConstants.AMQP_EXCHANGE_TYPE : AMQPConstants.AMQP_QUEUE_TYPE;
    }

    public String getName() {
        return this.settings.getEventName();
    }

    public String getURI() {
        return this.settings.getQueueOrExchangeName();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public AMQPSettings getSettings() {
        return this.settings;
    }

    public Address[] getAddresses() {
        return this.amqpConnection.getAddresses();
    }

    public List<String> ack(List<Message> messages) {
        ArrayList<String> processedDeliveryTags = new ArrayList<String>();
        block4: for (Message message : messages) {
            int retryIndex = 1;
            while (true) {
                try {
                    LOGGER.info("ACK message with delivery tag {}", (Object)message.getReceipt());
                    Channel chn = this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName());
                    chn.basicAck(Long.parseLong(message.getReceipt()), false);
                    processedDeliveryTags.add(message.getReceipt());
                    LOGGER.info("Ack'ed the message with delivery tag {}", (Object)message.getReceipt());
                    continue block4;
                }
                catch (Exception e) {
                    AMQPRetryPattern retry = this.retrySettings;
                    if (retry == null) {
                        LOGGER.error("Cannot ACK message with delivery tag {}", (Object)message.getReceipt(), (Object)e);
                    }
                    try {
                        retry.continueOrPropogate(e, retryIndex);
                    }
                    catch (Exception ex) {
                        LOGGER.error("Retries completed. Cannot ACK message with delivery tag {}", (Object)message.getReceipt(), (Object)e);
                    }
                    ++retryIndex;
                    continue;
                }
                break;
            }
        }
        return processedDeliveryTags;
    }

    private static AMQP.BasicProperties buildBasicProperties(Message message, AMQPSettings settings) {
        return new AMQP.BasicProperties.Builder().messageId(StringUtils.isEmpty((CharSequence)message.getId()) ? UUID.randomUUID().toString() : message.getId()).correlationId(StringUtils.isEmpty((CharSequence)message.getReceipt()) ? UUID.randomUUID().toString() : message.getReceipt()).contentType(settings.getContentType()).contentEncoding(settings.getContentEncoding()).deliveryMode(Integer.valueOf(settings.getDeliveryMode())).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void publishMessage(Message message, String exchange, String routingKey) {
        Channel chn = null;
        int retryIndex = 1;
        while (true) {
            String payload = message.getPayload();
            chn = this.amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, this.getSettings().getQueueOrExchangeName());
            chn.basicPublish(exchange, routingKey, AMQPObservableQueue.buildBasicProperties(message, this.settings), payload.getBytes(this.settings.getContentEncoding()));
            LOGGER.info(String.format("Published message to %s: %s", exchange, payload));
            if (chn == null) break;
            try {
                this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, chn);
            }
            catch (Exception e) {
                LOGGER.error("Failed to return the channel of {}. {}", (Object)ConnectionType.PUBLISHER, (Object)e);
            }
            break;
            catch (Exception ex) {
                try {
                    AMQPRetryPattern retry = this.retrySettings;
                    if (retry == null) {
                        LOGGER.error("Failed to publish message {} to {}", new Object[]{message.getPayload(), exchange, ex});
                        throw new RuntimeException(ex);
                    }
                    try {
                        retry.continueOrPropogate(ex, retryIndex);
                    }
                    catch (Exception e) {
                        LOGGER.error("Retries completed. Failed to publish message {} to {}", new Object[]{message.getPayload(), exchange, ex});
                        throw new RuntimeException(ex);
                    }
                    ++retryIndex;
                    if (chn == null) continue;
                }
                catch (Throwable throwable) {
                    if (chn != null) {
                        try {
                            this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, chn);
                        }
                        catch (Exception e) {
                            LOGGER.error("Failed to return the channel of {}. {}", (Object)ConnectionType.PUBLISHER, (Object)e);
                        }
                    }
                    throw throwable;
                }
                try {
                    this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, chn);
                }
                catch (Exception e) {
                    LOGGER.error("Failed to return the channel of {}. {}", (Object)ConnectionType.PUBLISHER, (Object)e);
                }
                continue;
            }
            break;
        }
    }

    public void publish(List<Message> messages) {
        try {
            String routingKey;
            String exchange;
            if (this.useExchange) {
                this.getOrCreateExchange(ConnectionType.PUBLISHER, this.settings.getQueueOrExchangeName(), this.settings.getExchangeType(), this.settings.isDurable(), this.settings.autoDelete(), this.settings.getArguments());
                exchange = this.settings.getQueueOrExchangeName();
                routingKey = this.settings.getRoutingKey();
            } else {
                AMQP.Queue.DeclareOk declareOk = this.getOrCreateQueue(ConnectionType.PUBLISHER, this.settings.getQueueOrExchangeName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), this.settings.getArguments());
                exchange = "";
                routingKey = declareOk.getQueue();
            }
            messages.forEach(message -> this.publishMessage((Message)message, exchange, routingKey));
        }
        catch (RuntimeException ex) {
            throw ex;
        }
        catch (Exception ex) {
            LOGGER.error("Failed to publish messages: {}", (Object)ex.getMessage(), (Object)ex);
            throw new RuntimeException(ex);
        }
    }

    public void setUnackTimeout(Message message, long unackTimeout) {
        throw new UnsupportedOperationException();
    }

    public long size() {
        Channel chn = null;
        try {
            chn = this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName());
            long l = chn.messageCount(this.settings.getQueueOrExchangeName());
            return l;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            if (chn != null) {
                try {
                    this.amqpConnection.returnChannel(ConnectionType.SUBSCRIBER, chn);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void close() {
        this.amqpConnection.close();
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.settings.getQueueOrExchangeName());
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.settings.getQueueOrExchangeName());
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType) throws Exception {
        return this.getOrCreateExchange(connectionType, this.settings.getQueueOrExchangeName(), this.settings.getExchangeType(), this.settings.isDurable(), this.settings.autoDelete(), this.settings.getArguments());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType, String name, String type, boolean isDurable, boolean autoDelete, Map<String, Object> arguments) throws Exception {
        AMQP.Exchange.DeclareOk declareOk;
        if (StringUtils.isEmpty((CharSequence)name)) {
            throw new RuntimeException("Exchange name is undefined");
        }
        if (StringUtils.isEmpty((CharSequence)type)) {
            throw new RuntimeException("Exchange type is undefined");
        }
        Channel chn = null;
        try {
            LOGGER.debug("Creating exchange {} of type {}", (Object)name, (Object)type);
            chn = this.amqpConnection.getOrCreateChannel(connectionType, this.getSettings().getQueueOrExchangeName());
            declareOk = chn.exchangeDeclare(name, type, isDurable, autoDelete, arguments);
            if (chn == null) return declareOk;
        }
        catch (Exception e) {
            try {
                LOGGER.warn("Failed to create exchange {} of type {}", new Object[]{name, type, e});
                throw e;
            }
            catch (Throwable throwable) {
                if (chn == null) throw throwable;
                try {
                    this.amqpConnection.returnChannel(connectionType, chn);
                    throw throwable;
                }
                catch (Exception e2) {
                    LOGGER.error("Failed to return the channel of {}. {}", (Object)connectionType, (Object)e2);
                }
                throw throwable;
            }
        }
        try {
            this.amqpConnection.returnChannel(connectionType, chn);
            return declareOk;
        }
        catch (Exception e) {
            LOGGER.error("Failed to return the channel of {}. {}", (Object)connectionType, (Object)e);
        }
        return declareOk;
    }

    private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) throws Exception {
        return this.getOrCreateQueue(connectionType, this.settings.getQueueOrExchangeName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), this.settings.getArguments());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType, String name, boolean isDurable, boolean isExclusive, boolean autoDelete, Map<String, Object> arguments) throws Exception {
        AMQP.Queue.DeclareOk declareOk;
        if (StringUtils.isEmpty((CharSequence)name)) {
            throw new RuntimeException("Queue name is undefined");
        }
        arguments.put("x-queue-type", this.settings.getQueueType());
        Channel chn = null;
        try {
            LOGGER.debug("Creating queue {}", (Object)name);
            chn = this.amqpConnection.getOrCreateChannel(connectionType, this.getSettings().getQueueOrExchangeName());
            declareOk = chn.queueDeclare(name, isDurable, isExclusive, autoDelete, arguments);
            if (chn == null) return declareOk;
        }
        catch (Exception e) {
            try {
                LOGGER.warn("Failed to create queue {}", (Object)name, (Object)e);
                throw e;
            }
            catch (Throwable throwable) {
                if (chn == null) throw throwable;
                try {
                    this.amqpConnection.returnChannel(connectionType, chn);
                    throw throwable;
                }
                catch (Exception e2) {
                    LOGGER.error("Failed to return the channel of {}. {}", (Object)connectionType, (Object)e2);
                }
                throw throwable;
            }
        }
        try {
            this.amqpConnection.returnChannel(connectionType, chn);
            return declareOk;
        }
        catch (Exception e) {
            LOGGER.error("Failed to return the channel of {}. {}", (Object)connectionType, (Object)e);
        }
        return declareOk;
    }

    private static Message asMessage(AMQPSettings settings, GetResponse response) throws Exception {
        if (response == null) {
            return null;
        }
        Message message = new Message();
        message.setId(response.getProps().getMessageId());
        message.setPayload(new String(response.getBody(), settings.getContentEncoding()));
        message.setReceipt(String.valueOf(response.getEnvelope().getDeliveryTag()));
        return message;
    }

    private void receiveMessagesFromQueue(String queueName) throws Exception {
        LOGGER.debug("Accessing channel for queue {}", (Object)queueName);
        DefaultConsumer consumer = new DefaultConsumer(this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName())){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Message message = AMQPObservableQueue.asMessage(AMQPObservableQueue.this.settings, new GetResponse(envelope, properties, body, Integer.MAX_VALUE));
                    if (message != null) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Got message with ID {} and receipt {}", (Object)message.getId(), (Object)message.getReceipt());
                        }
                        AMQPObservableQueue.this.messages.add(message);
                        LOGGER.info("receiveMessagesFromQueue- End method {}", AMQPObservableQueue.this.messages);
                    }
                }
                catch (InterruptedException e) {
                    LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", (Object)consumerTag, (Object)e);
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", (Object)consumerTag, (Object)e);
                }
            }

            public void handleCancel(String consumerTag) throws IOException {
                LOGGER.error("Recieved a consumer cancel notification for subscriber {}", (Object)consumerTag);
            }
        };
        this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).basicConsume(queueName, false, (Consumer)consumer);
        Monitors.recordEventQueueMessagesProcessed((String)this.getType(), (String)queueName, (int)this.messages.size());
    }

    private void receiveMessagesFromQueue(String queueName, final Subscriber<? super Message> subscriber) throws Exception {
        LOGGER.debug("Accessing channel for queue {}", (Object)queueName);
        DefaultConsumer consumer = new DefaultConsumer(this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName())){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Message message = AMQPObservableQueue.asMessage(AMQPObservableQueue.this.settings, new GetResponse(envelope, properties, body, Integer.MAX_VALUE));
                    if (message == null) {
                        return;
                    }
                    LOGGER.info("Got message with ID {} and receipt {}", (Object)message.getId(), (Object)message.getReceipt());
                    LOGGER.debug("Message content {}", (Object)message);
                    Thread newThread = new Thread(() -> {
                        LOGGER.info("Spawning a new thread for message with ID {}", (Object)message.getId());
                        subscriber.onNext((Object)message);
                    });
                    newThread.start();
                }
                catch (InterruptedException e) {
                    LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", (Object)consumerTag, (Object)e);
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", (Object)consumerTag, (Object)e);
                }
            }

            public void handleCancel(String consumerTag) throws IOException {
                LOGGER.error("Recieved a consumer cancel notification for subscriber {}", (Object)consumerTag);
            }
        };
        this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).basicConsume(queueName, false, (Consumer)consumer);
    }

    protected void receiveMessages() {
        try {
            String queueName;
            this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).basicQos(this.batchSize);
            if (this.useExchange) {
                this.getOrCreateExchange(ConnectionType.SUBSCRIBER);
                AMQP.Queue.DeclareOk declareOk = this.getOrCreateQueue(ConnectionType.SUBSCRIBER, String.format("bound_to_%s", this.settings.getQueueOrExchangeName()), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), Maps.newHashMap());
                queueName = declareOk.getQueue();
                this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).queueBind(queueName, this.settings.getQueueOrExchangeName(), this.settings.getRoutingKey());
            } else {
                queueName = this.getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue();
            }
            LOGGER.info("Consuming from queue {}", (Object)queueName);
            this.receiveMessagesFromQueue(queueName);
        }
        catch (Exception exception) {
            LOGGER.error("Exception while getting messages from RabbitMQ", (Throwable)exception);
            Monitors.recordObservableQMessageReceivedErrors((String)this.getType());
        }
    }

    protected void receiveMessages(Subscriber<? super Message> subscriber) {
        try {
            String queueName;
            this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).basicQos(this.batchSize);
            if (this.useExchange) {
                this.getOrCreateExchange(ConnectionType.SUBSCRIBER);
                AMQP.Queue.DeclareOk declareOk = this.getOrCreateQueue(ConnectionType.SUBSCRIBER, String.format("bound_to_%s", this.settings.getQueueOrExchangeName()), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), Maps.newHashMap());
                queueName = declareOk.getQueue();
                this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.getSettings().getQueueOrExchangeName()).queueBind(queueName, this.settings.getQueueOrExchangeName(), this.settings.getRoutingKey());
            } else {
                queueName = this.getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue();
            }
            LOGGER.info("Consuming from queue {}", (Object)queueName);
            this.receiveMessagesFromQueue(queueName, subscriber);
        }
        catch (Exception exception) {
            LOGGER.error("Exception while getting messages from RabbitMQ", (Throwable)exception);
            Monitors.recordObservableQMessageReceivedErrors((String)this.getType());
        }
    }

    public int getPollTimeInMS() {
        return this.pollTimeInMS;
    }

    public void setPollTimeInMS(int pollTimeInMS) {
        this.pollTimeInMS = pollTimeInMS;
    }

    public static class Builder {
        private final Address[] addresses;
        private final int batchSize;
        private final int pollTimeInMS;
        private final ConnectionFactory factory;
        private final AMQPEventQueueProperties properties;

        public Builder(AMQPEventQueueProperties properties) {
            this.properties = properties;
            this.addresses = this.buildAddressesFromHosts();
            this.factory = this.buildConnectionFactory();
            this.batchSize = properties.getBatchSize();
            this.pollTimeInMS = (int)properties.getPollTimeDuration().toMillis();
        }

        private Address[] buildAddressesFromHosts() {
            String hosts = this.properties.getHosts();
            if (StringUtils.isEmpty((CharSequence)hosts)) {
                throw new IllegalArgumentException("Hosts are undefined");
            }
            return Address.parseAddresses((String)hosts);
        }

        private ConnectionFactory buildConnectionFactory() {
            ConnectionFactory factory = new ConnectionFactory();
            String username = this.properties.getUsername();
            if (StringUtils.isEmpty((CharSequence)username)) {
                throw new IllegalArgumentException("Username is null or empty");
            }
            factory.setUsername(username);
            String password = this.properties.getPassword();
            if (StringUtils.isEmpty((CharSequence)password)) {
                throw new IllegalArgumentException("Password is null or empty");
            }
            factory.setPassword(password);
            String virtualHost = this.properties.getVirtualHost();
            if (StringUtils.isEmpty((CharSequence)virtualHost)) {
                throw new IllegalArgumentException("Virtual host is null or empty");
            }
            factory.setVirtualHost(virtualHost);
            int port = this.properties.getPort();
            if (port <= 0) {
                throw new IllegalArgumentException("Port must be greater than 0");
            }
            factory.setPort(port);
            boolean useNio = this.properties.isUseNio();
            if (useNio) {
                factory.useNio();
            }
            factory.setConnectionTimeout(this.properties.getConnectionTimeoutInMilliSecs());
            factory.setRequestedHeartbeat(this.properties.getRequestHeartbeatTimeoutInSecs());
            factory.setNetworkRecoveryInterval(this.properties.getNetworkRecoveryIntervalInMilliSecs());
            factory.setHandshakeTimeout(this.properties.getHandshakeTimeoutInMilliSecs());
            factory.setAutomaticRecoveryEnabled(true);
            factory.setTopologyRecoveryEnabled(true);
            factory.setRequestedChannelMax(this.properties.getMaxChannelCount());
            return factory;
        }

        public AMQPObservableQueue build(boolean useExchange, String queueURI) {
            AMQPSettings settings = new AMQPSettings(this.properties).fromURI(queueURI);
            AMQPRetryPattern retrySettings = new AMQPRetryPattern(this.properties.getLimit(), this.properties.getDuration(), this.properties.getType());
            return new AMQPObservableQueue(this.factory, this.addresses, useExchange, settings, retrySettings, this.batchSize, this.pollTimeInMS);
        }
    }
}

