/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.wadi.dindex.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.activecluster.Cluster;
import org.activecluster.ClusterEvent;
import org.activecluster.LocalNode;
import org.activecluster.Node;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.dindex.Partition;
import org.codehaus.wadi.dindex.PartitionConfig;
import org.codehaus.wadi.dindex.PartitionManager;
import org.codehaus.wadi.dindex.PartitionManagerConfig;
import org.codehaus.wadi.dindex.impl.DIndex;
import org.codehaus.wadi.dindex.impl.DummyPartition;
import org.codehaus.wadi.dindex.impl.LocalPartition;
import org.codehaus.wadi.dindex.impl.PartitionFacade;
import org.codehaus.wadi.dindex.impl.PartitionKeys;
import org.codehaus.wadi.dindex.impl.PartitionTransfer;
import org.codehaus.wadi.dindex.messages.PartitionEvacuationRequest;
import org.codehaus.wadi.dindex.messages.PartitionEvacuationResponse;
import org.codehaus.wadi.dindex.messages.PartitionRepopulateRequest;
import org.codehaus.wadi.dindex.messages.PartitionRepopulateResponse;
import org.codehaus.wadi.dindex.messages.PartitionTransferAcknowledgement;
import org.codehaus.wadi.dindex.messages.PartitionTransferCommand;
import org.codehaus.wadi.dindex.messages.PartitionTransferRequest;
import org.codehaus.wadi.dindex.messages.PartitionTransferResponse;
import org.codehaus.wadi.gridstate.Dispatcher;
import org.codehaus.wadi.gridstate.LockManager;
import org.codehaus.wadi.gridstate.PartitionMapper;
import org.codehaus.wadi.gridstate.activecluster.ActiveClusterDispatcher;
import org.codehaus.wadi.gridstate.impl.StupidLockManager;
import org.codehaus.wadi.impl.Quipu;

