/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jms;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jms.SolConnection;
import com.solacesystems.jms.SolConnectionConsumerIF;
import com.solacesystems.jms.SolConnectionIF;
import com.solacesystems.jms.SolDestinationInfo;
import com.solacesystems.jms.SolSession;
import com.solacesystems.jms.encoding.JMSDecoder;
import com.solacesystems.jms.impl.AckHandler;
import com.solacesystems.jms.impl.ConnectionConsumerFakeTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerLocalTxStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerNoTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerTransactionStrategy;
import com.solacesystems.jms.impl.ConsumerFactory;
import com.solacesystems.jms.impl.JMSState;
import com.solacesystems.jms.impl.MessageAckHandlerImpl;
import com.solacesystems.jms.impl.SessionProperties;
import com.solacesystems.jms.impl.SessionTransactionType;
import com.solacesystems.jms.impl.Validator;
import com.solacesystems.jms.message.SolMessage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SolConnectionConsumer
implements SolConnectionConsumerIF {
    protected static final String Component = "ConnectionConsumer";
    static final Log log = LogFactory.getLog(SolConnectionConsumer.class);
    private final SolConnectionIF mConnection;
    private final SolDestinationInfo mDestInfo;
    private final ServerSessionPool mServerPool;
    private final int mMaxMessages;
    private final SessionProperties mSessionProps;
    private final AckHandler mAckHandler;
    private final ConnectionConsumerTransactionStrategy mAckTransaction;
    protected volatile JMSState mState;

    public SolConnectionConsumer(SolConnectionIF conn, SolDestinationInfo destInfo, String messageSelector, ServerSessionPool serverPool, int maxMessages, JMSState state) throws JMSException {
        Validator.checkServerSessionPool(serverPool);
        Validator.checkConnectionConsumerMaxMessages(maxMessages);
        this.mServerPool = serverPool;
        this.mConnection = conn;
        this.mDestInfo = destInfo;
        this.mState = state;
        boolean noLocal = false;
        this.mSessionProps = SolConnectionConsumer.sniffSessionProperties(conn, serverPool);
        boolean isDirect = this.mConnection.getProperties().getPropertyBean().getDirectTransport();
        Validator.checkTransactedAndAckMode(this.mSessionProps.getTransactionType(), this.mSessionProps.getAcknowledgeMode(), isDirect);
        Validator.checkTransactedAndLargeMessaging(this.mSessionProps.getTransactionType(), this.mConnection.getJCSMPProperties().getBooleanProperty("large_messaging"));
        ConsumerFactory cmrFactory = new ConsumerFactory(messageSelector, noLocal, this.mSessionProps, conn, null);
        this.mAckHandler = new MessageAckHandlerImpl(this.mSessionProps.getAcknowledgeMode());
        ((SolConnection)this.mConnection).addConnectionConsumer(this);
        ConnectionConsumerTransactionStrategy.InitProperties txStrategyProp = new ConnectionConsumerTransactionStrategy.InitProperties();
        txStrategyProp.withConnection(this.mConnection).withConsumerFactory(cmrFactory).withDestinationInfo(this.mDestInfo).withSessionProperties(this.mSessionProps);
        if (this.mSessionProps.getAcknowledgeMode() == 0) {
            maxMessages = Math.min(maxMessages, 200);
            this.mAckTransaction = this.mDestInfo.isDurable() ? new ConnectionConsumerFakeTransactionStrategy(txStrategyProp) : new ConnectionConsumerLocalTxStrategy(txStrategyProp);
        } else {
            this.mAckTransaction = new ConnectionConsumerNoTransactionStrategy(txStrategyProp);
        }
        this.mMaxMessages = maxMessages;
        this.createConsumer();
        this.startWorkerThread();
        if (log.isDebugEnabled()) {
            log.debug((Object)("SolConnectionConsumer created. Connection: " + conn.toString() + "   Destination: " + destInfo.toString()));
        }
    }

    private static SessionProperties sniffSessionProperties(SolConnectionIF connection, ServerSessionPool serverPool) throws JMSException {
        ServerSession sniffServerSession = serverPool.getServerSession();
        Session sniffSession = sniffServerSession.getSession();
        boolean transacted = sniffSession.getTransacted();
        int acknowledgeMode = sniffSession.getAcknowledgeMode();
        log.debug((Object)String.format("sniffSession: transacted:%s ackMode:%s sniffSession:%s", transacted, acknowledgeMode, sniffSession));
        MessageAckHandlerImpl ackHandler = new MessageAckHandlerImpl(acknowledgeMode);
        sniffServerSession.start();
        return new SessionProperties(connection.getProperties(), transacted ? SessionTransactionType.LocalTransaction : SessionTransactionType.NoTransaction, acknowledgeMode, ackHandler);
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        return this.mServerPool;
    }

    @Override
    public void start() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Stopped) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Starting Connection Consumer.");
            }
            this.mState = JMSState.Started;
            this.startConsumer();
            if (log.isDebugEnabled()) {
                log.debug((Object)"Connection consumer started.");
            }
        }
    }

    @Override
    public void stop() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Started) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Stopping Connection Consumer.");
            }
            this.mState = JMSState.Stopped;
            this.stopConsumer();
            if (log.isDebugEnabled()) {
                log.debug((Object)"Connection Consumer stopped.");
            }
        }
    }

    private void startConsumer() throws JMSException {
        try {
            this.mAckTransaction.start();
        }
        catch (JMSException e) {
            this.deliverException(e);
            throw e;
        }
    }

    private void stopConsumer() throws JMSException {
        this.mAckTransaction.stop();
    }

    protected void createConsumer() throws JMSException {
        this.mAckTransaction.createConsumer();
        if (this.mState == JMSState.Started) {
            this.startConsumer();
        }
    }

    @Override
    public void close() throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Connection Consumer closed.");
        }
        this.mState = JMSState.Closed;
        this.mAckTransaction.closeConsumer();
        this.mAckTransaction.close();
        ((SolConnection)this.mConnection).removeConnectionConsumer(this);
    }

    public String toString() {
        String fmt = String.format("SolConnectionConsumer(Destination:%s SubscriptionName:%s SessionProperties:%s TransactionState:%s)", new Object[]{this.mDestInfo.destination, this.mDestInfo.subscriptionName, this.mSessionProps, this.mAckTransaction.getTxState()});
        return fmt;
    }

    protected SolMessage createMessage(BytesXMLMessage xmlMessage) throws JMSException {
        if (xmlMessage != null) {
            SolMessage jmsMessage = JMSDecoder.createJMSMessage(xmlMessage);
            jmsMessage.setAckHandler(this.mAckHandler);
            this.mAckHandler.onMessageCreate(jmsMessage);
            this.mAckHandler.onMessageSent(jmsMessage);
            return jmsMessage;
        }
        return null;
    }

    private void startWorkerThread() {
        Thread worker = new Thread(new Runnable(){

            @Override
            public void run() {
                SolConnectionConsumer.this.mainLoop();
            }
        });
        String randStr = UUID.randomUUID().toString();
        randStr = randStr.substring(randStr.length() - 8);
        worker.setName("SolConnectionConsumerWorker-" + randStr);
        worker.setDaemon(true);
        worker.start();
    }

    void mainLoop() {
        block13: {
            ArrayList<Message> msgBatch = new ArrayList<Message>(this.mMaxMessages);
            try {
                while (this.mState != JMSState.Closed) {
                    block12: {
                        if (this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.ACTIVE || this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.NONTRANSACTED) {
                            try {
                                msgBatch.clear();
                                while (msgBatch.size() < this.mMaxMessages) {
                                    int pre_sz = msgBatch.size();
                                    this.pollMessage(msgBatch);
                                    if (msgBatch.size() != pre_sz) continue;
                                    break;
                                }
                                if (msgBatch.size() > 0) {
                                    this.deliver(msgBatch);
                                    this.mAckTransaction.afterDelivery();
                                    msgBatch.clear();
                                }
                            }
                            catch (JCSMPException ex) {
                                if (this.mState == JMSState.Closed) break block12;
                                throw Validator.createJMSException("soljms.operation.recv", (Throwable)((Object)ex));
                            }
                        }
                    }
                    this.mAckTransaction.afterPollLoop();
                }
            }
            catch (JMSException ex) {
                if (this.mState == JMSState.Closed) break block13;
                try {
                    this.close();
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
                this.deliverException(ex);
            }
            catch (Exception ex) {
                JMSException jmsEx = Validator.createJMSException(null, ex);
                this.deliverException(jmsEx);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)"SolConnectionConsumer.mainLoop() exit.");
        }
    }

    private void pollMessage(List<Message> batch) throws JCSMPException, JMSException {
        BytesXMLMessage msg = null;
        msg = batch.size() == 0 ? this.mAckTransaction.getConsumer().receive(20) : this.mAckTransaction.getConsumer().receiveNoWait();
        if (msg != null) {
            batch.add(this.createMessage(msg));
        }
    }

    private void deliver(Collection<Message> msgs) throws JMSException {
        ArrayList<Message> deliveryBatch = new ArrayList<Message>(msgs);
        this.mAckTransaction.onMessage(deliveryBatch);
        ServerSession deliverySession = this.mServerPool.getServerSession();
        ((SolSession)deliverySession.getSession()).loadFromConnectionConsumer(deliveryBatch, this);
        deliverySession.start();
    }

    private void deliverException(JMSException ex) {
        ExceptionListener listener = this.mConnection.getProperties().getExceptionListener();
        log.debug((Object)String.format("%s Delivering exception to connection exception listener (%s): %s", this.toString(), listener, ex.toString()));
        if (listener != null) {
            listener.onException(ex);
        }
    }

    @Override
    public void commitBatch(Collection<Message> msgs) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Entering commitBatch()");
        }
        this.mAckTransaction.onCommit(msgs);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Leaving commitBatch().  Number of messages: " + msgs.size()));
        }
    }

    @Override
    public void rollbackBatch(Collection<Message> msgs) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Entering rollbackBatch()");
        }
        this.mAckTransaction.onRollback(msgs);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Leaving rollbackBatch().  Number of messages: " + msgs.size()));
        }
    }
}

