/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source.polling;

import javax.jms.JMSException;
import javax.jms.Message;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsXaTransactionalConnection;
import org.mule.jms.commons.internal.connection.XaJmsResourceWrapper;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.connection.session.PollingXAJmsSession;
import org.mule.jms.commons.internal.consume.JmsMessageConsumer;
import org.mule.jms.commons.internal.source.JmsMessageDispatcher;
import org.mule.jms.commons.internal.source.NullJmsListenerLock;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsXaMessageConsumer
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsXaMessageConsumer.class);
    private final int id;
    private final JmsMessageConsumer consumer;
    private final SourceCallback sourceCallback;
    private final PollingXAJmsSession session;
    private final JmsSessionManager sessionManager;
    private final ConnectionProvider connectionProvider;
    private final JmsMessageDispatcher dispatcher;
    private boolean stopRequested = false;

    JmsXaMessageConsumer(JmsMessageConsumer consumer, SourceCallback sourceCallback, PollingXAJmsSession session, JmsSessionManager sessionManager, ConnectionProvider connectionProvider, JmsConfig config, String inboundContentType, String inboundEncoding, JmsSpecification specification, int id) {
        this.consumer = consumer;
        this.sourceCallback = sourceCallback;
        this.session = session;
        this.sessionManager = sessionManager;
        this.connectionProvider = connectionProvider;
        this.id = id;
        this.dispatcher = new JmsMessageDispatcher(config, inboundContentType, inboundEncoding, specification, session, InternalAckMode.TRANSACTED, sessionManager, sourceCallback, new NullJmsListenerLock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void run() {
        try {
            boolean shouldIterate = true;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[" + this.id + "] : Starting to poll");
            }
            while (shouldIterate && !this.stopRequested) {
                try {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("[" + this.id + "] : about to poll ");
                    }
                    SourceCallbackContext context = this.sourceCallback.createContext();
                    JmsXaTransactionalConnection connect = (JmsXaTransactionalConnection)this.connectionProvider.connect();
                    this.sessionManager.bindToTransaction(connect, this.session, new XaJmsResourceWrapper(this.session, this.sessionManager));
                    this.bindTransaction(context, connect);
                    Message message = this.consumer.get().receive(2000L);
                    if (this.stopRequested) {
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("[" + this.id + "] : Stop has been requested, rolling back current transaction.");
                        }
                        TransactionCoordination.getInstance().rollbackCurrentTransaction();
                        continue;
                    }
                    if (message == null) {
                        if (LOGGER.isTraceEnabled()) {
                            LOGGER.trace("[" + this.id + "] : No message found, rolling back transaction.");
                        }
                        TransactionCoordination.getInstance().rollbackCurrentTransaction();
                        continue;
                    }
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("[" + this.id + "] : received message, handling to the flow.");
                    }
                    context.addVariable("CONSUMER", (Object)this);
                    this.dispatcher.dispatchMessage(message, context);
                    shouldIterate = false;
                }
                catch (JMSException e) {
                    LOGGER.error("[" + this.id + "] : Unknown error when trying to poll message", (Throwable)e);
                }
                catch (Exception e) {
                    throw e;
                }
                finally {
                    if (!shouldIterate) continue;
                    TransactionCoordination.getInstance().rollbackCurrentTransaction();
                }
            }
            if (this.stopRequested && LOGGER.isDebugEnabled()) {
                LOGGER.debug("[" + this.id + "] : Stopping poll");
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[" + this.id + "] : Finishing poll");
            }
        }
        catch (ConnectionException e) {
            this.sourceCallback.onConnectionException(e);
        }
        catch (Exception e) {
            this.sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)"Unexpected error occurred trying to poll a message"));
        }
        finally {
            if (this.stopRequested) {
                this.session.closeDefinetly();
            }
        }
    }

    private void bindTransaction(SourceCallbackContext context, JmsXaTransactionalConnection connect) throws ConnectionException, TransactionException {
        int timesToTry = 10;
        for (int i = 1; i <= timesToTry; ++i) {
            try {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("[" + this.id + "]: about to bind connection [" + connect + "] into context: [" + context + "]");
                }
                context.bindConnection(this.connectionProvider.connect());
                return;
            }
            catch (TransactionException e) {
                if (i == timesToTry) {
                    throw e;
                }
                LOGGER.debug("Internal error, Unable to bind transaction. trying again");
                continue;
            }
        }
    }

    public void stop() {
        this.stopRequested = true;
    }
}

