/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.wadi.dindex.impl;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.Sync;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.activecluster.Cluster;
import org.activecluster.ClusterEvent;
import org.activecluster.ClusterListener;
import org.activecluster.LocalNode;
import org.activecluster.Node;
import org.activecluster.election.ElectionStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.Immoter;
import org.codehaus.wadi.InvocationContext;
import org.codehaus.wadi.InvocationException;
import org.codehaus.wadi.Motable;
import org.codehaus.wadi.dindex.CoordinatorConfig;
import org.codehaus.wadi.dindex.PartitionManager;
import org.codehaus.wadi.dindex.PartitionManagerConfig;
import org.codehaus.wadi.dindex.StateManager;
import org.codehaus.wadi.dindex.StateManagerConfig;
import org.codehaus.wadi.dindex.impl.Coordinator;
import org.codehaus.wadi.dindex.impl.PartitionFacade;
import org.codehaus.wadi.dindex.impl.PartitionKeys;
import org.codehaus.wadi.dindex.impl.SeniorityElectionStrategy;
import org.codehaus.wadi.dindex.impl.SimplePartitionManager;
import org.codehaus.wadi.dindex.impl.SimpleStateManager;
import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
import org.codehaus.wadi.dindex.messages.RelocationRequest;
import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
import org.codehaus.wadi.dindex.newmessages.InsertPMToIM;
import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
import org.codehaus.wadi.dindex.newmessages.MoveIMToSM;
import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
import org.codehaus.wadi.dindex.newmessages.MoveSMToIM;
import org.codehaus.wadi.gridstate.Dispatcher;
import org.codehaus.wadi.gridstate.PartitionMapper;
import org.codehaus.wadi.gridstate.activecluster.ActiveClusterDispatcher;
import org.codehaus.wadi.impl.AbstractChainedEmoter;
import org.codehaus.wadi.impl.Quipu;
import org.codehaus.wadi.impl.SimpleMotable;
import org.codehaus.wadi.impl.Utils;

