/*
 * Decompiled with CFR 0.152.
 */
package com.metamatrix.common.messaging.jgroups;

import com.metamatrix.common.CommonPlugin;
import com.metamatrix.common.log.LogManager;
import com.metamatrix.common.messaging.MessageBus;
import com.metamatrix.common.messaging.MessagingException;
import com.metamatrix.common.messaging.RemoteMessagingException;
import com.metamatrix.common.messaging.jgroups.RPCStruct;
import com.metamatrix.common.messaging.jgroups.RemoteProxy;
import com.metamatrix.core.MetaMatrixRuntimeException;
import com.metamatrix.core.event.EventBroker;
import com.metamatrix.core.event.EventObjectListener;
import com.metamatrix.core.util.ArgCheck;
import com.metamatrix.platform.PlatformPlugin;
import com.metamatrix.server.ChannelProvider;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.EventObject;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.Header;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.RspList;

public class JGroupsMessageBus
implements MessageBus {
    public static final String MESSAGE_KEY = "MessageKey";
    public static final int REMOTE_TIMEOUT = 30000;
    static final FederateHeader MSG_HEADER = new FederateHeader(456188434);
    private Channel channel;
    private volatile boolean shutdown;
    private ConcurrentHashMap<UUID, RPCStruct> rpcStructs = new ConcurrentHashMap();
    private RpcDispatcher rpcDispatcher;

    public JGroupsMessageBus(ChannelProvider channelProvider, final EventBroker eventBroker, final String clusterName) throws ChannelException {
        Channel c = channelProvider.get(ChannelProvider.ChannelID.RPC);
        if (c == null || !c.isOpen()) {
            throw new MetaMatrixRuntimeException("Channel is not open");
        }
        this.channel = c;
        ReceiverAdapter receiver = new ReceiverAdapter(){

            public void receive(Message msg) {
                if (!msg.getSrc().equals(JGroupsMessageBus.this.channel.getLocalAddress()) && ((Object)((Object)MSG_HEADER)).equals(msg.getHeader(JGroupsMessageBus.MESSAGE_KEY))) {
                    eventBroker.processEvent((EventObject)msg.getObject());
                }
            }

            public void viewAccepted(View view) {
                super.viewAccepted(view);
                LogManager.logInfo((String)"CONTROLLER", (String)(view + "is added to cluster:" + clusterName));
            }
        };
        this.rpcDispatcher = new RpcDispatcher(this.channel, (MessageListener)receiver, (MembershipListener)receiver, (Object)new RemoteProxy(this.rpcStructs));
        this.channel.connect(clusterName);
    }

    @Override
    public void unExport(Object object) {
        if (object == null) {
            return;
        }
        ArgCheck.isInstanceOf(RPCStruct.class, (Object)object);
        RPCStruct struct = (RPCStruct)object;
        this.rpcStructs.remove(struct.objectId);
    }

    @Override
    public Serializable export(Object object, Class[] targetClasses) {
        if (object == null || this.shutdown) {
            return null;
        }
        RPCStruct struct = new RPCStruct(this.channel.getLocalAddress(), UUID.randomUUID(), targetClasses, object);
        this.rpcStructs.put(struct.objectId, struct);
        return struct;
    }

    @Override
    public Object getRPCProxy(Object object) {
        if (object == null || this.shutdown) {
            return null;
        }
        ArgCheck.isInstanceOf(RPCStruct.class, (Object)object);
        final RPCStruct struct = (RPCStruct)object;
        final Vector<Address> dest = new Vector<Address>();
        dest.add(struct.address);
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), struct.getTargetClasses(), new InvocationHandler(){

            @Override
            public Object invoke(Object arg0, Method arg1, Object[] arg2) throws Throwable {
                Object[] invokeArgs = new Object[]{struct.objectId, arg1.getName(), arg1.getParameterTypes(), arg2};
                RspList rsp_list = JGroupsMessageBus.this.rpcDispatcher.callRemoteMethods(dest, new MethodCall(RemoteProxy.getInvokeMethod(), invokeArgs), 1, 30000L);
                if (rsp_list.isEmpty()) {
                    throw new RemoteMessagingException(PlatformPlugin.Util.getString("JGroupsMessageBus.noResponse"));
                }
                return rsp_list.getFirst();
            }
        });
    }

    @Override
    public void processEvent(EventObject obj) throws MessagingException {
        if (obj != null) {
            try {
                Message msg = new Message(null, null, (Serializable)obj);
                msg.putHeader(MESSAGE_KEY, (Header)MSG_HEADER);
                this.channel.send(msg);
            }
            catch (Exception e) {
                throw new MessagingException(e, "ERR.003.017.0004", CommonPlugin.Util.getString("ERR.003.017.0004"));
            }
        }
    }

    @Override
    public synchronized void shutdown() throws MessagingException {
        this.shutdown = true;
        this.channel.close();
        this.rpcDispatcher.stop();
        this.rpcStructs.clear();
    }

    @Override
    public void addListener(Class eventClass, EventObjectListener listener) throws MessagingException {
    }

    @Override
    public void removeListener(Class eventClass, EventObjectListener listener) throws MessagingException {
    }

    @Override
    public void removeListener(EventObjectListener listener) throws MessagingException {
    }

    public static class FederateHeader
    extends Header {
        int type;

        public FederateHeader(int type) {
            this.type = type;
        }

        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeInt(this.type);
        }

        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.type = in.readInt();
        }
    }
}

