/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.messaging.jms;

import com.sap.cds.impl.util.Pair;
import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.messaging.jms.TopicAccessor;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.MessagingUtils;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessageQueueReader {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueReader.class);
    private final String queueName;
    private final MessagingBrokerQueueListener listener;
    private final TopicAccessor topicAccessor;

    MessageQueueReader(String queueName, MessagingBrokerQueueListener listener, TopicAccessor topicAccessor) {
        this.topicAccessor = topicAccessor;
        this.queueName = queueName;
        this.listener = listener;
    }

    public void startListening(Connection connection) throws JMSException {
        new MessageQueueReaderThread(connection).start();
    }

    private class MessageQueueReaderThread
    extends Thread {
        private final Connection connection;
        private Session session;
        private MessageConsumer consumer;

        public MessageQueueReaderThread(Connection connection) throws JMSException {
            super(MessageQueueReader.this.queueName + " - Listener");
            this.connection = connection;
            this.initSession();
        }

        private void initSession() throws JMSException {
            try (Session prevSession = this.session;){
                this.session = this.connection.createSession(2);
                Queue queue = this.session.createQueue(MessageQueueReader.this.queueName);
                this.consumer = this.session.createConsumer((Destination)queue);
            }
        }

        @Override
        public void run() {
            block4: while (true) {
                try {
                    while (true) {
                        Message jmsMessage;
                        if ((jmsMessage = this.consumer.receive()) == null) {
                            logger.debug("The message consumer of the queue reader '{}' was closed", (Object)MessageQueueReader.this.queueName);
                            break block4;
                        }
                        ReceivedMessage message = null;
                        try {
                            message = new ReceivedMessage(jmsMessage, MessageQueueReader.this.topicAccessor);
                            MessageQueueReader.this.listener.receivedMessage(message);
                            continue block4;
                        }
                        catch (Throwable th) {
                            if (message != null) {
                                logger.error("The received message of the queue '{}' and topic '{}' could not be handled", new Object[]{MessageQueueReader.this.queueName, !StringUtils.isEmpty((String)message.getBrokerTopic()) ? message.getBrokerTopic() : "???", th});
                            } else {
                                logger.error("The received message of the queue '{}' could not be handled", (Object)MessageQueueReader.this.queueName, (Object)th);
                            }
                            this.initSession();
                            continue;
                        }
                        break;
                    }
                }
                catch (JMSException e) {
                    logger.error("The queue reader '{}' was interrupted", (Object)MessageQueueReader.this.queueName, (Object)e);
                    break;
                }
            }
        }
    }

    private static class ReceivedMessage
    implements MessagingBrokerQueueListener.MessageAccess {
        private final Message jmsMessage;
        private final String message;
        private final String topic;
        private final String id;
        private Map<String, Object> dataMap;
        private Map<String, Object> headersMap;

        public ReceivedMessage(Message message, TopicAccessor topicAccessor) throws JMSException {
            this.id = message.getJMSMessageID();
            this.jmsMessage = message;
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                this.message = txtMsg.getText();
                this.topic = topicAccessor.getFromTopic(message);
            } else if (message instanceof BytesMessage) {
                BytesMessage byteMsg = (BytesMessage)message;
                byte[] byteData = new byte[(int)byteMsg.getBodyLength()];
                byteMsg.readBytes(byteData);
                byteMsg.reset();
                this.message = new String(byteData, StandardCharsets.UTF_8);
                this.topic = topicAccessor.getFromTopic(message);
            } else {
                throw new JMSException("Unknown event message format: " + message.getClass().getName());
            }
        }

        @Override
        public String getMessage() {
            return this.message;
        }

        @Override
        public Map<String, Object> getDataMap() {
            if (this.dataMap == null) {
                this.populateMaps();
            }
            return this.dataMap;
        }

        @Override
        public Map<String, Object> getHeadersMap() {
            if (this.headersMap == null) {
                this.populateMaps();
            }
            return this.headersMap;
        }

        private void populateMaps() {
            Pair<Map<String, Object>, Map<String, Object>> maps = MessagingUtils.toStructuredMessage(this.message);
            this.dataMap = (Map)maps.left;
            this.headersMap = (Map)maps.right;
        }

        @Override
        public String getId() {
            return this.id;
        }

        @Override
        public String getBrokerTopic() {
            return this.topic;
        }

        @Override
        public void acknowledge() {
            try {
                this.jmsMessage.acknowledge();
            }
            catch (JMSException e) {
                throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.ACKNOWLEDGMENT_FAILED, new Object[]{this.topic, e});
            }
        }
    }
}

