/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.kafka.service;

import com.sap.cds.feature.messaging.kafka.client.KafkaClientFactory;
import com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService;
import com.sap.cds.reflect.CdsAnnotatable;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessageTopic;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cds.services.utils.model.CdsAnnotations;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaChannelMessagingService
extends KafkaTopicMessagingService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaChannelMessagingService.class);
    public static final String EFFECTIVE_TOPIC_HEADER = "x-sap-cap-effective-topic";
    private final String defaultChannel;
    private final Set<String> subscribedChannels = new HashSet<String>();
    private final Map<String, String> topicToChannel = new HashMap<String, String>();

    KafkaChannelMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, ServiceBinding binding, CdsRuntime runtime, KafkaClientFactory clientFactory) {
        super(serviceConfig, binding, runtime, clientFactory);
        this.defaultChannel = serviceConfig.getQueue().getConfig().getOrDefault("channel", "cds.default");
        this.preprocessModel();
    }

    KafkaChannelMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, ServiceBinding binding, CdsRuntime runtime) {
        this(serviceConfig, binding, runtime, null);
    }

    private void preprocessModel() {
        this.runtime.getCdsModel().events().forEach(event -> {
            String channel;
            String topic = (String)CdsAnnotations.TOPIC.getOrDefault((CdsAnnotatable)event);
            if (StringUtils.isEmpty((String)topic)) {
                topic = event.getQualifiedName();
            }
            if (StringUtils.isEmpty((String)(channel = (String)CdsAnnotations.KAFKA_TOPIC.getOrDefault((CdsAnnotatable)event)))) {
                channel = this.defaultChannel;
            }
            this.topicToChannel.put(this.toFullyQualifiedTopicName(topic, false), channel);
        });
    }

    @Override
    public void init() {
        if (!this.queue.getTopics().isEmpty() || this.forceListening) {
            HashSet<String> channels = new HashSet<String>();
            channels.add(this.defaultChannel);
            for (Map.Entry<String, String> entry : this.topicToChannel.entrySet()) {
                if (!this.queue.hasTopic(entry.getKey())) continue;
                channels.add(entry.getValue());
            }
            for (String channel : channels) {
                logger.info("Creating channel handler for channel topic '{}'", (Object)channel);
                this.on(channel, null, this::dispatchToHandler);
            }
        }
        super.init();
    }

    private void dispatchToHandler(EventContext ctx) {
        String topic;
        if (Boolean.TRUE.equals(((TopicMessageEventContext)ctx.as(TopicMessageEventContext.class)).getIsInbound()) && (topic = (String)ctx.get(EFFECTIVE_TOPIC_HEADER)) != null && this.queue.hasTopic(topic) && !this.subscribedChannels.contains(topic)) {
            List events = this.queue.findTopic(topic);
            for (MessageTopic event : events) {
                TopicMessageEventContext eventContext = TopicMessageEventContext.create((String)event.getEventName());
                ctx.keySet().forEach(key -> eventContext.put(key, ctx.get(key)));
                logger.debug("Dispatching topic '{}' received on channel '{}' to event '{}'", new Object[]{topic, ctx.getEvent(), event.getEventName()});
                this.emit((EventContext)eventContext);
            }
        }
    }

    @Override
    protected void emitTopicMessage(String topic, TopicMessageEventContext context) {
        context.put(EFFECTIVE_TOPIC_HEADER, (Object)topic);
        String channel = this.topicToChannel.getOrDefault(topic, this.defaultChannel);
        super.emitTopicMessage(channel, context);
    }

    @Override
    protected void createQueueSubscription(String queue, String topic) {
        String channel = this.topicToChannel.getOrDefault(topic, this.defaultChannel);
        if (this.subscribedChannels.add(channel)) {
            super.createQueueSubscription(queue, channel);
        }
    }
}

