/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.utils.messaging.jms;

import com.sap.cds.services.utils.StringUtils;
import com.sap.cds.services.utils.messaging.jms.TopicAccessor;
import com.sap.cds.services.utils.messaging.service.MessagingBrokerQueueListener;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessageQueueReader {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueReader.class);
    private final String queueName;
    private final MessagingBrokerQueueListener listener;
    private final TopicAccessor topicAccessor;
    private final int maxFailedAttempts;
    private final Map<String, Integer> failedCounter = new HashMap<String, Integer>();

    MessageQueueReader(String queueName, MessagingBrokerQueueListener listener, TopicAccessor topicAccessor, int maxFailedAttempts) {
        this.maxFailedAttempts = maxFailedAttempts;
        this.topicAccessor = topicAccessor;
        this.queueName = queueName;
        this.listener = listener;
    }

    public void startListening(Connection connection) throws JMSException {
        new MessageQueueReaderThread(connection).start();
    }

    private static class ReceivedMessage {
        private String message;
        private String topic;

        public ReceivedMessage(Message message, TopicAccessor topicAccessor) throws JMSException {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                this.message = txtMsg.getText();
                this.topic = topicAccessor.getFromTopic(message);
            } else if (message instanceof BytesMessage) {
                BytesMessage byteMsg = (BytesMessage)message;
                byte[] byteData = new byte[(int)byteMsg.getBodyLength()];
                byteMsg.readBytes(byteData);
                byteMsg.reset();
                this.message = new String(byteData, StandardCharsets.UTF_8);
                this.topic = topicAccessor.getFromTopic(message);
            } else {
                throw new JMSException("Unknown event message format: " + message.getClass().getName());
            }
        }

        public String getMessage() {
            return this.message;
        }

        public String getTopic() {
            return this.topic;
        }
    }

    private class MessageQueueReaderThread
    extends Thread {
        private final Connection connection;
        private Session session;
        private MessageConsumer consumer;

        public MessageQueueReaderThread(Connection connection) throws JMSException {
            super(MessageQueueReader.this.queueName + " - Listener");
            this.connection = connection;
            this.initSession();
        }

        private void initSession() throws JMSException {
            try (Session prevSession = this.session;){
                this.session = this.connection.createSession(false, 2);
                Queue queue = this.session.createQueue(MessageQueueReader.this.queueName);
                this.consumer = this.session.createConsumer((Destination)queue);
            }
        }

        @Override
        public void run() {
            block5: while (true) {
                try {
                    while (true) {
                        Message message;
                        if ((message = this.consumer.receive()) == null) {
                            logger.debug("The message consumer of the queue reader '{}' was closed", (Object)MessageQueueReader.this.queueName);
                            break block5;
                        }
                        ReceivedMessage textMessage = null;
                        try {
                            textMessage = new ReceivedMessage(message, MessageQueueReader.this.topicAccessor);
                            MessageQueueReader.this.listener.receivedMessage(textMessage.getMessage(), textMessage.getTopic(), message.getJMSMessageID());
                            message.acknowledge();
                            continue block5;
                        }
                        catch (JMSException e) {
                            logger.warn("Failed to parse JMS message on queue '{}'", (Object)MessageQueueReader.this.queueName, (Object)e);
                            this.checkUnhandledMessage(message);
                            continue;
                        }
                        catch (Throwable th) {
                            if (textMessage != null) {
                                logger.error("The received message of the queue '{}' and topic '{}' could not be handled", new Object[]{MessageQueueReader.this.queueName, !StringUtils.isEmpty(textMessage.getTopic()) ? textMessage.getTopic() : "???", th});
                            } else {
                                logger.error("The received message of the queue '{}' could not be handled", (Object)MessageQueueReader.this.queueName, (Object)th);
                            }
                            this.checkUnhandledMessage(message);
                            continue;
                        }
                        break;
                    }
                }
                catch (JMSException e) {
                    logger.error("The queue reader '{}' was interrupted", (Object)MessageQueueReader.this.queueName, (Object)e);
                    break;
                }
            }
        }

        private void checkUnhandledMessage(Message message) throws JMSException {
            if (MessageQueueReader.this.maxFailedAttempts > 0) {
                int failedAttempts;
                if (!MessageQueueReader.this.failedCounter.containsKey(message.getJMSMessageID())) {
                    MessageQueueReader.this.failedCounter.put(message.getJMSMessageID(), 0);
                }
                if ((failedAttempts = (Integer)MessageQueueReader.this.failedCounter.get(message.getJMSMessageID()) + 1) >= MessageQueueReader.this.maxFailedAttempts) {
                    logger.debug("Auto-acknowledged message with ID '{}', as maximum failed attempts were reached", (Object)message.getJMSMessageID());
                    message.acknowledge();
                    MessageQueueReader.this.failedCounter.remove(message.getJMSMessageID());
                } else {
                    MessageQueueReader.this.failedCounter.put(message.getJMSMessageID(), failedAttempts);
                    this.initSession();
                }
            }
        }
    }
}

