/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.KeyUtil;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.ops.CASOperation;
import net.spy.memcached.ops.CASOperationStatus;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.ConcatenationOperation;
import net.spy.memcached.ops.ConcatenationType;
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.FlushOperation;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.plugin.LocalCacheManager;
import net.spy.memcached.transcoders.TranscodeService;
import net.spy.memcached.transcoders.Transcoder;

public class MemcachedClient
extends SpyThread
implements MemcachedClientIF,
ConnectionObserver {
    private volatile boolean running = true;
    private volatile boolean shuttingDown = false;
    protected LocalCacheManager localCacheManager = null;
    protected final long operationTimeout;
    private final MemcachedConnection conn;
    protected final OperationFactory opFact;
    protected final Transcoder<Object> transcoder;
    private final TranscodeService tcService;
    private final AuthDescriptor authDescriptor;
    private final byte delimiter;
    private static final String DEFAULT_MEMCACHED_CLIENT_NAME = "MemcachedClient";
    private static final int GET_BULK_CHUNK_SIZE = 200;
    private final AuthThreadMonitor authMonitor = new AuthThreadMonitor();

    public MemcachedClient(String name, InetSocketAddress ... ia) throws IOException {
        this(new DefaultConnectionFactory(), name, Arrays.asList(ia));
    }

    public MemcachedClient(InetSocketAddress ... ia) throws IOException {
        this(new DefaultConnectionFactory(), DEFAULT_MEMCACHED_CLIENT_NAME, Arrays.asList(ia));
    }

    public MemcachedClient(List<InetSocketAddress> addrs) throws IOException {
        this(new DefaultConnectionFactory(), DEFAULT_MEMCACHED_CLIENT_NAME, addrs);
    }

    public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs) throws IOException {
        this(cf, DEFAULT_MEMCACHED_CLIENT_NAME, addrs);
    }

    public MemcachedClient(ConnectionFactory cf, String name, List<InetSocketAddress> addrs) throws IOException {
        if (cf == null) {
            throw new NullPointerException("Connection factory required");
        }
        if (name == null) {
            throw new NullPointerException("Client name required");
        }
        if (addrs == null) {
            throw new NullPointerException("Server list required");
        }
        if (addrs.isEmpty()) {
            throw new IllegalArgumentException("You must have at least one server to connect to");
        }
        if (cf.getOperationTimeout() <= 0L) {
            throw new IllegalArgumentException("Operation timeout must be positive.");
        }
        this.tcService = new TranscodeService(cf.isDaemon());
        this.transcoder = cf.getDefaultTranscoder();
        this.opFact = cf.getOperationFactory();
        assert (this.opFact != null) : "Connection factory failed to make op factory";
        this.conn = cf.createConnection(name, addrs);
        assert (this.conn != null) : "Connection factory failed to make a connection";
        this.operationTimeout = cf.getOperationTimeout();
        this.authDescriptor = cf.getAuthDescriptor();
        if (this.authDescriptor != null) {
            this.addObserver(this);
        }
        this.delimiter = cf.getDelimiter();
        this.setName("Memcached IO over " + this.conn);
        this.setDaemon(cf.isDaemon());
        this.start();
    }

    @Override
    public Collection<SocketAddress> getAvailableServers() {
        ArrayList<SocketAddress> rv = new ArrayList<SocketAddress>();
        for (MemcachedNode node : this.conn.getLocator().getAll()) {
            if (!node.isActive()) continue;
            rv.add(node.getSocketAddress());
        }
        return rv;
    }

    @Override
    public Collection<SocketAddress> getUnavailableServers() {
        ArrayList<SocketAddress> rv = new ArrayList<SocketAddress>();
        for (MemcachedNode node : this.conn.getLocator().getAll()) {
            if (node.isActive()) continue;
            rv.add(node.getSocketAddress());
        }
        return rv;
    }

    @Override
    public NodeLocator getNodeLocator() {
        return this.conn.getLocator().getReadonlyCopy();
    }

    @Override
    public Transcoder<Object> getTranscoder() {
        return this.transcoder;
    }

    protected void validateKey(String key) {
        boolean hasPrefix = false;
        byte[] keyBytes = KeyUtil.getKeyBytes(key);
        if (keyBytes.length > 32000) {
            throw new IllegalArgumentException("Key is too long (maxlen = 32000)");
        }
        if (keyBytes.length == 0) {
            throw new IllegalArgumentException("Key must contain at least one character.");
        }
        for (byte b : keyBytes) {
            if (b == 32 || b == 10 || b == 13 || b == 0) {
                throw new IllegalArgumentException("Key contains invalid characters:  ``" + key + "''");
            }
            if (b != this.delimiter) continue;
            hasPrefix = true;
        }
        if (hasPrefix) {
            if (keyBytes[0] == 45) {
                throw new IllegalArgumentException("Key contains invalid prefix: ``" + key + "''");
            }
            for (byte b : keyBytes) {
                if (b == this.delimiter) break;
                if (97 <= b && b <= 122 || 65 <= b && b <= 90 || 48 <= b && b <= 57 || b == 95 || b == 45 || b == 43 || b == 46) continue;
                throw new IllegalArgumentException("Key contains invalid prefix: ``" + key + "''");
            }
        }
    }

    protected void checkState() {
        if (this.shuttingDown) {
            throw new IllegalStateException("Shutting down");
        }
        assert (this.isAlive()) : "IO Thread is not running.";
    }

    protected Operation addOp(String key, Operation op) {
        this.validateKey(key);
        this.checkState();
        this.conn.addOperation(key, op);
        return op;
    }

    protected CountDownLatch broadcastOp(BroadcastOpFactory of) {
        return this.broadcastOp(of, this.conn.getLocator().getAll(), true);
    }

    CountDownLatch broadcastOp(BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
        return this.broadcastOp(of, nodes, true);
    }

    private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection<MemcachedNode> nodes, boolean checkShuttingDown) {
        if (checkShuttingDown && this.shuttingDown) {
            throw new IllegalStateException("Shutting down");
        }
        return this.conn.broadcastOperation(of, nodes);
    }

    private <T> Future<Boolean> asyncStore(StoreType storeType, String key, int exp, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(latch, this.operationTimeout);
        StoreOperation op = this.opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                rv.set(val.isSuccess(), val);
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    private Future<Boolean> asyncStore(StoreType storeType, String key, int exp, Object value) {
        return this.asyncStore(storeType, key, exp, value, this.transcoder);
    }

    private <T> Future<Boolean> asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(latch, this.operationTimeout);
        ConcatenationOperation op = this.opFact.cat(catType, cas, key, co.getData(), new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                rv.set(val.isSuccess(), val);
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    @Override
    public Future<Boolean> append(long cas, String key, Object val) {
        return this.append(cas, key, val, this.transcoder);
    }

    @Override
    public <T> Future<Boolean> append(long cas, String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.append, cas, key, val, tc);
    }

    @Override
    public Future<Boolean> prepend(long cas, String key, Object val) {
        return this.prepend(cas, key, val, this.transcoder);
    }

    @Override
    public <T> Future<Boolean> prepend(long cas, String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.prepend, cas, key, val, tc);
    }

    @Override
    public <T> Future<CASResponse> asyncCAS(String key, long casId, T value, Transcoder<T> tc) {
        return this.asyncCAS(key, casId, 0, value, tc);
    }

    @Override
    public <T> Future<CASResponse> asyncCAS(String key, long casId, int exp, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<CASResponse> rv = new OperationFuture<CASResponse>(latch, this.operationTimeout);
        CASOperation op = this.opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                if (val instanceof CASOperationStatus) {
                    rv.set(((CASOperationStatus)val).getCASResponse(), val);
                } else if (val instanceof CancelledOperationStatus) {
                    rv.set(CASResponse.CANCELED, val);
                } else {
                    rv.set(CASResponse.UNDEFINED, val);
                }
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    @Override
    public Future<CASResponse> asyncCAS(String key, long casId, Object value) {
        return this.asyncCAS(key, casId, value, this.transcoder);
    }

    @Override
    public Future<CASResponse> asyncCAS(String key, long casId, int exp, Object value) {
        return this.asyncCAS(key, casId, exp, value, this.transcoder);
    }

    @Override
    public <T> CASResponse cas(String key, long casId, T value, Transcoder<T> tc) {
        return this.cas(key, casId, 0, value, tc);
    }

    @Override
    public <T> CASResponse cas(String key, long casId, int exp, T value, Transcoder<T> tc) {
        try {
            return this.asyncCAS(key, casId, exp, value, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", e);
        }
    }

    @Override
    public CASResponse cas(String key, long casId, Object value) {
        return this.cas(key, casId, value, this.transcoder);
    }

    @Override
    public CASResponse cas(String key, long casId, int exp, Object value) {
        return this.cas(key, casId, exp, value, this.transcoder);
    }

    @Override
    public <T> Future<Boolean> add(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.add, key, exp, o, tc);
    }

    @Override
    public Future<Boolean> add(String key, int exp, Object o) {
        return this.asyncStore(StoreType.add, key, exp, o, this.transcoder);
    }

    @Override
    public <T> Future<Boolean> set(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.set, key, exp, o, tc);
    }

    @Override
    public Future<Boolean> set(String key, int exp, Object o) {
        return this.asyncStore(StoreType.set, key, exp, o, this.transcoder);
    }

    @Override
    public <T> Future<Boolean> replace(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.replace, key, exp, o, tc);
    }

    @Override
    public Future<Boolean> replace(String key, int exp, Object o) {
        return this.asyncStore(StoreType.replace, key, exp, o, this.transcoder);
    }

    @Override
    public <T> Future<T> asyncGet(final String key, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final GetFuture rv = new GetFuture(latch, this.operationTimeout);
        GetOperation op = this.opFact.get(key, new GetOperation.Callback(){
            private Future<T> val = null;

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(this.val, status);
            }

            @Override
            public void gotData(String k, int flags, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                this.val = MemcachedClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
            }

            @Override
            public void complete() {
                if (MemcachedClient.this.localCacheManager != null) {
                    MemcachedClient.this.localCacheManager.put(key, this.val, MemcachedClient.this.operationTimeout);
                }
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    @Override
    public Future<Object> asyncGet(String key) {
        return this.asyncGet(key, this.transcoder);
    }

    @Override
    public <T> Future<CASValue<T>> asyncGets(final String key, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<CASValue<T>> rv = new OperationFuture<CASValue<T>>(latch, this.operationTimeout);
        GetsOperation op = this.opFact.gets(key, new GetsOperation.Callback(){
            private CASValue<T> val = null;

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(this.val, status);
            }

            @Override
            public void gotData(String k, int flags, long cas, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                assert (cas > 0L) : "CAS was less than zero:  " + cas;
                this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    @Override
    public Future<CASValue<Object>> asyncGets(String key) {
        return this.asyncGets(key, this.transcoder);
    }

    @Override
    public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
        try {
            return this.asyncGets(key, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", e);
        }
    }

    @Override
    public CASValue<Object> gets(String key) {
        return this.gets(key, this.transcoder);
    }

    @Override
    public <T> T get(String key, Transcoder<T> tc) {
        Future<T> future = this.asyncGet(key, tc);
        try {
            return future.get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            future.cancel(true);
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            future.cancel(true);
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            future.cancel(true);
            throw new OperationTimeoutException("Timeout waiting for value", e);
        }
    }

    @Override
    public Object get(String key) {
        return this.get(key, this.transcoder);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Iterator<Transcoder<T>> tc_iter) {
        final ConcurrentHashMap m = new ConcurrentHashMap();
        final HashMap<String, Object> tc_map = new HashMap<String, Object>();
        HashMap chunks = new HashMap();
        HashMap<MemcachedNode, Integer> chunkCount = new HashMap<MemcachedNode, Integer>();
        NodeLocator locator = this.conn.getLocator();
        Iterator<String> key_iter = keys.iterator();
        while (key_iter.hasNext() && tc_iter.hasNext()) {
            Object cachedData;
            String key = key_iter.next();
            Transcoder<T> tc = tc_iter.next();
            if (this.localCacheManager != null && (cachedData = this.localCacheManager.get(key, tc)) != null) {
                m.put(key, new LocalCacheManager.Task(new Callable<T>(){

                    @Override
                    public T call() throws Exception {
                        return cachedData;
                    }
                }));
                continue;
            }
            tc_map.put(key, tc);
            this.validateKey(key);
            MemcachedNode primaryNode = this.conn.getPrimaryNode(key);
            MemcachedNode node = null;
            if (primaryNode == null) {
                node = null;
            } else if (primaryNode.isActive()) {
                node = primaryNode;
            } else {
                Iterator<MemcachedNode> iter = this.conn.getNodeSequence(key);
                while (node == null && iter.hasNext()) {
                    MemcachedNode n = iter.next();
                    if (!n.isActive()) continue;
                    node = n;
                }
                if (node == null) {
                    node = primaryNode;
                }
            }
            ArrayList lks = (ArrayList)chunks.get(node);
            if (lks == null) {
                lks = new ArrayList();
                ArrayList ts = new ArrayList();
                lks.add(0, ts);
                chunks.put(node, lks);
                chunkCount.put(node, 0);
            }
            if (((Collection)lks.get((Integer)chunkCount.get(node))).size() >= 200) {
                int count = (Integer)chunkCount.get(node) + 1;
                ArrayList ts = new ArrayList();
                lks.add(count, ts);
                chunkCount.put(node, count);
            }
            Collection ks = (Collection)lks.get((Integer)chunkCount.get(node));
            ks.add(key);
        }
        int chunk_size = 0;
        for (Map.Entry counts : chunkCount.entrySet()) {
            chunk_size += (Integer)counts.getValue() + 1;
        }
        final CountDownLatch latch = new CountDownLatch(chunk_size);
        ArrayList<Operation> ops = new ArrayList<Operation>(chunk_size);
        GetOperation.Callback cb = new GetOperation.Callback(){

            @Override
            public void receivedStatus(OperationStatus status) {
                if (!status.isSuccess()) {
                    MemcachedClient.this.getLogger().warn("Unsuccessful get:  %s", status);
                }
            }

            @Override
            public void gotData(String k, int flags, byte[] data) {
                Transcoder tc = (Transcoder)tc_map.get(k);
                m.put(k, MemcachedClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        };
        this.checkState();
        for (Map.Entry me : chunks.entrySet()) {
            MemcachedNode node = (MemcachedNode)me.getKey();
            for (int i = 0; i <= (Integer)chunkCount.get(node); ++i) {
                GetOperation op;
                if (node == null) {
                    op = this.opFact.mget((Collection)((List)me.getValue()).get(i), cb);
                    op.cancel("no node");
                } else {
                    op = node.enabledMGetOp() ? this.opFact.mget((Collection)((List)me.getValue()).get(i), cb) : this.opFact.get((Collection)((List)me.getValue()).get(i), cb);
                    this.conn.addOperation(node, (Operation)op);
                }
                ops.add(op);
            }
        }
        return new BulkGetFuture(m, ops, latch, this.localCacheManager);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Transcoder<T> tc) {
        return this.asyncGetBulk(keys, new SingleElementInfiniteIterator<Transcoder<T>>(tc));
    }

    @Override
    public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys) {
        return this.asyncGetBulk(keys, this.transcoder);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Transcoder<T> tc, String ... keys) {
        return this.asyncGetBulk(Arrays.asList(keys), tc);
    }

    @Override
    public BulkFuture<Map<String, Object>> asyncGetBulk(String ... keys) {
        return this.asyncGetBulk(Arrays.asList(keys), this.transcoder);
    }

    @Override
    public <T> Map<String, T> getBulk(Collection<String> keys, Transcoder<T> tc) {
        try {
            return (Map)this.asyncGetBulk(keys, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted getting bulk values", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Failed getting bulk values", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for bulkvalues", e);
        }
    }

    @Override
    public Map<String, Object> getBulk(Collection<String> keys) {
        return this.getBulk(keys, this.transcoder);
    }

    @Override
    public <T> Map<String, T> getBulk(Transcoder<T> tc, String ... keys) {
        return this.getBulk(Arrays.asList(keys), tc);
    }

    @Override
    public Map<String, Object> getBulk(String ... keys) {
        return this.getBulk(Arrays.asList(keys), this.transcoder);
    }

    @Override
    public Map<SocketAddress, String> getVersions() {
        final ConcurrentHashMap<SocketAddress, String> rv = new ConcurrentHashMap<SocketAddress, String>();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                final SocketAddress sa = n.getSocketAddress();
                return MemcachedClient.this.opFact.version(new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus s) {
                        rv.put(sa, s.getMessage());
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for versions", e);
        }
        return rv;
    }

    @Override
    public Map<SocketAddress, Map<String, String>> getStats() {
        return this.getStats(null);
    }

    @Override
    public Map<SocketAddress, Map<String, String>> getStats(final String arg) {
        final HashMap<SocketAddress, Map<String, String>> rv = new HashMap<SocketAddress, Map<String, String>>();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                final SocketAddress sa = n.getSocketAddress();
                rv.put(sa, new HashMap());
                return MemcachedClient.this.opFact.stats(arg, new StatsOperation.Callback(){

                    @Override
                    public void gotStat(String name, String val) {
                        ((Map)rv.get(sa)).put(name, val);
                    }

                    @Override
                    public void receivedStatus(OperationStatus status) {
                        if (!status.isSuccess()) {
                            MemcachedClient.this.getLogger().warn("Unsuccessful stat fetch:  %s", status);
                        }
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for stats", e);
        }
        return rv;
    }

    private long mutate(Mutator m, String key, int by, long def, int exp) {
        final AtomicLong rv = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(1);
        this.addOp(key, this.opFact.mutate(m, key, by, def, exp, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"));
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        }));
        try {
            if (!latch.await(this.operationTimeout, TimeUnit.MILLISECONDS)) {
                throw new OperationTimeoutException("Mutate operation timed out, unable to modify counter [" + key + "]");
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
        this.getLogger().debug("Mutation returned %s", rv);
        return rv.get();
    }

    @Override
    public long incr(String key, int by) {
        return this.mutate(Mutator.incr, key, by, -1L, 0);
    }

    @Override
    public long decr(String key, int by) {
        return this.mutate(Mutator.decr, key, by, -1L, 0);
    }

    @Override
    public long incr(String key, int by, long def, int exp) {
        return this.mutate(Mutator.incr, key, by, def, exp);
    }

    @Override
    public long decr(String key, int by, long def, int exp) {
        return this.mutate(Mutator.decr, key, by, def, exp);
    }

    private long mutateWithDefault(Mutator t, String key, int by, long def, int exp) {
        long rv = this.mutate(t, key, by, def, exp);
        if (rv == -1L) {
            Future<Boolean> f = this.asyncStore(StoreType.add, key, exp, String.valueOf(def));
            try {
                if (f.get(this.operationTimeout, TimeUnit.MILLISECONDS).booleanValue()) {
                    rv = def;
                } else {
                    rv = this.mutate(t, key, by, 0L, exp);
                    assert (rv != -1L) : "Failed to mutate or init value";
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted waiting for store", e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException("Failed waiting for store", e);
            }
            catch (TimeoutException e) {
                throw new OperationTimeoutException("Timeout waiting to mutate or init value", e);
            }
        }
        return rv;
    }

    private Future<Long> asyncMutate(Mutator m, String key, int by, long def, int exp) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Long> rv = new OperationFuture<Long>(latch, this.operationTimeout);
        Operation op = this.addOp(key, this.opFact.mutate(m, key, by, def, exp, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"), s);
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        }));
        rv.setOperation(op);
        return rv;
    }

    @Override
    public Future<Long> asyncIncr(String key, int by) {
        return this.asyncMutate(Mutator.incr, key, by, -1L, 0);
    }

    @Override
    public Future<Long> asyncIncr(String key, int by, long def, int exp) {
        return this.asyncMutate(Mutator.incr, key, by, def, exp);
    }

    @Override
    public Future<Long> asyncDecr(String key, int by) {
        return this.asyncMutate(Mutator.decr, key, by, -1L, 0);
    }

    @Override
    public Future<Long> asyncDecr(String key, int by, long def, int exp) {
        return this.asyncMutate(Mutator.decr, key, by, def, exp);
    }

    @Override
    public long incr(String key, int by, long def) {
        return this.mutateWithDefault(Mutator.incr, key, by, def, 0);
    }

    @Override
    public long decr(String key, int by, long def) {
        return this.mutateWithDefault(Mutator.decr, key, by, def, 0);
    }

    @Deprecated
    public Future<Boolean> delete(String key, int hold) {
        return this.delete(key);
    }

    @Override
    public Future<Boolean> delete(String key) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(latch, this.operationTimeout);
        DeleteOperation op = this.opFact.delete(key, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(s.isSuccess(), s);
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        });
        rv.setOperation(op);
        this.addOp(key, op);
        return rv;
    }

    @Override
    public Future<Boolean> flush(final int delay) {
        final AtomicReference<Object> flushResult = new AtomicReference<Object>(null);
        final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue();
        final CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                FlushOperation op = MemcachedClient.this.opFact.flush(delay, new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus s) {
                        flushResult.set(s.isSuccess());
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
                ops.add(op);
                return op;
            }
        });
        return new OperationFuture<Boolean>(blatch, flushResult, this.operationTimeout){

            @Override
            public boolean cancel(boolean ign) {
                boolean rv = false;
                for (Operation op : ops) {
                    op.cancel("by application.");
                    rv |= op.getState() == OperationState.WRITE_QUEUED;
                }
                return rv;
            }

            @Override
            public boolean isCancelled() {
                for (Operation op : ops) {
                    if (!op.isCancelled()) continue;
                    return true;
                }
                return false;
            }

            @Override
            public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException {
                if (!blatch.await(duration, units)) {
                    for (Operation op : ops) {
                        MemcachedConnection.opTimedOut(op);
                    }
                    throw new CheckedOperationTimeoutException("Timed out waiting for operation. >" + duration + " " + (Object)((Object)units), ops);
                }
                for (Operation op : ops) {
                    MemcachedConnection.opSucceeded(op);
                }
                for (Operation op : ops) {
                    if (op != null && op.hasErrored()) {
                        throw new ExecutionException(op.getException());
                    }
                    if (op == null || !op.isCancelled()) continue;
                    throw new ExecutionException(new RuntimeException(op.getCancelCause()));
                }
                return (Boolean)flushResult.get();
            }

            @Override
            public boolean isDone() {
                for (Operation op : ops) {
                    if (op.getState() == OperationState.COMPLETE || op.isCancelled()) continue;
                    return false;
                }
                return true;
            }
        };
    }

    @Override
    public Future<Boolean> flush() {
        return this.flush(-1);
    }

    @Override
    public Set<String> listSaslMechanisms() {
        final ConcurrentHashMap rv = new ConcurrentHashMap();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                return MemcachedClient.this.opFact.saslMechs(new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus status) {
                        for (String s : status.getMessage().split(" ")) {
                            rv.put(s, s);
                        }
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return rv.keySet();
    }

    private void logRunException(Exception e) {
        if (this.shuttingDown) {
            this.getLogger().debug((Object)"Exception occurred during shutdown", e);
        } else {
            this.getLogger().warn((Object)"Problem handling memcached IO", e);
        }
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                this.conn.handleIO();
            }
            catch (IOException e) {
                this.logRunException(e);
            }
            catch (CancelledKeyException e) {
                this.logRunException(e);
            }
            catch (ClosedSelectorException e) {
                this.logRunException(e);
            }
            catch (IllegalStateException e) {
                this.logRunException(e);
            }
            catch (ConcurrentModificationException e) {
                this.logRunException(e);
            }
        }
        this.getLogger().info("Shut down memcached client");
    }

    @Override
    public void shutdown() {
        this.shutdown(-1L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean shutdown(long timeout, TimeUnit unit) {
        if (this.shuttingDown) {
            this.getLogger().info("Suppressing duplicate attempt to shut down");
            return false;
        }
        this.shuttingDown = true;
        String baseName = this.getName();
        this.setName(baseName + " - SHUTTING DOWN");
        boolean rv = false;
        try {
            if (timeout > 0L) {
                this.setName(baseName + " - SHUTTING DOWN (waiting)");
                rv = this.waitForQueues(timeout, unit);
            }
        }
        finally {
            try {
                this.setName(baseName + " - SHUTTING DOWN (telling client)");
                this.running = false;
                this.conn.shutdown();
                this.setName(baseName + " - SHUTTING DOWN (informed client)");
                this.tcService.shutdown();
            }
            catch (IOException e) {
                this.getLogger().warn((Object)"exception while shutting down", e);
            }
        }
        return rv;
    }

    @Override
    public boolean waitForQueues(long timeout, TimeUnit unit) {
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                return MemcachedClient.this.opFact.noop(new OperationCallback(){

                    @Override
                    public void complete() {
                        latch.countDown();
                    }

                    @Override
                    public void receivedStatus(OperationStatus s) {
                    }
                });
            }
        }, this.conn.getLocator().getAll(), false);
        try {
            return blatch.await(timeout, unit);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for queues", e);
        }
    }

    @Override
    public boolean addObserver(ConnectionObserver obs) {
        boolean rv = this.conn.addObserver(obs);
        if (rv) {
            for (MemcachedNode node : this.conn.getLocator().getAll()) {
                if (!node.isActive()) continue;
                obs.connectionEstablished(node.getSocketAddress(), -1);
            }
        }
        return rv;
    }

    @Override
    public boolean removeObserver(ConnectionObserver obs) {
        return this.conn.removeObserver(obs);
    }

    @Override
    public void connectionEstablished(SocketAddress sa, int reconnectCount) {
        if (this.authDescriptor != null) {
            if (this.authDescriptor.authThresholdReached()) {
                this.shutdown();
            }
            this.authMonitor.authConnection(this.conn, this.opFact, this.authDescriptor, this.findNode(sa));
        }
    }

    private MemcachedNode findNode(SocketAddress sa) {
        MemcachedNode node = null;
        for (MemcachedNode n : this.conn.getLocator().getAll()) {
            if (!n.getSocketAddress().equals(sa)) continue;
            node = n;
        }
        assert (node != null) : "Couldn't find node connected to " + sa;
        return node;
    }

    @Override
    public void connectionLost(SocketAddress sa) {
    }

    public MemcachedConnection getMemcachedConnection() {
        return this.conn;
    }

    int getAddedQueueSize() {
        return this.conn.getAddedQueueSize();
    }

    Collection<MemcachedNode> getAllNodes() {
        return this.conn.getLocator().getAll();
    }

    public LocalCacheManager getLocalCacheManager() {
        return this.localCacheManager;
    }
}

