/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.wadi.gridstate.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.gridstate.Dispatcher;
import org.codehaus.wadi.gridstate.DispatcherConfig;
import org.codehaus.wadi.impl.Quipu;

public abstract class AbstractDispatcher
implements Dispatcher,
MessageListener {
    protected final String _nodeName;
    protected final String _clusterName;
    protected final long _inactiveTime;
    protected final Map _map;
    protected final PooledExecutor _executor;
    protected final Log _messageLog = LogFactory.getLog((String)"org.codehaus.wadi.MESSAGES");
    protected Log _log = LogFactory.getLog(this.getClass());
    protected DispatcherConfig _config;
    protected final Map _rvMap = new ConcurrentHashMap();
    protected final SimpleCorrelationIDFactory _factory = new SimpleCorrelationIDFactory();

    public AbstractDispatcher(String nodeName, String clusterName, long inactiveTime) {
        this._nodeName = nodeName;
        this._clusterName = clusterName;
        this._inactiveTime = inactiveTime;
        this._map = new HashMap();
        this._executor = new PooledExecutor();
        this._executor.setThreadFactory(new ThreadFactory(){
            protected int _count;

            public synchronized Thread newThread(Runnable runnable) {
                return new Thread(runnable, "WADI Dispatcher (" + this._count++ + ")");
            }
        });
    }

    public void init(DispatcherConfig config) throws Exception {
        this._config = config;
    }

    public Dispatcher.InternalDispatcher register(Object target, String methodName, Class type) {
        try {
            Method method = target.getClass().getMethod(methodName, ObjectMessage.class, type);
            if (method == null) {
                return null;
            }
            TargetDispatcher nuw = new TargetDispatcher(target, method);
            Dispatcher.InternalDispatcher old = this._map.put(type, nuw);
            if (old != null && this._log.isWarnEnabled()) {
                this._log.warn((Object)("later registration replaces earlier - multiple dispatch NYI: " + old + " -> " + nuw));
            }
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("registering: " + type.getName() + "." + methodName + "()"));
            }
            return nuw;
        }
        catch (NoSuchMethodException e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("no method: " + methodName + "(" + type.getName() + ") on class: " + target.getClass().getName()), (Throwable)e);
            }
            return null;
        }
    }

    public Dispatcher.InternalDispatcher newRegister(Object target, String methodName, Class type) {
        try {
            Method method = target.getClass().getMethod(methodName, type);
            if (method == null) {
                return null;
            }
            NewTargetDispatcher nuw = new NewTargetDispatcher(target, method);
            Dispatcher.InternalDispatcher old = this._map.put(type, nuw);
            if (old != null && this._log.isWarnEnabled()) {
                this._log.warn((Object)("later registration replaces earlier - multiple dispatch NYI: " + old + " -> " + nuw));
            }
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("registering: " + type.getName() + "." + methodName + "()"));
            }
            return nuw;
        }
        catch (NoSuchMethodException e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("no method: " + methodName + "(" + type.getName() + ") on class: " + target.getClass().getName()), (Throwable)e);
            }
            return null;
        }
    }

    public boolean deregister(String methodName, Class type, int timeout) {
        TargetDispatcher td = (TargetDispatcher)this._map.get(type);
        if (td == null) {
            return false;
        }
        for (int i = timeout; td._count > 0 && i > 0; --i) {
            try {
                Thread.sleep(1000L);
                continue;
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        this._map.remove(type);
        return td._count <= 0;
    }

    public boolean newDeregister(String methodName, Class type, int timeout) {
        NewTargetDispatcher td = (NewTargetDispatcher)this._map.get(type);
        if (td == null) {
            return false;
        }
        for (int i = timeout; td._count > 0 && i > 0; --i) {
            try {
                Thread.sleep(1000L);
                continue;
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        this._map.remove(type);
        return td._count <= 0;
    }

    public void register(Class type, long timeout) {
        this._map.put(type, new RendezVousDispatcher(this._rvMap, timeout));
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("registering class: " + type.getName()));
        }
    }

    public Map getRendezVousMap() {
        return this._rvMap;
    }

    public String nextCorrelationId() {
        return this._factory.create();
    }

    public Quipu setRendezVous(String correlationId, int numLlamas) {
        Quipu rv = new Quipu(numLlamas);
        this._rvMap.put(correlationId, rv);
        return rv;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ObjectMessage attemptRendezVous(String correlationId, Quipu rv, long timeout) {
        ObjectMessage response;
        block8: {
            response = null;
            try {
                do {
                    try {
                        long startTime = System.currentTimeMillis();
                        if (rv.waitFor(timeout)) {
                            response = (ObjectMessage)rv.getResults().toArray()[0];
                            long elapsedTime = System.currentTimeMillis() - startTime;
                            if (!this._log.isTraceEnabled()) continue;
                            this._log.trace((Object)("successful message exchange within timeframe (" + elapsedTime + "<" + timeout + " millis) {" + correlationId + "}"));
                            continue;
                        }
                        response = null;
                        if (!this._log.isWarnEnabled()) continue;
                        this._log.warn((Object)("unsuccessful message exchange within timeframe (" + timeout + " millis) {" + correlationId + "}"), (Throwable)new Exception());
                    }
                    catch (TimeoutException e) {
                        if (!this._log.isWarnEnabled()) continue;
                        this._log.warn((Object)("no response to request within timeout (" + timeout + " millis)"));
                    }
                    catch (InterruptedException e) {
                        if (!this._log.isWarnEnabled()) continue;
                        this._log.warn((Object)"waiting for response - interruption ignored");
                    }
                } while (Thread.interrupted());
                Object var11_9 = null;
                if (correlationId == null) break block8;
                this._rvMap.remove(correlationId);
            }
            catch (Throwable throwable) {
                Object var11_10 = null;
                if (correlationId != null) {
                    this._rvMap.remove(correlationId);
                }
                throw throwable;
            }
            {
            }
        }
        return response;
    }

    public PooledExecutor getExecutor() {
        return this._executor;
    }

    public ObjectMessage exchangeSendLoop(Destination from, Destination to, Serializable body, long timeout, int iterations) {
        ObjectMessage response = null;
        for (int i = 0; response == null && i < iterations; ++i) {
            response = this.exchangeSend(from, to, body, timeout);
            if (response != null || !this._log.isWarnEnabled()) continue;
            this._log.warn((Object)("null response - retrying: " + (i + 1) + "/" + iterations));
        }
        return response;
    }

    public ObjectMessage exchangeSend(Destination from, Destination to, Serializable body, long timeout) {
        return this.exchangeSend(from, to, body, timeout, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(Message message) {
        block11: {
            try {
                Dispatcher.InternalDispatcher dispatcher;
                ObjectMessage objectMessage = null;
                Serializable body = null;
                if (message instanceof ObjectMessage && (objectMessage = (ObjectMessage)message) != null && (body = objectMessage.getObject()) != null && (dispatcher = (Dispatcher.InternalDispatcher)this._map.get(body.getClass())) != null) {
                    if (this._messageLog.isTraceEnabled()) {
                        this._messageLog.trace((Object)("incoming: " + body + " {" + this.getNodeName(message.getJMSReplyTo()) + "->" + this.getNodeName(message.getJMSDestination()) + "} - " + this.getIncomingCorrelationId(objectMessage) + "/" + this.getOutgoingCorrelationId(objectMessage)));
                    }
                    do {
                        try {
                            Dispatcher.InternalDispatcher internalDispatcher = dispatcher;
                            synchronized (internalDispatcher) {
                                this._executor.execute((Runnable)new DispatchRunner(dispatcher, objectMessage, body));
                                dispatcher.incCount();
                            }
                        }
                        catch (InterruptedException e) {
                            // empty catch block
                        }
                    } while (Thread.interrupted());
                    break block11;
                }
                if (this._log.isWarnEnabled()) {
                    this._log.warn((Object)("spurious message received: " + message));
                }
            }
            catch (Exception e) {
                this._log.warn((Object)"bad message", (Throwable)e);
            }
        }
    }

    public boolean reply(ObjectMessage message, Serializable body) {
        try {
            ObjectMessage om = this.createObjectMessage();
            Destination from = this.getLocalDestination();
            om.setJMSReplyTo(from);
            Destination to = message.getJMSReplyTo();
            om.setJMSDestination(to);
            String incomingCorrelationId = this.getOutgoingCorrelationId(message);
            this.setIncomingCorrelationId(om, incomingCorrelationId);
            om.setObject(body);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("reply: " + this.getNodeName(from) + " -> " + this.getNodeName(to) + " {" + incomingCorrelationId + "} : " + body));
            }
            this.send(to, om);
            return true;
        }
        catch (Exception e) {
            this._log.error((Object)"problem replying to message", (Throwable)e);
            return false;
        }
    }

    public boolean send(Destination from, Destination to, String outgoingCorrelationId, Serializable body) {
        try {
            ObjectMessage om = this.createObjectMessage();
            om.setJMSReplyTo(from);
            om.setJMSDestination(to);
            this.setOutgoingCorrelationId(om, outgoingCorrelationId);
            om.setObject(body);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("send {" + outgoingCorrelationId + "}: " + this.getNodeName(from) + " -> " + this.getNodeName(to) + " : " + body));
            }
            this.send(to, om);
            return true;
        }
        catch (Exception e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("problem sending " + body), (Throwable)e);
            }
            return false;
        }
    }

    public ObjectMessage exchangeSend(Destination from, Destination to, Serializable body, long timeout, String targetCorrelationId) {
        try {
            ObjectMessage om = this.createObjectMessage();
            om.setJMSReplyTo(from);
            om.setJMSDestination(to);
            om.setObject(body);
            String correlationId = this.nextCorrelationId();
            this.setOutgoingCorrelationId(om, correlationId);
            if (targetCorrelationId != null) {
                this.setIncomingCorrelationId(om, targetCorrelationId);
            }
            Quipu rv = this.setRendezVous(correlationId, 1);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("exchangeSend {" + correlationId + "}: " + this.getNodeName(from) + " -> " + this.getNodeName(to) + " : " + body));
            }
            this.send(to, om);
            return this.attemptRendezVous(correlationId, rv, timeout);
        }
        catch (Exception e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("problem sending " + body), (Throwable)e);
            }
            return null;
        }
    }

    public ObjectMessage exchangeSend(Destination from, Destination to, String outgoingCorrelationId, Serializable body, long timeout) {
        Quipu rv = null;
        rv = this.setRendezVous(outgoingCorrelationId, 1);
        if (this.send(from, to, outgoingCorrelationId, body)) {
            return this.attemptRendezVous(outgoingCorrelationId, rv, timeout);
        }
        return null;
    }

    public boolean reply(Destination from, Destination to, String incomingCorrelationId, Serializable body) {
        try {
            ObjectMessage om = this.createObjectMessage();
            om.setJMSReplyTo(from);
            om.setJMSDestination(to);
            this.setIncomingCorrelationId(om, incomingCorrelationId);
            om.setObject(body);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("reply: " + this.getNodeName(from) + " -> " + this.getNodeName(to) + " {" + incomingCorrelationId + "} : " + body));
            }
            this.send(to, om);
            return true;
        }
        catch (Exception e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("problem sending " + body), (Throwable)e);
            }
            return false;
        }
    }

    public ObjectMessage exchangeReply(ObjectMessage message, Serializable body, long timeout) {
        try {
            ObjectMessage om = this.createObjectMessage();
            Destination from = this.getLocalDestination();
            om.setJMSReplyTo(from);
            Destination to = message.getJMSReplyTo();
            om.setJMSDestination(to);
            String incomingCorrelationId = this.getOutgoingCorrelationId(message);
            this.setIncomingCorrelationId(om, incomingCorrelationId);
            String outgoingCorrelationId = this.nextCorrelationId();
            this.setOutgoingCorrelationId(om, outgoingCorrelationId);
            om.setObject(body);
            Quipu rv = this.setRendezVous(outgoingCorrelationId, 1);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("exchangeSend {" + outgoingCorrelationId + "}: " + this.getNodeName(from) + " -> " + this.getNodeName(to) + " {" + incomingCorrelationId + "} : " + body));
            }
            this.send(to, om);
            return this.attemptRendezVous(outgoingCorrelationId, rv, timeout);
        }
        catch (Exception e) {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("problem sending " + body), (Throwable)e);
            }
            return null;
        }
    }

    public ObjectMessage exchangeReplyLoop(ObjectMessage message, Serializable body, long timeout) {
        return this.exchangeReply(message, body, timeout);
    }

    public boolean forward(ObjectMessage message, Destination destination) {
        try {
            return this.forward(message, destination, message.getObject());
        }
        catch (JMSException e) {
            this._log.error((Object)"problem forwarding message with new body", (Throwable)e);
            return false;
        }
    }

    public boolean forward(ObjectMessage message, Destination destination, Serializable body) {
        try {
            return this.send(message.getJMSReplyTo(), destination, this.getOutgoingCorrelationId(message), body);
        }
        catch (Exception e) {
            this._log.error((Object)"problem forwarding message", (Throwable)e);
            return false;
        }
    }

    public String getNodeName() {
        return this._nodeName;
    }

    public long getInactiveTime() {
        return this._inactiveTime;
    }

    class SimpleCorrelationIDFactory {
        protected final SynchronizedInt _count = new SynchronizedInt(0);

        SimpleCorrelationIDFactory() {
        }

        public String create() {
            return Integer.toString(this._count.increment());
        }
    }

    class DispatchRunner
    implements Runnable {
        protected final Dispatcher.InternalDispatcher _dispatcher;
        protected final ObjectMessage _objectMessage;
        protected final Serializable _serializable;

        public DispatchRunner(Dispatcher.InternalDispatcher dispatcher, ObjectMessage objectMessage, Serializable serializable) {
            this._dispatcher = dispatcher;
            this._objectMessage = objectMessage;
            this._serializable = serializable;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                this._dispatcher.dispatch(this._objectMessage, this._serializable);
                Dispatcher.InternalDispatcher internalDispatcher = this._dispatcher;
                synchronized (internalDispatcher) {
                    this._dispatcher.decCount();
                }
            }
            catch (Exception e) {
                AbstractDispatcher.this._log.error((Object)"problem dispatching message", (Throwable)e);
            }
        }
    }

    class RendezVousDispatcher
    implements Dispatcher.InternalDispatcher {
        protected final Map _rvMap2;
        protected final long _timeout;
        protected int _count;

        public RendezVousDispatcher(Map rvMap, long timeout) {
            this._rvMap2 = rvMap;
            this._timeout = timeout;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void dispatch(ObjectMessage om, Serializable obj) throws Exception {
            String correlationId = AbstractDispatcher.this.getIncomingCorrelationId(om);
            Map map = this._rvMap2;
            synchronized (map) {
                Quipu rv = (Quipu)((Object)this._rvMap2.get(correlationId));
                if (rv == null) {
                    if (AbstractDispatcher.this._log.isWarnEnabled()) {
                        AbstractDispatcher.this._log.warn((Object)("no-one waiting: {" + correlationId + "} - " + obj));
                    }
                } else {
                    if (AbstractDispatcher.this._log.isTraceEnabled()) {
                        AbstractDispatcher.this._log.trace((Object)("rendez-vous-ing with Quipu: " + correlationId));
                    }
                    rv.putResult(om);
                }
            }
        }

        public String toString() {
            return "<RendezVousDispatcher>";
        }

        public void incCount() {
            ++this._count;
        }

        public void decCount() {
            --this._count;
        }

        public synchronized int getCount() {
            return this._count;
        }
    }

    class NewTargetDispatcher
    implements Dispatcher.InternalDispatcher {
        protected final Object _target;
        protected final Method _method;
        protected final ThreadLocal _singleton = new ThreadLocal(this){
            private final /* synthetic */ NewTargetDispatcher this$1;
            {
                this.this$1 = this$1;
            }

            protected Object initialValue() {
                return new Object[1];
            }
        };
        protected int _count;

        public NewTargetDispatcher(Object target, Method method) {
            this._target = target;
            this._method = method;
        }

        public void dispatch(ObjectMessage message, Serializable request) throws InvocationTargetException, IllegalAccessException {
            Object[] singleton = (Object[])this._singleton.get();
            singleton[0] = request;
            Object response = this._method.invoke(this._target, singleton);
            AbstractDispatcher.this.reply(message, (Serializable)response);
        }

        public String toString() {
            return "<TargetDispatcher: " + this._method + " dispatched on: " + this._target + ">";
        }

        public void incCount() {
            ++this._count;
        }

        public void decCount() {
            --this._count;
        }

        public synchronized int getCount() {
            return this._count;
        }
    }

    class TargetDispatcher
    implements Dispatcher.InternalDispatcher {
        protected final Object _target;
        protected final Method _method;
        protected final ThreadLocal _pair = new ThreadLocal(this){
            private final /* synthetic */ TargetDispatcher this$1;
            {
                this.this$1 = this$1;
            }

            protected Object initialValue() {
                return new Object[2];
            }
        };
        protected int _count;

        public TargetDispatcher(Object target, Method method) {
            this._target = target;
            this._method = method;
        }

        public void dispatch(ObjectMessage om, Serializable obj) throws InvocationTargetException, IllegalAccessException {
            Object[] pair = (Object[])this._pair.get();
            pair[0] = om;
            pair[1] = obj;
            this._method.invoke(this._target, pair);
        }

        public String toString() {
            return "<TargetDispatcher: " + this._method + " dispatched on: " + this._target + ">";
        }

        public void incCount() {
            ++this._count;
        }

        public void decCount() {
            --this._count;
        }

        public synchronized int getCount() {
            return this._count;
        }
    }
}

