/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.em.mt.service;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.sap.cds.feature.messaging.em.client.EnterpriseMessagingWebhookManagementClient;
import com.sap.cds.feature.messaging.em.mt.service.EnterpriseMessagingTenantStatus;
import com.sap.cds.feature.messaging.em.service.EnterpriseMessagingService;
import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.MessagingUtils;
import com.sap.cds.services.request.RequestContext;
import com.sap.cds.services.request.UserInfo;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EnterpriseMessagingMtService
extends EnterpriseMessagingService {
    private static final Logger logger = LoggerFactory.getLogger(EnterpriseMessagingMtService.class);
    private final MessagingBrokerQueueListener queueListener;
    private final String webhookUrl;
    private final EnterpriseMessagingWebhookManagementClient webhookManagementClient;

    public EnterpriseMessagingMtService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, ServiceBinding binding, CdsRuntime runtime) {
        super(serviceConfig, binding, null, runtime);
        this.queueListener = new MessagingBrokerQueueListener((MessagingService)this, this.toFullyQualifiedQueueName(this.queue), this.queue, runtime, serviceConfig.isStructured());
        Object webhookUrl = runtime.getEnvironment().getCdsProperties().getMessaging().getWebhooks().getUrl();
        if (webhookUrl == null) {
            webhookUrl = runtime.getEnvironment().getApplicationInfo().getUrl();
        }
        if (webhookUrl == null) {
            throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.NO_WEBHOOK_URL, new Object[0]);
        }
        if (URI.create((String)webhookUrl).getScheme() == null) {
            webhookUrl = "https://" + (String)webhookUrl;
        }
        this.webhookUrl = (String)webhookUrl + "/messaging/v1.0/em";
        this.webhookManagementClient = new EnterpriseMessagingWebhookManagementClient(binding, serviceConfig.getConnection().getConnectionPool());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init(String tenantId) {
        String subdomain = this.checkSubdomain(tenantId);
        try {
            logger.info("Initializing the enterprise-messaging service '{}' for tenant '{}'", (Object)this.getName(), (Object)tenantId);
            if (this.checkTenantReadiness(tenantId, false) && this.createOrUpdateQueuesAndSubscriptions()) {
                this.webhookManagementClient.createOrUpdateWebhookRegistration(this.getName(), this.toFullyQualifiedQueueName(this.queue), this.webhookUrl, subdomain);
            }
        }
        catch (IOException e) {
            logger.error("Failed to initialize the enterprise-messaging service '{}' for tenant '{}'", new Object[]{this.getName(), tenantId, e});
        }
        finally {
            logger.debug("Finished initializing the enterprise-messaging service '{}' for tenant '{}'", (Object)this.getName(), (Object)tenantId);
        }
    }

    public EnterpriseMessagingTenantStatus getTenantStatus(String tenantId, boolean verbose) {
        EnterpriseMessagingTenantStatus result = new EnterpriseMessagingTenantStatus(tenantId);
        EnterpriseMessagingTenantStatus.QueueStatus queueStatus = new EnterpriseMessagingTenantStatus.QueueStatus();
        String queueName = this.toFullyQualifiedQueueName(this.queue);
        result.getServices().put(this.getName(), queueStatus);
        try {
            if (this.checkTenantReadiness(tenantId, true)) {
                ArrayNode createdQueues = this.managementClient.getQueues();
                createdQueues.forEach(createdQueue -> {
                    String createdQueueName = createdQueue.get("name").asText();
                    if (createdQueueName.equals(queueName)) {
                        queueStatus.setQueue(verbose ? createdQueue : createdQueueName);
                        try {
                            ArrayNode topicsFromBroker = this.managementClient.getQueueSubscriptions(createdQueueName);
                            Set topicsFromModel = this.queue.getTopics().stream().map(t -> t.getBrokerName()).collect(Collectors.toSet());
                            if (topicsFromBroker != null && topicsFromBroker.size() > 0) {
                                topicsFromBroker.forEach(t -> {
                                    String topic = t.get("topicPattern").asText();
                                    queueStatus.getTopics().add(topic);
                                    if (!topicsFromModel.remove(topic)) {
                                        queueStatus.getUnmanagedTopics().add(topic);
                                    }
                                });
                                if (!topicsFromModel.isEmpty()) {
                                    queueStatus.setError("Missing topic subscriptions: " + topicsFromModel.stream().collect(Collectors.joining(", ")));
                                }
                                if (!queueStatus.getUnmanagedTopics().isEmpty()) {
                                    queueStatus.setWarning("There are unmanaged topics subscribed. This could potentially lead to unexpected messages received by the queue.");
                                }
                            } else if (!topicsFromModel.isEmpty()) {
                                queueStatus.getUnsubscribedTopics().addAll(topicsFromModel);
                                queueStatus.setError("Missing topic subscriptions: " + topicsFromModel.stream().collect(Collectors.joining(", ")));
                            }
                            ArrayNode webhooks = this.webhookManagementClient.getRegisteredWebhooks();
                            webhooks.forEach(w -> {
                                String webhookAddress = w.get("address").asText();
                                if (webhookAddress.endsWith(createdQueueName)) {
                                    queueStatus.getWebhooks().add(verbose ? w : w.get("name").asText());
                                } else {
                                    result.getUnmanagedWebhooks().add(verbose ? w : w.get("name").asText());
                                }
                            });
                            if (queueStatus.getWebhooks().isEmpty() && !queueStatus.getTopics().isEmpty()) {
                                queueStatus.setError("No webhook registration available.");
                            }
                        }
                        catch (IOException e) {
                            logger.debug("Could not retrieve status of tenant '{}' for queue '{}' of service '{}'", new Object[]{tenantId, createdQueueName, this.getName(), e});
                        }
                    } else {
                        result.getUnmanagedQueues().add(verbose ? createdQueue : createdQueueName);
                    }
                });
                if (queueStatus.getQueue() == null) {
                    queueStatus.setInitial();
                }
            } else {
                queueStatus.setOnboarding();
            }
        }
        catch (IOException e) {
            queueStatus.setError("The status of the tenant could not be retrieved.");
            logger.debug("The status of the tenant '{}' on service '{}' could not be retrieved.", new Object[]{tenantId, this.getName(), e});
        }
        return result;
    }

    private boolean checkTenantReadiness(String tenantId, boolean skipOnOnboarding) throws IOException {
        while (!this.managementClient.checkTenantInstanceReadiness()) {
            logger.info("The messaging bus for service '{}' is not yet ready for the tenant '{}'", (Object)this.getName(), (Object)tenantId);
            if (skipOnOnboarding) {
                return false;
            }
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        logger.debug("The messaging bus for service '{}' is ready for the tenant '{}'", (Object)this.getName(), (Object)tenantId);
        return true;
    }

    private String checkSubdomain(String tenantId) {
        UserInfo userInfo = RequestContext.getCurrent((CdsRuntime)this.runtime).getUserInfo();
        if (!Objects.equals(userInfo.getTenant(), tenantId) || StringUtils.isEmpty((String)((String)userInfo.getAdditionalAttribute("subDomain")))) {
            throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.NO_TENANT_INFO, new Object[0]);
        }
        return (String)userInfo.getAdditionalAttribute("subDomain");
    }

    public MessagingBrokerQueueListener getQueueListener() {
        return this.queueListener;
    }

    @Override
    public void init() {
    }

    @Override
    protected void registerQueueListener(String queue, MessagingBrokerQueueListener listener) throws IOException {
    }

    @Override
    protected void emitTopicMessage(String topic, TopicMessageEventContext messageEventContext) {
        try {
            this.webhookManagementClient.sendMessage(topic, MessagingUtils.toStringMessage((TopicMessageEventContext)messageEventContext));
        }
        catch (IOException e) {
            throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_EMITTING_FAILED, new Object[]{topic, e});
        }
    }
}

