/*
 * Decompiled with CFR 0.152.
 */
package net.jxta.impl.endpoint.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.endpoint.WireFormatMessage;
import net.jxta.endpoint.WireFormatMessageFactory;
import net.jxta.id.ID;
import net.jxta.impl.endpoint.BlockingMessenger;
import net.jxta.impl.endpoint.msgframing.MessagePackageHeader;
import net.jxta.impl.endpoint.msgframing.WelcomeMessage;
import net.jxta.impl.endpoint.tcp.TcpTransport;
import net.jxta.impl.endpoint.transportMeter.TransportBindingMeter;
import net.jxta.impl.endpoint.transportMeter.TransportMeterBuildSettings;
import net.jxta.impl.util.TimeUtils;
import net.jxta.logging.Logging;
import net.jxta.peer.PeerID;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TcpMessenger
extends BlockingMessenger
implements Runnable {
    private static final Logger LOG = Logger.getLogger(TcpMessenger.class.getName());
    private static final int MAX_WRITE_ATTEMPTS = 3;
    private final EndpointAddress srcAddress;
    private final MessageElement srcAddressElement;
    private EndpointAddress logicalDestAddress;
    private final TcpTransport tcpTransport;
    private EndpointAddress dstAddress = null;
    private EndpointAddress origAddress = null;
    private EndpointAddress fullDstAddress = null;
    private InetAddress inetAddress = null;
    private int port = 0;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private boolean closingDueToFailure = false;
    private WelcomeMessage itsWelcome = null;
    private final long createdAt = TimeUtils.timeNow();
    private long lastUsed = TimeUtils.timeNow();
    private SocketChannel socketChannel = null;
    private boolean selfDestruct = true;
    private TransportBindingMeter transportBindingMeter;
    private boolean initiator;
    private AtomicReference<readState> state = new AtomicReference<readState>(readState.WELCOME);
    private static final int MAX_LEN = 4096;
    private ByteBuffer buffer = ByteBuffer.allocate(4096);
    private MessagePackageHeader header = null;
    long receiveBeginTime = 0L;
    private final ReentrantLock writeLock = new ReentrantLock();
    private static final long NEXT_PRINT_DURATION = 120000L;
    private static volatile long maxQueue = 0L;
    private static volatile long lastMaxQueuePrinted = 0L;
    private static volatile long nextMaxQueuePrintTime = System.currentTimeMillis() + 120000L;

    TcpMessenger(SocketChannel socketChannel, TcpTransport transport) throws IOException {
        super(transport.group.getPeerGroupID(), new EndpointAddress(transport.getProtocolName(), socketChannel.socket().getInetAddress().getHostAddress() + ":" + socketChannel.socket().getPort(), null, null), true);
        this.initiator = false;
        this.socketChannel = socketChannel;
        this.tcpTransport = transport;
        this.srcAddress = transport.getPublicAddress();
        this.srcAddressElement = new StringMessageElement("EndpointSourceAddress", this.srcAddress.toString(), null);
        try {
            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
                LOG.info("Connection from " + socketChannel.socket().getInetAddress().getHostAddress() + ":" + socketChannel.socket().getPort());
            }
            Socket socket = socketChannel.socket();
            int useBufferSize = Math.max(65536, socket.getSendBufferSize());
            socket.setSendBufferSize(useBufferSize);
            this.inetAddress = socketChannel.socket().getInetAddress();
            this.port = socketChannel.socket().getPort();
            socket.setKeepAlive(true);
            socket.setSoTimeout(TcpTransport.connectionTimeOut);
            socket.setTcpNoDelay(true);
            this.fullDstAddress = this.dstAddress = new EndpointAddress(this.tcpTransport.getProtocolName(), this.inetAddress.getHostAddress() + ":" + this.port, null, null);
            this.startMessenger();
        }
        catch (IOException io) {
            if (TransportMeterBuildSettings.TRANSPORT_METERING) {
                this.transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter(null, this.dstAddress);
                if (this.transportBindingMeter != null) {
                    this.transportBindingMeter.connectionFailed(this.initiator, TimeUtils.timeNow() - this.createdAt);
                }
            }
            if (socketChannel != null) {
                socketChannel.close();
            }
            throw io;
        }
        if (TransportMeterBuildSettings.TRANSPORT_METERING) {
            this.transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter((PeerID)this.getDestinationPeerID(), this.dstAddress);
            if (this.transportBindingMeter != null) {
                this.transportBindingMeter.connectionEstablished(this.initiator, TimeUtils.timeNow() - this.createdAt);
            }
        }
        if (!this.isConnected()) {
            throw new IOException("Failed to establish connection to " + this.dstAddress);
        }
    }

    TcpMessenger(EndpointAddress destaddr, TcpTransport tcpTransport) throws IOException {
        this(destaddr, tcpTransport, true);
    }

    TcpMessenger(EndpointAddress destaddr, TcpTransport tcpTransport, boolean selfDestruct) throws IOException {
        super(tcpTransport.group.getPeerGroupID(), destaddr, selfDestruct);
        this.selfDestruct = selfDestruct;
        this.origAddress = destaddr;
        this.initiator = true;
        this.tcpTransport = tcpTransport;
        this.fullDstAddress = destaddr;
        this.dstAddress = new EndpointAddress(destaddr, null, null);
        this.srcAddress = tcpTransport.getPublicAddress();
        this.srcAddressElement = new StringMessageElement("EndpointSourceAddress", this.srcAddress.toString(), null);
        String protoAddr = destaddr.getProtocolAddress();
        int portIndex = protoAddr.lastIndexOf(":");
        if (portIndex == -1) {
            throw new IllegalArgumentException("Invalid Protocol Address (port # missing) ");
        }
        String portString = protoAddr.substring(portIndex + 1);
        try {
            this.port = Integer.valueOf(portString);
        }
        catch (NumberFormatException caught) {
            throw new IllegalArgumentException("Invalid Protocol Address (port # invalid): " + portString);
        }
        if (this.port <= 0 || this.port > 65535) {
            throw new IllegalArgumentException("Invalid port number in Protocol Address : " + this.port);
        }
        String hostString = protoAddr.substring(0, portIndex);
        this.inetAddress = InetAddress.getByName(hostString);
        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
            LOG.info("Creating new TCP Connection to : " + this.dstAddress + " / " + this.inetAddress.getHostAddress() + ":" + this.port);
        }
        try {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Connecting to " + this.inetAddress.getHostAddress() + ":" + this.port + " via " + this.tcpTransport.usingInterface.getHostAddress() + ":0");
            }
            this.socketChannel = SocketChannel.open();
            Socket socket = this.socketChannel.socket();
            InetSocketAddress bindAddress = new InetSocketAddress(this.tcpTransport.usingInterface, 0);
            socket.bind(bindAddress);
            int useBufferSize = Math.max(65536, socket.getSendBufferSize());
            socket.setSendBufferSize(useBufferSize);
            useBufferSize = Math.max(65536, socket.getReceiveBufferSize());
            socket.setReceiveBufferSize(useBufferSize);
            socket.setKeepAlive(true);
            socket.setSoTimeout(TcpTransport.connectionTimeOut);
            socket.setTcpNoDelay(true);
            InetSocketAddress connectAddress = new InetSocketAddress(this.inetAddress, this.port);
            this.socketChannel.connect(connectAddress);
            this.startMessenger();
        }
        catch (IOException io) {
            if (TransportMeterBuildSettings.TRANSPORT_METERING) {
                this.transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter(null, this.dstAddress);
                if (this.transportBindingMeter != null) {
                    this.transportBindingMeter.connectionFailed(this.initiator, TimeUtils.timeNow() - this.createdAt);
                }
            }
            if (this.socketChannel != null) {
                this.socketChannel.close();
            }
            throw io;
        }
        if (TransportMeterBuildSettings.TRANSPORT_METERING) {
            this.transportBindingMeter = this.tcpTransport.getUnicastTransportBindingMeter((PeerID)this.getDestinationPeerID(), this.dstAddress);
            if (this.transportBindingMeter != null) {
                this.transportBindingMeter.connectionEstablished(this.initiator, TimeUtils.timeNow() - this.createdAt);
            }
        }
        if (!this.isConnected()) {
            throw new IOException("Failed to establish connection to " + this.dstAddress);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void finalize() throws Throwable {
        try {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning("Messenger being finalized. closing messenger");
            }
            this.closeImpl();
            Object var2_1 = null;
        }
        catch (Throwable throwable) {
            Object var2_2 = null;
            super.finalize();
            throw throwable;
        }
        super.finalize();
    }

    @Override
    public void closeImpl() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        super.close();
        this.setLastUsed(0L);
        if (this.socketChannel != null) {
            block9: {
                this.tcpTransport.unregister(this.socketChannel);
                try {
                    this.socketChannel.close();
                }
                catch (IOException e) {
                    if (!Logging.SHOW_WARNING || !LOG.isLoggable(Level.WARNING)) break block9;
                    LOG.log(Level.WARNING, "Failed to close messenger " + this.toString(), e);
                }
            }
            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
                LOG.info((this.closingDueToFailure ? "Failure" : "Normal") + " close (open " + TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), this.createdAt) + "ms) of socket to : " + this.dstAddress + " / " + this.inetAddress.getHostAddress() + ":" + this.port);
                if (this.closingDueToFailure && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                    LOG.log(Level.FINE, "stack trace", new Throwable("stack trace"));
                }
            }
        }
        if (TransportMeterBuildSettings.TRANSPORT_METERING && this.transportBindingMeter != null) {
            if (this.closingDueToFailure) {
                this.transportBindingMeter.connectionDropped(this.initiator, TimeUtils.timeNow() - this.createdAt);
            } else {
                this.transportBindingMeter.connectionClosed(this.initiator, TimeUtils.timeNow() - this.createdAt);
            }
        }
    }

    @Override
    public boolean isClosed() {
        if (this.isConnected()) {
            return false;
        }
        super.close();
        return true;
    }

    @Override
    public boolean isIdleImpl() {
        return false;
    }

    @Override
    public EndpointAddress getLogicalDestinationImpl() {
        return this.logicalDestAddress;
    }

    @Override
    public void sendMessageBImpl(Message message, String service, String serviceParam) throws IOException {
        this.sendMessageDirect(message, service, serviceParam, false);
    }

    public void sendMessageDirect(Message message, String service, String serviceParam, boolean direct) throws IOException {
        if (this.isClosed()) {
            IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, failure.getMessage(), failure);
            }
            throw failure;
        }
        message.replaceMessageElement("jxta", this.srcAddressElement);
        EndpointAddress destAddressToUse = direct ? this.origAddress : this.getDestAddressToUse(service, serviceParam);
        StringMessageElement dstAddressElement = new StringMessageElement("EndpointDestinationAddress", destAddressToUse.toString(), null);
        message.replaceMessageElement("jxta", dstAddressElement);
        try {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Sending " + message + " to " + destAddressToUse + " on connection " + this.getDestinationAddress());
            }
            this.xmitMessage(message);
        }
        catch (IOException caught) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Message send failed for " + message, caught);
            }
            this.closeImpl();
            throw caught;
        }
    }

    private void startMessenger() throws IOException {
        this.socketChannel.configureBlocking(true);
        WelcomeMessage myWelcome = new WelcomeMessage(this.fullDstAddress, this.tcpTransport.getPublicAddress(), this.tcpTransport.group.getPeerID(), false);
        long written = this.write(new ByteBuffer[]{myWelcome.getByteBuffer()});
        this.tcpTransport.incrementBytesSent(written);
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("welcome message sent");
        }
        while (this.state.get() == readState.WELCOME) {
            if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), this.createdAt) > (long)TcpTransport.connectionTimeOut) {
                throw new SocketTimeoutException("Failed to receive remote welcome message before timeout.");
            }
            this.read();
            this.processBuffer();
        }
        if (!this.closed.get()) {
            this.socketChannel.configureBlocking(false);
            this.tcpTransport.register(this.socketChannel, this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void xmitMessage(Message msg) throws IOException {
        if (this.closed.get()) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning("Connection was closed to : " + this.dstAddress);
            }
            throw new IOException("Connection was closed to : " + this.dstAddress);
        }
        long sendBeginTime = TimeUtils.timeNow();
        long size = 0L;
        try {
            long written;
            WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null);
            MessagePackageHeader header = new MessagePackageHeader();
            header.setContentTypeHeader(serialed.getMimeType());
            size = serialed.getByteLength();
            header.setContentLengthHeader(size);
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Sending " + msg + " (" + size + ") to " + this.dstAddress + " via " + this.inetAddress.getHostAddress() + ":" + this.port);
            }
            ArrayList<ByteBuffer> partBuffers = new ArrayList<ByteBuffer>();
            partBuffers.add(header.getByteBuffer());
            partBuffers.addAll(Arrays.asList(serialed.getByteBuffers()));
            this.writeLock.lock();
            try {
                written = this.write(partBuffers.toArray(new ByteBuffer[partBuffers.size()]));
                Object var12_12 = null;
                this.writeLock.unlock();
            }
            catch (Throwable throwable) {
                Object var12_13 = null;
                this.writeLock.unlock();
                throw throwable;
            }
            if (TransportMeterBuildSettings.TRANSPORT_METERING && this.transportBindingMeter != null) {
                this.transportBindingMeter.messageSent(this.initiator, msg, TimeUtils.timeNow() - sendBeginTime, written);
            }
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("Sent {0} bytes {1} successfully via {2}:{3}", written, msg, this.inetAddress.getHostAddress(), this.port));
            }
            this.tcpTransport.incrementBytesSent(written);
            this.tcpTransport.incrementMessagesSent();
            this.setLastUsed(TimeUtils.timeNow());
        }
        catch (SocketTimeoutException failed) {
            SocketTimeoutException failure = new SocketTimeoutException("Failed sending " + msg + " to : " + this.inetAddress.getHostAddress() + ":" + this.port);
            failure.initCause(failed);
            throw failure;
        }
        catch (IOException failed) {
            if (TransportMeterBuildSettings.TRANSPORT_METERING && this.transportBindingMeter != null) {
                this.transportBindingMeter.sendFailure(this.initiator, msg, TimeUtils.timeNow() - sendBeginTime, size);
            }
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Message send failed for " + this.inetAddress.getHostAddress() + ":" + this.port, failed);
            }
            this.closingDueToFailure = true;
            this.close();
            IOException failure = new IOException("Failed sending " + msg + " to : " + this.inetAddress.getHostAddress() + ":" + this.port);
            failure.initCause(failed);
            throw failure;
        }
    }

    private long write(ByteBuffer[] byteBuffers) throws IOException {
        long nBytes = 0L;
        for (ByteBuffer byteBuffer : byteBuffers) {
            nBytes += this.write(byteBuffer);
        }
        return nBytes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private long write(ByteBuffer byteBuffer) throws IOException {
        long bytesToWrite = byteBuffer.remaining();
        if (bytesToWrite == 0L) {
            return 0L;
        }
        long bytesWritten = 0L;
        Selector writeSelector = null;
        SelectionKey wKey = null;
        int attempts = 1;
        try {
            block15: {
                while (true) {
                    block17: {
                        block16: {
                            long wroteBytes = this.socketChannel.write(byteBuffer);
                            bytesWritten += wroteBytes;
                            if (wroteBytes < 0L) {
                                throw new EOFException();
                            }
                            if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
                                LOG.finer(MessageFormat.format("Wrote {0} bytes", wroteBytes));
                            }
                            if (wroteBytes != 0L) continue;
                            if (bytesWritten == bytesToWrite) break block15;
                            if (++attempts > 3) {
                                throw new IOException(MessageFormat.format("Max write attempts ({0}) exceeded ({1})", attempts, 3));
                            }
                            if (writeSelector != null) break block16;
                            try {
                                writeSelector = this.tcpTransport.getSelector();
                            }
                            catch (InterruptedException woken) {
                                InterruptedIOException incompleteIO = new InterruptedIOException("Interrupted while acquiring write selector.");
                                incompleteIO.initCause(woken);
                                incompleteIO.bytesTransferred = (int)Math.min(bytesWritten, Integer.MAX_VALUE);
                                throw incompleteIO;
                            }
                            if (writeSelector == null) break block17;
                            wKey = this.socketChannel.register(writeSelector, 4);
                        }
                        long time = TimeUtils.timeNow();
                        int ready = writeSelector.select(TcpTransport.connectionTimeOut);
                        if (ready == 0 && TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), time) > (long)TcpTransport.connectionTimeOut) {
                            throw new SocketTimeoutException("Timeout during socket write");
                        }
                        --attempts;
                    }
                    if (attempts > 3) break;
                }
            }
            Object var15_12 = null;
            if (wKey != null) {
                wKey.cancel();
                wKey = null;
            }
            if (writeSelector == null) return bytesWritten;
        }
        catch (Throwable throwable) {
            Object var15_13 = null;
            if (wKey != null) {
                wKey.cancel();
                wKey = null;
            }
            if (writeSelector == null) throw throwable;
            writeSelector.selectNow();
            this.tcpTransport.returnSelector(writeSelector);
            throw throwable;
        }
        writeSelector.selectNow();
        this.tcpTransport.returnSelector(writeSelector);
        return bytesWritten;
    }

    private boolean processWelcome(ByteBuffer buffer) {
        try {
            if (this.itsWelcome == null) {
                this.itsWelcome = new WelcomeMessage();
            }
            if (!this.itsWelcome.read(buffer)) {
                return false;
            }
            this.dstAddress = this.itsWelcome.getPublicAddress();
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Creating a logical address from : " + this.itsWelcome.getWelcomeString());
            }
            this.fullDstAddress = this.dstAddress;
            this.logicalDestAddress = new EndpointAddress("jxta", this.itsWelcome.getPeerID().getUniqueValue().toString(), null, null);
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Hello from " + this.itsWelcome.getPublicAddress() + " [" + this.itsWelcome.getPeerID() + "] ");
            }
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Registering Messenger from " + this.socketChannel.socket().getInetAddress().getHostAddress() + ":" + this.socketChannel.socket().getPort());
            }
            try {
                this.tcpTransport.messengerReadyEvent(this, this.getConnectionAddress());
            }
            catch (Throwable all) {
                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
                    LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
                }
                IOException failure = new IOException("Failure announcing messenger.");
                failure.initCause(all);
                throw failure;
            }
        }
        catch (IOException e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Error while parsing the welcome message", e);
            }
            this.closeImpl();
            return false;
        }
        return true;
    }

    private boolean processHeader(ByteBuffer buffer) {
        block9: {
            if (null == this.header) {
                this.header = new MessagePackageHeader();
            }
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("{0} Processing message package header, buffer stats:{1}", Thread.currentThread(), buffer.toString()));
            }
            try {
                if (!this.header.readHeader(buffer)) {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine(MessageFormat.format("{0} maintaining current state at header, buffer stats :{1}", Thread.currentThread(), buffer.toString()));
                    }
                    return false;
                }
            }
            catch (IOException e) {
                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                    LOG.log(Level.FINE, "Error while parsing the message header", e);
                }
                if (this.socketChannel.isConnected()) break block9;
                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                    LOG.warning("SocketChannel closed. Closing the messenger");
                }
                this.closeImpl();
            }
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine(MessageFormat.format("{0} setting current state to body, Buffer stats :{1}, remaining elements {2}:", Thread.currentThread(), buffer.toString(), buffer.remaining()));
        }
        return true;
    }

    private Message processMessage(ByteBuffer buffer, MessagePackageHeader header) throws IOException {
        MimeMediaType msgMime = header.getContentTypeHeader();
        return WireFormatMessageFactory.fromBuffer(buffer, msgMime, null);
    }

    @Override
    public void run() {
        block6: {
            try {
                while (this.read()) {
                    List<Message> msgs = this.processBuffer();
                    for (Message msg : msgs) {
                        this.tcpTransport.executor.execute(new MessageProcessor(msg));
                        if (!Logging.SHOW_INFO || !LOG.isLoggable(Level.INFO)) continue;
                        int queuesize = ((ThreadPoolExecutor)this.tcpTransport.executor).getQueue().size();
                        if ((long)queuesize > maxQueue) {
                            maxQueue = queuesize;
                        }
                        if (maxQueue <= lastMaxQueuePrinted || System.currentTimeMillis() <= nextMaxQueuePrintTime) continue;
                        lastMaxQueuePrinted = maxQueue;
                        nextMaxQueuePrintTime = System.currentTimeMillis() + 120000L;
                        LOG.info("TcpMessenger executor.getQueue().maxSize=" + lastMaxQueuePrinted);
                    }
                }
                if (this.socketChannel != null) {
                    this.tcpTransport.register(this.socketChannel, this);
                }
            }
            catch (Throwable all) {
                if (!Logging.SHOW_SEVERE) break block6;
                LOG.log(Level.SEVERE, "Uncaught Throwable", all);
            }
        }
    }

    private boolean read() {
        if (this.closed.get() || this.socketChannel == null) {
            return false;
        }
        if (!this.socketChannel.isConnected()) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning("SocketChannel is not connected. closing connection");
            }
            this.closeImpl();
            return false;
        }
        try {
            int read;
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("{0} State before read(): {1}, buffer stats : {2}, remaining :{3}", new Object[]{Thread.currentThread(), this.state.get(), this.buffer.toString(), this.buffer.remaining()}));
            }
            if ((read = this.socketChannel.read(this.buffer)) < 0) {
                if (!this.socketChannel.isConnected() || read < 0) {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine(MessageFormat.format("{0} Closing due to EOF", Thread.currentThread()));
                    }
                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                        LOG.warning("SocketChannel is not connected. closing connection");
                    }
                    this.closeImpl();
                }
                return false;
            }
            if (read == 0) {
                return false;
            }
            this.tcpTransport.incrementBytesReceived(read);
            this.buffer.flip();
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("{0} SocketChannel.read() == {1} bytes. Buffer stats:{2}, remaining {3}", Thread.currentThread(), read, this.buffer.toString(), this.buffer.remaining()));
            }
            return true;
        }
        catch (ClosedChannelException e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Channel closed while reading data", e);
            }
            this.closeImpl();
            return false;
        }
        catch (InterruptedIOException woken) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning(MessageFormat.format("tcp receive - interrupted : read() {0} {1}:{2}", woken.bytesTransferred, this.inetAddress.getHostAddress(), this.port));
            }
        }
        catch (IOException ioe) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "IOException occured while reading data", ioe);
            }
            this.closeImpl();
            return false;
        }
        catch (Throwable e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, MessageFormat.format("tcp receive - Error on connection {0}:{1}", this.inetAddress.getHostAddress(), this.port), e);
            }
            this.closingDueToFailure = true;
            this.closeImpl();
            return false;
        }
        return (this.socketChannel.validOps() & 1) == 1;
    }

    public List<Message> processBuffer() {
        ArrayList<Message> msgs = new ArrayList<Message>();
        boolean done = false;
        while (!done) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("{0} processBuffer({1}). Buffer stats:{2}, elements remaining {3}", Thread.currentThread(), this.state.getClass(), this.buffer.toString(), this.buffer.remaining()));
            }
            switch (this.state.get()) {
                case WELCOME: {
                    boolean wseen = this.processWelcome(this.buffer);
                    if (wseen) {
                        this.state.set(readState.HEADER);
                    }
                    done = true;
                    break;
                }
                case HEADER: {
                    boolean hseen = this.processHeader(this.buffer);
                    if (!hseen) {
                        done = true;
                        break;
                    }
                    this.receiveBeginTime = TimeUtils.timeNow();
                    if (this.header.getContentLengthHeader() > (long)this.buffer.capacity()) {
                        ByteBuffer src = this.buffer;
                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                            LOG.fine(MessageFormat.format("{0} Reallocating a new buffer of size {1} to replace :{2}", Thread.currentThread(), this.header.getContentLengthHeader(), this.buffer.toString()));
                        }
                        this.buffer = ByteBuffer.allocate((int)this.header.getContentLengthHeader());
                        this.buffer.put(src);
                        this.buffer.flip();
                    }
                    this.state.set(readState.BODY);
                }
                case BODY: {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine(MessageFormat.format(" {0} Proccessing Message Body. expecting {1}, {2} elements remaining {3}", Thread.currentThread(), this.header.getContentLengthHeader(), this.buffer.toString(), this.buffer.remaining()));
                    }
                    if (this.buffer.remaining() >= (int)this.header.getContentLengthHeader()) {
                        Message msg;
                        try {
                            msg = this.processMessage(this.buffer, this.header);
                        }
                        catch (IOException io) {
                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                                LOG.log(Level.WARNING, "Failed to parse a message from buffer. closing connection", io);
                            }
                            this.closeImpl();
                            done = true;
                            break;
                        }
                        if (TransportMeterBuildSettings.TRANSPORT_METERING && this.transportBindingMeter != null) {
                            this.transportBindingMeter.messageReceived(this.initiator, msg, TimeUtils.timeNow() - this.receiveBeginTime, this.header.getContentLengthHeader());
                        }
                        this.tcpTransport.incrementMessagesReceived();
                        this.setLastUsed(TimeUtils.timeNow());
                        this.state.set(readState.HEADER);
                        this.header = null;
                        msgs.add(msg);
                        break;
                    }
                    done = true;
                }
            }
        }
        this.buffer.compact();
        return msgs;
    }

    private boolean isConnected() {
        return !this.closed.get();
    }

    private long getLastUsed() {
        return !this.selfDestruct ? System.currentTimeMillis() : this.lastUsed;
    }

    private void setLastUsed(long time) {
        this.lastUsed = time;
    }

    TransportBindingMeter getTransportBindingMeter() {
        return this.transportBindingMeter;
    }

    private EndpointAddress getConnectionAddress() {
        return this.itsWelcome.getDestinationAddress();
    }

    private ID getDestinationPeerID() {
        return this.itsWelcome.getPeerID();
    }

    private class MessageProcessor
    implements Runnable {
        private Message msg;

        MessageProcessor(Message msg) {
            this.msg = msg;
        }

        public void run() {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine(MessageFormat.format("{0} calling EndpointService.demux({1})", Thread.currentThread(), this.msg, TcpMessenger.this.inetAddress.getHostAddress(), TcpMessenger.this.port));
            }
            ((TcpMessenger)TcpMessenger.this).tcpTransport.endpoint.demux(this.msg);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static enum readState {
        WELCOME,
        HEADER,
        BODY;

    }
}

