/*
 * Decompiled with CFR 0.152.
 */
package net.jxta.util;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jxta.credential.Credential;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.MimeMediaType;
import net.jxta.document.StructuredDocument;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointService;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.Messenger;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.endpoint.TextDocumentMessageElement;
import net.jxta.id.ID;
import net.jxta.impl.endpoint.tcp.TcpMessenger;
import net.jxta.impl.util.pipe.reliable.Defs;
import net.jxta.impl.util.pipe.reliable.FixedFlowControl;
import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;
import net.jxta.impl.util.pipe.reliable.ReliableInputStream;
import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;
import net.jxta.logging.Logging;
import net.jxta.membership.MembershipService;
import net.jxta.peer.PeerID;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.pipe.OutputPipeEvent;
import net.jxta.pipe.OutputPipeListener;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PeerAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.util.JxtaServerPipe;
import net.jxta.util.PipeEventListener;
import net.jxta.util.PipeStateListener;

public class JxtaBiDiPipe
implements PipeMsgListener,
OutputPipeListener,
ReliableInputStream.MsgListener {
    private static final transient Logger LOG = Logger.getLogger(JxtaBiDiPipe.class.getName());
    private static final int MAXRETRYTIMEOUT = 120000;
    private PipeAdvertisement remotePipeAdv;
    private PeerAdvertisement remotePeerAdv;
    protected int timeout = 15000;
    protected int retryTimeout = 60000;
    protected int maxRetryTimeout = 120000;
    protected int windowSize = 50;
    private BlockingQueue<PipeMsgEvent> queue = null;
    protected PeerGroup group;
    protected PipeAdvertisement pipeAdv;
    protected PipeAdvertisement myPipeAdv;
    protected PipeService pipeSvc;
    protected InputPipe inputPipe;
    protected OutputPipe connectOutpipe;
    protected Messenger msgr;
    protected InputStream stream;
    protected final Object closeLock = new Object();
    protected final Object acceptLock = new Object();
    protected final Object finalLock = new Object();
    protected boolean closed = false;
    protected boolean bound = false;
    protected PipeMsgListener msgListener;
    protected PipeEventListener eventListener;
    protected PipeStateListener stateListener;
    protected Credential credential = null;
    protected boolean waiting;
    protected boolean isReliable = false;
    protected ReliableInputStream ris = null;
    protected ReliableOutputStream ros = null;
    protected volatile boolean direct = false;
    protected OutgoingMsgrAdaptor outgoing = null;
    protected StructuredDocument credentialDoc = null;
    protected Properties connectionProperties = null;
    public static final int PIPE_CLOSED_EVENT = 1;

    protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct) throws IOException {
        if (msgr == null) {
            throw new IOException("Null Messenger");
        }
        this.direct = direct;
        this.group = group;
        this.pipeAdv = pipe;
        this.credentialDoc = credDoc != null ? credDoc : JxtaBiDiPipe.getCredDoc(group);
        this.pipeSvc = group.getPipeService();
        this.inputPipe = this.pipeSvc.createInputPipe(pipe, this);
        this.msgr = msgr;
        this.isReliable = isReliable;
        if (!direct) {
            this.createRLib();
        }
        this.setBound();
    }

    protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct, Properties connectionProperties) throws IOException {
        this(group, msgr, pipe, credDoc, isReliable, direct);
        this.connectionProperties = connectionProperties;
    }

    public JxtaBiDiPipe() {
    }

    public JxtaBiDiPipe(Properties properties) {
        this.connectionProperties = properties;
    }

    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener) throws IOException {
        this.connect(group, null, pipeAd, this.timeout, msgListener);
    }

    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
        this.connect(group, null, pipeAd, timeout, msgListener);
    }

    public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
        this.connect(group, null, pipeAd, timeout, msgListener, reliable);
    }

    public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
        this.connect(group, pipeAd, this.timeout);
    }

    public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {
        this.connect(group, null, pipeAd, timeout, null);
    }

    public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
        this.connect(group, peerid, pipeAd, timeout, msgListener, this.isReliable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
        if (this.isBound()) {
            throw new IOException("Pipe already bound");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("Invalid timeout :" + timeout);
        }
        this.pipeAdv = pipeAd;
        this.group = group;
        this.msgListener = msgListener;
        if (msgListener == null) {
            this.queue = new ArrayBlockingQueue<PipeMsgEvent>(this.windowSize);
        }
        this.isReliable = reliable;
        this.pipeSvc = group.getPipeService();
        int n = this.timeout = timeout == 0 ? Integer.MAX_VALUE : timeout;
        if (this.myPipeAdv == null) {
            this.myPipeAdv = JxtaServerPipe.newInputPipe(group, pipeAd);
            this.inputPipe = this.pipeSvc.createInputPipe(this.myPipeAdv, this);
        }
        this.credentialDoc = this.credentialDoc != null ? this.credentialDoc : JxtaBiDiPipe.getCredDoc(group);
        Message openMsg = this.createOpenMessage(group, this.myPipeAdv);
        if (peerid == null) {
            this.pipeSvc.createOutputPipe(pipeAd, this);
        } else {
            this.pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this);
        }
        try {
            Object object = this.acceptLock;
            synchronized (object) {
                if (this.connectOutpipe == null) {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Waiting for " + timeout + " msec");
                    }
                    this.acceptLock.wait(timeout);
                }
            }
        }
        catch (InterruptedException ie) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "Interrupted", ie);
            }
            Thread.interrupted();
            IOException exp = new IOException("Interrupted");
            exp.initCause(ie);
            throw exp;
        }
        if (this.connectOutpipe == null) {
            throw new SocketTimeoutException("Connection timeout");
        }
        this.waiting = true;
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Sending a backchannel message");
        }
        this.connectOutpipe.send(openMsg);
        try {
            Object ie = this.finalLock;
            synchronized (ie) {
                if (this.waiting) {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Waiting for " + timeout + " msec for back channel to be established");
                    }
                    this.finalLock.wait(timeout);
                    if (this.msgr == null) {
                        throw new SocketTimeoutException("Connection timeout");
                    }
                }
            }
        }
        catch (InterruptedException ie) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "Interrupted", ie);
            }
            Thread.interrupted();
            IOException exp = new IOException("Interrupted");
            exp.initCause(ie);
            throw exp;
        }
        this.setBound();
        this.notifyListeners(2);
    }

    private void createRLib() {
        if (this.isReliable) {
            if (this.outgoing == null) {
                this.outgoing = new OutgoingMsgrAdaptor(this.msgr, this.retryTimeout);
            }
            if (this.ros == null) {
                this.ros = new ReliableOutputStream(this.outgoing, new FixedFlowControl(this.windowSize));
            }
            if (this.ris == null) {
                this.ris = new ReliableInputStream(this.outgoing, this.retryTimeout, this);
            }
        }
    }

    public void setReliable(boolean reliable) throws IOException {
        if (this.isBound()) {
            throw new IOException("Can not set reliability after pipe is bound");
        }
        this.isReliable = reliable;
    }

    protected static StructuredDocument getCredDoc(PeerGroup group) {
        block3: {
            try {
                MembershipService membership = group.getMembershipService();
                Credential credential = membership.getDefaultCredential();
                if (credential != null) {
                    return credential.getDocument(MimeMediaType.XMLUTF8);
                }
            }
            catch (Exception e) {
                if (!Logging.SHOW_WARNING || !LOG.isLoggable(Level.WARNING)) break block3;
                LOG.log(Level.WARNING, "failed to get credential", e);
            }
        }
        return null;
    }

    public StructuredDocument getCredentialDoc() {
        return this.credentialDoc;
    }

    public Properties getConnectionProperties() {
        if (this.connectionProperties == null) {
            return null;
        }
        return (Properties)this.connectionProperties.clone();
    }

    private String getConnectionPropertiesString() {
        return this.propertiesToString(this.connectionProperties);
    }

    private String propertiesToString(Properties properties) {
        if (properties == null) {
            return null;
        }
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            properties.store(bos, null);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return bos.toString();
    }

    public void setCredentialDoc(StructuredDocument doc) {
        this.credentialDoc = doc;
    }

    protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
        Message msg = new Message();
        PeerAdvertisement peerAdv = group.getPeerAdvertisement();
        if (this.credentialDoc == null) {
            this.credentialDoc = JxtaBiDiPipe.getCredDoc(group);
        }
        if (this.credentialDoc == null && pipeAd.getType().equals("JxtaUnicastSecure")) {
            throw new IOException("No credentials established to initiate a secure connection");
        }
        try {
            if (this.credentialDoc != null) {
                msg.addMessageElement("JXTABIP", new TextDocumentMessageElement("Cred", (XMLDocument)this.credentialDoc, null));
            }
            msg.addMessageElement("JXTABIP", new TextDocumentMessageElement("reqPipe", (XMLDocument)pipeAd.getDocument(MimeMediaType.XMLUTF8), null));
            msg.addMessageElement("JXTABIP", new StringMessageElement("reliable", Boolean.toString(this.isReliable), null));
            msg.addMessageElement("JXTABIP", new StringMessageElement("direct", Boolean.toString(true), null));
            String connectionPropertiesString = this.getConnectionPropertiesString();
            if (connectionPropertiesString != null) {
                msg.addMessageElement("JXTABIP", new StringMessageElement("connectionproperties", connectionPropertiesString, null));
            }
            msg.addMessageElement("JXTABIP", new TextDocumentMessageElement("remPeer", (XMLDocument)peerAdv.getDocument(MimeMediaType.XMLUTF8), null));
            return msg;
        }
        catch (Throwable t) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "error getting element stream", t);
            }
            return null;
        }
    }

    void setBound() {
        this.bound = true;
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Pipe Bound :true");
        }
    }

    public boolean isBound() {
        return this.bound;
    }

    public InputPipe getInputPipe() throws IOException {
        return this.inputPipe;
    }

    public PeerAdvertisement getRemotePeerAdvertisement() {
        return this.remotePeerAdv;
    }

    public PipeAdvertisement getRemotePipeAdvertisement() {
        return this.remotePipeAdv;
    }

    protected void setRemotePeerAdvertisement(PeerAdvertisement peer) {
        this.remotePeerAdv = peer;
    }

    protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) {
        this.remotePipeAdv = pipe;
    }

    public void close() throws IOException {
        this.sendClose();
        this.closePipe(false);
        this.bound = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closePipe(boolean fastClose) throws IOException {
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.bound = false;
        }
        if (!fastClose && this.isReliable && !this.direct) {
            long quitAt = System.currentTimeMillis() + (long)this.timeout;
            if (this.ros != null && this.ros.getMaxAck() != this.ros.getSeqNumber()) {
                long left = 0L;
                if (this.timeout != 0 && (left = quitAt - System.currentTimeMillis()) < 0L) {
                    this.sendClose();
                    throw new SocketTimeoutException("Close timeout");
                }
                try {
                    if (!this.ros.isQueueEmpty()) {
                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                            LOG.fine("Waiting for Output stream queue event");
                        }
                        this.ros.waitQueueEvent(left);
                    }
                }
                catch (InterruptedException ie) {
                    throw new IOException("Close interrupted");
                }
            }
            this.ris.close();
        }
        if (this.isReliable && this.ros != null) {
            this.ros.close();
        }
        this.inputPipe.close();
        this.msgr.close();
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Pipe close complete");
        }
        this.notifyListeners(1);
    }

    private void notifyListeners(int event) {
        block5: {
            try {
                if (this.eventListener != null) {
                    this.eventListener.pipeEvent(event);
                } else if (this.stateListener != null) {
                    this.stateListener.stateEvent(this, event);
                }
            }
            catch (Throwable th) {
                if (!Logging.SHOW_FINE || !LOG.isLoggable(Level.FINE)) break block5;
                LOG.log(Level.FINE, "error during pipe event callback", th);
            }
        }
    }

    protected void setInputPipe(InputPipe inputPipe) {
        this.inputPipe = inputPipe;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pipeMsgEvent(PipeMsgEvent event) {
        MessageElement element2;
        Message message = event.getMessage();
        if (message == null) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Empty event");
            }
            return;
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Pipe message arrived");
        }
        if (!this.bound && (element2 = message.getMessageElement("JXTABIP", "remPipe")) != null) {
            block27: {
                try {
                    XMLDocument CredDoc = null;
                    XMLDocument remotePipeDoc = (XMLDocument)StructuredDocumentFactory.newStructuredDocument(element2);
                    this.remotePipeAdv = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(remotePipeDoc);
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Recevied a pipe Advertisement :" + this.remotePipeAdv.getName());
                    }
                    if ((element2 = message.getMessageElement("JXTABIP", "remPeer")) != null) {
                        XMLDocument remotePeerDoc = (XMLDocument)StructuredDocumentFactory.newStructuredDocument(element2);
                        this.remotePeerAdv = (PeerAdvertisement)AdvertisementFactory.newAdvertisement(remotePeerDoc);
                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                            LOG.fine("Recevied an Peer Advertisement :" + this.remotePeerAdv.getName());
                        }
                    } else {
                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                            LOG.warning(" BAD connect response");
                        }
                        return;
                    }
                    element2 = message.getMessageElement("JXTABIP", "Cred");
                    if (element2 != null) {
                        CredDoc = (XMLDocument)StructuredDocumentFactory.newStructuredDocument(element2);
                    }
                    if (this.pipeAdv.getType().equals("JxtaUnicastSecure") && (CredDoc == null || !this.checkCred(CredDoc))) {
                        if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
                            LOG.severe("Missing remote credential doc");
                        }
                        return;
                    }
                    element2 = message.getMessageElement("JXTABIP", "reliable");
                    if (element2 != null) {
                        this.isReliable = Boolean.valueOf(element2.toString());
                    }
                    boolean directSupported = false;
                    element2 = message.getMessageElement("JXTABIP", "direct");
                    if (element2 != null) {
                        directSupported = Boolean.valueOf(element2.toString());
                    }
                    if (directSupported) {
                        this.msgr = JxtaBiDiPipe.getDirectMessenger(this.group, this.remotePipeAdv, this.remotePeerAdv);
                        if (this.msgr != null) {
                            this.direct = true;
                        } else {
                            this.msgr = JxtaBiDiPipe.lightweightOutputPipe(this.group, this.remotePipeAdv, this.remotePeerAdv);
                        }
                    } else {
                        this.msgr = JxtaBiDiPipe.lightweightOutputPipe(this.group, this.remotePipeAdv, this.remotePeerAdv);
                    }
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Reliability set to :" + this.isReliable);
                    }
                    if (this.isReliable && !this.direct) {
                        this.createRLib();
                    }
                    Object object = this.finalLock;
                    synchronized (object) {
                        this.waiting = false;
                        this.finalLock.notifyAll();
                    }
                }
                catch (IOException e) {
                    if (!Logging.SHOW_SEVERE || !LOG.isLoggable(Level.SEVERE)) break block27;
                    LOG.log(Level.SEVERE, "failed to process response message", e);
                }
            }
            return;
        }
        if (this.isReliable && !this.direct) {
            this.receiveMessage(message);
            return;
        }
        if (!this.hasClose(message)) {
            this.push(event);
        }
    }

    private boolean hasClose(Message message) {
        MessageElement element2 = message.getMessageElement("JXTABIP", "close");
        if (element2 != null) {
            block5: {
                try {
                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Received a pipe close request, closing pipes");
                    }
                    if (this.ros != null) {
                        this.ros.hardClose();
                    }
                    this.closePipe(false);
                }
                catch (IOException ie) {
                    if (!Logging.SHOW_WARNING || !LOG.isLoggable(Level.WARNING)) break block5;
                    LOG.log(Level.WARNING, "failed during close", ie);
                }
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void receiveMessage(Message message) {
        Message.ElementIterator i = message.getMessageElements("jxtarel", Defs.MIME_TYPE_ACK);
        if (i.hasNext()) {
            if (this.ros != null) {
                this.ros.recv(message);
            }
            return;
        }
        i = message.getMessageElements("jxtarel", Defs.MIME_TYPE_BLOCK);
        if (i.hasNext()) {
            try {
                Object object = this.finalLock;
                synchronized (object) {
                    while (this.waiting) {
                        this.finalLock.wait(this.timeout);
                    }
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (this.ris != null) {
                this.ris.recv(message);
            }
        }
    }

    public synchronized int getMaxRetryTimeout() {
        return this.maxRetryTimeout;
    }

    public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {
        if (maxRetryTimeout <= 0 || maxRetryTimeout > 120000) {
            throw new IllegalArgumentException("Invalid Maximum retry timeout :" + maxRetryTimeout + " Exceed Global maximum retry timeout :" + 120000);
        }
        this.maxRetryTimeout = maxRetryTimeout;
    }

    public synchronized int getRetryTimeout() {
        return this.retryTimeout;
    }

    public synchronized void setRetryTimeout(int retryTimeout) throws IOException {
        if (this.timeout <= 0) {
            throw new IllegalArgumentException("Invalid Socket timeout :" + retryTimeout);
        }
        this.retryTimeout = retryTimeout;
        if (this.outgoing != null) {
            this.outgoing.setTimeout(retryTimeout);
        }
    }

    public synchronized int getWindowSize() {
        return this.windowSize;
    }

    public synchronized void setWindowSize(int windowSize) throws IOException {
        if (this.isBound()) {
            throw new IOException("Socket bound. Can not change the window size");
        }
        this.windowSize = windowSize;
    }

    public void processIncomingMessage(Message message) {
        if (!this.hasClose(message)) {
            PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID)this.pipeAdv.getID());
            this.push(event);
        }
    }

    private void push(PipeMsgEvent event) {
        boolean queued = false;
        BlockingQueue<PipeMsgEvent> msg_queue = this.queue;
        if (null != msg_queue) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("push message onto queue");
            }
            queued = msg_queue.offer(event);
        }
        PipeMsgListener msg_event_listener = this.msgListener;
        if (!queued && null != msg_event_listener) {
            msg_event_listener.pipeMsgEvent(event);
        }
    }

    public boolean sendMessage(Message msg) throws IOException {
        if (this.isReliable && !this.direct) {
            int seqn = this.ros.send(msg);
            return seqn > 0;
        }
        try {
            if (this.msgr instanceof TcpMessenger) {
                ((TcpMessenger)this.msgr).sendMessageDirect(msg, null, null, true);
                return true;
            }
            return this.msgr.sendMessage(msg, null, null);
        }
        catch (SocketTimeoutException io) {
            if (this.msgr instanceof TcpMessenger) {
                ((TcpMessenger)this.msgr).sendMessageDirect(msg, null, null, true);
                return true;
            }
            return this.msgr.sendMessage(msg, null, null);
        }
        catch (IOException io) {
            this.closePipe(true);
            IOException exp = new IOException("IO error occured during sendMessage()");
            exp.initCause(io);
            throw exp;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void outputPipeEvent(OutputPipeEvent event) {
        OutputPipe op = event.getOutputPipe();
        if (op.getAdvertisement() == null && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
            LOG.warning("The output pipe has no internal pipe advertisement. Continueing anyway.");
        }
        if (op.getAdvertisement() == null || this.pipeAdv.equals(op.getAdvertisement())) {
            Object object = this.acceptLock;
            synchronized (object) {
                if (this.connectOutpipe == null) {
                    this.connectOutpipe = op;
                    op = null;
                }
                this.acceptLock.notifyAll();
            }
            if (op != null) {
                op.close();
            }
        } else if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
            LOG.warning("Unexpected OutputPipe :" + op);
        }
    }

    protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
        if (pipeAdv.getType().equals("JxtaPropagate")) {
            throw new IllegalArgumentException("Invalid pipe type " + pipeAdv.getType());
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Creating a Direct Messenger");
        }
        if (pipeAdv.getType().equals("JxtaUnicast")) {
            EndpointService endpoint = group.getEndpointService();
            EndpointAddress pipeEndpoint = new EndpointAddress("jxta", peer.getPeerID().getUniqueValue().toString(), "PipeService", pipeAdv.getPipeID().toString());
            return endpoint.getDirectMessenger(pipeEndpoint, peer, true);
        }
        return null;
    }

    protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
        EndpointAddress addr;
        EndpointService endpoint = group.getEndpointService();
        ID opId = pipeAdv.getPipeID();
        String destPeer = peer.getPeerID().getUniqueValue().toString();
        if (pipeAdv.getType().equals("JxtaUnicast")) {
            addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());
        } else if (pipeAdv.getType().equals("JxtaUnicastSecure")) {
            addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());
        } else {
            return null;
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Creating a lightweightOutputPipe()");
        }
        return endpoint.getMessenger(addr);
    }

    protected boolean checkCred(StructuredDocument cred) {
        return true;
    }

    private void sendClose() {
        block5: {
            if (!this.direct && this.isReliable && this.ros.isClosed()) {
                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                    LOG.fine("ReliableOutputStream is already closed. Skipping close message");
                }
                return;
            }
            Message msg = new Message();
            msg.addMessageElement("JXTABIP", new StringMessageElement("close", "close", null));
            try {
                this.sendMessage(msg);
                if (!this.direct && this.ros != null) {
                    this.ros.close();
                }
            }
            catch (IOException ie) {
                if (!Logging.SHOW_FINE || !LOG.isLoggable(Level.FINE)) break block5;
                LOG.log(Level.SEVERE, "failed during close", ie);
            }
        }
    }

    @Deprecated
    public PipeMsgListener getListener() {
        return this.getMessageListener();
    }

    public PipeMsgListener getMessageListener() {
        return this.msgListener;
    }

    @Deprecated
    public void setListener(PipeMsgListener msgListener) {
        this.setMessageListener(msgListener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageListener(PipeMsgListener msgListener) {
        BlockingQueue<PipeMsgEvent> drainQueue = null;
        JxtaBiDiPipe jxtaBiDiPipe = this;
        synchronized (jxtaBiDiPipe) {
            this.msgListener = msgListener;
            if (null != msgListener) {
                drainQueue = this.queue;
                this.queue = null;
            } else {
                this.queue = new ArrayBlockingQueue<PipeMsgEvent>(this.windowSize);
            }
        }
        if (null != drainQueue) {
            while (!drainQueue.isEmpty()) {
                PipeMsgEvent event = (PipeMsgEvent)drainQueue.poll();
                if (null == event) continue;
                this.push(event);
            }
        }
    }

    @Deprecated
    public void setListener(PipeEventListener eventListener) {
        this.setPipeEventListener(eventListener);
    }

    public void setPipeEventListener(PipeEventListener eventListener) {
        this.eventListener = eventListener;
    }

    public PipeEventListener getPipeEventListener() {
        return this.eventListener;
    }

    public void setPipeStateListener(PipeStateListener stateListener) {
        this.stateListener = stateListener;
    }

    public PipeStateListener getPipeStateListener() {
        return this.stateListener;
    }

    public Message getMessage(int timeout) throws InterruptedException {
        PipeMsgEvent ev;
        BlockingQueue<PipeMsgEvent> msg_queue = this.queue;
        if (msg_queue == null) {
            return null;
        }
        if (0 == timeout) {
            timeout = Integer.MAX_VALUE;
        }
        if ((ev = msg_queue.poll(timeout, TimeUnit.MILLISECONDS)) != null) {
            return ev.getMessage();
        }
        return null;
    }

    public PipeAdvertisement getPipeAdvertisement() {
        return this.pipeAdv;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void finalize() throws Throwable {
        try {
            if (!this.closed) {
                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                    LOG.warning("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug.");
                }
                this.close();
            }
            Object var2_1 = null;
        }
        catch (Throwable throwable) {
            Object var2_2 = null;
            super.finalize();
            throw throwable;
        }
        super.finalize();
    }
}

