/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.stomp.jms;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.stomp.ProtocolException;
import org.codehaus.stomp.StompFrame;
import org.codehaus.stomp.StompFrameError;
import org.codehaus.stomp.StompHandler;
import org.codehaus.stomp.jms.StompSession;
import org.codehaus.stomp.jms.StompSubscription;
import org.codehaus.stomp.util.IntrospectionSupport;

public class ProtocolConverter
implements StompHandler {
    private static final transient Log log = LogFactory.getLog(ProtocolConverter.class);
    private ConnectionFactory connectionFactory;
    private final StompHandler outputHandler;
    private Connection connection;
    private StompSession defaultSession;
    private StompSession clientAckSession;
    private final Map<String, StompSession> transactedSessions = new ConcurrentHashMap<String, StompSession>();
    private final Map subscriptions = new ConcurrentHashMap();
    private final Map messages = new ConcurrentHashMap();

    public ProtocolConverter(ConnectionFactory connectionFactory, StompHandler outputHandler) {
        this.connectionFactory = connectionFactory;
        this.outputHandler = outputHandler;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public StompHandler getOutputHandler() {
        return this.outputHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() throws JMSException {
        try {
            Iterator iterator = this.messages.values().iterator();
            while (iterator.hasNext()) {
                Object message;
                Object v = message = iterator.next();
                synchronized (v) {
                    message.notify();
                }
            }
            JMSException firstException = null;
            ArrayList<StompSession> sessions = new ArrayList<StompSession>(this.transactedSessions.values());
            if (this.defaultSession != null) {
                sessions.add(this.defaultSession);
            }
            if (this.clientAckSession != null) {
                sessions.add(this.clientAckSession);
            }
            for (StompSession session : sessions) {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Closing session: " + session + " with ack mode: " + session.getSession().getAcknowledgeMode()));
                    }
                    session.close();
                }
                catch (JMSException e) {
                    if (firstException != null) continue;
                    firstException = e;
                }
            }
            if (this.connection != null) {
                this.connection.close();
            }
            if (firstException != null) {
                throw firstException;
            }
        }
        finally {
            this.connection = null;
            this.defaultSession = null;
            this.clientAckSession = null;
            this.transactedSessions.clear();
            this.subscriptions.clear();
            this.messages.clear();
        }
    }

    public void onStompFrame(StompFrame command) throws Exception {
        block14: {
            try {
                if (log.isDebugEnabled()) {
                    log.debug((Object)(">>>> " + command.getAction() + " headers: " + command.getHeaders()));
                }
                if (command.getClass() == StompFrameError.class) {
                    throw ((StompFrameError)command).getException();
                }
                String action = command.getAction();
                if (action.startsWith("SEND")) {
                    this.onStompSend(command);
                    break block14;
                }
                if (action.startsWith("ACK")) {
                    this.onStompAck(command);
                    break block14;
                }
                if (action.startsWith("BEGIN")) {
                    this.onStompBegin(command);
                    break block14;
                }
                if (action.startsWith("COMMIT")) {
                    this.onStompCommit(command);
                    break block14;
                }
                if (action.startsWith("ABORT")) {
                    this.onStompAbort(command);
                    break block14;
                }
                if (action.startsWith("SUB")) {
                    this.onStompSubscribe(command);
                    break block14;
                }
                if (action.startsWith("UNSUB")) {
                    this.onStompUnsubscribe(command);
                    break block14;
                }
                if (action.startsWith("CONNECT")) {
                    this.onStompConnect(command);
                    break block14;
                }
                if (action.startsWith("DISCONNECT")) {
                    this.onStompDisconnect(command);
                    break block14;
                }
                throw new ProtocolException("Unknown STOMP action: " + action);
            }
            catch (Exception e) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                PrintWriter stream = new PrintWriter(new OutputStreamWriter((OutputStream)baos, "UTF-8"));
                e.printStackTrace(stream);
                stream.close();
                HashMap<String, Object> headers = new HashMap<String, Object>();
                headers.put("message", e.getMessage());
                String receiptId = (String)command.getHeaders().get("receipt");
                if (receiptId != null) {
                    headers.put("receipt-id", receiptId);
                }
                StompFrame errorMessage = new StompFrame("ERROR", headers, baos.toByteArray());
                this.sendToStomp(errorMessage);
            }
        }
    }

    public void onException(Exception e) {
        log.error((Object)("Caught: " + e), (Throwable)e);
    }

    public void addMessageToAck(Message message) throws JMSException {
        this.messages.put(message.getJMSMessageID(), message);
    }

    protected void onStompConnect(StompFrame command) throws Exception {
        if (this.connection != null) {
            throw new ProtocolException("Allready connected.");
        }
        Map<String, Object> headers = command.getHeaders();
        String login = (String)headers.get("login");
        String passcode = (String)headers.get("passcode");
        String clientId = (String)headers.get("client-id");
        ConnectionFactory factory = this.getConnectionFactory();
        IntrospectionSupport.setProperties(factory, headers, "factory.");
        this.connection = login != null ? factory.createConnection(login, passcode) : factory.createConnection();
        if (clientId != null) {
            this.connection.setClientID(clientId);
        }
        IntrospectionSupport.setProperties(this.connection, headers, "connection.");
        this.connection.start();
        HashMap<String, Object> responseHeaders = new HashMap<String, Object>();
        responseHeaders.put("session", this.connection.getClientID());
        String requestId = (String)headers.get("request-id");
        if (requestId == null) {
            requestId = (String)headers.get("receipt");
        }
        if (requestId != null) {
            responseHeaders.put("response-id", requestId);
            responseHeaders.put("receipt-id", requestId);
        }
        StompFrame sc = new StompFrame();
        sc.setAction("CONNECTED");
        sc.setHeaders(responseHeaders);
        this.sendToStomp(sc);
    }

    protected void onStompDisconnect(StompFrame command) throws Exception {
        this.checkConnected();
        this.close();
    }

    protected void onStompSend(StompFrame command) throws Exception {
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String stompTx = (String)headers.get("transaction");
        StompSession session = stompTx != null ? this.getExistingTransactedSession(stompTx) : this.getDefaultSession();
        session.sendToJms(command);
        this.sendResponse(command);
    }

    protected void onStompBegin(StompFrame command) throws Exception {
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String stompTx = (String)headers.get("transaction");
        if (stompTx == null) {
            throw new ProtocolException("Must specify the transaction you are beginning");
        }
        StompSession session = this.getTransactedSession(stompTx);
        if (session != null) {
            throw new ProtocolException("The transaction was already started: " + stompTx);
        }
        session = this.createTransactedSession(stompTx);
        this.setTransactedSession(stompTx, session);
        this.sendResponse(command);
    }

    protected void onStompCommit(StompFrame command) throws Exception {
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String stompTx = (String)headers.get("transaction");
        if (stompTx == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        StompSession session = this.getExistingTransactedSession(stompTx);
        session.getSession().commit();
        this.considerClosingTransactedSession(session, stompTx);
        this.sendResponse(command);
    }

    protected void onStompAbort(StompFrame command) throws Exception {
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String stompTx = (String)headers.get("transaction");
        if (stompTx == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        StompSession session = this.getExistingTransactedSession(stompTx);
        session.getSession().rollback();
        this.considerClosingTransactedSession(session, stompTx);
        this.sendResponse(command);
    }

    protected void onStompSubscribe(StompFrame command) throws Exception {
        StompSubscription subscription;
        String ackMode;
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String stompTx = (String)headers.get("transaction");
        StompSession session = stompTx != null ? this.getExistingTransactedSession(stompTx) : ((ackMode = (String)headers.get("ack")) != null && "client".equals(ackMode) ? this.getClientAckSession() : this.getDefaultSession());
        String subscriptionId = (String)headers.get("id");
        if (subscriptionId == null) {
            subscriptionId = this.createSubscriptionId(headers);
        }
        if ((subscription = (StompSubscription)this.subscriptions.get(subscriptionId)) != null) {
            throw new ProtocolException("There already is a subscription for: " + subscriptionId + ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
        }
        subscription = new StompSubscription(session, subscriptionId, command);
        this.subscriptions.put(subscriptionId, subscription);
        this.sendResponse(command);
    }

    protected void onStompUnsubscribe(StompFrame command) throws Exception {
        StompSubscription subscription;
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String destinationName = (String)headers.get("destination");
        String subscriptionId = (String)headers.get("id");
        if (subscriptionId == null) {
            if (destinationName == null) {
                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
            }
            subscriptionId = this.createSubscriptionId(headers);
        }
        if ((subscription = (StompSubscription)this.subscriptions.remove(subscriptionId)) == null) {
            throw new ProtocolException("Cannot unsubscribe as mo subscription exists for id: " + subscriptionId);
        }
        subscription.close();
        this.sendResponse(command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onStompAck(StompFrame command) throws Exception {
        this.checkConnected();
        Map<String, Object> headers = command.getHeaders();
        String messageId = (String)headers.get("message-id");
        if (messageId == null) {
            throw new ProtocolException("ACK received without a message-id to acknowledge!");
        }
        Message message = (Message)this.messages.remove(messageId);
        if (message == null) {
            throw new ProtocolException("No such message for message-id: " + messageId);
        }
        Message message2 = message;
        synchronized (message2) {
            message.notify();
        }
        message.acknowledge();
        this.sendResponse(command);
    }

    protected void checkConnected() throws ProtocolException {
        if (this.connection == null) {
            throw new ProtocolException("Not connected.");
        }
    }

    protected String createSubscriptionId(Map headers) {
        return "/subscription-to/" + headers.get("destination");
    }

    protected StompSession getDefaultSession() throws JMSException {
        if (this.defaultSession == null) {
            this.defaultSession = this.createSession(1);
        }
        return this.defaultSession;
    }

    protected StompSession getClientAckSession() throws JMSException {
        if (this.clientAckSession == null) {
            this.clientAckSession = this.createSession(2);
        }
        return this.clientAckSession;
    }

    protected StompSession getExistingTransactedSession(String stompTx) throws ProtocolException, JMSException {
        StompSession session = this.getTransactedSession(stompTx);
        if (session == null) {
            throw new ProtocolException("Invalid transaction id: " + stompTx);
        }
        return session;
    }

    protected StompSession getTransactedSession(String stompTx) throws ProtocolException, JMSException {
        return this.transactedSessions.get(stompTx);
    }

    protected void setTransactedSession(String stompTx, StompSession session) {
        this.transactedSessions.put(stompTx, session);
    }

    protected StompSession createSession(int ackMode) throws JMSException {
        Session session = this.connection.createSession(false, ackMode);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Created session with ack mode: " + session.getAcknowledgeMode()));
        }
        return new StompSession(this, session);
    }

    protected StompSession createTransactedSession(String stompTx) throws JMSException {
        Session session = this.connection.createSession(true, 0);
        return new StompSession(this, session);
    }

    protected void sendResponse(StompFrame command) throws Exception {
        String receiptId = (String)command.getHeaders().get("receipt");
        if (receiptId != null) {
            StompFrame sc = new StompFrame();
            sc.setAction("RECEIPT");
            sc.setHeaders(new HashMap<String, Object>(1));
            sc.getHeaders().put("receipt-id", receiptId);
            this.sendToStomp(sc);
        }
    }

    protected void sendToStomp(StompFrame frame) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug((Object)("<<<< " + frame.getAction() + " headers: " + frame.getHeaders()));
        }
        this.outputHandler.onStompFrame(frame);
    }

    protected void considerClosingTransactedSession(StompSession session, String stompTx) {
    }
}

