/*
 * Decompiled with CFR 0.152.
 */
package org.jruby.rack.jms;

import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.jruby.Ruby;
import org.jruby.RubyModule;
import org.jruby.RubyObjectAdapter;
import org.jruby.javasupport.JavaEmbedUtils;
import org.jruby.rack.RackApplication;
import org.jruby.rack.RackApplicationFactory;
import org.jruby.rack.RackContext;
import org.jruby.rack.jms.QueueManager;
import org.jruby.rack.servlet.ServletRackContext;
import org.jruby.runtime.builtin.IRubyObject;

public class DefaultQueueManager
implements QueueManager {
    private ConnectionFactory connectionFactory = null;
    private ServletRackContext context;
    private Context jndiContext;
    private Map<String, Connection> queues = new HashMap<String, Connection>();
    private RubyObjectAdapter rubyObjectAdapter = JavaEmbedUtils.newObjectAdapter();

    public DefaultQueueManager() {
    }

    public DefaultQueueManager(ConnectionFactory qcf, Context ctx) {
        this.connectionFactory = qcf;
        this.jndiContext = ctx;
    }

    public void init(RackContext context) throws Exception {
        this.context = (ServletRackContext)context;
        String jndiName = context.getConfig().getJmsConnectionFactory();
        if (jndiName != null && this.connectionFactory == null) {
            Properties properties = new Properties();
            String jndiProperties = context.getConfig().getJmsJndiProperties();
            if (jndiProperties != null) {
                properties.load(new ByteArrayInputStream(jndiProperties.getBytes("UTF-8")));
            }
            this.jndiContext = new InitialContext(properties);
            this.connectionFactory = (ConnectionFactory)this.jndiContext.lookup(jndiName);
        }
    }

    public synchronized void listen(String queueName) {
        Connection conn = this.queues.get(queueName);
        if (conn == null) {
            try {
                conn = this.connectionFactory.createConnection();
                Session session = conn.createSession(false, 1);
                Destination dest = (Destination)this.lookup(queueName);
                MessageConsumer consumer = session.createConsumer(dest);
                consumer.setMessageListener((MessageListener)new RubyObjectMessageListener(queueName));
                this.queues.put(queueName, conn);
                conn.start();
            }
            catch (Exception e) {
                this.context.log("Unable to listen to '" + queueName + "': " + e.getMessage(), e);
            }
        }
    }

    public synchronized void close(String queueName) {
        Connection conn = this.queues.remove(queueName);
        if (conn != null) {
            this.closeConnection(conn);
        }
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public Object lookup(String name) throws NamingException {
        return this.jndiContext.lookup(name);
    }

    public void destroy() {
        for (Map.Entry<String, Connection> entry : this.queues.entrySet()) {
            this.closeConnection(entry.getValue());
        }
        this.queues.clear();
        this.connectionFactory = null;
    }

    private void closeConnection(Connection conn) {
        try {
            conn.close();
        }
        catch (Exception e) {
            this.context.log("exception while closing connection: " + e.getMessage(), e);
        }
    }

    private class RubyObjectMessageListener
    implements MessageListener {
        private String queueName;
        private RackApplicationFactory rackFactory;

        public RubyObjectMessageListener(String name) {
            this.queueName = name;
            this.rackFactory = DefaultQueueManager.this.context.getRackFactory();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(Message message) {
            RackApplication app = null;
            try {
                app = this.rackFactory.getApplication();
                Ruby runtime = app.getRuntime();
                RubyModule mod = runtime.getClassFromPath("JRuby::Rack::Queues");
                IRubyObject obj = mod.getConstant("Registry");
                DefaultQueueManager.this.rubyObjectAdapter.callMethod(obj, "receive_message", new IRubyObject[]{JavaEmbedUtils.javaToRuby((Ruby)runtime, (Object)this.queueName), JavaEmbedUtils.javaToRuby((Ruby)runtime, (Object)message)});
            }
            catch (Exception e) {
                DefaultQueueManager.this.context.log("exception during message reception: " + e.getMessage(), e);
            }
            finally {
                if (app != null) {
                    this.rackFactory.finishedWithApplication(app);
                }
            }
        }
    }
}

