/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached.protocol;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import net.spy.memcached.ArcusReplNodeAddress;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.MemcachedReplicaGroup;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

public abstract class TCPMemcachedNodeImpl
extends SpyObject
implements MemcachedNode {
    private String nodeName;
    private final SocketAddress socketAddress;
    private final ByteBuffer rbuf;
    private final ByteBuffer wbuf;
    protected final BlockingQueue<Operation> writeQ;
    private final BlockingQueue<Operation> readQ;
    private final BlockingQueue<Operation> inputQueue;
    private final long opQueueMaxBlockTime;
    private volatile int reconnectAttempt = 1;
    private SocketChannel channel;
    private int toWrite = 0;
    protected Operation optimizedOp = null;
    private volatile SelectionKey sk = null;
    private boolean shouldAuth = false;
    private CountDownLatch authLatch;
    private ArrayList<Operation> reconnectBlocked;
    private String version = null;
    private boolean isAsciiProtocol = true;
    private boolean enabledMGetOp = false;
    private boolean enabledSpaceSeparate = false;
    private final AtomicInteger continuousTimeout = new AtomicInteger(0);
    private boolean toRatioEnabled = false;
    private int[] toCountArray;
    private static final int MAX_TOCOUNT = 100;
    private int toCountIdx;
    private int toRatioMax;
    private int toRatioNow;
    private Lock toRatioLock = new ReentrantLock();
    private volatile long addOpCount;
    private MemcachedReplicaGroup replicaGroup;

    private void resetTimeoutRatioCount() {
        if (this.toRatioEnabled) {
            this.toRatioLock.lock();
            for (int i = 0; i < 100; ++i) {
                this.toCountArray[i] = 0;
            }
            this.toCountIdx = -1;
            this.toRatioMax = 0;
            this.toRatioNow = 0;
            this.toRatioLock.unlock();
        }
    }

    private void addTimeoutRatioCount(boolean timedOut) {
        if (this.toRatioEnabled) {
            this.toRatioLock.lock();
            if (++this.toCountIdx >= 100) {
                this.toCountIdx = 0;
            }
            if (this.toCountArray[this.toCountIdx] > 0) {
                this.toRatioNow -= this.toCountArray[this.toCountIdx];
                this.toCountArray[this.toCountIdx] = 0;
            }
            if (timedOut) {
                this.toCountArray[this.toCountIdx] = 1;
                ++this.toRatioNow;
                if (this.toRatioNow > this.toRatioMax) {
                    this.toRatioMax = this.toRatioNow;
                }
            }
            this.toRatioLock.unlock();
        }
    }

    public TCPMemcachedNodeImpl(String name, SocketAddress sa, SocketChannel c, int bufSize, BlockingQueue<Operation> rq, BlockingQueue<Operation> wq, BlockingQueue<Operation> iq, long opQueueMaxBlockTime, boolean waitForAuth, boolean asciiProtocol) {
        assert (sa != null) : "No SocketAddress";
        assert (c != null) : "No SocketChannel";
        assert (bufSize > 0) : "Invalid buffer size: " + bufSize;
        assert (rq != null) : "No operation read queue";
        assert (wq != null) : "No operation write queue";
        assert (iq != null) : "No input queue";
        this.socketAddress = sa instanceof ArcusReplNodeAddress ? new ArcusReplNodeAddress((ArcusReplNodeAddress)sa) : sa;
        this.nodeName = name + " " + sa;
        this.setChannel(c);
        this.rbuf = ByteBuffer.allocate(bufSize);
        this.wbuf = ByteBuffer.allocate(bufSize);
        this.getWbuf().clear();
        this.readQ = rq;
        this.writeQ = wq;
        this.inputQueue = iq;
        this.addOpCount = 0L;
        this.opQueueMaxBlockTime = opQueueMaxBlockTime;
        this.shouldAuth = waitForAuth;
        this.isAsciiProtocol = asciiProtocol;
        this.setupForAuth("init authentication");
    }

    @Override
    public final void copyInputQueue() {
        ArrayList tmp = new ArrayList();
        this.inputQueue.drainTo(tmp, this.writeQ.remainingCapacity());
        this.writeQ.addAll(tmp);
    }

    @Override
    public Collection<Operation> destroyInputQueue() {
        ArrayList<Operation> rv = new ArrayList<Operation>();
        this.inputQueue.drainTo(rv);
        return rv;
    }

    private Collection<Operation> destroyQueue(BlockingQueue<Operation> queue, boolean resend) {
        ArrayList<Operation> rv = new ArrayList<Operation>();
        queue.drainTo(rv);
        if (resend) {
            for (Operation o : rv) {
                if ((o.getState() == OperationState.WRITE_QUEUED || o.getState() == OperationState.WRITING) && o.getBuffer() != null) {
                    o.getBuffer().reset();
                    continue;
                }
                o.initialize();
            }
        }
        return rv;
    }

    @Override
    public Collection<Operation> destroyWriteQueue(boolean resend) {
        return this.destroyQueue(this.writeQ, resend);
    }

    @Override
    public Collection<Operation> destroyReadQueue(boolean resend) {
        return this.destroyQueue(this.readQ, resend);
    }

    @Override
    public final void setupResend(boolean cancelWrite, String cause) {
        Operation op = this.getCurrentWriteOp();
        if ((!cancelWrite && !this.shouldAuth || op == null) && op != null) {
            ByteBuffer buf = op.getBuffer();
            if (buf != null) {
                buf.reset();
            } else {
                this.getLogger().warn("No buffer for current write op, removing");
                this.removeCurrentWriteOp();
            }
        }
        while (this.hasReadOp()) {
            op = this.removeCurrentReadOp();
            if (op == this.getCurrentWriteOp()) continue;
            this.getLogger().warn("Discarding partially completed op: %s", op);
            op.cancel(cause);
        }
        while ((cancelWrite || this.shouldAuth) && this.hasWriteOp()) {
            op = this.removeCurrentWriteOp();
            this.getLogger().warn("Discarding partially completed op: %s", op);
            op.cancel(cause);
        }
        this.getWbuf().clear();
        this.getRbuf().clear();
        this.toWrite = 0;
    }

    private boolean preparePending() {
        this.copyInputQueue();
        Operation nextOp = this.getCurrentWriteOp();
        while (nextOp != null && nextOp.isCancelled()) {
            this.getLogger().info("Removing cancelled operation: %s", nextOp);
            this.removeCurrentWriteOp();
            nextOp = this.getCurrentWriteOp();
        }
        return nextOp != null;
    }

    @Override
    public final void fillWriteBuffer(boolean shouldOptimize) {
        if (this.toWrite == 0 && this.readQ.remainingCapacity() > 0) {
            this.getWbuf().clear();
            Operation o = this.getNextWritableOp();
            while (o != null && this.toWrite < this.getWbuf().capacity()) {
                assert (o.getState() == OperationState.WRITING);
                if (!this.readQ.contains(o)) {
                    this.readQ.add(o);
                }
                ByteBuffer obuf = o.getBuffer();
                assert (obuf != null) : "Didn't get a write buffer from " + o;
                int bytesToCopy = Math.min(this.getWbuf().remaining(), obuf.remaining());
                byte[] b = new byte[bytesToCopy];
                obuf.get(b);
                this.getWbuf().put(b);
                this.getLogger().debug("After copying stuff from %s: %s", o, this.getWbuf());
                if (!o.getBuffer().hasRemaining()) {
                    o.writeComplete();
                    this.transitionWriteItem();
                    this.preparePending();
                    if (shouldOptimize) {
                        this.optimize();
                    }
                    o = this.readQ.remainingCapacity() > 0 ? this.getNextWritableOp() : null;
                }
                this.toWrite += bytesToCopy;
            }
            this.getWbuf().flip();
            assert (this.toWrite <= this.getWbuf().capacity()) : "toWrite exceeded capacity: " + this;
            assert (this.toWrite == this.getWbuf().remaining()) : "Expected " + this.toWrite + " remaining, got " + this.getWbuf().remaining();
        } else {
            this.getLogger().debug("Buffer is full, skipping");
        }
    }

    @Override
    public final void transitionWriteItem() {
        Operation op = this.removeCurrentWriteOp();
        assert (op != null) : "There is no write item to transition";
        this.getLogger().debug("Finished writing %s", op);
    }

    protected abstract void optimize();

    @Override
    public final Operation getCurrentReadOp() {
        return (Operation)this.readQ.peek();
    }

    @Override
    public final Operation removeCurrentReadOp() {
        return (Operation)this.readQ.remove();
    }

    @Override
    public final Operation getCurrentWriteOp() {
        return this.optimizedOp == null ? (Operation)this.writeQ.peek() : this.optimizedOp;
    }

    private Operation getNextWritableOp() {
        Operation o = this.getCurrentWriteOp();
        while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
            if (o.isCancelled()) {
                this.getLogger().debug("Not writing cancelled op.");
                Operation cancelledOp = this.removeCurrentWriteOp();
                assert (o == cancelledOp);
            } else {
                o.writing();
                return o;
            }
            o = this.getCurrentWriteOp();
        }
        return o;
    }

    @Override
    public final Operation removeCurrentWriteOp() {
        Operation rv = this.optimizedOp;
        if (rv == null) {
            rv = (Operation)this.writeQ.remove();
        } else {
            this.optimizedOp = null;
        }
        return rv;
    }

    @Override
    public final boolean hasReadOp() {
        return !this.readQ.isEmpty();
    }

    @Override
    public final boolean hasWriteOp() {
        return this.optimizedOp != null || !this.writeQ.isEmpty();
    }

    @Override
    public final void addOp(Operation op) {
        try {
            if (!this.authLatch.await(1L, TimeUnit.SECONDS)) {
                op.cancel("authentication timeout");
                this.getLogger().warn("Operation canceled because authentication or reconnection and authentication has taken more than one second to complete.");
                this.getLogger().debug("Canceled operation %s", op.toString());
                return;
            }
            if (!this.inputQueue.offer(op, this.opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("Timed out waiting to add " + op + "(max wait=" + this.opQueueMaxBlockTime + "ms)");
            }
            ++this.addOpCount;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Interrupted while waiting to add " + op);
        }
    }

    @Override
    public final void insertOp(Operation op) {
        ArrayList<Operation> tmp = new ArrayList<Operation>(this.inputQueue.size() + 1);
        tmp.add(op);
        this.inputQueue.drainTo(tmp);
        this.inputQueue.addAll(tmp);
        ++this.addOpCount;
    }

    @Override
    public final int getSelectionOps() {
        int rv = 0;
        if (this.getChannel().isConnected()) {
            if (this.hasReadOp()) {
                rv |= 1;
            }
            if (this.toWrite > 0 || this.hasWriteOp()) {
                rv |= 4;
            }
        } else {
            rv = 8;
        }
        return rv;
    }

    @Override
    public final ByteBuffer getRbuf() {
        return this.rbuf;
    }

    @Override
    public final ByteBuffer getWbuf() {
        return this.wbuf;
    }

    @Override
    public final SocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    @Override
    public final boolean isActive() {
        return this.reconnectAttempt == 0 && this.getChannel() != null && this.getChannel().isConnected();
    }

    @Override
    public final void reconnecting() {
        ++this.reconnectAttempt;
        this.continuousTimeout.set(0);
        this.resetTimeoutRatioCount();
    }

    @Override
    public final void connected() {
        this.reconnectAttempt = 0;
        this.continuousTimeout.set(0);
        this.resetTimeoutRatioCount();
    }

    @Override
    public final int getReconnectCount() {
        return this.reconnectAttempt;
    }

    public final String toString() {
        int sops = 0;
        if (this.getSk() != null && this.getSk().isValid()) {
            sops = this.getSk().interestOps();
        }
        int rsize = this.readQ.size() + (this.optimizedOp == null ? 0 : 1);
        int wsize = this.writeQ.size();
        int isize = this.inputQueue.size();
        return "{QA name=" + this.nodeName + ", #Rops=" + rsize + ", #Wops=" + wsize + ", #iq=" + isize + ", topRop=" + this.getCurrentReadOp() + ", topWop=" + this.getCurrentWriteOp() + ", toWrite=" + this.toWrite + ", interested=" + sops + "}";
    }

    @Override
    public final void registerChannel(SocketChannel ch, SelectionKey skey) {
        this.setChannel(ch);
        this.setSk(skey);
    }

    @Override
    public final void setChannel(SocketChannel to) {
        assert (this.channel == null || !this.channel.isOpen()) : "Attempting to overwrite channel";
        this.channel = to;
    }

    @Override
    public final SocketChannel getChannel() {
        return this.channel;
    }

    @Override
    public final void setSk(SelectionKey to) {
        this.sk = to;
    }

    @Override
    public final SelectionKey getSk() {
        return this.sk;
    }

    @Override
    public final void setVersion(String vr) {
        this.version = vr;
        this.setEnableMGetOp();
        this.setEnableSpaceSeparate();
    }

    @Override
    public final String getVersion() {
        return this.version;
    }

    private final void setEnableMGetOp() {
        if (this.isAsciiProtocol) {
            StringTokenizer tokens = new StringTokenizer(this.version, ".");
            int majorVersion = Integer.parseInt(tokens.nextToken());
            int minorVersion = Integer.parseInt(tokens.nextToken());
            this.enabledMGetOp = this.version.contains("E") ? majorVersion > 0 || majorVersion == 0 && minorVersion > 6 : majorVersion > 1 || majorVersion == 1 && minorVersion > 10;
        }
    }

    private final void setEnableSpaceSeparate() {
        if (this.isAsciiProtocol) {
            StringTokenizer tokens = new StringTokenizer(this.version, ".");
            int majorVersion = Integer.parseInt(tokens.nextToken());
            int minorVersion = Integer.parseInt(tokens.nextToken());
            this.enabledSpaceSeparate = this.version.contains("E") ? majorVersion > 0 || majorVersion == 0 && minorVersion > 6 : majorVersion > 1 || majorVersion == 1 && minorVersion > 10;
        }
    }

    @Override
    public final boolean enabledMGetOp() {
        return this.enabledMGetOp;
    }

    @Override
    public final boolean enabledSpaceSeparate() {
        return this.enabledSpaceSeparate;
    }

    @Override
    public final int getBytesRemainingToWrite() {
        return this.toWrite;
    }

    @Override
    public final int writeSome() throws IOException {
        int wrote = this.channel.write(this.wbuf);
        assert (wrote >= 0) : "Wrote negative bytes?";
        this.toWrite -= wrote;
        assert (this.toWrite >= 0) : "toWrite went negative after writing " + wrote + " bytes for " + this;
        this.getLogger().debug("Wrote %d bytes", wrote);
        return wrote;
    }

    @Override
    public void setContinuousTimeout(boolean timedOut) {
        if (this.isActive()) {
            this.addTimeoutRatioCount(timedOut);
        }
        if (timedOut && this.isActive()) {
            this.continuousTimeout.incrementAndGet();
        } else {
            this.continuousTimeout.set(0);
        }
    }

    @Override
    public int getContinuousTimeout() {
        return this.continuousTimeout.get();
    }

    @Override
    public void enableTimeoutRatio() {
        this.toRatioEnabled = true;
        this.toCountArray = new int[100];
        this.resetTimeoutRatioCount();
    }

    @Override
    public int getTimeoutRatioNow() {
        int ratio = -1;
        if (this.toRatioEnabled) {
            this.toRatioLock.lock();
            ratio = this.toRatioNow;
            this.toRatioLock.unlock();
        }
        return ratio;
    }

    @Override
    public final void fixupOps() {
        SelectionKey s = this.sk;
        if (s != null && s.isValid()) {
            int iops = this.getSelectionOps();
            this.getLogger().debug("Setting interested opts to %d", iops);
            s.interestOps(iops);
        } else {
            this.getLogger().debug("Selection key is not valid.");
        }
    }

    @Override
    public final void authComplete() {
        if (this.reconnectBlocked != null && this.reconnectBlocked.size() > 0) {
            this.inputQueue.addAll(this.reconnectBlocked);
        }
        this.authLatch.countDown();
    }

    @Override
    public final void setupForAuth(String cause) {
        if (this.shouldAuth) {
            this.authLatch = new CountDownLatch(1);
            if (this.inputQueue.size() > 0) {
                this.reconnectBlocked = new ArrayList(this.inputQueue.size() + 1);
                this.inputQueue.drainTo(this.reconnectBlocked);
            }
            assert (this.inputQueue.size() == 0);
            this.setupResend(false, cause);
        } else {
            this.authLatch = new CountDownLatch(0);
        }
    }

    @Override
    public final void shutdown() throws IOException {
        if (this.channel != null) {
            this.channel.close();
            this.sk = null;
            if (this.toWrite > 0) {
                this.getLogger().warn("Shut down with %d bytes remaining to write", this.toWrite);
            }
            this.getLogger().debug("Shut down channel %s", this.channel);
        }
    }

    public int getInputQueueSize() {
        return this.inputQueue.size();
    }

    public int getWriteQueueSize() {
        return this.writeQ.size();
    }

    public int getReadQueueSize() {
        return this.readQ.size();
    }

    @Override
    public String getStatus() {
        StringBuilder sb = new StringBuilder();
        sb.append("#Tops=").append(this.addOpCount);
        sb.append(" #iq=").append(this.getInputQueueSize());
        sb.append(" #Wops=").append(this.getWriteQueueSize());
        sb.append(" #Rops=").append(this.getReadQueueSize());
        sb.append(" #CT=").append(this.getContinuousTimeout());
        sb.append(" #TR=").append(this.getTimeoutRatioNow());
        return sb.toString();
    }

    @Override
    public void setReplicaGroup(MemcachedReplicaGroup g) {
        this.replicaGroup = g;
    }

    @Override
    public MemcachedReplicaGroup getReplicaGroup() {
        return this.replicaGroup;
    }

    private BlockingQueue<Operation> getAllOperations() {
        LinkedBlockingQueue<Operation> allOp = new LinkedBlockingQueue<Operation>();
        if (this.hasReadOp()) {
            this.readQ.drainTo(allOp);
        }
        while (this.hasWriteOp()) {
            Operation op = this.removeCurrentWriteOp();
            if (!allOp.contains(op)) {
                allOp.add(op);
                continue;
            }
            this.getLogger().warn("Duplicate operation exist in " + this + " : " + op);
        }
        if (this.inputQueue.size() > 0) {
            this.inputQueue.drainTo(allOp);
        }
        return allOp;
    }

    @Override
    public int addAllOpToWriteQ(BlockingQueue<Operation> allOp) {
        int movedOpCount = 0;
        for (Operation op : allOp) {
            op.setHandlingNode(this);
            if ((op.getState() == OperationState.WRITE_QUEUED || op.getState() == OperationState.WRITING) && op.getBuffer() != null) {
                op.getBuffer().reset();
            } else {
                op.initialize();
                op.resetState();
            }
            if (this.writeQ.offer(op)) {
                op.setMoved(true);
                ++movedOpCount;
                continue;
            }
            op.cancel("by moving operations");
        }
        this.addOpCount += (long)movedOpCount;
        return movedOpCount;
    }

    @Override
    public int moveOperations(MemcachedNode toNode) {
        BlockingQueue<Operation> allOp = this.getAllOperations();
        int opCount = allOp.size();
        int movedOpCount = 0;
        if (opCount > 0) {
            movedOpCount = toNode.addAllOpToWriteQ(allOp);
            this.getLogger().info("Total %d operations have been moved to %s and %d operations have been canceled.", movedOpCount, toNode, opCount - movedOpCount);
        }
        return movedOpCount;
    }
}

