/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.services.utils.messaging.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.cds.reflect.CdsEvent;
import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.ErrorStatuses;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.Service;
import com.sap.cds.services.ServiceDelegator;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.handler.Handler;
import com.sap.cds.services.handler.annotations.HandlerOrder;
import com.sap.cds.services.handler.annotations.On;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.outbox.OutboxService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cds.services.utils.messaging.service.MessageQueue;
import com.sap.cds.services.utils.messaging.service.MessageTopic;
import com.sap.cds.services.utils.messaging.service.MessagingBrokerQueueListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessagingService
extends ServiceDelegator
implements MessagingService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingService.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String IS_OUTBOXED = "IS_OUTBOXED";
    protected final CdsProperties.Messaging.MessagingServiceConfig serviceConfig;
    protected final CdsRuntime runtime;
    protected final MessageQueue queue;
    private final boolean outboxed;

    protected AbstractMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, CdsRuntime runtime) {
        super(serviceConfig.getName());
        this.serviceConfig = serviceConfig;
        this.runtime = runtime;
        this.queue = MessageQueue.create(serviceConfig, runtime.getEnvironment().getApplicationInfo());
        this.outboxed = serviceConfig.getOutbox().isEnabled();
    }

    public void init() {
        this.createOrUpdateQueuesAndSubscriptions();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean createOrUpdateQueuesAndSubscriptions() {
        logger.info("Initializing subscriptions of messaging service '{}'", (Object)this.getName());
        String queueName = this.toFullyQualifiedQueueName(this.queue);
        try {
            if (this.runtime.getEnvironment().getCdsProperties().getMessaging().isResetQueues()) {
                try {
                    this.createQueue(queueName, this.queue.getProperties());
                    this.removeQueue(queueName);
                    logger.warn("Reset the queue '{}' of service '{}'", (Object)queueName, (Object)this.getName());
                }
                catch (IOException e) {
                    logger.warn("Failed to reset queue '{}' of service '{}'", (Object)queueName, (Object)this.getName());
                }
            }
            if (!this.queue.getTopics().isEmpty()) {
                this.createQueue(queueName, this.queue.getProperties());
                logger.info("Created queue '{}' for service '{}'", (Object)queueName, (Object)this.getName());
                for (MessageTopic topic : this.queue.getTopics()) {
                    String topicName = topic.getBrokerName();
                    try {
                        this.createQueueSubscription(queueName, topicName);
                        logger.info("Subscribed topic '{}' on queue '{}' for service '{}'", new Object[]{topicName, queueName, this.getName()});
                    }
                    catch (IOException e) {
                        logger.error("Failed to subscribe topic '{}' on queue '{}' for service '{}'", new Object[]{topicName, queueName, this.getName(), e});
                    }
                }
                try {
                    this.registerQueueListener(queueName, new MessagingBrokerQueueListener((Service)this, queueName, this.queue));
                }
                catch (IOException | IllegalArgumentException e) {
                    logger.error("Failed to register the listener to the queue '{}' for service '{}'", new Object[]{queueName, this.getName(), e});
                }
                boolean e = true;
                return e;
            }
            logger.warn("There are no queue subscriptions available for the service '{}'", (Object)this.getName());
            boolean e = false;
            return e;
        }
        catch (Exception e) {
            logger.error("Failed to create queue '{}' for service '{}'", new Object[]{queueName, this.getName(), e});
            boolean bl = false;
            return bl;
        }
        finally {
            logger.debug("Finished initializing subscriptions of service '{}'", (Object)this.getName());
        }
    }

    public void emit(EventContext context) {
        if (this.shouldBeOutboxed(context)) {
            OutboxService outbox = (OutboxService)this.runtime.getServiceCatalog().getService(OutboxService.class, "OutboxService$Default");
            context.put(IS_OUTBOXED, (Object)true);
            outbox.enroll((Service)this, context);
        } else {
            super.emit(context);
        }
    }

    private boolean isInbound(EventContext context) {
        return context instanceof TopicMessageEventContext && ((TopicMessageEventContext)context).getIsInbound() != false;
    }

    private boolean shouldBeOutboxed(EventContext context) {
        return this.outboxed && context.get(IS_OUTBOXED) == null && !this.isInbound(context);
    }

    public void emit(String topic, String message) {
        TopicMessageEventContext context = TopicMessageEventContext.create((String)topic);
        context.setData(message);
        this.emit((EventContext)context);
    }

    public void emit(String topic, Map<String, Object> message) {
        this.emit(topic, this.toJson(message));
    }

    @On
    @HandlerOrder(value=0x7FFFFFFE)
    private void autoComplete(EventContext context) {
        if (this.queue.hasEvent(context.getEvent())) {
            context.setCompleted();
        }
    }

    @On
    @HandlerOrder(value=-9900)
    protected void sendMessageEvent(TopicMessageEventContext context) {
        if (!Boolean.TRUE.equals(context.getIsInbound())) {
            AbstractMessagingService service = (AbstractMessagingService)context.getService();
            String topic = context.getModel().findEvent(context.getEvent()).map(this::toTopicName).orElse(context.getEvent());
            service.emitTopicMessage(this.toFullyQualifiedTopicName(topic), context.getData());
            context.setCompleted();
        }
    }

    private String toJson(Object object) {
        try {
            return mapper.writeValueAsString(object);
        }
        catch (JsonProcessingException e) {
            throw new ErrorStatusException((ErrorStatus)ErrorStatuses.SERVER_ERROR, new Object[]{e});
        }
    }

    public void on(String[] events, String[] entities, int order, Handler handler) {
        super.on(events, entities, order, handler);
        this.checkEvents(events);
    }

    private void checkEvents(String[] events) {
        Arrays.stream(events).filter(event -> !StringUtils.isEmpty(event) && !event.equals("*")).forEach(event -> {
            String topic = this.runtime.getCdsModel().findEvent(event).map(this::toTopicName).orElse((String)event);
            this.queue.addTopic(new MessageTopic((String)event, this.toFullyQualifiedTopicName(topic)));
        });
    }

    protected String toTopicName(CdsEvent event) {
        return event.getQualifiedName().replace('.', '/');
    }

    protected String toFullyQualifiedQueueName(MessageQueue queue) {
        return queue.getName();
    }

    protected String toFullyQualifiedTopicName(String topic) {
        return topic;
    }

    protected abstract void removeQueue(String var1) throws IOException;

    protected abstract void createQueue(String var1, Map<String, String> var2) throws IOException;

    protected abstract void createQueueSubscription(String var1, String var2) throws IOException;

    protected abstract void registerQueueListener(String var1, MessagingBrokerQueueListener var2) throws IOException;

    protected abstract void emitTopicMessage(String var1, String var2);
}

