/*
 * Decompiled with CFR 0.152.
 */
package org.mule.jms.commons.internal.source.polling;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import org.mule.jms.commons.api.connection.JmsSpecification;
import org.mule.jms.commons.internal.common.JmsCommons;
import org.mule.jms.commons.internal.config.InternalAckMode;
import org.mule.jms.commons.internal.config.JmsConfig;
import org.mule.jms.commons.internal.connection.XaJmsTransactionalConnection;
import org.mule.jms.commons.internal.connection.session.JmsSession;
import org.mule.jms.commons.internal.connection.session.JmsSessionManager;
import org.mule.jms.commons.internal.source.JmsConnectionExceptionResolver;
import org.mule.jms.commons.internal.source.JmsMessageDispatcher;
import org.mule.jms.commons.internal.source.NullJmsListenerLock;
import org.mule.jms.commons.internal.source.polling.MessageConsumerFactory;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.extension.api.connectivity.XATransactionalConnection;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsXaMessageConsumer
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsXaMessageConsumer.class);
    private static final long POLLING_TIMEOUT = 2000L;
    private final int id;
    private CountDownLatch initializationCountDownLatch;
    private final MessageConsumerFactory consumer;
    private final SourceCallback sourceCallback;
    private JmsSession session;
    private final JmsSessionManager sessionManager;
    private final ConnectionProvider connectionProvider;
    private final JmsMessageDispatcher dispatcher;
    private boolean stopRequested = false;

    JmsXaMessageConsumer(MessageConsumerFactory consumerFactory, SourceCallback sourceCallback, JmsSessionManager sessionManager, ConnectionProvider connectionProvider, JmsConfig config, String inboundContentType, String inboundEncoding, JmsSpecification specification, int id, CountDownLatch initializationCountDownLatch, JmsConnectionExceptionResolver exceptionResolver) {
        this.consumer = consumerFactory;
        this.sourceCallback = sourceCallback;
        this.sessionManager = sessionManager;
        this.connectionProvider = connectionProvider;
        this.id = id;
        this.initializationCountDownLatch = initializationCountDownLatch;
        this.dispatcher = new JmsMessageDispatcher(config, inboundContentType, inboundEncoding, specification, () -> this.session, InternalAckMode.TRANSACTED, sessionManager, sourceCallback, new NullJmsListenerLock(), exceptionResolver);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public synchronized void run() {
        MessageConsumer messageConsumer = null;
        try {
            boolean shouldKeepIterating = true;
            LOGGER.debug("[{}] : Starting to poll", (Object)this.id);
            while (shouldKeepIterating && !this.stopRequested) {
                try {
                    Pair<SourceCallbackContext, JmsSession> jmsContext = this.initializePoll();
                    this.session = (JmsSession)jmsContext.getSecond();
                    messageConsumer = this.consumer.createConsumer(this.session).get();
                    Message message = messageConsumer.receive(2000L);
                    shouldKeepIterating = this.dispatchMessage(message, (SourceCallbackContext)jmsContext.getFirst());
                }
                catch (JMSException e) {
                    LOGGER.error("[" + this.id + "] : Unknown error when trying to poll message", (Throwable)e);
                }
                catch (Exception e) {
                    throw e;
                }
                finally {
                    if (!shouldKeepIterating) continue;
                    TransactionCoordination.getInstance().rollbackCurrentTransaction();
                }
            }
            if (this.stopRequested) {
                LOGGER.debug("[{}] : Stopping poll", (Object)this.id);
            }
            LOGGER.debug("[{}] : Finishing poll", (Object)this.id);
            this.sessionManager.unbindSession();
        }
        catch (ConnectionException e) {
            LOGGER.debug("[{}] : Finishing poll due to {}:{}", new Object[]{this.id, ((Object)((Object)e)).getClass(), e.getMessage()});
            this.sourceCallback.onConnectionException(e);
            this.sessionManager.unbindSession();
            JmsCommons.closeQuietly((AutoCloseable)messageConsumer);
        }
        catch (Exception e2) {
            LOGGER.debug("[{}] : Finishing poll due to {}:{}", new Object[]{this.id, e2.getClass(), e2.getMessage()});
            this.sourceCallback.onConnectionException(new ConnectionException((Throwable)e2, (Object)"Unexpected error occurred trying to poll a message"));
            this.sessionManager.unbindSession();
            {
                catch (Throwable throwable) {
                    this.sessionManager.unbindSession();
                    JmsCommons.closeQuietly(messageConsumer);
                    throw throwable;
                }
            }
            JmsCommons.closeQuietly((AutoCloseable)messageConsumer);
        }
        JmsCommons.closeQuietly(messageConsumer);
    }

    private Pair<SourceCallbackContext, JmsSession> initializePoll() throws ConnectionException, TransactionException {
        try {
            LOGGER.trace("[{}] : initializing poll ", (Object)this.id);
            SourceCallbackContext context = this.sourceCallback.createContext();
            XaJmsTransactionalConnection connect = (XaJmsTransactionalConnection)this.connectionProvider.connect();
            this.bindTransaction(context, connect);
            try {
                Pair pair = new Pair((Object)context, (Object)connect.getSession(InternalAckMode.TRANSACTED, false));
                return pair;
            }
            catch (JMSException e) {
                throw new MuleRuntimeException((Throwable)e);
            }
        }
        finally {
            if (this.initializationCountDownLatch != null) {
                this.initializationCountDownLatch.countDown();
                this.initializationCountDownLatch = null;
            }
        }
    }

    private boolean dispatchMessage(Message message, SourceCallbackContext context) {
        if (this.stopRequested) {
            LOGGER.trace("[{}] : Stop has been requested, rolling back current transaction.", (Object)this.id);
            TransactionCoordination.getInstance().rollbackCurrentTransaction();
        } else if (message == null) {
            LOGGER.trace("[{}] : No message found, rolling back transaction.", (Object)this.id);
            TransactionCoordination.getInstance().rollbackCurrentTransaction();
        } else {
            LOGGER.trace("[{}] : received message, handling to the flow.", (Object)this.id);
            context.addVariable("CONSUMER", (Object)this);
            try {
                this.dispatcher.dispatchMessage(message, context);
                return false;
            }
            catch (Exception e) {
                LOGGER.trace("[{}] : Message dispatch failed, rolling back transaction.", (Object)this.id);
                TransactionCoordination.getInstance().rollbackCurrentTransaction();
            }
        }
        return true;
    }

    private void bindTransaction(SourceCallbackContext context, XATransactionalConnection connect) throws ConnectionException, TransactionException {
        int timesToTry = 10;
        for (int i = 1; i <= timesToTry; ++i) {
            try {
                LOGGER.trace("[{}] : about to bind connection [{}] into context: [{}]", new Object[]{this.id, connect, context});
                context.bindConnection(this.connectionProvider.connect());
                return;
            }
            catch (TransactionException e) {
                if (i == timesToTry) {
                    throw e;
                }
                LOGGER.debug("Internal error, Unable to bind transaction. trying again");
                try {
                    Field connectionField = context.getClass().getDeclaredField("connection");
                    connectionField.setAccessible(true);
                    connectionField.set(context, null);
                    continue;
                }
                catch (Exception e1) {
                    throw new MuleRuntimeException((Throwable)e1);
                }
            }
        }
    }

    public void stop() {
        this.stopRequested = true;
    }
}

