/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ArcusReplKetamaNodeLocator;
import net.spy.memcached.ArcusReplNodeAddress;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.MemcachedReplicaGroup;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.ReadPriority;
import net.spy.memcached.ReplicaPick;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.ReconnDelay;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.VersionOperation;

public final class MemcachedConnection
extends SpyObject {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 0x1000000;
    private String connName;
    private volatile boolean shutDown = false;
    private final boolean shouldOptimize;
    private Selector selector = null;
    private final NodeLocator locator;
    private final FailureMode failureMode;
    private final long maxDelay;
    private int emptySelects = 0;
    private final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue<ConnectionObserver>();
    private final OperationFactory opFact;
    private final int timeoutExceptionThreshold;
    private final int timeoutRatioThreshold;
    private BlockingQueue<String> _nodeManageQueue = new LinkedBlockingQueue<String>();
    private final ConnectionFactory f;
    private Set<MemcachedNode> nodesNeedVersionOp = new HashSet<MemcachedNode>();
    private boolean arcusReplEnabled;

    public MemcachedConnection(String name, int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException {
        this.f = f;
        this.connName = name;
        this.connObservers.addAll(obs);
        this.reconnectQueue = new TreeMap<Long, MemcachedNode>();
        this.addedQueue = new ConcurrentLinkedQueue();
        this.failureMode = fm;
        this.shouldOptimize = f.shouldOptimize();
        this.maxDelay = f.getMaxReconnectDelay();
        this.opFact = opfactory;
        this.timeoutExceptionThreshold = f.getTimeoutExceptionThreshold();
        this.timeoutRatioThreshold = f.getTimeoutRatioThreshold();
        this.selector = Selector.open();
        ArrayList<MemcachedNode> connections = new ArrayList<MemcachedNode>(a.size());
        for (SocketAddress socketAddress : a) {
            connections.add(this.attachMemcachedNode(this.connName, socketAddress));
        }
        this.locator = f.createLocator(connections);
    }

    void setArcusReplEnabled(boolean b) {
        this.arcusReplEnabled = b;
    }

    boolean getArcusReplEnabled() {
        return this.arcusReplEnabled;
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode qa : this.locator.getAll()) {
            int sops;
            if (qa.getSk() == null || !qa.getSk().isValid()) continue;
            if (qa.getChannel().isConnected()) {
                sops = qa.getSk().interestOps();
                int expected = 0;
                if (qa.hasReadOp()) {
                    expected |= 1;
                }
                if (qa.hasWriteOp()) {
                    expected |= 4;
                }
                if (qa.getBytesRemainingToWrite() > 0) {
                    expected |= 4;
                }
                assert (sops == expected) : "Invalid ops:  " + qa + ", expected " + expected + ", got " + sops;
                continue;
            }
            sops = qa.getSk().interestOps();
            assert (sops == 8) : "Not connected, and not watching for connect: " + sops;
        }
        this.getLogger().debug("Checked the selectors.");
        return true;
    }

    private void addVersionOpToVersionAbsentNodes() {
        Iterator<MemcachedNode> it = this.nodesNeedVersionOp.iterator();
        while (it.hasNext()) {
            MemcachedNode qa = it.next();
            try {
                this.prepareVersionInfo(qa);
            }
            catch (IllegalStateException e) {
                continue;
            }
            it.remove();
        }
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            throw new IOException("No IO while shut down");
        }
        this.addVersionOpToVersionAbsentNodes();
        this.handleInputQueue();
        this.getLogger().debug("Done dealing with queue.");
        long delay = 0L;
        if (!this._nodeManageQueue.isEmpty()) {
            delay = 1L;
        } else if (!this.reconnectQueue.isEmpty()) {
            long now = System.currentTimeMillis();
            long then = this.reconnectQueue.firstKey();
            delay = Math.max(then - now, 1L);
        }
        this.getLogger().debug("Selecting with delay of %sms", delay);
        assert (this.selectorsMakeSense()) : "Selectors don't make sense.";
        int selected = this.selector.select(delay);
        Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
        if (selectedKeys.isEmpty() && !this.shutDown) {
            this.getLogger().debug("No selectors ready, interrupted: " + Thread.interrupted());
            if (++this.emptySelects > 256) {
                for (SelectionKey sk : this.selector.keys()) {
                    this.getLogger().info("%s has %s, interested in %s", sk, sk.readyOps(), sk.interestOps());
                    if (sk.readyOps() != 0) {
                        this.getLogger().info("%s has a ready op, handling IO", sk);
                        this.handleIO(sk);
                        continue;
                    }
                    this.lostConnection((MemcachedNode)sk.attachment(), ReconnDelay.DEFAULT, "too many empty selects");
                }
                assert (this.emptySelects < 0x1000000) : "Too many empty selects";
            }
        } else {
            this.getLogger().debug("Selected %d, selected %d keys", selected, selectedKeys.size());
            this.emptySelects = 0;
            for (SelectionKey sk : selectedKeys) {
                this.handleIO(sk);
            }
            selectedKeys.clear();
        }
        for (SelectionKey sk : this.selector.keys()) {
            MemcachedNode mn = (MemcachedNode)sk.attachment();
            if (mn.getContinuousTimeout() > this.timeoutExceptionThreshold) {
                this.getLogger().warn("%s exceeded continuous timeout threshold. >%s (%s)", mn.getSocketAddress().toString(), this.timeoutExceptionThreshold, mn.getStatus());
                this.lostConnection(mn, ReconnDelay.DEFAULT, "continuous timeout");
                continue;
            }
            if (this.timeoutRatioThreshold <= 0 || mn.getTimeoutRatioNow() <= this.timeoutRatioThreshold) continue;
            this.getLogger().warn("%s exceeded timeout ratio threshold. >%s (%s)", mn.getSocketAddress().toString(), this.timeoutRatioThreshold, mn.getStatus());
            this.lostConnection(mn, ReconnDelay.DEFAULT, "high timeout ratio");
        }
        this.handleNodeManageQueue();
        if (!this.shutDown && !this.reconnectQueue.isEmpty()) {
            this.attemptReconnects();
        }
    }

    private void handleNodesToRemove(List<MemcachedNode> nodesToRemove) {
        for (MemcachedNode node : nodesToRemove) {
            this.getLogger().info("old memcached node removed %s", node);
            for (Map.Entry<Long, MemcachedNode> each : this.reconnectQueue.entrySet()) {
                if (!node.equals(each.getValue())) continue;
                this.reconnectQueue.remove(each.getKey());
                break;
            }
            String cause = "node removed.";
            if (this.failureMode == FailureMode.Cancel) {
                this.cancelOperations(node.destroyReadQueue(false), cause);
                this.cancelOperations(node.destroyWriteQueue(false), cause);
                this.cancelOperations(node.destroyInputQueue(), cause);
                continue;
            }
            if (this.failureMode != FailureMode.Redistribute && this.failureMode != FailureMode.Retry) continue;
            this.redistributeOperations(node.destroyReadQueue(true), cause);
            this.redistributeOperations(node.destroyWriteQueue(true), cause);
            this.redistributeOperations(node.destroyInputQueue(), cause);
        }
    }

    private void updateConnections(List<InetSocketAddress> addrs) throws IOException {
        ArrayList<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
        ArrayList<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            if (addrs.contains((InetSocketAddress)memcachedNode.getSocketAddress())) {
                addrs.remove((InetSocketAddress)memcachedNode.getSocketAddress());
                continue;
            }
            removeNodes.add(memcachedNode);
        }
        for (SocketAddress socketAddress : addrs) {
            attachNodes.add(this.attachMemcachedNode(this.connName, socketAddress));
        }
        this.locator.update(attachNodes, removeNodes);
        this.handleNodesToRemove(removeNodes);
    }

    private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
        ArrayList<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
        ArrayList<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
        ArrayList<MemcachedReplicaGroup> changeRoleGroups = new ArrayList<MemcachedReplicaGroup>();
        ArrayList<Task> taskList = new ArrayList<Task>();
        Map<String, List<ArcusReplNodeAddress>> newAllGroups = ArcusReplNodeAddress.makeGroupAddrsList(addrs);
        Map<String, MemcachedReplicaGroup> oldAllGroups = ((ArcusReplKetamaNodeLocator)this.locator).getAllGroups();
        for (Map.Entry<String, MemcachedReplicaGroup> entry : oldAllGroups.entrySet()) {
            ArcusReplNodeAddress newSlaveAddr;
            MemcachedReplicaGroup oldGroup = entry.getValue();
            MemcachedNode oldMasterNode = oldGroup.getMasterNode();
            MemcachedNode oldSlaveNode = oldGroup.getSlaveNode();
            List<ArcusReplNodeAddress> newGroupAddrs = newAllGroups.get(entry.getKey());
            this.getLogger().debug("New group nodes : " + newGroupAddrs);
            this.getLogger().debug("Old group nodes : [" + oldGroup + "]");
            if (newGroupAddrs == null) {
                removeNodes.add(oldMasterNode);
                if (oldSlaveNode == null) continue;
                removeNodes.add(oldSlaveNode);
                continue;
            }
            if (newGroupAddrs.size() == 0) {
                newAllGroups.remove(entry.getKey());
                continue;
            }
            ArcusReplNodeAddress oldMasterAddr = (ArcusReplNodeAddress)oldMasterNode.getSocketAddress();
            ArcusReplNodeAddress newMasterAddr = newGroupAddrs.get(0);
            assert (oldMasterAddr != null) : "invalid old rgroup";
            assert (newMasterAddr != null) : "invalid new rgroup";
            ArcusReplNodeAddress oldSlaveAddr = oldSlaveNode != null ? (ArcusReplNodeAddress)oldSlaveNode.getSocketAddress() : null;
            ArcusReplNodeAddress arcusReplNodeAddress = newSlaveAddr = newGroupAddrs.size() > 1 ? newGroupAddrs.get(1) : null;
            if (oldMasterAddr.getIPPort().equals(newMasterAddr.getIPPort())) {
                if (oldSlaveAddr == null) {
                    if (newSlaveAddr != null) {
                        attachNodes.add(this.attachMemcachedNode(this.connName, newSlaveAddr));
                    }
                } else if (newSlaveAddr == null) {
                    if (oldSlaveAddr != null) {
                        removeNodes.add(oldSlaveNode);
                        taskList.add(new MoveOperationTask(oldSlaveNode, oldMasterNode));
                    }
                } else if (!oldSlaveAddr.getIPPort().equals(newSlaveAddr.getIPPort())) {
                    attachNodes.add(this.attachMemcachedNode(this.connName, newSlaveAddr));
                    removeNodes.add(oldSlaveNode);
                    taskList.add(new MoveOperationTask(oldSlaveNode, oldMasterNode));
                }
            } else if (oldSlaveAddr != null && oldSlaveAddr.getIPPort().equals(newMasterAddr.getIPPort())) {
                if (newSlaveAddr != null && newSlaveAddr.getIPPort().equals(oldMasterAddr.getIPPort())) {
                    changeRoleGroups.add(oldGroup);
                    taskList.add(new MoveOperationTask(oldMasterNode, oldSlaveNode));
                    taskList.add(new QueueReconnectTask(oldMasterNode, ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
                } else {
                    changeRoleGroups.add(oldGroup);
                    removeNodes.add(oldMasterNode);
                    taskList.add(new SetupResendTask(oldMasterNode, false, "Discarded all pending reading state operation to move operations."));
                    taskList.add(new MoveOperationTask(oldMasterNode, oldSlaveNode));
                    if (newSlaveAddr != null) {
                        attachNodes.add(this.attachMemcachedNode(this.connName, newSlaveAddr));
                    }
                }
            } else {
                MemcachedNode newMasterNode = this.attachMemcachedNode(this.connName, newMasterAddr);
                attachNodes.add(newMasterNode);
                if (newSlaveAddr != null) {
                    attachNodes.add(this.attachMemcachedNode(this.connName, newSlaveAddr));
                }
                removeNodes.add(oldMasterNode);
                taskList.add(new SetupResendTask(oldMasterNode, false, "Discarded all pending reading state operation to move operations."));
                taskList.add(new MoveOperationTask(oldMasterNode, newMasterNode));
                if (oldSlaveNode != null) {
                    removeNodes.add(oldSlaveNode);
                    taskList.add(new MoveOperationTask(oldSlaveNode, newMasterNode));
                }
            }
            newAllGroups.remove(entry.getKey());
        }
        for (Map.Entry<String, Object> entry : newAllGroups.entrySet()) {
            List newGroupAddrs = (List)entry.getValue();
            if (newGroupAddrs.size() == 0) continue;
            attachNodes.add(this.attachMemcachedNode(this.connName, (SocketAddress)newGroupAddrs.get(0)));
            if (newGroupAddrs.size() <= 1) continue;
            attachNodes.add(this.attachMemcachedNode(this.connName, (SocketAddress)newGroupAddrs.get(1)));
        }
        ((ArcusReplKetamaNodeLocator)this.locator).update(attachNodes, removeNodes, changeRoleGroups);
        for (Task task : taskList) {
            task.doTask();
        }
        this.handleNodesToRemove(removeNodes);
    }

    private void switchoverMemcachedReplGroup(MemcachedNode node) {
        MemcachedReplicaGroup group = node.getReplicaGroup();
        if (group.getMasterNode() != null && group.getSlaveNode() != null) {
            if (((ArcusReplNodeAddress)node.getSocketAddress()).isMaster()) {
                if (node.moveOperations(group.getSlaveNode()) > 0) {
                    this.addedQueue.offer(group.getSlaveNode());
                }
                ((ArcusReplKetamaNodeLocator)this.locator).switchoverReplGroup(group);
            } else if (node.moveOperations(group.getMasterNode()) > 0) {
                this.addedQueue.offer(group.getMasterNode());
            }
            this.queueReconnect(node, ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
        } else {
            this.getLogger().warn("Delay switchover because invalid group state : " + group);
        }
    }

    MemcachedNode attachMemcachedNode(String name, SocketAddress sa) throws IOException {
        SocketChannel ch = SocketChannel.open();
        ch.configureBlocking(false);
        MemcachedNode qa = this.f.createMemcachedNode(name, sa, ch, this.f.getReadBufSize());
        if (this.timeoutRatioThreshold > 0) {
            qa.enableTimeoutRatio();
        }
        int ops = 0;
        ch.socket().setTcpNoDelay(!this.f.useNagleAlgorithm());
        ch.socket().setReuseAddress(true);
        try {
            if (ch.connect(sa)) {
                this.getLogger().info("new memcached node connected to %s immediately", qa);
                this.connected(qa);
            } else {
                this.getLogger().info("new memcached node added %s to connect queue", qa);
                ops = 8;
            }
            qa.setSk(ch.register(this.selector, ops, qa));
            assert (ch.isConnected() || qa.getSk().interestOps() == 8) : "Not connected, and not wanting to connect";
        }
        catch (SocketException e) {
            this.getLogger().warn("new memcached socket error on initial connect");
            this.queueReconnect(qa, ReconnDelay.DEFAULT, "initial connection error");
        }
        this.prepareVersionInfo(qa);
        return qa;
    }

    private void prepareVersionInfo(final MemcachedNode node) {
        VersionOperation op = this.opFact.version(new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus status) {
                if (status.isSuccess()) {
                    node.setVersion(status.getMessage());
                } else {
                    MemcachedConnection.this.getLogger().warn("VersionOp failed : " + status.getMessage());
                }
            }

            @Override
            public void complete() {
                if (node.getVersion() == null) {
                    MemcachedConnection.this.nodesNeedVersionOp.add(node);
                }
            }
        });
        this.addOperation(node, (Operation)op);
    }

    public void putMemcachedQueue(String addrs) {
        this._nodeManageQueue.offer(addrs);
        this.selector.wakeup();
    }

    void handleNodeManageQueue() throws IOException {
        if (!this._nodeManageQueue.isEmpty()) {
            String addrs = (String)this._nodeManageQueue.poll();
            if (this.arcusReplEnabled) {
                this.updateReplConnections(ArcusReplNodeAddress.getAddresses(addrs));
                return;
            }
            this.updateConnections(AddrUtil.getAddresses(addrs));
        }
    }

    private void handleInputQueue() {
        if (!this.addedQueue.isEmpty()) {
            MemcachedNode node;
            this.getLogger().debug("Handling queue");
            HashSet<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
            HashSet<MemcachedNode> todo = new HashSet<MemcachedNode>();
            while ((node = this.addedQueue.poll()) != null) {
                todo.add(node);
            }
            for (MemcachedNode qa : todo) {
                boolean readyForIO = false;
                if (qa.isActive()) {
                    if (qa.getCurrentWriteOp() != null) {
                        readyForIO = true;
                        this.getLogger().debug("Handling queued write %s", qa);
                    }
                } else {
                    toAdd.add(qa);
                }
                qa.copyInputQueue();
                if (readyForIO) {
                    try {
                        if (qa.getWbuf().hasRemaining()) {
                            this.handleWrites(qa.getSk(), qa);
                        }
                    }
                    catch (IOException e) {
                        this.getLogger().warn((Object)"Exception handling write", e);
                        this.lostConnection(qa, ReconnDelay.DEFAULT, "exception handling write");
                    }
                }
                qa.fixupOps();
            }
            this.addedQueue.addAll(toAdd);
        }
    }

    public boolean addObserver(ConnectionObserver obs) {
        return this.connObservers.add(obs);
    }

    public boolean removeObserver(ConnectionObserver obs) {
        return this.connObservers.remove(obs);
    }

    private void connected(MemcachedNode qa) {
        assert (qa.getChannel().isConnected()) : "Not connected.";
        int rt = qa.getReconnectCount();
        qa.connected();
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionEstablished(qa.getSocketAddress(), rt);
        }
    }

    private void lostConnection(MemcachedNode qa, ReconnDelay type, String cause) {
        this.queueReconnect(qa, type, cause);
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionLost(qa.getSocketAddress());
        }
    }

    private void handleIO(SelectionKey sk) {
        MemcachedNode qa = (MemcachedNode)sk.attachment();
        try {
            this.getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment());
            if (sk.isConnectable()) {
                this.getLogger().info("Connection state changed for %s", qa);
                SocketChannel channel = qa.getChannel();
                if (channel.finishConnect()) {
                    this.connected(qa);
                    this.addedQueue.offer(qa);
                    if (qa.getWbuf().hasRemaining()) {
                        this.handleWrites(sk, qa);
                    }
                } else assert (!channel.isConnected()) : "connected";
            } else {
                if (sk.isValid() && sk.isReadable()) {
                    this.handleReads(sk, qa);
                }
                if (sk.isValid() && sk.isWritable()) {
                    this.handleWrites(sk, qa);
                }
            }
        }
        catch (ClosedChannelException e) {
            if (!this.shutDown) {
                this.getLogger().warn("Closed channel and not shutting down.  Queueing reconnect on %s", qa, e);
                this.lostConnection(qa, ReconnDelay.DEFAULT, "closed channel");
            }
        }
        catch (ConnectException e) {
            this.getLogger().warn("Reconnecting due to failure to connect to %s", qa, e);
            this.queueReconnect(qa, ReconnDelay.DEFAULT, "failure to connect");
        }
        catch (OperationException e) {
            qa.setupForAuth("operation exception");
            this.getLogger().warn("Reconnection due to exception handling a memcached exception on %s.", qa, e);
            this.lostConnection(qa, ReconnDelay.IMMEDIATE, "operation exception");
        }
        catch (Exception e) {
            qa.setupForAuth("due to exception");
            this.getLogger().warn("Reconnecting due to exception on %s", qa, e);
            this.lostConnection(qa, ReconnDelay.DEFAULT, "exception" + e);
        }
        qa.fixupOps();
    }

    private void handleWrites(SelectionKey sk, MemcachedNode qa) throws IOException {
        boolean canWriteMore;
        qa.fillWriteBuffer(this.shouldOptimize);
        boolean bl = canWriteMore = qa.getBytesRemainingToWrite() > 0;
        while (canWriteMore) {
            int wrote = qa.writeSome();
            qa.fillWriteBuffer(this.shouldOptimize);
            canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
        }
    }

    private void handleReads(SelectionKey sk, MemcachedNode qa) throws IOException {
        Operation currentOp = qa.getCurrentReadOp();
        ByteBuffer rbuf = qa.getRbuf();
        SocketChannel channel = qa.getChannel();
        int read = channel.read(rbuf);
        while (read > 0) {
            this.getLogger().debug("Read %d bytes", read);
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                if (currentOp == null) {
                    throw new IllegalStateException("No read operation.");
                }
                currentOp.readFromBuffer(rbuf);
                if (currentOp.getState() == OperationState.COMPLETE) {
                    this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
                    Operation op = qa.removeCurrentReadOp();
                    assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
                    currentOp = qa.getCurrentReadOp();
                    continue;
                }
                if (currentOp.getState() != OperationState.MOVING) continue;
            }
            if (currentOp != null && currentOp.getState() == OperationState.MOVING) {
                rbuf.clear();
                this.switchoverMemcachedReplGroup(qa);
                break;
            }
            rbuf.clear();
            read = channel.read(rbuf);
        }
        if (read < 0) {
            throw new IOException("Disconnected unexpected, will reconnect.");
        }
    }

    static String dbgBuffer(ByteBuffer b, int size) {
        StringBuilder sb = new StringBuilder();
        byte[] bytes = b.array();
        for (int i = 0; i < size; ++i) {
            char ch = (char)bytes[i];
            if (Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) {
                sb.append(ch);
                continue;
            }
            sb.append("\\x");
            sb.append(Integer.toHexString(bytes[i] & 0xFF));
        }
        return sb.toString();
    }

    private void queueReconnect(MemcachedNode qa, ReconnDelay type, String cause) {
        if (!this.shutDown) {
            long delay;
            this.getLogger().warn("Closing, and reopening %s, attempt %d.", qa, qa.getReconnectCount());
            if (qa.getSk() != null) {
                qa.getSk().cancel();
                assert (!qa.getSk().isValid()) : "Cancelled selection key is valid";
            }
            qa.reconnecting();
            try {
                if (qa.getChannel() != null) {
                    qa.getChannel().close();
                } else {
                    this.getLogger().info("The channel or socket was null for %s", qa);
                }
            }
            catch (IOException e) {
                this.getLogger().warn((Object)"IOException trying to close a socket", e);
            }
            qa.setChannel(null);
            switch (type) {
                case IMMEDIATE: {
                    delay = 0L;
                    break;
                }
                default: {
                    delay = (long)Math.min((double)this.maxDelay, Math.pow(2.0, qa.getReconnectCount())) * 1000L;
                }
            }
            long reconTime = System.currentTimeMillis() + delay;
            while (this.reconnectQueue.containsKey(reconTime)) {
                ++reconTime;
            }
            this.reconnectQueue.put(reconTime, qa);
            qa.setupResend(this.failureMode == FailureMode.Cancel && type == ReconnDelay.DEFAULT, cause);
            if (type == ReconnDelay.DEFAULT) {
                if (this.failureMode == FailureMode.Redistribute) {
                    this.redistributeOperations(qa.destroyInputQueue(), cause);
                } else if (this.failureMode == FailureMode.Cancel) {
                    this.cancelOperations(qa.destroyInputQueue(), cause);
                }
            }
        }
    }

    private void cancelOperations(Collection<Operation> ops, String cause) {
        for (Operation op : ops) {
            op.cancel(cause);
        }
    }

    private void redistributeOperations(Collection<Operation> ops, String cause) {
        for (Operation op : ops) {
            if (op instanceof KeyedOperation) {
                KeyedOperation ko = (KeyedOperation)op;
                int added = 0;
                for (String k : ko.getKeys()) {
                    for (Operation newop : this.opFact.clone(ko)) {
                        this.addOperation(k, newop);
                        ++added;
                    }
                }
                assert (added > 0) : "Didn't add any new operations when redistributing";
                continue;
            }
            op.cancel(cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void attemptReconnects() throws IOException {
        long now = System.currentTimeMillis();
        IdentityHashMap<MemcachedNode, Boolean> seen = new IdentityHashMap<MemcachedNode, Boolean>();
        ArrayList<MemcachedNode> rereQueue = new ArrayList<MemcachedNode>();
        SocketChannel ch = null;
        Iterator<MemcachedNode> i = this.reconnectQueue.headMap(now).values().iterator();
        while (i.hasNext()) {
            MemcachedNode qa;
            block26: {
                qa = i.next();
                i.remove();
                if (qa.getChannel() == null) break block26;
                this.getLogger().info("Skipping reconnect request that already reconnected to %s", qa);
                if (ch == null || ch.isConnected() || ch.isConnectionPending()) continue;
                try {
                    ch.close();
                }
                catch (IOException x) {
                    this.getLogger().error("Exception closing channel: %s", qa, x);
                }
                continue;
            }
            if (!seen.containsKey(qa)) {
                seen.put(qa, Boolean.TRUE);
                this.getLogger().info("Reconnecting %s", qa);
                ch = SocketChannel.open();
                ch.configureBlocking(false);
                ch.socket().setTcpNoDelay(!this.f.useNagleAlgorithm());
                ch.socket().setReuseAddress(true);
                int ops = 0;
                if (ch.connect(qa.getSocketAddress())) {
                    this.getLogger().info("Immediately reconnected to %s", qa);
                    this.connected(qa);
                    this.addedQueue.offer(qa);
                    assert (ch.isConnected());
                } else {
                    ops = 8;
                }
                qa.registerChannel(ch, ch.register(this.selector, ops, qa));
                assert (qa.getChannel() == ch) : "Channel was lost.";
            } else {
                this.getLogger().debug("Skipping duplicate reconnect request for %s", qa);
            }
            if (ch == null || ch.isConnected() || ch.isConnectionPending()) continue;
            try {
                ch.close();
            }
            catch (IOException x) {
                this.getLogger().error("Exception closing channel: %s", qa, x);
            }
            continue;
            catch (SocketException e) {
                this.getLogger().warn((Object)"Error on reconnect", e);
                rereQueue.add(qa);
                if (ch == null || ch.isConnected() || ch.isConnectionPending()) continue;
                try {
                    ch.close();
                }
                catch (IOException x) {
                    this.getLogger().error("Exception closing channel: %s", qa, x);
                }
                continue;
            }
            catch (Exception e2) {
                this.getLogger().error("Exception on reconnect, lost node %s", qa, e2);
                if (ch == null || ch.isConnected() || ch.isConnectionPending()) continue;
                {
                    catch (Throwable throwable) {
                        if (ch != null && !ch.isConnected() && !ch.isConnectionPending()) {
                            try {
                                ch.close();
                            }
                            catch (IOException x) {
                                this.getLogger().error("Exception closing channel: %s", qa, x);
                            }
                        }
                        throw throwable;
                    }
                }
                try {
                    ch.close();
                }
                catch (IOException x) {
                    this.getLogger().error("Exception closing channel: %s", qa, x);
                }
            }
        }
        for (MemcachedNode n : rereQueue) {
            this.queueReconnect(n, ReconnDelay.DEFAULT, "error on reconnect");
        }
    }

    NodeLocator getLocator() {
        return this.locator;
    }

    private ReplicaPick getReplicaPick(Operation o) {
        ReplicaPick pick = ReplicaPick.MASTER;
        if (o.isReadOperation()) {
            ReadPriority readPriority = this.f.getAPIReadPriority().get((Object)o.getAPIType());
            if (readPriority != null) {
                if (readPriority == ReadPriority.SLAVE) {
                    pick = ReplicaPick.SLAVE;
                } else if (readPriority == ReadPriority.RR) {
                    pick = ReplicaPick.RR;
                }
            } else {
                pick = this.getReplicaPick();
            }
        }
        return pick;
    }

    private ReplicaPick getReplicaPick() {
        ReadPriority readPriority = this.f.getReadPriority();
        ReplicaPick pick = ReplicaPick.MASTER;
        if (readPriority == ReadPriority.SLAVE) {
            pick = ReplicaPick.SLAVE;
        } else if (readPriority == ReadPriority.RR) {
            pick = ReplicaPick.RR;
        }
        return pick;
    }

    public MemcachedNode getPrimaryNode(String key) {
        if (this.arcusReplEnabled) {
            return ((ArcusReplKetamaNodeLocator)this.locator).getPrimary(key, this.getReplicaPick());
        }
        return this.locator.getPrimary(key);
    }

    public MemcachedNode getPrimaryNode(String key, Operation o) {
        if (this.arcusReplEnabled) {
            return ((ArcusReplKetamaNodeLocator)this.locator).getPrimary(key, this.getReplicaPick(o));
        }
        return this.locator.getPrimary(key);
    }

    public Iterator<MemcachedNode> getNodeSequence(String key) {
        if (this.arcusReplEnabled) {
            return ((ArcusReplKetamaNodeLocator)this.locator).getSequence(key, this.getReplicaPick());
        }
        return this.locator.getSequence(key);
    }

    public Iterator<MemcachedNode> getNodeSequence(String key, Operation o) {
        if (this.arcusReplEnabled) {
            return ((ArcusReplKetamaNodeLocator)this.locator).getSequence(key, this.getReplicaPick(o));
        }
        return this.locator.getSequence(key);
    }

    public void addOperation(String key, Operation o) {
        MemcachedNode placeIn = null;
        MemcachedNode primary = this.getPrimaryNode(key, o);
        if (primary == null) {
            o.cancel("no node");
        } else if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            placeIn = primary;
        } else if (this.failureMode == FailureMode.Cancel) {
            o.setHandlingNode(primary);
            o.cancel("inactive node");
        } else {
            Iterator<MemcachedNode> iter = this.getNodeSequence(key, o);
            while (placeIn == null && iter.hasNext()) {
                MemcachedNode n = iter.next();
                if (!n.isActive()) continue;
                placeIn = n;
            }
            if (placeIn == null) {
                placeIn = primary;
            }
        }
        assert (o.isCancelled() || placeIn != null) : "No node found for key " + key;
        if (placeIn != null) {
            this.addOperation(placeIn, o);
        } else assert (o.isCancelled()) : "No not found for " + key + " (and not immediately cancelled)";
    }

    public void insertOperation(MemcachedNode node, Operation o) {
        o.setHandlingNode(node);
        o.initialize();
        node.insertOp(o);
        this.addedQueue.offer(node);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    public void addOperation(MemcachedNode node, Operation o) {
        o.setHandlingNode(node);
        o.initialize();
        node.addOp(o);
        this.addedQueue.offer(node);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    public void addOperations(Map<MemcachedNode, Operation> ops) {
        for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
            MemcachedNode node = me.getKey();
            Operation o = me.getValue();
            o.setHandlingNode(node);
            o.initialize();
            node.addOp(o);
            this.addedQueue.offer(node);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of) {
        return this.broadcastOperation(of, this.locator.getAll());
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
        CountDownLatch latch = new CountDownLatch(this.locator.getAll().size());
        for (MemcachedNode node : nodes) {
            Operation op = of.newOp(node, latch);
            op.setHandlingNode(node);
            op.initialize();
            node.addOp(op);
            this.addedQueue.offer(node);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        return latch;
    }

    public void shutdown() throws IOException {
        this.shutDown = true;
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        for (MemcachedNode qa : this.locator.getAll()) {
            qa.shutdown();
        }
        this.selector.close();
        this.getLogger().debug("Shut down selector %s", this.selector);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        for (MemcachedNode qa : this.locator.getAll()) {
            sb.append(" ");
            sb.append(qa.getSocketAddress());
        }
        sb.append("}");
        return sb.toString();
    }

    public static void opTimedOut(Operation op) {
        MemcachedConnection.setTimeout(op, true);
    }

    public static void opSucceeded(Operation op) {
        MemcachedConnection.setTimeout(op, false);
    }

    private static void setTimeout(Operation op, boolean isTimeout) {
        try {
            if (op == null) {
                LoggerFactory.getLogger(MemcachedConnection.class).debug("op is null.");
                return;
            }
            MemcachedNode node = op.getHandlingNode();
            if (node == null) {
                LoggerFactory.getLogger(MemcachedConnection.class).debug("handling node for operation is not set");
            } else if (isTimeout || !op.isCancelled()) {
                node.setContinuousTimeout(isTimeout);
            }
        }
        catch (Exception e) {
            LoggerFactory.getLogger(MemcachedConnection.class).error(e.getMessage());
        }
    }

    public MemcachedNode findNodeByKey(String key) {
        MemcachedNode placeIn = null;
        MemcachedNode primary = this.getPrimaryNode(key);
        if (primary == null) {
            return null;
        }
        if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            placeIn = primary;
        } else {
            Iterator<MemcachedNode> iter = this.getNodeSequence(key);
            while (placeIn == null && iter.hasNext()) {
                MemcachedNode n = iter.next();
                if (!n.isActive()) continue;
                placeIn = n;
            }
            if (placeIn == null) {
                placeIn = primary;
            }
        }
        return placeIn;
    }

    public int getAddedQueueSize() {
        return this.addedQueue.size();
    }

    private class MoveOperationTask
    implements Task {
        private MemcachedNode fromNode;
        private MemcachedNode toNode;

        public MoveOperationTask(MemcachedNode from, MemcachedNode to) {
            this.fromNode = from;
            this.toNode = to;
        }

        @Override
        public void doTask() {
            if (this.fromNode.moveOperations(this.toNode) > 0) {
                MemcachedConnection.this.addedQueue.offer(this.toNode);
            }
        }
    }

    private class QueueReconnectTask
    implements Task {
        private MemcachedNode node;
        private ReconnDelay delay;
        private String cause;

        public QueueReconnectTask(MemcachedNode node, ReconnDelay delay, String cause) {
            this.node = node;
            this.delay = delay;
            this.cause = cause;
        }

        @Override
        public void doTask() {
            MemcachedConnection.this.queueReconnect(this.node, this.delay, this.cause);
        }
    }

    private class SetupResendTask
    implements Task {
        private MemcachedNode node;
        private boolean cancelWrite;
        private String cause;

        public SetupResendTask(MemcachedNode node, boolean cancelWrite, String cause) {
            this.node = node;
            this.cancelWrite = cancelWrite;
            this.cause = cause;
        }

        @Override
        public void doTask() {
            this.node.setupResend(this.cancelWrite, this.cause);
        }
    }

    private static interface Task {
        public void doTask();
    }
}

