/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source.polling;

import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.JmsXaTransactionalConnection;
import org.mule.jms.commons.internal.connection.XaJmsResourceWrapper;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.connection.session.PollingXAJmsSession;
import org.mule.jms.commons.internal.consume.JmsMessageConsumer;
import org.mule.jms.commons.internal.source.JmsMessageDispatcher;
import org.mule.jms.commons.internal.source.NullJmsListenerLock;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsXaMessageConsumer
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsXaMessageConsumer.class);
    private final int id;
    private CountDownLatch initializationCountDownLatch;
    private final JmsMessageConsumer consumer;
    private final SourceCallback sourceCallback;
    private final PollingXAJmsSession session;
    private final JmsSessionManager sessionManager;
    private final ConnectionProvider connectionProvider;
    private final JmsMessageDispatcher dispatcher;
    private boolean stopRequested = false;

    JmsXaMessageConsumer(JmsMessageConsumer consumer, SourceCallback sourceCallback, PollingXAJmsSession session, JmsSessionManager sessionManager, ConnectionProvider connectionProvider, JmsConfig config, String inboundContentType, String inboundEncoding, JmsSpecification specification, int id, CountDownLatch initializationCountDownLatch) {
        this.consumer = consumer;
        this.sourceCallback = sourceCallback;
        this.session = session;
        this.sessionManager = sessionManager;
        this.connectionProvider = connectionProvider;
        this.id = id;
        this.initializationCountDownLatch = initializationCountDownLatch;
        this.dispatcher = new JmsMessageDispatcher(config, inboundContentType, inboundEncoding, specification, session, InternalAckMode.TRANSACTED, sessionManager, sourceCallback, new NullJmsListenerLock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void run() {
        try {
            boolean shouldKeepIterating = true;
            LOGGER.debug("[{}] : Starting to poll", (Object)this.id);
            while (shouldKeepIterating && !this.stopRequested) {
                try {
                    SourceCallbackContext context = this.initializePoll();
                    Message message = this.consumer.get().receive(2000L);
                    shouldKeepIterating = this.dispatchMessage(message, context);
                }
                catch (JMSException e) {
                    LOGGER.error("[" + this.id + "] : Unknown error when trying to poll message", (Throwable)e);
                }
                catch (Exception e) {
                    throw e;
                }
                finally {
                    if (!shouldKeepIterating) continue;
                    TransactionCoordination.getInstance().rollbackCurrentTransaction();
                }
            }
            if (this.stopRequested) {
                LOGGER.debug("[{}] : Stopping poll", (Object)this.id);
            }
            LOGGER.debug("[{}] : Finishing poll", (Object)this.id);
        }
        catch (ConnectionException e) {
            this.sourceCallback.onConnectionException(e);
        }
        catch (Exception e) {
            this.sourceCallback.onConnectionException(new ConnectionException((Throwable)e, (Object)"Unexpected error occurred trying to poll a message"));
        }
        finally {
            if (this.stopRequested) {
                this.session.closeDefinetly();
            }
        }
    }

    private SourceCallbackContext initializePoll() throws ConnectionException, TransactionException {
        LOGGER.trace("[{}] : initializing poll ", (Object)this.id);
        SourceCallbackContext context = this.sourceCallback.createContext();
        JmsXaTransactionalConnection connect = (JmsXaTransactionalConnection)this.connectionProvider.connect();
        this.sessionManager.bindToTransaction(connect, this.session, new XaJmsResourceWrapper(this.session, this.sessionManager));
        this.bindTransaction(context, connect);
        if (this.initializationCountDownLatch != null) {
            this.initializationCountDownLatch.countDown();
            this.initializationCountDownLatch = null;
        }
        return context;
    }

    private boolean dispatchMessage(Message message, SourceCallbackContext context) {
        if (this.stopRequested) {
            LOGGER.trace("[{}] : Stop has been requested, rolling back current transaction.", (Object)this.id);
            TransactionCoordination.getInstance().rollbackCurrentTransaction();
        } else if (message == null) {
            LOGGER.trace("[{}] : No message found, rolling back transaction.", (Object)this.id);
            TransactionCoordination.getInstance().rollbackCurrentTransaction();
        } else {
            LOGGER.trace("[{}] : received message, handling to the flow.", (Object)this.id);
            context.addVariable("CONSUMER", (Object)this);
            this.dispatcher.dispatchMessage(message, context);
            return false;
        }
        return true;
    }

    private void bindTransaction(SourceCallbackContext context, JmsXaTransactionalConnection connect) throws ConnectionException, TransactionException {
        int timesToTry = 10;
        for (int i = 1; i <= timesToTry; ++i) {
            try {
                LOGGER.trace("[{}]: about to bind connection [{}] into context: [{}]", new Object[]{this.id, connect, context});
                context.bindConnection(this.connectionProvider.connect());
                return;
            }
            catch (TransactionException e) {
                if (i == timesToTry) {
                    throw e;
                }
                LOGGER.debug("Internal error, Unable to bind transaction. trying again");
                continue;
            }
        }
    }

    public void stop() {
        this.stopRequested = true;
    }
}

