/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.messaging.service;

import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.ServiceException;
import com.sap.cds.services.messaging.MessagingErrorEventContext;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessageQueue;
import com.sap.cds.services.messaging.service.MessageTopic;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagingBrokerQueueListener {
    private static final Logger logger = LoggerFactory.getLogger(MessagingBrokerQueueListener.class);
    private final MessagingService service;
    private final String queueName;
    private final MessageQueue queue;
    private final CdsRuntime runtime;
    private final boolean isStructured;

    public MessagingBrokerQueueListener(MessagingService service, String queueName, MessageQueue queue, CdsRuntime runtime, boolean isStructured) {
        this.service = service;
        this.queueName = queueName;
        this.queue = queue;
        this.runtime = runtime;
        this.isStructured = isStructured;
    }

    public String getQueueName() {
        return this.queueName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void receivedMessage(MessageAccess message) {
        logger.debug("Received message on service '{}' from topic '{}' and queue '{}'.", new Object[]{this.service.getName(), message.getBrokerTopic(), this.queueName});
        List<MessageTopic> topics = this.queue.findTopic(message.getBrokerTopic());
        AtomicBoolean acknowledge = new AtomicBoolean(true);
        try {
            for (MessageTopic topic : topics) {
                try {
                    logger.debug("The message from topic '{}' on service '{}' is going to be emitted as a service event '{}'", new Object[]{topic.getBrokerName(), this.service.getName(), topic.getEventName()});
                    this.runtime.requestContext().privilegedUser().run(req -> {
                        try {
                            TopicMessageEventContext context = this.getContext(message, topic.getEventName());
                            this.service.emit((EventContext)context);
                        }
                        catch (Throwable th) {
                            MessagingErrorEventContext errorContext = MessagingErrorEventContext.create();
                            ServiceException e = th instanceof ServiceException ? (ServiceException)th : new ServiceException(th);
                            errorContext.setException(e);
                            this.service.emit((EventContext)errorContext);
                            if (!errorContext.getResult()) {
                                acknowledge.set(false);
                            }
                            throw th;
                        }
                    });
                }
                catch (Exception e) {
                    throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_PROCESSING_FAILED, new Object[]{topic.getEventName(), this.service.getName(), this.queueName, e});
                    return;
                }
            }
        }
        finally {
            if (acknowledge.get()) {
                message.acknowledge();
            }
        }
    }

    private TopicMessageEventContext getContext(MessageAccess message, String eventTopicName) {
        TopicMessageEventContext context = TopicMessageEventContext.create((String)eventTopicName);
        for (Map.Entry<String, String> header : message.getTechnicalHeaders().entrySet()) {
            context.put(header.getKey(), (Object)header.getValue());
        }
        if (this.isStructured) {
            context.setDataMap(message.getDataMap());
            context.setHeadersMap(message.getHeadersMap());
        } else {
            context.setData(message.getMessage());
        }
        context.setMessageId(message.getId());
        context.setIsInbound(true);
        return context;
    }

    public static interface MessageAccess {
        public String getId();

        public String getMessage();

        public String getBrokerTopic();

        public void acknowledge();

        public Map<String, Object> getDataMap();

        public Map<String, Object> getHeadersMap();

        default public Map<String, String> getTechnicalHeaders() {
            return Collections.emptyMap();
        }
    }
}

