/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.messaging.service;

import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.Service;
import com.sap.cds.services.ServiceException;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.MessagingErrorEventContext;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessageQueue;
import com.sap.cds.services.messaging.service.MessageTopic;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.lib.mt.TenantUtils;
import com.sap.cds.services.utils.outbox.OutboxUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagingBrokerQueueListener {
    private static final Logger logger = LoggerFactory.getLogger(MessagingBrokerQueueListener.class);
    private final MessagingService service;
    private final MessagingService errorService;
    private final String queueName;
    private final MessageQueue queue;
    private final CdsRuntime runtime;

    @Deprecated(forRemoval=true, since="4.0.0")
    public MessagingBrokerQueueListener(MessagingService service, String queueName, MessageQueue queue, CdsRuntime runtime, boolean structured) {
        this(service, new CdsProperties.Messaging.MessagingServiceConfig(), queueName, queue, runtime);
    }

    public MessagingBrokerQueueListener(MessagingService service, CdsProperties.Messaging.MessagingServiceConfig serviceConfig, String queueName, MessageQueue queue, CdsRuntime runtime) {
        this.errorService = service;
        this.service = serviceConfig.getInbox().isEnabled() != false ? (MessagingService)OutboxUtils.outboxed((Service)service, (String)serviceConfig.getInbox().getName(), (CdsRuntime)runtime) : service;
        this.queueName = queueName;
        this.queue = queue;
        this.runtime = runtime;
    }

    public String getQueueName() {
        return this.queueName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void receivedMessage(MessageAccess message) {
        logger.debug("Received message on service '{}' from topic '{}' and queue '{}'.", new Object[]{this.service.getName(), message.getBrokerTopic(), this.queueName});
        List<MessageTopic> topics = this.queue.findTopic(message.getBrokerTopic());
        AtomicBoolean acknowledge = new AtomicBoolean(false);
        AtomicBoolean error = new AtomicBoolean(false);
        try {
            for (MessageTopic topic : topics) {
                try {
                    this.runtime.requestContext().systemUser(message.getTenant()).privilegedUser().modifyParameters(params -> {
                        if (message.getHeadersMap() != null && message.getHeadersMap().containsKey("correlation_id")) {
                            params.setCorrelationId((String)message.getHeadersMap().get("correlation_id"));
                        }
                    }).run(req -> {
                        try {
                            TopicMessageEventContext context = this.getContext(message, topic.getEventName());
                            logger.debug("The message 'id:{}' from topic '{}' on service '{}' is going to be emitted as a service event '{}'", new Object[]{context.getMessageId(), topic.getBrokerName(), this.service.getName(), topic.getEventName()});
                            this.service.emit((EventContext)context);
                            acknowledge.set(true);
                        }
                        catch (Throwable th) {
                            error.set(true);
                            this.performErrorHandling(th, acknowledge, message);
                            throw th;
                        }
                    });
                }
                catch (Throwable th2) {
                    ErrorStatusException th2;
                    if (error.get()) throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_PROCESSING_FAILED, new Object[]{topic.getEventName(), this.service.getName(), this.queueName, th2});
                    logger.debug("The tenant request context for '{}' cannot be created", (Object)message.getTenant());
                    if (TenantUtils.isUnknownTenant((Throwable)th2)) {
                        th2 = new ErrorStatusException((ErrorStatus)CdsErrorStatuses.TENANT_NOT_EXISTS, new Object[]{message.getTenant(), th2});
                    }
                    this.performErrorHandling((Throwable)th2, acknowledge, message);
                    throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_PROCESSING_FAILED, new Object[]{topic.getEventName(), this.service.getName(), this.queueName, th2});
                    return;
                }
            }
        }
        finally {
            if (acknowledge.get()) {
                message.acknowledge();
            }
        }
    }

    private void performErrorHandling(Throwable th, AtomicBoolean acknowledge, MessageAccess message) {
        ServiceException se;
        MessagingErrorEventContext errorContext = MessagingErrorEventContext.create();
        ServiceException e = th instanceof ServiceException ? (se = (ServiceException)th) : new ServiceException(th);
        errorContext.setException(e);
        errorContext.setTenant(message.getTenant());
        errorContext.setMessageHeaders(message.getHeadersMap());
        errorContext.setMessageData(message.getDataMap());
        this.errorService.emit((EventContext)errorContext);
        if (errorContext.getResult()) {
            acknowledge.set(true);
        }
    }

    private TopicMessageEventContext getContext(MessageAccess message, String eventTopicName) {
        TopicMessageEventContext context = TopicMessageEventContext.create((String)eventTopicName);
        for (Map.Entry<String, String> header : message.getTechnicalHeaders().entrySet()) {
            context.put(header.getKey(), (Object)header.getValue());
        }
        context.setDataMap(message.getDataMap());
        context.setHeadersMap(message.getHeadersMap());
        if (message.getId() == null) {
            String id = null;
            if (message.getHeadersMap().containsKey("id")) {
                id = (String)message.getHeadersMap().get("id");
            }
            context.setMessageId(id);
        } else {
            context.setMessageId(message.getId());
        }
        context.setIsInbound(true);
        return context;
    }

    public static interface MessageAccess {
        public String getId();

        public String getTenant();

        public String getMessage();

        public String getBrokerTopic();

        public void acknowledge();

        public Map<String, Object> getDataMap();

        public Map<String, Object> getHeadersMap();

        default public Map<String, String> getTechnicalHeaders() {
            return Collections.emptyMap();
        }
    }
}

