/*
 * Decompiled with CFR 0.152.
 */
package rapture.exchange.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.FlowListener;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import rapture.common.RapturePipelineTask;
import rapture.common.exception.ExceptionToString;
import rapture.common.exception.RaptureException;
import rapture.common.exception.RaptureExceptionFactory;
import rapture.common.impl.jackson.JacksonUtil;
import rapture.common.model.RaptureExchange;
import rapture.common.model.RaptureExchangeQueue;
import rapture.config.MultiValueConfigLoader;
import rapture.exchange.ExchangeHandler;
import rapture.exchange.QueueHandler;
import rapture.exchange.RPCMessage;
import rapture.exchange.TopicMessageHandler;
import rapture.exchange.rabbitmq.MessageConsumer;
import rapture.exchange.rabbitmq.Messages;
import rapture.exchange.rabbitmq.SubscriptionThread;
import rapture.util.IDGenerator;

public class RabbitExchangeHandler
implements ExchangeHandler {
    private Connection connection;
    private Channel channel;
    private ExecutorService service = Executors.newCachedThreadPool();
    private static Logger logger = Logger.getLogger(RabbitExchangeHandler.class);
    private String instanceName = "default";
    private int messageCounter = 1;
    private String replyQueueName;
    private Map<String, String> queueNameRegistry = new ConcurrentHashMap<String, String>();
    private QueueingConsumer consumer;
    private Set<String> exchangesTested = Collections.synchronizedSet(new HashSet());
    private AtomicLong subscriptionHandler = new AtomicLong(0L);
    private Map<Long, SubscriptionThread> subscriberMap = new HashMap<Long, SubscriptionThread>();

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    public synchronized void setConfig(Map<String, String> config) {
        ConnectionFactory factory = new ConnectionFactory();
        logger.info((Object)Messages.getString("RabbitExchangeHandler.config"));
        try {
            String uri = MultiValueConfigLoader.getConfig((String)("RABBITMQ-" + this.instanceName));
            if (uri == null || uri.isEmpty()) {
                uri = "amqp://guest:guest@localhost:5672/%2f";
            }
            factory.setUri(uri);
            factory.setAutomaticRecoveryEnabled(true);
            logger.debug((Object)Messages.getString("RabbitExchangeHandler.creatingChannel"));
            this.connection = factory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener(){

                public void shutdownCompleted(ShutdownSignalException cause) {
                    logger.info((Object)"Reconnected to RabbitMQ");
                }
            });
            logger.debug((Object)Messages.getString("RabbitExchangeHandler.connectionMade"));
            this.channel = this.connection.createChannel();
            this.channel.addShutdownListener(new ShutdownListener(){

                public void shutdownCompleted(ShutdownSignalException cause) {
                    logger.info((Object)("Disconnected from RabbitMQ. Cause :" + cause.getMessage()));
                    logger.debug((Object)ExceptionToString.format((Throwable)cause));
                }
            });
            logger.debug((Object)Messages.getString("RabbitExchangeHandler.channelCreated"));
            this.channel.basicQos(100);
            this.channel.addReturnListener(new ReturnListener(){

                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.returnListener"), replyCode, replyText));
                }
            });
            this.channel.addFlowListener(new FlowListener(){

                public void handleFlow(boolean active) throws IOException {
                    logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.Flow"), active));
                }
            });
            this.replyQueueName = this.channel.queueDeclare().getQueue();
            logger.info((Object)("RPC reply queue is " + this.replyQueueName));
            this.consumer = new QueueingConsumer(this.channel);
            this.channel.basicConsume(this.replyQueueName, true, (Consumer)this.consumer);
        }
        catch (Exception e) {
            String message = Messages.getString("RabbitExchangeHandler.noConnect");
            throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
        }
    }

    public void setupExchange(RaptureExchange exchange) {
        if (this.exchangesTested.contains(exchange.getName())) {
            if (exchange.getQueueBindings() != null) {
                for (RaptureExchangeQueue queue : exchange.getQueueBindings()) {
                    try {
                        this.bindQueue(exchange, queue);
                    }
                    catch (IOException e) {
                        logger.error((Object)("Unable to bind " + queue.getName() + " to exchange " + exchange.getName()));
                        logger.info((Object)ExceptionToString.format((Throwable)e));
                    }
                }
            }
            return;
        }
        logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.ensureAvail"), exchange.getName()));
        try {
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.ExchangeDeclare"), exchange.getName()));
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.exchangeType"), exchange.getExchangeType()));
            this.channel.exchangeDeclare(exchange.getName(), exchange.getExchangeType().name().toLowerCase(), true);
            logger.debug((Object)Messages.getString("RabbitExchangeHandler.bindingQueues"));
            if (exchange.getQueueBindings() != null) {
                for (RaptureExchangeQueue queue : exchange.getQueueBindings()) {
                    this.bindQueue(exchange, queue);
                }
            }
            this.exchangesTested.add(exchange.getName());
        }
        catch (IOException e) {
            String message = Messages.getString("RabbitExchangeHandler.noExchange");
            try {
                if (this.channel.isOpen()) {
                    this.channel.close();
                }
            }
            catch (IOException e1) {
                logger.error((Object)ExceptionToString.format((Throwable)e1));
            }
            throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
        }
        catch (Throwable t) {
            try {
                if (this.channel.isOpen()) {
                    this.channel.close();
                }
            }
            catch (IOException e1) {
                logger.error((Object)ExceptionToString.format((Throwable)e1));
            }
            String message = String.format("Caught throwable during exchangeDeclare for exchange %s:\n%s", exchange.getName(), ExceptionToString.format((Throwable)t));
            logger.info((Object)message);
            throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)t);
        }
    }

    private void bindQueue(RaptureExchange exchange, RaptureExchangeQueue queue) throws IOException {
        String underlying;
        if (this.isAnonymousQueue(queue.getName())) {
            if (this.queueNameRegistry.containsKey(queue.getName())) {
                underlying = this.queueNameRegistry.get(queue.getName());
            } else {
                underlying = this.channel.queueDeclare().getQueue();
                this.queueNameRegistry.put(queue.getName(), underlying);
            }
        } else {
            underlying = this.getUnderlyingQueueName(exchange.getName(), queue.getName());
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.underlyingQueue"), underlying));
            this.channel.queueDeclare(underlying, true, false, false, null);
        }
        if (queue.getRouteBindings().isEmpty()) {
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.noRoute"), exchange.getName()));
            this.channel.queueBind(underlying, exchange.getName(), "");
        } else {
            for (String bindingKey : queue.getRouteBindings()) {
                logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.binding"), bindingKey, exchange.getName()));
                this.channel.queueBind(underlying, exchange.getName(), bindingKey);
            }
        }
    }

    private boolean isAnonymousQueue(String queue) {
        return queue != null && queue.startsWith("$anonoymous");
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public String startConsuming(String exchange, String queue, QueueHandler handler) {
        boolean autoAck = Boolean.valueOf(MultiValueConfigLoader.getConfig((String)"RABBITMQ-autoAck", (String)"false"));
        String tag = IDGenerator.getUUID();
        try {
            String underlyingQueue;
            if (this.isAnonymousQueue(queue)) {
                if (!this.queueNameRegistry.containsKey(queue)) throw RaptureExceptionFactory.create((String)String.format("Error! Cannot start consuming on undefined anonymous queue %s, on exchange %s", queue, exchange));
                underlyingQueue = this.queueNameRegistry.get(queue);
            } else {
                underlyingQueue = this.getUnderlyingQueueName(exchange, queue);
            }
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.startConsuming"), underlyingQueue));
            logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.underly"), underlyingQueue));
            this.channel.basicConsume(underlyingQueue, autoAck, tag, (Consumer)new MessageConsumer(this.channel, tag, handler, underlyingQueue));
            return tag;
        }
        catch (IOException e) {
            String message = String.format(Messages.getString("RabbitExchangeHandler.noStartConsuming"), String.format("%s (%s)", queue, exchange));
            try {
                this.channel.close();
                throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
            }
            catch (IOException e1) {
                logger.error((Object)ExceptionToString.format((Throwable)e1));
            }
            throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
        }
    }

    private String getUnderlyingQueueName(String exchange, String queue) {
        return exchange + "-" + queue;
    }

    public void tearDownExchange(RaptureExchange exchange) {
        try {
            for (RaptureExchangeQueue queue : exchange.getQueueBindings()) {
                this.unbindQueue(exchange, queue);
            }
            this.channel.exchangeDelete(exchange.getName());
            this.exchangesTested.remove(exchange.getName());
        }
        catch (IOException e) {
            String message = String.format(Messages.getString("RabbitExchangeHandler.noDelete"), new Object[0]);
            throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
        }
    }

    private void unbindQueue(RaptureExchange exchange, RaptureExchangeQueue queue) throws IOException {
        String underlying = this.getUnderlyingQueueName(exchange.getName(), queue.getName());
        for (String routingKey : queue.getRouteBindings()) {
            this.channel.queueUnbind(underlying, exchange.getName(), routingKey);
        }
        this.channel.queueDelete(underlying);
    }

    public void putTaskOnExchange(String exchange, RapturePipelineTask task, String routingKey) {
        logger.debug((Object)Messages.getString("RabbitExchangeHandler.puttingTask"));
        Submitter s = new Submitter(exchange, task, routingKey);
        this.service.submit(s);
        RaptureException ex = s.getError();
        if (ex != null) {
            throw ex;
        }
    }

    public String subscribeToExchange(String exchange, List<String> routingKeys, QueueHandler handler) {
        throw RaptureExceptionFactory.create((Integer)500, (String)Messages.getString("RabbitExchangeHandler.notYetSupported"));
    }

    public void ensureExchangeUnAvailable(RaptureExchange exchangeConfig) {
        this.exchangesTested.remove(exchangeConfig.getName());
    }

    public Map<String, Object> makeRPC(String queueName, String fnName, Map<String, Object> params, long timeoutInSeconds) {
        try {
            QueueingConsumer.Delivery delivery;
            String corrId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(this.replyQueueName).build();
            RPCMessage messageObj = new RPCMessage();
            messageObj.setFnName(fnName);
            messageObj.setParams(params);
            String message = JacksonUtil.jsonFromObject((Object)messageObj);
            logger.debug((Object)("Will make call on queue name " + queueName));
            logger.debug((Object)("Message is " + message));
            this.channel.basicPublish("", queueName, props, message.getBytes("UTF-8"));
            String response = null;
            do {
                if ((delivery = this.consumer.nextDelivery(timeoutInSeconds * 1000L)) != null) continue;
                throw RaptureExceptionFactory.create((String)"Timed out while waiting for response");
            } while (!delivery.getProperties().getCorrelationId().equals(corrId));
            response = new String(delivery.getBody());
            Map ret = null;
            if (response != null) {
                ret = JacksonUtil.getMapFromJson((String)response);
            }
            return ret;
        }
        catch (IOException e) {
            throw RaptureExceptionFactory.create((String)"Could not create reply rpc queue", (Throwable)e);
        }
        catch (ShutdownSignalException e) {
            throw RaptureExceptionFactory.create((String)"Shutdown", (Throwable)e);
        }
        catch (ConsumerCancelledException e) {
            throw RaptureExceptionFactory.create((String)"Cancelled", (Throwable)e);
        }
        catch (InterruptedException e) {
            throw RaptureExceptionFactory.create((String)"Interrupted", (Throwable)e);
        }
    }

    public void publishTopicMessage(String exchange, String topic, String message) {
        Publisher p = new Publisher(exchange, topic, message);
        this.service.submit(p);
    }

    public long subscribeTopic(String exchange, String topic, TopicMessageHandler messageHandler) {
        SubscriptionThread t = new SubscriptionThread(exchange, topic, this.channel, messageHandler);
        long handleId = this.subscriptionHandler.incrementAndGet();
        this.subscriberMap.put(handleId, t);
        t.start();
        return handleId;
    }

    public void unsubscribeTopic(long handle) {
        SubscriptionThread t = this.subscriberMap.remove(handle);
        if (t != null) {
            t.closeSubscription();
        }
    }

    private class Publisher
    implements Runnable {
        final String exchange;
        final String topic;
        final String message;

        public Publisher(String exchange, String topic, String message) {
            this.exchange = exchange;
            this.topic = topic;
            this.message = message;
        }

        @Override
        public void run() {
            try {
                RabbitExchangeHandler.this.channel.basicPublish(this.exchange, this.topic, null, this.message.getBytes());
            }
            catch (IOException e) {
                throw RaptureExceptionFactory.create((String)"Could not publish on topic queue", (Throwable)e);
            }
        }
    }

    private class Submitter
    implements Runnable {
        final String exchange;
        final RapturePipelineTask task;
        final String routingKey;
        private RaptureException error = null;

        public Submitter(String exchange, RapturePipelineTask task, String routingKey) {
            this.exchange = exchange;
            this.task = task;
            this.routingKey = routingKey;
        }

        @Override
        public void run() {
            try {
                byte[] messageBody = JacksonUtil.bytesJsonFromObject((Object)this.task);
                RabbitExchangeHandler.this.messageCounter++;
                Integer deliveryMode = Integer.parseInt(MultiValueConfigLoader.getConfig((String)"RABBITMQ-deliveryMode", (String)"1"));
                AMQP.BasicProperties props = new AMQP.BasicProperties().builder().contentType(this.task.getContentType()).deliveryMode(deliveryMode).priority(Integer.valueOf(this.task.getPriority())).messageId("" + RabbitExchangeHandler.this.messageCounter).build();
                logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.publishMessage"), this.exchange, this.routingKey, this.task.getContentType()));
                try {
                    RabbitExchangeHandler.this.channel.basicPublish(this.exchange, this.routingKey, props, messageBody);
                    logger.debug((Object)String.format(Messages.getString("RabbitExchangeHandler.sendMessage"), props.getMessageId(), this.task.getContent(), this.exchange, this.routingKey));
                }
                catch (IOException e) {
                    String message = Messages.getString("RabbitExchangeHandler.noPublish");
                    throw RaptureExceptionFactory.create((Integer)500, (String)message, (Throwable)e);
                }
            }
            catch (RaptureException e) {
                this.error = e;
            }
        }

        public RaptureException getError() {
            return this.error;
        }
    }
}