public class DIndex
implements ClusterListener,
CoordinatorConfig,
SimplePartitionManager.Callback,
StateManagerConfig {
    protected static final String _nodeNameKey = "nodeName";
    protected static final String _partitionKeysKey = "partitionKeys";
    protected static final String _timeStampKey = "timeStamp";
    protected static final String _birthTimeKey = "birthTime";
    protected static final String _correlationIDMapKey = "correlationIDMap";
    protected final Map _distributedState;
    protected final Latch _coordinatorLatch = new Latch();
    protected final Object _coordinatorLock = new Object();
    protected final Dispatcher _dispatcher;
    protected final Cluster _cluster;
    protected final String _nodeName;
    protected final Log _log;
    protected final long _inactiveTime;
    protected final PartitionManager _partitionManager;
    protected final StateManager _stateManager;
    protected final Log _lockLog = LogFactory.getLog((String)"org.codehaus.wadi.LOCKS");
    protected Node _coordinatorNode;
    protected Coordinator _coordinator;
    protected PartitionManagerConfig _config;
    protected final Collection _leavers = Collections.synchronizedCollection(new ArrayList());
    protected final Collection _left = Collections.synchronizedCollection(new ArrayList());

    public DIndex(String nodeName, int numPartitions, long inactiveTime, Dispatcher dispatcher, Map distributedState, PartitionMapper mapper) {
        this._nodeName = nodeName;
        this._log = LogFactory.getLog((String)(this.getClass().getName() + "#" + this._nodeName));
        this._inactiveTime = inactiveTime;
        this._dispatcher = dispatcher;
        this._cluster = ((ActiveClusterDispatcher)this._dispatcher).getCluster();
        this._distributedState = distributedState;
        this._partitionManager = new SimplePartitionManager(this._dispatcher, numPartitions, this._distributedState, this, mapper);
        this._stateManager = new SimpleStateManager(this._dispatcher, this._inactiveTime);
    }

    public void init(PartitionManagerConfig config) {
        this._log.info((Object)"init-ing...");
        this._config = config;
        this._cluster.setElectionStrategy((ElectionStrategy)new SeniorityElectionStrategy());
        this._dispatcher.setClusterListener(this);
        this._distributedState.put(_nodeNameKey, this._nodeName);
        this._distributedState.put(_correlationIDMapKey, new HashMap());
        this._distributedState.put(_birthTimeKey, new Long(System.currentTimeMillis()));
        PartitionKeys keys = this._partitionManager.getPartitionKeys();
        this._distributedState.put(_partitionKeysKey, keys);
        this._distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
        if (this._log.isInfoEnabled()) {
            this._log.info((Object)("local state: " + keys));
        }
        this._partitionManager.init(config);
        this._stateManager.init(this);
        this._log.info((Object)"...init-ed");
    }

    public void start() throws Exception {
        this._log.info((Object)"starting...");
        this._partitionManager.start();
        this._log.info((Object)"sleeping...");
        boolean isNotCoordinator = this._coordinatorLatch.attempt(this._inactiveTime);
        this._log.info((Object)"...waking");
        if (!isNotCoordinator) {
            this._partitionManager.localise();
            PartitionKeys k = this._partitionManager.getPartitionKeys();
            this._distributedState.put(_partitionKeysKey, k);
            this._distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
            if (this._log.isInfoEnabled()) {
                this._log.info((Object)("local state: " + k));
            }
            this._dispatcher.setDistributedState(this._distributedState);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("distributed state updated: " + this._dispatcher.getDistributedState()));
            }
            this.onCoordinatorChanged(new ClusterEvent(this._cluster, (Node)this._cluster.getLocalNode(), 5));
            this._coordinator.queueRebalancing();
        }
        this._log.info((Object)"...started");
    }

    public void stop() throws Exception {
        this._log.info((Object)"stopping...");
        Thread.interrupted();
        this._stateManager.stop();
        if (this._coordinator != null) {
            this._coordinator.stop();
            this._coordinator = null;
        }
        this._partitionManager.stop();
        this._log.info((Object)"...stopped");
    }

    public Cluster getCluster() {
        return this._cluster;
    }

    public Dispatcher getDispatcher() {
        return this._dispatcher;
    }

    public PartitionManager getPartitionManager() {
        return this._partitionManager;
    }

    public int getPartition() {
        PartitionKeys keys = (PartitionKeys)this._distributedState.get(_partitionKeysKey);
        return keys.getKeys()[Math.abs((int)(Math.random() * (double)keys.size()))];
    }

    public void onNodeUpdate(ClusterEvent event) {
        Node node = event.getNode();
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("onNodeUpdate: " + DIndex.getNodeName(node) + ": " + node.getState()));
        }
        this._partitionManager.update(node);
        Map state = node.getState();
        this.correlateStateUpdate(state);
    }

    protected void correlateStateUpdate(Map state) {
        Destination local;
        Map correlationIDMap = (Map)state.get(_correlationIDMapKey);
        String correlationID = (String)correlationIDMap.get(local = this._dispatcher.getLocalDestination());
        if (correlationID != null) {
            Quipu rv = (Quipu)((Object)this._dispatcher.getRendezVousMap().get(correlationID));
            if (rv == null) {
                if (this._log.isWarnEnabled()) {
                    this._log.warn((Object)("no one waiting for: " + correlationID));
                }
            } else {
                if (this._log.isTraceEnabled()) {
                    this._log.trace((Object)("successful correlation: " + correlationID));
                }
                rv.putResult(state);
            }
        }
    }

    public void onNodeAdd(ClusterEvent event) {
        Node node = event.getNode();
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("node joined: " + DIndex.getNodeName(node)));
        }
        if (this._cluster.getLocalNode() == this._coordinatorNode) {
            this._coordinator.queueRebalancing();
        }
        this._partitionManager.update(node);
    }

    public void onNodeRemoved(ClusterEvent event) {
        Node node = event.getNode();
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("node left: " + DIndex.getNodeName(node)));
        }
        this._leavers.add(node.getDestination());
        if (this._coordinator != null) {
            this._coordinator.queueRebalancing();
        }
    }

    public boolean amCoordinator() {
        return this._coordinatorNode.getDestination().equals(this._dispatcher.getLocalDestination());
    }

    public void onNodeFailed(ClusterEvent event) {
        Node node = event.getNode();
        if (this._log.isDebugEnabled()) {
            this._log.info((Object)("node failed: " + DIndex.getNodeName(node)));
        }
        if (this._leavers.remove(node.getDestination())) {
            this._left.remove(node);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("onNodeFailed:" + DIndex.getNodeName(node) + "- already evacuated - ignoring"));
            }
        } else {
            if (this._log.isErrorEnabled()) {
                this._log.error((Object)("onNodeFailed: " + DIndex.getNodeName(node)));
            }
            if (this.amCoordinator()) {
                if (this._log.isErrorEnabled()) {
                    this._log.error((Object)("CATASTROPHIC FAILURE on: " + DIndex.getNodeName(node)));
                }
                if (this._coordinator != null) {
                    this._coordinator.queueRebalancing();
                } else {
                    this._log.warn((Object)"coordinator thread not running");
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onCoordinatorChanged(ClusterEvent event) {
        Object object = this._coordinatorLock;
        synchronized (object) {
            Node newCoordinator;
            if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("coordinator elected: " + DIndex.getNodeName(event.getNode())));
            }
            if ((newCoordinator = event.getNode()) != this._coordinatorNode) {
                if (this._coordinatorNode == this._cluster.getLocalNode()) {
                    this.onDismissal(event);
                }
                this._coordinatorNode = newCoordinator;
                if (this._coordinatorNode == this._cluster.getLocalNode()) {
                    this.onElection(event);
                }
            }
            this._coordinatorLatch.release();
        }
    }

    public Collection[] createResultSet(int numPartitions, int[] keys) {
        Collection[] c = new Collection[numPartitions];
        for (int i = 0; i < keys.length; ++i) {
            c[keys[i]] = new ArrayList();
        }
        return c;
    }

    public void onElection(ClusterEvent event) {
        this._log.info((Object)"accepting coordinatorship");
        try {
            this._coordinator = new Coordinator(this);
            this._coordinator.start();
            this._coordinator.queueRebalancing();
        }
        catch (Exception e) {
            this._log.error((Object)"problem starting Coordinator");
        }
    }

    public void onDismissal(ClusterEvent event) {
        this._log.info((Object)"resigning coordinatorship");
        try {
            this._coordinator.stop();
            this._coordinator = null;
        }
        catch (Exception e) {
            this._log.error((Object)"problem starting Balancer");
        }
    }

    public static String getNodeName(Node node) {
        return node == null ? "<unknown>" : (String)node.getState().get(_nodeNameKey);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isCoordinator() {
        Object object = this._coordinatorLock;
        synchronized (object) {
            return this._cluster.getLocalNode() == this._coordinatorNode;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Node getCoordinator() {
        Object object = this._coordinatorLock;
        synchronized (object) {
            return this._coordinatorNode;
        }
    }

    public int getNumPartitions() {
        return this._partitionManager.getNumPartitions();
    }

    public Node getLocalNode() {
        return this._cluster.getLocalNode();
    }

    public Collection getRemoteNodes() {
        return this._cluster.getNodes().values();
    }

    public Map getRendezVousMap() {
        return this._dispatcher.getRendezVousMap();
    }

    public Collection getLeavers() {
        return this._leavers;
    }

    public Collection getLeft() {
        return this._left;
    }

    protected int printNode(Node node) {
        if (node != this._cluster.getLocalNode()) {
            node = (Node)this._cluster.getNodes().get(node.getDestination());
        }
        if (node == null) {
            if (this._log.isInfoEnabled()) {
                this._log.info((Object)(DIndex.getNodeName(node) + " : <unknown> - {?...}"));
            }
            return 0;
        }
        PartitionKeys keys = DIndex.getPartitionKeys(node);
        int amount = keys.size();
        if (this._log.isInfoEnabled()) {
            this._log.info((Object)(DIndex.getNodeName(node) + " : " + amount + " - " + keys));
        }
        return amount;
    }

    public boolean insert(String name, long timeout) {
        try {
            InsertIMToPM request = new InsertIMToPM(name);
            PartitionFacade pf = this.getPartition(name);
            ObjectMessage reply = pf.exchange(request, timeout);
            return ((InsertPMToIM)reply.getObject()).getSuccess();
        }
        catch (Exception e) {
            this._log.warn((Object)"problem inserting session key into DHT", (Throwable)e);
            return false;
        }
    }

    public void remove(String name) {
        try {
            DeleteIMToPM request = new DeleteIMToPM(name);
            this.getPartition(name).exchange(request, this._inactiveTime);
        }
        catch (Exception e) {
            this._log.info((Object)"oops...", (Throwable)e);
        }
    }

    public void relocate(String name) {
        try {
            DIndexRelocationRequest request = new DIndexRelocationRequest(name);
            this.getPartition(name).exchange(request, this._inactiveTime);
        }
        catch (Exception e) {
            this._log.info((Object)"oops...", (Throwable)e);
        }
    }

    public ObjectMessage relocate(String sessionName, String nodeName, int concurrentRequestThreads, boolean shuttingDown, long timeout) throws Exception {
        ObjectMessage message = this._dispatcher.createObjectMessage();
        message.setJMSReplyTo(this._dispatcher.getLocalDestination());
        RelocationRequest request = new RelocationRequest(sessionName, nodeName, concurrentRequestThreads, shuttingDown);
        message.setObject((Serializable)request);
        return this.forwardAndExchange(sessionName, request, timeout);
    }

    public Motable relocate2(String sessionName, String nodeName, int concurrentRequestThreads, boolean shuttingDown, long timeout) throws Exception {
        MoveIMToPM request = new MoveIMToPM(sessionName, nodeName, concurrentRequestThreads, shuttingDown);
        ObjectMessage message = this.getPartition(sessionName).exchange(request, timeout);
        if (message == null) {
            this._log.error((Object)"something went wrong - what should we do?");
            return null;
        }
        try {
            Serializable dm = message.getObject();
            if (dm instanceof MoveSMToIM) {
                MoveSMToIM req = (MoveSMToIM)dm;
                byte[] bytes = (byte[])req.getValue();
                if (bytes == null) {
                    this._log.warn((Object)("failed relocation - 0 bytes arrived: " + sessionName));
                    return null;
                }
                SimpleMotable emotable = new SimpleMotable();
                emotable.setBodyAsByteArray(bytes);
                emotable.setLastAccessedTime(System.currentTimeMillis());
                if (!emotable.checkTimeframe(System.currentTimeMillis()) && this._log.isWarnEnabled()) {
                    this._log.warn((Object)("immigrating session has come from the future!: " + emotable.getName()));
                }
                SMToIMEmoter emoter = new SMToIMEmoter(this._config.getNodeName(message.getJMSReplyTo()), message);
                Immoter immoter = this._config.getImmoter(sessionName, emotable);
                Motable immotable = Utils.mote(emoter, immoter, emotable, sessionName);
                return immotable;
            }
            if (dm instanceof MovePMToIM) {
                if (this._log.isTraceEnabled()) {
                    this._log.trace((Object)("unknown session: " + sessionName));
                }
                return null;
            }
            this._log.warn((Object)("unexpected response returned - what should I do? : " + dm));
            return null;
        }
        catch (JMSException e) {
            this._log.warn((Object)"could not extract message body", (Throwable)e);
            return null;
        }
    }

    public ObjectMessage forwardAndExchange(String name, RelocationRequest request, long timeout) throws Exception {
        this._log.trace((Object)"wrapping request");
        DIndexForwardRequest request2 = new DIndexForwardRequest(request);
        return this.getPartition(name).exchange(request2, timeout);
    }

    public PartitionFacade getPartition(Object key) {
        return this._partitionManager.getPartition(key);
    }

    public String getNodeName(Destination destination) {
        LocalNode local = this._cluster.getLocalNode();
        LocalNode node = destination.equals(local.getDestination()) ? local : (Node)this._cluster.getNodes().get(destination);
        return DIndex.getNodeName((Node)node);
    }

    public long getInactiveTime() {
        return this._inactiveTime;
    }

    public void regenerateMissingPartitions(Node[] living, Node[] leaving) {
        this._partitionManager.regenerateMissingPartitions(living, leaving);
    }

    public static PartitionKeys getPartitionKeys(Node node) {
        return (PartitionKeys)node.getState().get(_partitionKeysKey);
    }

    public PartitionFacade getPartition(int key) {
        return this._partitionManager.getPartition(key);
    }

    public StateManager getStateManager() {
        return this._stateManager;
    }

    public String getLocalNodeName() {
        return this._nodeName;
    }

    public boolean contextualise(InvocationContext invocationContext, String id, Immoter immoter, Sync motionLock, boolean exclusiveOnly) throws InvocationException {
        return this._config.contextualise(invocationContext, id, immoter, motionLock, exclusiveOnly);
    }

    public Sync getInvocationLock(String name) {
        return this._config.getInvocationLock(name);
    }

    class SMToIMEmoter
    extends AbstractChainedEmoter {
        protected final Log _log = LogFactory.getLog(this.getClass());
        protected final String _nodeName;
        protected final ObjectMessage _message;
        protected Sync _invocationLock;
        protected Sync _stateLock;

        public SMToIMEmoter(String nodeName, ObjectMessage message) {
            this._nodeName = nodeName;
            this._message = message;
        }

        public boolean prepare(String name, Motable emotable, Motable immotable) {
            try {
                immotable.copy(emotable);
            }
            catch (Exception e) {
                this._log.warn((Object)"oops", (Throwable)e);
                return false;
            }
            return true;
        }

        public void commit(String name, Motable emotable) {
            try {
                MoveIMToSM response = new MoveIMToSM(true);
                DIndex.this._dispatcher.reply(this._message, response);
                emotable.destroy();
            }
            catch (Exception e) {
                throw new UnsupportedOperationException("NYI");
            }
        }

        public void rollback(String name, Motable motable) {
            throw new RuntimeException("NYI");
        }

        public String getInfo() {
            return "immigration:" + this._nodeName;
        }
    }
}

