/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.messaging.service;

import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.ServiceDelegator;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.handler.Handler;
import com.sap.cds.services.handler.annotations.Before;
import com.sap.cds.services.handler.annotations.HandlerOrder;
import com.sap.cds.services.handler.annotations.On;
import com.sap.cds.services.messaging.MessagingErrorEventContext;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessageQueue;
import com.sap.cds.services.messaging.service.MessageTopic;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.CorrelationIdUtils;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessagingService
extends ServiceDelegator
implements MessagingService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingService.class);
    protected static final String FORMAT_CLOUDEVENTS = "cloudevents";
    public static final String CONTEXT_PARAMETERS_KEY = "cds.context.parameters";
    protected final CdsProperties.Messaging.MessagingServiceConfig serviceConfig;
    protected final CdsRuntime runtime;
    protected final MessageQueue queue;
    protected final boolean forceListening;
    private final boolean isStructured;

    protected AbstractMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, CdsRuntime runtime) {
        super(serviceConfig.getName());
        this.serviceConfig = serviceConfig;
        this.runtime = runtime;
        this.forceListening = serviceConfig.getQueue().isForceListening();
        this.isStructured = serviceConfig.isStructured();
        this.queue = MessageQueue.create(serviceConfig, this.getTopicMatcher(), runtime.getEnvironment().getApplicationInfo());
    }

    public void init() {
        if (this.runtime.getEnvironment().getCdsProperties().getEnvironment().getCommand().isEnabled().booleanValue()) {
            return;
        }
        this.createOrUpdateQueuesAndSubscriptions();
    }

    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected InitStatus createOrUpdateQueuesAndSubscriptions() {
        InitStatus initStatus;
        logger.info("Initializing subscriptions of messaging service '{}'", (Object)this.getName());
        String queueName = this.toFullyQualifiedQueueName(this.queue);
        try {
            if (this.runtime.getEnvironment().getCdsProperties().getMessaging().isResetQueues()) {
                try {
                    this.createQueue(queueName, this.queue.getProperties());
                    this.removeQueue(queueName);
                    logger.warn("Reset the queue '{}' of service '{}'", (Object)queueName, (Object)this.getName());
                }
                catch (IOException e) {
                    logger.warn("Failed to reset queue '{}' of service '{}'", (Object)queueName, (Object)this.getName());
                }
            }
            if (!this.queue.getTopics().isEmpty() || this.forceListening) {
                this.createQueue(queueName, this.queue.getProperties());
                logger.info("Created queue '{}' for service '{}'", (Object)queueName, (Object)this.getName());
                for (MessageTopic topic : this.queue.getTopics()) {
                    String topicName = topic.getBrokerName();
                    try {
                        this.createQueueSubscription(queueName, topicName);
                        logger.info("Subscribed topic '{}' on queue '{}' for service '{}'", new Object[]{topicName, queueName, this.getName()});
                    }
                    catch (IOException e) {
                        logger.error("Failed to subscribe topic '{}' on queue '{}' for service '{}'", new Object[]{topicName, queueName, this.getName(), e});
                    }
                }
                try {
                    this.registerQueueListener(queueName, new MessagingBrokerQueueListener(this, queueName, this.queue, this.runtime, this.isStructured));
                }
                catch (IOException | IllegalArgumentException e) {
                    logger.error("Failed to register the listener to the queue '{}' for service '{}'", new Object[]{queueName, this.getName(), e});
                    initStatus = InitStatus.ERROR;
                    logger.debug("Finished initializing subscriptions of service '{}'", (Object)this.getName());
                    return initStatus;
                }
                Object e = InitStatus.SUCCESS;
                return e;
            }
            logger.warn("There are no queue subscriptions available for the service '{}'", (Object)this.getName());
            InitStatus e = InitStatus.NOOP;
            return e;
        }
        catch (Exception e) {
            logger.error("Failed to create queue '{}' for service '{}'", new Object[]{queueName, this.getName(), e});
            initStatus = InitStatus.ERROR;
            return initStatus;
        }
        finally {
            logger.debug("Finished initializing subscriptions of service '{}'", (Object)this.getName());
        }
    }

    protected boolean isCloudEventsFormat() {
        return this.serviceConfig.getFormat() != null && this.serviceConfig.getFormat().trim().equalsIgnoreCase(FORMAT_CLOUDEVENTS);
    }

    public void emit(String topic, String message) {
        this.emit(topic, message, null, null);
    }

    public void emit(String topic, Map<String, Object> dataMap) {
        this.emit(topic, null, dataMap, null);
    }

    public void emit(String topic, Map<String, Object> dataMap, Map<String, Object> headersMap) {
        this.emit(topic, null, dataMap, headersMap);
    }

    private void emit(String topic, String message, Map<String, Object> dataMap, Map<String, Object> headersMap) {
        TopicMessageEventContext context = TopicMessageEventContext.create((String)topic);
        this.retrieveContextParameters(headersMap).ifPresent(p -> p.forEach((arg_0, arg_1) -> ((TopicMessageEventContext)context).put(arg_0, arg_1)));
        if (this.isStructured) {
            if (message != null) {
                context.setDataMap(new HashMap<String, String>(Map.of("message", message)));
                context.setHeadersMap(new HashMap());
            } else {
                context.setDataMap(dataMap);
                context.setHeadersMap((Map)(headersMap != null ? headersMap : new HashMap()));
            }
        } else if (message != null) {
            context.setData(message);
        } else {
            Map<String, Object> map;
            if (headersMap != null) {
                map = new HashMap<String, Object>(headersMap);
                map.put("data", new HashMap<String, Object>(dataMap));
            } else {
                map = dataMap;
            }
            context.setData(CloudEventUtils.toJson(map));
        }
        this.emit((EventContext)context);
    }

    private Optional<Map<String, Object>> retrieveContextParameters(Map<String, Object> headersMap) {
        if (headersMap != null) {
            return Optional.ofNullable((Map)headersMap.remove(CONTEXT_PARAMETERS_KEY));
        }
        return Optional.empty();
    }

    @On
    @HandlerOrder(value=0x7FFFFFFE)
    private void autoComplete(EventContext context) {
        if (this.queue.hasEvent(context.getEvent()) || this.forceListening) {
            context.setCompleted();
        }
    }

    @Before
    @HandlerOrder(value=-2147483648)
    protected void validateEventContext(TopicMessageEventContext context) {
        if (!Boolean.TRUE.equals(context.getIsInbound())) {
            if (context.getData() == null && context.getDataMap() == null && context.getHeadersMap() == null) {
                throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.NO_MESSAGE_PROVIDED, new Object[0]);
            }
            if (context.getDataMap() == null && context.getHeadersMap() != null) {
                context.setDataMap(new HashMap());
            } else if (context.getDataMap() != null && context.getHeadersMap() == null) {
                context.setHeadersMap(new HashMap());
            }
            if (context.getHeadersMap() != null && CorrelationIdUtils.mdcHasEntry()) {
                context.getHeadersMap().put("correlation_id", CorrelationIdUtils.getFromMDC());
            }
        }
    }

    @Before
    @HandlerOrder(value=11000)
    protected void cloudEventsFormatter(TopicMessageEventContext context) {
        if (!Boolean.TRUE.equals(context.getIsInbound()) && this.isCloudEventsFormat()) {
            if (context.getHeadersMap() != null) {
                context.setHeadersMap(CloudEventUtils.toCloudEvent(context.getHeadersMap(), context.getEvent(), this.serviceConfig.getPublishPrefix()));
            } else {
                context.setData(CloudEventUtils.toCloudEvent(context.getData(), context.getEvent(), this.serviceConfig.getPublishPrefix()));
            }
        }
    }

    @On
    @HandlerOrder(value=-9900)
    protected void sendMessageEvent(TopicMessageEventContext context) {
        if (!Boolean.TRUE.equals(context.getIsInbound())) {
            AbstractMessagingService service = (AbstractMessagingService)context.getService();
            String topic = this.toFullyQualifiedTopicName(context.getEvent(), false);
            logger.debug("The service event '{}' is going to be emitted on service '{}' to topic '{}'", new Object[]{context.getEvent(), this.getName(), topic});
            service.emitTopicMessage(topic, context);
            context.setCompleted();
        }
    }

    @On
    @HandlerOrder(value=11000)
    protected void defaultErrorHandler(MessagingErrorEventContext context) {
        if (context.getException().getErrorStatus().equals(CdsErrorStatuses.TENANT_NOT_EXISTS)) {
            logger.debug("Message of none-existing tenant '{}' acknowledged by default error handler.", (Object)context.getTenant());
            context.setResult(true);
        } else {
            context.setResult(false);
        }
    }

    public void on(String[] events, String[] entities, int order, Handler handler) {
        super.on(events, entities, order, handler);
        Arrays.stream(events).filter(event -> !StringUtils.isEmpty((String)event) && !event.equals("*") && !"MESSAGING_ERROR".equals(event)).forEach(event -> {
            String topic = this.toFullyQualifiedTopicName((String)event, true);
            this.queue.addTopic(new MessageTopic((String)event, topic));
        });
    }

    protected String toFullyQualifiedQueueName(MessageQueue queue) {
        return queue.getName();
    }

    protected String toFullyQualifiedTopicName(String event, boolean inbound) {
        if (inbound) {
            if (this.serviceConfig.getSubscribePrefix() != null) {
                return this.serviceConfig.getSubscribePrefix() + event;
            }
        } else if (this.serviceConfig.getPublishPrefix() != null) {
            return this.serviceConfig.getPublishPrefix() + event;
        }
        return event;
    }

    protected BiPredicate<MessageTopic, String> getTopicMatcher() {
        return (internalTopic, brokerTopic) -> Objects.equals(internalTopic.getBrokerName(), brokerTopic);
    }

    protected abstract void removeQueue(String var1) throws IOException;

    protected abstract void createQueue(String var1, Map<String, Object> var2) throws IOException;

    protected abstract void createQueueSubscription(String var1, String var2) throws IOException;

    protected abstract void registerQueueListener(String var1, MessagingBrokerQueueListener var2) throws IOException;

    protected abstract void emitTopicMessage(String var1, TopicMessageEventContext var2);

    protected static enum InitStatus {
        NOOP,
        SUCCESS,
        ERROR;

    }
}