public class SimplePartitionManager
implements PartitionManager,
PartitionConfig {
    protected static final String _partitionKeysKey = "partitionKeys";
    protected static final String _timeStampKey = "timeStamp";
    protected static final String _correlationIDMapKey = "correlationIDMap";
    protected final String _nodeName;
    protected final Log _log;
    protected final int _numPartitions;
    protected final PartitionFacade[] _partitions;
    protected final Cluster _cluster;
    protected final Dispatcher _dispatcher;
    protected final Map _distributedState;
    protected final long _inactiveTime;
    protected final boolean _allowRegenerationOfMissingPartitions = true;
    protected final Callback _callback;
    protected final PartitionMapper _mapper;
    protected final LockManager _pmSyncs;
    protected PartitionManagerConfig _config;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SimplePartitionManager(Dispatcher dispatcher, int numPartitions, Map distributedState, Callback callback, PartitionMapper mapper) {
        this._dispatcher = dispatcher;
        this._nodeName = this._dispatcher.getNodeName();
        this._pmSyncs = new StupidLockManager(this._nodeName);
        this._log = LogFactory.getLog((String)(this.getClass().getName() + "#" + this._nodeName));
        this._numPartitions = numPartitions;
        this._partitions = new PartitionFacade[this._numPartitions];
        long timeStamp = System.currentTimeMillis();
        boolean queueing = true;
        for (int i = 0; i < this._numPartitions; ++i) {
            this._partitions[i] = new PartitionFacade(i, timeStamp, new DummyPartition(i), queueing, this);
        }
        this._cluster = ((ActiveClusterDispatcher)this._dispatcher).getCluster();
        this._distributedState = distributedState;
        this._inactiveTime = this._dispatcher.getInactiveTime();
        this._callback = callback;
        this._mapper = mapper;
    }

    public void init(PartitionManagerConfig config) {
        this._config = config;
        this._log.trace((Object)"init");
        this._dispatcher.register(this, "onPartitionTransferCommand", PartitionTransferCommand.class);
        this._dispatcher.register(PartitionTransferAcknowledgement.class, this._inactiveTime);
        this._dispatcher.register(this, "onPartitionTransferRequest", PartitionTransferRequest.class);
        this._dispatcher.register(PartitionTransferResponse.class, this._inactiveTime);
        this._dispatcher.register(this, "onPartitionEvacuationRequest", PartitionEvacuationRequest.class);
        this._dispatcher.register(PartitionEvacuationResponse.class, this._inactiveTime);
        this._dispatcher.register(this, "onPartitionRepopulateRequest", PartitionRepopulateRequest.class);
        this._dispatcher.register(PartitionRepopulateResponse.class, this._inactiveTime);
    }

    public void start() throws Exception {
        this._log.trace((Object)"starting...");
        this._log.trace((Object)"...started");
    }

    public void evacuate() throws Exception {
        this._log.info((Object)"evacuating...");
        PartitionEvacuationRequest request = new PartitionEvacuationRequest();
        LocalNode localNode = this._cluster.getLocalNode();
        String correlationId = this._cluster.getLocalNode().getName();
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("evacuating partitions...: " + this._dispatcher.getNodeName(localNode.getDestination()) + " -> " + this._config.getCoordinatorNode().getState().get("nodeName")));
        }
        while (this._dispatcher.exchangeSend(localNode.getDestination(), this._config.getCoordinatorNode().getDestination(), correlationId, request, this._inactiveTime) == null) {
            if (this._log.isWarnEnabled()) {
                this._log.warn((Object)("could not contact Coordinator - backing off for " + this._inactiveTime + " millis..."));
            }
            Thread.sleep(this._config.getInactiveTime());
        }
        this._log.info((Object)"...evacuated");
    }

    public void stop() throws Exception {
        this._log.info((Object)"stopping...");
        this._dispatcher.deregister("onPartitionTransferCommand", PartitionTransferCommand.class, 5000);
        this._dispatcher.deregister("onPartitionTransferRequest", PartitionTransferRequest.class, 5000);
        this._dispatcher.deregister("onPartitionEvacuationRequest", PartitionEvacuationRequest.class, 5000);
        this._dispatcher.deregister("onPartitionRepopulateRequest", PartitionRepopulateRequest.class, 5000);
        this._log.info((Object)"...stopped");
    }

    public PartitionFacade getPartition(int partition) {
        return this._partitions[partition];
    }

    public void onPartitionEvacuationRequest(ObjectMessage om, PartitionEvacuationRequest request) {
        Object from;
        try {
            Destination destination = om.getJMSReplyTo();
            LocalNode local = this._cluster.getLocalNode();
            from = destination.equals(local.getDestination()) ? local : (Node)this._cluster.getNodes().get(destination);
        }
        catch (JMSException e) {
            this._log.warn((Object)"could not read src node from message", (Throwable)e);
            from = null;
        }
        if (!$assertionsDisabled && from == null) {
            throw new AssertionError();
        }
        this._callback.onNodeRemoved(new ClusterEvent(this._cluster, (Node)from, 3));
    }

    public void onPartitionRepopulateRequest(ObjectMessage om, PartitionRepopulateRequest request) {
        int[] keys = request.getKeys();
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("PartitionRepopulateRequest ARRIVED: " + keys));
        }
        Collection[] c = this.createResultSet(this._numPartitions, keys);
        try {
            this._log.trace((Object)"findRelevantSessionNames - starting");
            this._config.findRelevantSessionNames(this._numPartitions, c);
            this._log.trace((Object)"findRelevantSessionNames - finished");
        }
        catch (Throwable t) {
            this._log.warn((Object)"ERROR", t);
        }
        if (!this._dispatcher.reply(om, new PartitionRepopulateResponse(c))) {
            this._log.warn((Object)"unexpected problem responding to partition repopulation request");
        }
    }

    public void onPartitionTransferCommand(ObjectMessage om, PartitionTransferCommand command) {
        PartitionTransfer[] transfers = command.getTransfers();
        for (int i = 0; i < transfers.length; ++i) {
            PartitionTransfer transfer = transfers[i];
            int amount = transfer.getAmount();
            Destination destination = transfer.getDestination();
            LocalPartition[] acquired = null;
            try {
                ArrayList<Partition> c = new ArrayList<Partition>();
                for (int j = 0; j < this._numPartitions && c.size() < amount; ++j) {
                    PartitionFacade facade = this._partitions[j];
                    if (!facade.isLocal()) continue;
                    Partition partition = facade.getContent();
                    c.add(partition);
                }
                acquired = c.toArray(new LocalPartition[c.size()]);
                if (!$assertionsDisabled && amount != acquired.length) {
                    throw new AssertionError();
                }
                long timeStamp = System.currentTimeMillis();
                if (this._log.isTraceEnabled()) {
                    this._log.trace((Object)("local state (before giving): " + this.getPartitionKeys()));
                }
                PartitionTransferRequest request = new PartitionTransferRequest(timeStamp, acquired);
                ObjectMessage om3 = this._dispatcher.exchangeSend(this._dispatcher.getLocalDestination(), destination, request, this._inactiveTime);
                if (om3 != null && ((PartitionTransferResponse)om3.getObject()).getSuccess()) {
                    for (int j = 0; j < acquired.length; ++j) {
                        PartitionFacade facade = null;
                        facade = this._partitions[acquired[j].getKey()];
                        facade.setContentRemote(timeStamp, this._dispatcher, destination);
                    }
                    if (!this._log.isDebugEnabled()) continue;
                    this._log.debug((Object)("released " + acquired.length + " partition[s] to " + this._dispatcher.getNodeName(destination)));
                    continue;
                }
                this._log.warn((Object)"transfer unsuccessful");
                continue;
            }
            catch (Throwable t) {
                this._log.warn((Object)"unexpected problem", t);
            }
        }
        try {
            PartitionKeys keys = this.getPartitionKeys();
            this._distributedState.put(_partitionKeysKey, keys);
            this._distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("local state (after giving): " + keys));
            }
            String correlationID = this._dispatcher.getOutgoingCorrelationId(om);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("CORRELATIONID: " + correlationID));
            }
            Map correlationIDMap = (Map)this._distributedState.get(_correlationIDMapKey);
            Destination from = om.getJMSReplyTo();
            correlationIDMap.put(from, correlationID);
            this._dispatcher.setDistributedState(this._distributedState);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("distributed state updated: " + this._dispatcher.getDistributedState()));
            }
            this.correlateStateUpdate(this._distributedState);
            correlationIDMap.remove(from);
        }
        catch (Exception e) {
            this._log.warn((Object)"could not acknowledge safe transfer to Coordinator", (Throwable)e);
        }
    }

    public synchronized void onPartitionTransferRequest(ObjectMessage om, PartitionTransferRequest request) {
        long timeStamp = request.getTimeStamp();
        LocalPartition[] partitions = request.getPartitions();
        boolean success = false;
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("local state (before receiving): " + this.getPartitionKeys()));
        }
        for (int i = 0; i < partitions.length; ++i) {
            LocalPartition partition = partitions[i];
            partition.init(this);
            PartitionFacade facade = this.getPartition(partition.getKey());
            facade.setContent(timeStamp, partition);
        }
        success = true;
        try {
            PartitionKeys keys = this.getPartitionKeys();
            this._distributedState.put(_partitionKeysKey, keys);
            this._distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("local state (after receiving): " + keys));
            }
            this._dispatcher.setDistributedState(this._distributedState);
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("distributed state updated: " + this._dispatcher.getDistributedState()));
            }
        }
        catch (Exception e) {
            this._log.error((Object)"could not update distributed state", (Throwable)e);
        }
        if (this._dispatcher.reply(om, new PartitionTransferResponse(success))) {
            try {
                if (this._log.isDebugEnabled()) {
                    this._log.debug((Object)("acquired " + partitions.length + " partition[s] from " + this._dispatcher.getNodeName(om.getJMSReplyTo())));
                }
            }
            catch (JMSException e) {
                this._log.warn((Object)"problem reading incoming message's source", (Throwable)e);
            }
        } else {
            this._log.warn((Object)"problem acknowledging reciept of IndexPartitions - donor may have died");
        }
    }

    public Collection[] createResultSet(int numPartitions, int[] keys) {
        Collection[] c = new Collection[numPartitions];
        for (int i = 0; i < keys.length; ++i) {
            c[keys[i]] = new ArrayList();
        }
        return c;
    }

    public void update(Node node) {
        Map state = node.getState();
        long timeStamp = (Long)state.get(_timeStampKey);
        PartitionKeys keys = (PartitionKeys)state.get(_partitionKeysKey);
        Destination location = node.getDestination();
        int[] k = keys._keys;
        for (int i = 0; i < k.length; ++i) {
            int key = k[i];
            PartitionFacade facade = this._partitions[key];
            facade.setContentRemote(timeStamp, this._dispatcher, location);
        }
    }

    public void markExistingPartitions(Node[] nodes, boolean[] partitionIsPresent) {
        for (int i = 0; i < nodes.length; ++i) {
            PartitionKeys keys;
            Node node = nodes[i];
            if (node == null || (keys = DIndex.getPartitionKeys(node)) == null) continue;
            int[] k = keys.getKeys();
            for (int j = 0; j < k.length; ++j) {
                int index = k[j];
                if (partitionIsPresent[index]) {
                    if (!this._log.isErrorEnabled()) continue;
                    this._log.error((Object)("partition " + index + " found on more than one node"));
                    continue;
                }
                partitionIsPresent[index] = true;
            }
        }
    }

    public void regenerateMissingPartitions(Node[] living, Node[] leaving) {
        boolean[] partitionIsPresent = new boolean[this._numPartitions];
        this.markExistingPartitions(living, partitionIsPresent);
        this.markExistingPartitions(leaving, partitionIsPresent);
        ArrayList<Integer> missingPartitions = new ArrayList<Integer>();
        for (int i = 0; i < partitionIsPresent.length; ++i) {
            if (partitionIsPresent[i]) continue;
            missingPartitions.add(new Integer(i));
        }
        int numKeys = missingPartitions.size();
        if (numKeys > 0) {
            int[] missingKeys = new int[numKeys];
            int key = 0;
            Iterator i = missingPartitions.iterator();
            while (i.hasNext()) {
                missingKeys[key++] = (Integer)i.next();
            }
            if (this._log.isWarnEnabled()) {
                this._log.warn((Object)("RECREATING PARTITIONS...: " + missingPartitions));
            }
            long time = System.currentTimeMillis();
            for (int i2 = 0; i2 < missingKeys.length; ++i2) {
                int k = missingKeys[i2];
                PartitionFacade facade = this._partitions[k];
                LocalPartition local = new LocalPartition(k);
                local.init(this);
                facade.setContent(time, local);
            }
            PartitionKeys newKeys = this.getPartitionKeys();
            if (this._log.isWarnEnabled()) {
                this._log.warn((Object)("REPOPULATING PARTITIONS...: " + missingPartitions));
            }
            String correlationId = this._dispatcher.nextCorrelationId();
            Quipu rv = this._dispatcher.setRendezVous(correlationId, this._dispatcher.getNumNodes() - 1);
            if (!this._dispatcher.send(this._dispatcher.getLocalDestination(), this._dispatcher.getClusterDestination(), correlationId, new PartitionRepopulateRequest(missingKeys))) {
                this._log.error((Object)"unexpected problem repopulating lost index");
            }
            Collection[] c = this.createResultSet(this._numPartitions, missingKeys);
            this._config.findRelevantSessionNames(this._numPartitions, c);
            this.repopulate(this._dispatcher.getLocalDestination(), c);
            try {
                rv.waitFor(this._inactiveTime);
            }
            catch (InterruptedException e) {
                this._log.warn((Object)"unexpected interruption", (Throwable)e);
            }
            Collection results = rv.getResults();
            Iterator i3 = results.iterator();
            while (i3.hasNext()) {
                ObjectMessage message = (ObjectMessage)i3.next();
                try {
                    Destination from = message.getJMSReplyTo();
                    PartitionRepopulateResponse response = (PartitionRepopulateResponse)message.getObject();
                    Collection[] relevantKeys = response.getKeys();
                    this.repopulate(from, relevantKeys);
                }
                catch (JMSException e) {
                    this._log.warn((Object)"unexpected problem interrogating response", (Throwable)e);
                }
            }
            if (this._log.isWarnEnabled()) {
                this._log.warn((Object)("...PARTITIONS REPOPULATED: " + missingPartitions));
            }
            this._distributedState.put(_partitionKeysKey, newKeys);
            try {
                this._dispatcher.setDistributedState(this._distributedState);
                if (this._log.isTraceEnabled()) {
                    this._log.trace((Object)("distributed state updated: " + this._dispatcher.getDistributedState()));
                }
            }
            catch (Exception e) {
                this._log.error((Object)"could not update distributed state", (Throwable)e);
            }
        }
    }

    public PartitionKeys getPartitionKeys() {
        return new PartitionKeys(this._partitions);
    }

    public void repopulate(Destination location, Collection[] keys) {
        if (!$assertionsDisabled && location == null) {
            throw new AssertionError();
        }
        for (int i = 0; i < this._numPartitions; ++i) {
            Collection c = keys[i];
            if (c == null) continue;
            PartitionFacade facade = this._partitions[i];
            LocalPartition local = (LocalPartition)facade.getContent();
            Iterator j = c.iterator();
            while (j.hasNext()) {
                String name = (String)j.next();
                local.put(name, location);
            }
        }
    }

    public void localise() {
        if (this._log.isTraceEnabled()) {
            this._log.trace((Object)("allocating " + this._numPartitions + " partitions"));
        }
        long timeStamp = System.currentTimeMillis();
        for (int i = 0; i < this._numPartitions; ++i) {
            PartitionFacade facade = this._partitions[i];
            LocalPartition partition = new LocalPartition(i);
            partition.init(this);
            facade.setContent(timeStamp, partition);
        }
    }

    protected void correlateStateUpdate(Map state) {
        Destination local;
        Map correlationIDMap = (Map)state.get(_correlationIDMapKey);
        String correlationID = (String)correlationIDMap.get(local = this._dispatcher.getLocalDestination());
        if (correlationID != null) {
            Quipu rv = (Quipu)((Object)this._dispatcher.getRendezVousMap().get(correlationID));
            if (rv == null) {
                if (this._log.isWarnEnabled()) {
                    this._log.warn((Object)("no one waiting for: " + correlationID));
                }
            } else {
                if (this._log.isTraceEnabled()) {
                    this._log.trace((Object)("successful correlation: " + correlationID));
                }
                rv.putResult(state);
            }
        }
    }

    public int getNumPartitions() {
        return this._numPartitions;
    }

    public PartitionFacade getPartition(Object key) {
        return this._partitions[this._mapper.map(key)];
    }

    public Dispatcher getDispatcher() {
        return this._dispatcher;
    }

    public Cluster getCluster() {
        return this._cluster;
    }

    public String getNodeName(Destination destination) {
        return this._dispatcher.getNodeName(destination);
    }

    public long getInactiveTime() {
        return this._inactiveTime;
    }

    public String getLocalNodeName() {
        return this._nodeName;
    }

    public LockManager getPMSyncs() {
        return this._pmSyncs;
    }

    static {
        $assertionsDisabled = !SimplePartitionManager.class.desiredAssertionStatus();
    }

    public static interface Callback {
        public void onNodeRemoved(ClusterEvent var1);
    }
}

