/*
 * Decompiled with CFR 0.152.
 */
package net.jxta.impl.pipe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.EndpointService;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.id.ID;
import net.jxta.impl.pipe.InputPipeImpl;
import net.jxta.impl.pipe.NonBlockingWireOutputPipe;
import net.jxta.impl.pipe.PipeResolver;
import net.jxta.impl.pipe.WireHeader;
import net.jxta.impl.pipe.WirePipe;
import net.jxta.logging.Logging;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.rendezvous.RendezVousService;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class WirePipeImpl
implements EndpointListener {
    private static final Logger LOG = Logger.getLogger(WirePipeImpl.class.getName());
    static final String WIRE_SERVICE_NAME = "jxta.service.wirepipe";
    private final String wireParam;
    static final String WIRE_HEADER_ELEMENT_NAMESPACE = "jxta";
    static final String WIRE_HEADER_ELEMENT_NAME = "JxtaWireHeader";
    private final Map<ID, WirePipe> wirePipes = new HashMap<ID, WirePipe>();
    private final PeerGroup group;
    private final PipeResolver pipeResolver;
    private EndpointService endpoint = null;
    private RendezVousService rendezvous = null;

    WirePipeImpl(PeerGroup group, PipeResolver pipeResolver) {
        this.group = group;
        this.pipeResolver = pipeResolver;
        this.wireParam = group.getPeerGroupID().getUniqueValue().toString();
    }

    public String getServiceParameter() {
        return this.wireParam;
    }

    public int startApp(String[] arg) {
        this.endpoint = this.group.getEndpointService();
        if (null == this.endpoint) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning("Stalled until there is an endpoint service");
            }
            return 2;
        }
        this.rendezvous = this.group.getRendezVousService();
        if (null == this.rendezvous) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.warning("Stalled until there is a rendezvous service");
            }
            return 2;
        }
        try {
            this.endpoint.addIncomingMessageListener(this, WIRE_SERVICE_NAME, null);
        }
        catch (Exception e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Failed registering Endpoint Listener", e);
            }
            throw new IllegalStateException("Failed registering Endpoint Listener");
        }
        return 0;
    }

    public void stopApp() {
        ArrayList<WirePipe> allWirePipes = new ArrayList<WirePipe>(this.wirePipes.values());
        for (WirePipe aWirePipe : allWirePipes) {
            aWirePipe.close();
        }
        this.wirePipes.clear();
        this.endpoint.removeIncomingMessageListener(WIRE_SERVICE_NAME, null);
        this.endpoint = null;
        this.rendezvous = null;
    }

    InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException {
        WirePipe wirePipe = this.getWirePipe(adv);
        return new InputPipeImpl(wirePipe, adv, listener);
    }

    NonBlockingWireOutputPipe createOutputPipe(PipeAdvertisement adv, Set<? extends ID> peers) {
        WirePipe wirePipe = this.getWirePipe(adv);
        return new NonBlockingWireOutputPipe(this.group, wirePipe, adv, peers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private WirePipe getWirePipe(PipeAdvertisement adv) {
        WirePipe wirePipe;
        Map<ID, WirePipe> map = this.wirePipes;
        synchronized (map) {
            wirePipe = this.wirePipes.get(adv.getPipeID());
            if (null == wirePipe) {
                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                    LOG.fine("Creating new wire pipe for " + adv.getPipeID());
                }
                wirePipe = new WirePipe(this.group, this.pipeResolver, this, adv);
                this.wirePipes.put(adv.getPipeID(), wirePipe);
            }
        }
        return wirePipe;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private WirePipe getWirePipe(ID pipeID, boolean create) {
        WirePipe wirePipe;
        Map<ID, WirePipe> map = this.wirePipes;
        synchronized (map) {
            wirePipe = this.wirePipes.get(pipeID);
            if (null == wirePipe && create) {
                PipeAdvertisement adv = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
                adv.setPipeID(pipeID);
                adv.setType("JxtaPropagate");
                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                    LOG.fine("Creating new wire pipe for " + adv.getPipeID());
                }
                wirePipe = new WirePipe(this.group, this.pipeResolver, this, adv);
                this.wirePipes.put(pipeID, wirePipe);
            }
        }
        return wirePipe;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean forgetWirePipe(ID pipeID) {
        Map<ID, WirePipe> map = this.wirePipes;
        synchronized (map) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Removing wire pipe for " + pipeID);
            }
            return null != this.wirePipes.remove(pipeID);
        }
    }

    @Override
    public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
        WireHeader header;
        MessageElement elem = message.getMessageElement(WIRE_HEADER_ELEMENT_NAMESPACE, WIRE_HEADER_ELEMENT_NAME);
        if (null == elem) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("No JxtaWireHeader element. Discarding " + message);
            }
            return;
        }
        try {
            XMLDocument doc = (XMLDocument)StructuredDocumentFactory.newStructuredDocument(elem);
            header = new WireHeader(doc);
        }
        catch (Exception e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "bad wire header for " + message, e);
            }
            return;
        }
        WirePipe wirePipe = this.getWirePipe(header.getPipeID(), this.rendezvous.isRendezVous());
        if (null != wirePipe) {
            wirePipe.processIncomingMessage(message, header, srcAddr, dstAddr);
        } else if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Ignoring message " + message + " for id " + header.getPipeID());
        }
    }
}

