/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.wadi.gridstate.activecluster;

import java.util.Collection;
import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import org.activecluster.Cluster;
import org.activecluster.ClusterListener;
import org.activecluster.LocalNode;
import org.activecluster.Node;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.broker.BrokerConnector;
import org.activemq.broker.BrokerContainer;
import org.activemq.store.vm.VMPersistenceAdapterFactory;
import org.activemq.transport.TransportChannel;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.dindex.impl.DIndex;
import org.codehaus.wadi.gridstate.DispatcherConfig;
import org.codehaus.wadi.gridstate.ExtendedCluster;
import org.codehaus.wadi.gridstate.activecluster.CustomClusterFactory;
import org.codehaus.wadi.gridstate.impl.AbstractDispatcher;

public class ActiveClusterDispatcher
extends AbstractDispatcher {
    protected static String _incomingCorrelationIdKey = "incomingCorrelationId";
    protected static String _outgoingCorrelationIdKey = "outgoingCorrelationId";
    protected Cluster _cluster;
    protected MessageConsumer _clusterConsumer;
    protected MessageConsumer _nodeConsumer;
    protected final String _clusterUri;
    protected ActiveMQConnectionFactory _connectionFactory;
    public CustomClusterFactory _clusterFactory;

    public ActiveClusterDispatcher(String nodeName, String clusterName, String clusterUri, long inactiveTime) {
        super(nodeName, clusterName, inactiveTime);
        this._clusterUri = clusterUri;
        this._log = LogFactory.getLog((String)(this.getClass() + "#" + this._nodeName));
    }

    public Cluster getCluster() {
        return this._cluster;
    }

    public MessageConsumer addDestination(Destination destination) throws JMSException {
        boolean excludeSelf = true;
        MessageConsumer consumer = this._cluster.createConsumer(destination, null, excludeSelf);
        consumer.setMessageListener((MessageListener)this);
        return consumer;
    }

    public void removeDestination(MessageConsumer consumer) throws JMSException {
        consumer.close();
    }

    public void init(DispatcherConfig config) throws Exception {
        super.init(config);
        try {
            this._connectionFactory = new ActiveMQConnectionFactory(this._clusterUri);
            this._connectionFactory.start();
            System.setProperty("activemq.persistenceAdapterFactory", VMPersistenceAdapterFactory.class.getName());
            this._clusterFactory = new CustomClusterFactory((ConnectionFactory)this._connectionFactory);
            this._clusterFactory.setInactiveTime(this._inactiveTime);
            this._cluster = (ExtendedCluster)this._clusterFactory.createCluster(this._clusterName);
        }
        catch (Exception e) {
            this._log.error((Object)"problem starting Cluster", (Throwable)e);
        }
        boolean excludeSelf = false;
        this._clusterConsumer = this._cluster.createConsumer((Destination)this._cluster.getDestination(), null, excludeSelf);
        this._clusterConsumer.setMessageListener((MessageListener)this);
        excludeSelf = false;
        this._nodeConsumer = this._cluster.createConsumer(this._cluster.getLocalNode().getDestination(), null, excludeSelf);
        this._nodeConsumer.setMessageListener((MessageListener)this);
    }

    public void start() throws Exception {
        this._cluster.start();
    }

    public void stop() throws Exception {
        BrokerContainer container;
        ActiveMQConnection connection = (ActiveMQConnection)((ExtendedCluster)this._cluster).getConnection();
        TransportChannel channel = connection == null ? null : connection.getTransportChannel();
        BrokerConnector connector = channel == null ? null : channel.getEmbeddedBrokerConnector();
        BrokerContainer brokerContainer = container = connector == null ? null : connector.getBrokerContainer();
        if (container != null) {
            container.stop();
        }
        this._cluster.stop();
        this._connectionFactory.stop();
        Thread.sleep(5000L);
    }

    public int getNumNodes() {
        return this._cluster.getNodes().size() + 1;
    }

    public ObjectMessage createObjectMessage() throws Exception {
        return this._cluster.createObjectMessage();
    }

    public void send(Destination to, ObjectMessage message) throws Exception {
        if (this._messageLog.isTraceEnabled()) {
            try {
                this._messageLog.trace((Object)("outgoing: " + message.getObject() + " {" + this.getNodeName(message.getJMSReplyTo()) + "->" + this.getNodeName(message.getJMSDestination()) + "} - " + this.getIncomingCorrelationId(message) + "/" + this.getOutgoingCorrelationId(message) + " on " + Thread.currentThread().getName()));
            }
            catch (JMSException e) {
                this._log.warn((Object)"problem extracting message content", (Throwable)e);
            }
        }
        this._cluster.send(to, (Message)message);
    }

    public Destination getLocalDestination() {
        return this._cluster.getLocalNode().getDestination();
    }

    public Destination getClusterDestination() {
        return this._cluster.getDestination();
    }

    public Map getDistributedState() {
        return this._cluster.getLocalNode().getState();
    }

    public void setDistributedState(Map state) throws Exception {
        this._cluster.getLocalNode().setState(state);
    }

    public String getNodeName(Destination destination) {
        LocalNode localNode = this._cluster.getLocalNode();
        Destination localDestination = localNode.getDestination();
        if (destination == null) {
            return "<NULL-DESTINATION>";
        }
        if (destination.equals(localDestination)) {
            return DIndex.getNodeName((Node)localNode);
        }
        Topic clusterDestination = this._cluster.getDestination();
        if (destination.equals(clusterDestination)) {
            return "cluster";
        }
        Node node = null;
        node = (Node)this._cluster.getNodes().get(destination);
        if (node != null) {
            return DIndex.getNodeName(node);
        }
        return "<unknown>";
    }

    public String getIncomingCorrelationId(ObjectMessage message) throws Exception {
        return message.getStringProperty(_incomingCorrelationIdKey);
    }

    public void setIncomingCorrelationId(ObjectMessage message, String id) throws JMSException {
        message.setStringProperty(_incomingCorrelationIdKey, id);
    }

    public String getOutgoingCorrelationId(ObjectMessage message) throws JMSException {
        return message.getStringProperty(_outgoingCorrelationIdKey);
    }

    public void setOutgoingCorrelationId(ObjectMessage message, String id) throws JMSException {
        message.setStringProperty(_outgoingCorrelationIdKey, id);
    }

    public void findRelevantSessionNames(int numPartitions, Collection[] resultSet) {
        throw new UnsupportedOperationException("NYI");
    }

    public void setClusterListener(ClusterListener listener) {
        this._cluster.addClusterListener(listener);
    }
}

