/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.kafka.service;

import com.sap.cds.feature.messaging.kafka.service.KafkaChannelMessagingService;
import com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService;
import com.sap.cds.services.Service;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.utils.MessagingOutboxUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.runtime.CdsRuntimeConfiguration;
import com.sap.cds.services.runtime.CdsRuntimeConfigurer;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cds.services.utils.environment.ServiceBindingUtils;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessagingServiceConfiguration
implements CdsRuntimeConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessagingServiceConfiguration.class);
    public static final String CHANNEL_KIND_LABEL = "kafka-channel-messaging";
    public static final String TOPIC_KIND_LABEL = "kafka-topic-messaging";
    public static final String BINDING_LABEL = "kafka";

    public void services(CdsRuntimeConfigurer configurer) {
        CdsProperties.Messaging config = configurer.getCdsRuntime().getEnvironment().getCdsProperties().getMessaging();
        List<ServiceBinding> bindings = configurer.getCdsRuntime().getEnvironment().getServiceBindings().filter(binding -> ServiceBindingUtils.matches((ServiceBinding)binding, (String)BINDING_LABEL)).collect(Collectors.toList());
        if (bindings.isEmpty()) {
            logger.info("No service bindings with label '{}' found", (Object)BINDING_LABEL);
        } else {
            boolean isSingleBinding = bindings.size() == 1;
            bindings.forEach(binding -> {
                logger.debug("Starting the initialization of the Kafka service binding '{}'", binding.getName().get());
                boolean createDefaultService = true;
                List serviceConfigs = config.getServicesByBinding((String)binding.getName().get());
                if (!serviceConfigs.isEmpty()) {
                    createDefaultService = false;
                    serviceConfigs.forEach(serviceConfig -> {
                        if (Boolean.TRUE.equals(serviceConfig.isEnabled())) {
                            this.configureService(configurer, (ServiceBinding)binding, (CdsProperties.Messaging.MessagingServiceConfig)serviceConfig);
                        }
                    });
                }
                if (isSingleBinding) {
                    List serviceConfigsByKind = config.getServicesByKind(CHANNEL_KIND_LABEL);
                    serviceConfigsByKind.addAll(config.getServicesByKind(TOPIC_KIND_LABEL));
                    if (!serviceConfigsByKind.isEmpty()) {
                        createDefaultService = false;
                        serviceConfigsByKind.forEach(serviceConfig -> {
                            if (Boolean.TRUE.equals(serviceConfig.isEnabled()) && serviceConfigs.stream().noneMatch(c -> c.getName().equals(serviceConfig.getName()))) {
                                this.configureService(configurer, (ServiceBinding)binding, (CdsProperties.Messaging.MessagingServiceConfig)serviceConfig);
                            }
                        });
                    }
                }
                if (createDefaultService) {
                    CdsProperties.Messaging.MessagingServiceConfig defConfig = config.getService((String)binding.getName().get());
                    if (StringUtils.isEmpty((String)defConfig.getBinding()) && StringUtils.isEmpty((String)defConfig.getKind())) {
                        this.configureService(configurer, (ServiceBinding)binding, defConfig);
                    } else {
                        logger.warn("Could not create service for binding '{}': A configuration with the same name is already defined for another kind or binding.", binding.getName().get());
                    }
                }
                logger.debug("Finished the initialization of the Kafka service binding '{}'", binding.getName().get());
            });
        }
    }

    private void configureService(CdsRuntimeConfigurer configurer, ServiceBinding binding, CdsProperties.Messaging.MessagingServiceConfig serviceConfig) {
        if (Objects.equals(serviceConfig.getKind(), TOPIC_KIND_LABEL)) {
            KafkaTopicMessagingService topicService = new KafkaTopicMessagingService(serviceConfig, binding, configurer.getCdsRuntime());
            configurer.service((Service)MessagingOutboxUtils.outboxed((MessagingService)topicService, (CdsProperties.Messaging.MessagingServiceConfig)serviceConfig, (CdsRuntime)configurer.getCdsRuntime()));
        } else if (Objects.isNull(serviceConfig.getKind()) || Objects.equals(serviceConfig.getKind(), CHANNEL_KIND_LABEL)) {
            KafkaChannelMessagingService channelService = new KafkaChannelMessagingService(serviceConfig, binding, configurer.getCdsRuntime());
            configurer.service((Service)MessagingOutboxUtils.outboxed((MessagingService)channelService, (CdsProperties.Messaging.MessagingServiceConfig)serviceConfig, (CdsRuntime)configurer.getCdsRuntime()));
        } else {
            logger.warn("Could not create service '{}' for the binding '{}' as the kind '{}' is unknown!", new Object[]{serviceConfig.getName(), binding.getName().get(), serviceConfig.getKind()});
        }
    }
}

