/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(value="one")
public class PerChannelBookieClient
extends SimpleChannelHandler
implements ChannelPipelineFactory {
    static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5L;
    public static final int MAX_FRAME_LENGTH = 0x200000;
    InetSocketAddress addr;
    AtomicLong totalBytesOutstanding;
    ClientSocketChannelFactory channelFactory;
    OrderedSafeExecutor executor;
    ScheduledExecutorService timeoutExecutor;
    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap();
    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap();
    Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
    volatile Channel channel = null;
    volatile ConnectionState state;
    private final ClientConfiguration conf;

    private void errorOutTimedOutEntries() {
        int numAdd = 0;
        int numRead = 0;
        int total = 0;
        try {
            for (CompletionKey key : this.addCompletions.keySet()) {
                ++total;
                if (!key.shouldTimeout(this.conf.getAddEntryTimeout() * 1000)) continue;
                this.errorOutAddKey(key);
                ++numAdd;
            }
            for (CompletionKey key : this.readCompletions.keySet()) {
                ++total;
                if (!key.shouldTimeout(this.conf.getReadEntryTimeout() * 1000)) continue;
                this.errorOutReadKey(key);
                ++numRead;
            }
        }
        catch (Throwable t) {
            LOG.error("Caught RuntimeException while erroring out timed out entries : ", t);
        }
        if (numAdd + numRead > 0) {
            LOG.info("Timeout task iterated through a total of {} keys.", (Object)total);
            LOG.info("Timeout Task errored out {} add entry requests.", (Object)numAdd);
            LOG.info("Timeout Task errored out {} read entry requests.", (Object)numRead);
        }
    }

    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor) {
        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, timeoutExecutor);
    }

    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding, null);
    }

    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding, ScheduledExecutorService timeoutExecutor) {
        this.conf = conf;
        this.addr = addr;
        this.executor = executor;
        this.totalBytesOutstanding = totalBytesOutstanding;
        this.channelFactory = channelFactory;
        this.state = ConnectionState.DISCONNECTED;
        this.timeoutExecutor = timeoutExecutor;
        if (null != this.timeoutExecutor) {
            this.timeoutExecutor.scheduleWithFixedDelay(new TimeoutTask(), conf.getTimeoutTaskIntervalMillis(), conf.getTimeoutTaskIntervalMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void connect() {
        LOG.info("Connecting to bookie: {}", (Object)this.addr);
        ClientBootstrap bootstrap = new ClientBootstrap((ChannelFactory)this.channelFactory);
        bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        bootstrap.setOption("tcpNoDelay", (Object)this.conf.getClientTcpNoDelay());
        bootstrap.setOption("keepAlive", (Object)true);
        ChannelFuture future = bootstrap.connect((SocketAddress)this.addr);
        future.addListener(new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void operationComplete(ChannelFuture future) throws Exception {
                Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> oldPendingOps;
                int rc;
                LOG.debug("Channel connected ({}) {}", (Object)future.isSuccess(), (Object)future.getChannel());
                PerChannelBookieClient perChannelBookieClient = PerChannelBookieClient.this;
                synchronized (perChannelBookieClient) {
                    if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTING) {
                        LOG.info("Successfully connected to bookie: {}", (Object)future.getChannel());
                        rc = 0;
                        PerChannelBookieClient.this.channel = future.getChannel();
                        PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    } else if (future.isSuccess() && (PerChannelBookieClient.this.state == ConnectionState.CLOSED || PerChannelBookieClient.this.state == ConnectionState.DISCONNECTED)) {
                        LOG.warn("Closed before connection completed, clean up: {}, current state {}", (Object)future.getChannel(), (Object)PerChannelBookieClient.this.state);
                        PerChannelBookieClient.this.closeChannel(future.getChannel());
                        rc = -8;
                        PerChannelBookieClient.this.channel = null;
                    } else {
                        if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTED) {
                            LOG.debug("Already connected with another channel({}), so close the new channel({})", (Object)PerChannelBookieClient.this.channel, (Object)future.getChannel());
                            PerChannelBookieClient.this.closeChannel(future.getChannel());
                            return;
                        }
                        LOG.error("Could not connect to bookie: {}/{}, current state {} : ", new Object[]{future.getChannel(), PerChannelBookieClient.this.addr, PerChannelBookieClient.this.state, future.getCause()});
                        rc = -8;
                        PerChannelBookieClient.this.closeChannel(future.getChannel());
                        PerChannelBookieClient.this.channel = null;
                        if (PerChannelBookieClient.this.state != ConnectionState.CLOSED) {
                            PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                        }
                    }
                    oldPendingOps = PerChannelBookieClient.this.pendingOps;
                    PerChannelBookieClient.this.pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
                }
                for (BookkeeperInternalCallbacks.GenericCallback genericCallback : oldPendingOps) {
                    genericCallback.operationComplete(rc, null);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<Void> op) {
        boolean completeOpNow = false;
        int opRc = 0;
        if (this.channel != null && this.state == ConnectionState.CONNECTED) {
            completeOpNow = true;
        } else {
            PerChannelBookieClient perChannelBookieClient = this;
            synchronized (perChannelBookieClient) {
                if (this.channel != null && this.state == ConnectionState.CONNECTED) {
                    completeOpNow = true;
                    opRc = 0;
                } else if (this.state == ConnectionState.CLOSED) {
                    completeOpNow = true;
                    opRc = -8;
                } else {
                    this.pendingOps.add(op);
                    if (this.state == ConnectionState.CONNECTING) {
                        return;
                    }
                    this.state = ConnectionState.CONNECTING;
                }
            }
            if (!completeOpNow) {
                this.connect();
            }
        }
        if (completeOpNow) {
            op.operationComplete(opRc, null);
        }
    }

    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, int options) {
        final int entrySize = toSend.readableBytes();
        final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
        this.addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
        int totalHeaderSize = 28;
        try {
            ChannelBuffer header = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
            header.writeInt(totalHeaderSize - 4 + entrySize);
            header.writeInt(new BookieProtocol.PacketHeader(2, 1, (short)options).toInt());
            header.writeBytes(masterKey, 0, 20);
            ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer((ChannelBuffer[])new ChannelBuffer[]{header, toSend});
            final Channel c = this.channel;
            if (c == null) {
                this.errorOutReadKey(completionKey);
                return;
            }
            ChannelFuture future = c.write((Object)wrappedBuffer);
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + c.getRemoteAddress() + " entry length: " + entrySize);
                        }
                    } else {
                        if (!(future.getCause() instanceof ClosedChannelException)) {
                            LOG.warn("Writing addEntry(lid={}, eid={}) to channel {} failed : ", new Object[]{ledgerId, entryId, c, future.getCause()});
                        }
                        PerChannelBookieClient.this.errorOutAddKey(completionKey);
                    }
                }
            });
        }
        catch (Throwable e) {
            LOG.warn("Add entry operation failed", e);
            this.errorOutAddKey(completionKey);
        }
    }

    public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey, final long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
        final CompletionKey key = new CompletionKey(ledgerId, entryId);
        this.readCompletions.put(key, new ReadCompletion(cb, ctx));
        int totalHeaderSize = 44;
        ChannelBuffer tmpEntry = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
        tmpEntry.writeInt(totalHeaderSize - 4);
        tmpEntry.writeInt(new BookieProtocol.PacketHeader(2, 2, 1).toInt());
        tmpEntry.writeLong(ledgerId);
        tmpEntry.writeLong(entryId);
        tmpEntry.writeBytes(masterKey, 0, 20);
        final Channel c = this.channel;
        if (c == null) {
            this.errorOutReadKey(key);
            return;
        }
        ChannelFuture future = c.write((Object)tmpEntry);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + c.getRemoteAddress());
                    }
                } else {
                    if (!(future.getCause() instanceof ClosedChannelException)) {
                        LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ", new Object[]{ledgerId, entryId, c, future.getCause()});
                    }
                    PerChannelBookieClient.this.errorOutReadKey(key);
                }
            }
        });
    }

    public void readEntry(final long ledgerId, final long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
        final CompletionKey key = new CompletionKey(ledgerId, entryId);
        this.readCompletions.put(key, new ReadCompletion(cb, ctx));
        int totalHeaderSize = 24;
        try {
            ChannelBuffer tmpEntry = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
            tmpEntry.writeInt(totalHeaderSize - 4);
            tmpEntry.writeInt(new BookieProtocol.PacketHeader(2, 2, 0).toInt());
            tmpEntry.writeLong(ledgerId);
            tmpEntry.writeLong(entryId);
            final Channel c = this.channel;
            if (c == null) {
                this.errorOutReadKey(key);
                return;
            }
            ChannelFuture future = c.write((Object)tmpEntry);
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + c.getRemoteAddress());
                        }
                    } else {
                        if (!(future.getCause() instanceof ClosedChannelException)) {
                            LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ", new Object[]{ledgerId, entryId, c, future.getCause()});
                        }
                        PerChannelBookieClient.this.errorOutReadKey(key);
                    }
                }
            });
        }
        catch (Throwable e) {
            LOG.warn("Read entry operation failed", e);
            this.errorOutReadKey(key);
        }
    }

    public void disconnect() {
        this.closeInternal(false);
    }

    public void close() {
        this.closeInternal(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal(boolean permanent) {
        Channel toClose = null;
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (permanent) {
                this.state = ConnectionState.CLOSED;
            } else if (this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
            toClose = this.channel;
            this.channel = null;
        }
        if (toClose != null) {
            this.closeChannel(toClose).awaitUninterruptibly();
        }
    }

    private ChannelFuture closeChannel(Channel c) {
        LOG.debug("Closing channel {}", (Object)c);
        ReadTimeoutHandler timeout = (ReadTimeoutHandler)c.getPipeline().get(ReadTimeoutHandler.class);
        if (timeout != null) {
            timeout.releaseExternalResources();
        }
        return c.close();
    }

    void errorOutReadKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                ReadCompletion readCompletion = PerChannelBookieClient.this.readCompletions.remove(key);
                String bAddress = "null";
                Channel c = PerChannelBookieClient.this.channel;
                if (c != null) {
                    bAddress = c.getRemoteAddress().toString();
                }
                if (readCompletion != null) {
                    LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}", new Object[]{key.entryId, key.ledgerId, bAddress});
                    readCompletion.cb.readEntryComplete(-8, key.ledgerId, key.entryId, null, readCompletion.ctx);
                }
            }
        });
    }

    void errorOutAddKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                AddCompletion addCompletion = PerChannelBookieClient.this.addCompletions.remove(key);
                if (addCompletion != null) {
                    String bAddress = "null";
                    Channel c = PerChannelBookieClient.this.channel;
                    if (c != null) {
                        bAddress = c.getRemoteAddress().toString();
                    }
                    LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}", new Object[]{key.entryId, key.ledgerId, bAddress});
                    addCompletion.cb.writeComplete(-8, key.ledgerId, key.entryId, PerChannelBookieClient.this.addr, addCompletion.ctx);
                    LOG.debug("Invoked callback method: {}", (Object)key.entryId);
                }
            }
        });
    }

    void errorOutOutstandingEntries() {
        for (CompletionKey key : this.addCompletions.keySet()) {
            this.errorOutAddKey(key);
        }
        for (CompletionKey key : this.readCompletions.keySet()) {
            this.errorOutReadKey(key);
        }
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("lengthbasedframedecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x200000, 0, 4, 0, 4));
        pipeline.addLast("mainhandler", (ChannelHandler)this);
        return pipeline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel c = ctx.getChannel();
        LOG.info("Disconnected from bookie channel {}", (Object)c);
        if (c != null) {
            this.closeChannel(c);
        }
        this.errorOutOutstandingEntries();
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (this.channel == c && this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        Throwable t = e.getCause();
        if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
            LOG.error("Corrupted frame received from bookie: {}", (Object)e.getChannel().getRemoteAddress());
            return;
        }
        if (t instanceof IOException) {
            return;
        }
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (this.state == ConnectionState.CLOSED) {
                LOG.debug("Unexpected exception caught by bookie client channel handler, but the client is closed, so it isn't important", t);
            } else {
                LOG.error("Unexpected exception caught by bookie client channel handler", t);
            }
        }
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        long entryId;
        long ledgerId;
        int rc;
        BookieProtocol.PacketHeader header;
        if (!(e.getMessage() instanceof ChannelBuffer)) {
            ctx.sendUpstream((ChannelEvent)e);
            return;
        }
        final ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
        try {
            header = BookieProtocol.PacketHeader.fromInt(buffer.readInt());
            rc = buffer.readInt();
            ledgerId = buffer.readLong();
            entryId = buffer.readLong();
        }
        catch (IndexOutOfBoundsException ex) {
            LOG.error("Unparseable response from bookie: " + this.addr, (Throwable)ex);
            return;
        }
        this.executor.submitOrdered(ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                switch (header.getOpCode()) {
                    case 1: {
                        PerChannelBookieClient.this.handleAddResponse(ledgerId, entryId, rc);
                        break;
                    }
                    case 2: {
                        PerChannelBookieClient.this.handleReadResponse(ledgerId, entryId, rc, buffer);
                        break;
                    }
                    default: {
                        LOG.error("Unexpected response, type: " + header.getOpCode() + " received from bookie: " + PerChannelBookieClient.this.addr + " , ignoring");
                    }
                }
            }
        });
    }

    void handleAddResponse(long ledgerId, long entryId, int rc) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for add request from bookie: {} for ledger: {} entry: {} rc: {}", new Object[]{this.addr, ledgerId, entryId, rc});
        }
        switch (rc) {
            case 0: {
                rc = 0;
                break;
            }
            case 103: {
                rc = -16;
                break;
            }
            case 104: {
                rc = -101;
                break;
            }
            case 102: {
                rc = -102;
                break;
            }
            case 105: {
                rc = -104;
                break;
            }
            default: {
                LOG.warn("Add for ledger: {}, entry: {} failed on bookie: {} with unknown code: {}", new Object[]{ledgerId, entryId, this.addr, rc});
                rc = -12;
            }
        }
        AddCompletion ac = this.addCompletions.remove(new CompletionKey(ledgerId, entryId));
        if (ac == null) {
            LOG.debug("Unexpected add response received from bookie: {} for ledger: {}, entry: {}, ignoring", new Object[]{this.addr, ledgerId, entryId});
            return;
        }
        ac.cb.writeComplete(rc, ledgerId, entryId, this.addr, ac.ctx);
    }

    void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for read request from bookie: {} for ledger: {} entry: {} rc: {} entry length: {}", new Object[]{this.addr, ledgerId, entryId, rc, buffer.readableBytes()});
        }
        if (rc == 0) {
            rc = 0;
        } else if (rc == 2 || rc == 1) {
            rc = -13;
        } else if (rc == 103) {
            rc = -16;
        } else if (rc == 102) {
            rc = -102;
        } else {
            LOG.warn("Read for ledger: {}, entry: {} failed on bookie: {} with unknown code: {}", new Object[]{ledgerId, entryId, this.addr, rc});
            rc = -1;
        }
        CompletionKey key = new CompletionKey(ledgerId, entryId);
        ReadCompletion readCompletion = this.readCompletions.remove(key);
        if (readCompletion == null) {
            readCompletion = this.readCompletions.remove(new CompletionKey(ledgerId, -1L));
        }
        if (readCompletion == null) {
            LOG.debug("Unexpected read response received from bookie: {} for ledger: {}, entry: {} , ignoring", new Object[]{this.addr, ledgerId, entryId});
            return;
        }
        readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
    }

    CompletionKey newCompletionKey(long ledgerId, long entryId) {
        return new CompletionKey(ledgerId, entryId);
    }

    static class CompletionKey {
        long ledgerId;
        long entryId;
        final long requestAt;

        CompletionKey(long ledgerId, long entryId) {
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.requestAt = MathUtils.nowInNano();
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof CompletionKey) || obj == null) {
                return false;
            }
            CompletionKey that = (CompletionKey)obj;
            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
        }

        public int hashCode() {
            return (int)this.ledgerId << 16 ^ (int)this.entryId;
        }

        public String toString() {
            return String.format("LedgerEntry(%d, %d)", this.ledgerId, this.entryId);
        }

        public boolean shouldTimeout(long timeout) {
            return this.elapsedTime() >= timeout;
        }

        public long elapsedTime() {
            return MathUtils.elapsedMSec(this.requestAt);
        }
    }

    static class AddCompletion {
        final BookkeeperInternalCallbacks.WriteCallback cb;
        final Object ctx;

        public AddCompletion(BookkeeperInternalCallbacks.WriteCallback cb, long size, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    static class ReadCompletion {
        final BookkeeperInternalCallbacks.ReadEntryCallback cb;
        final Object ctx;

        public ReadCompletion(BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    static enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        CLOSED;

    }

    private class TimeoutTask
    implements Runnable {
        private TimeoutTask() {
        }

        @Override
        public void run() {
            PerChannelBookieClient.this.errorOutTimedOutEntries();
        }
    }
}

