/*
 * Decompiled with CFR 0.152.
 */
package com.sap.cds.feature.messaging.kafka.service;

import com.google.common.annotations.VisibleForTesting;
import com.sap.cds.feature.messaging.kafka.client.KafkaClientFactory;
import com.sap.cds.feature.messaging.kafka.client.KafkaMessagingConsumer;
import com.sap.cds.feature.messaging.kafka.client.KafkaMessagingProducer;
import com.sap.cds.feature.messaging.kafka.utils.KafkaServiceBinding;
import com.sap.cds.feature.messaging.kafka.utils.KafkaUtils;
import com.sap.cds.impl.parser.token.Jsonizer;
import com.sap.cds.services.ErrorStatus;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.CloudEventMessageEventContext;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.AbstractMessagingService;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.request.UserInfo;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTopicMessagingService
extends AbstractMessagingService {
    public static final String TENANT_ID_HEADER = "x-sap-cap-tenant-id";
    public static final String MESSAGE_ID_HEADER = "x-sap-cap-message-id";
    public static final String HEADERS_IN_MESSAGE_HEADER = "x-sap-cap-headers-in-message";
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicMessagingService.class);
    private static final String PARTITION = "partition";
    private static final String OBJECT_KEY = "objectKey";
    private static final Set<String> INTERNAL_HEADERS = Set.of("x-sap-cap-effective-topic", "x-sap-cap-tenant-id", "x-sap-cap-message-id", "x-sap-cap-headers-in-message");
    private final KafkaClientFactory kafkaClientFactory;

    KafkaTopicMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, ServiceBinding binding, CdsRuntime runtime, KafkaClientFactory kafkaClientFactory) {
        super(serviceConfig, runtime);
        this.kafkaClientFactory = Objects.nonNull(kafkaClientFactory) ? kafkaClientFactory : KafkaClientFactory.create(serviceConfig.getName(), this.queue.getName(), new KafkaServiceBinding(binding), serviceConfig.getQueue().getConfig(), runtime);
    }

    public KafkaTopicMessagingService(CdsProperties.Messaging.MessagingServiceConfig serviceConfig, ServiceBinding binding, CdsRuntime runtime) {
        this(serviceConfig, binding, runtime, null);
    }

    public void init() {
        this.run(() -> {
            super.init();
            if (!this.queue.getTopics().isEmpty() && !this.runtime.getEnvironment().getCdsProperties().getEnvironment().getCommand().isEnabled().booleanValue()) {
                this.kafkaClientFactory.startConsumer();
            }
        });
    }

    public void stop() {
        try {
            this.kafkaClientFactory.closeProducer();
            if (!this.queue.getTopics().isEmpty()) {
                this.kafkaClientFactory.closeConsumer();
            }
            super.stop();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected void removeQueue(String name) {
    }

    protected void createQueue(String name, Map<String, Object> properties) {
    }

    protected String toFullyQualifiedTopicName(String event, boolean inbound) {
        return event.replace('/', '_');
    }

    protected void createQueueSubscription(String queue, String topic) {
        this.kafkaClientFactory.getOrCreateTopicAdminClient().createTopicIfNotExisting(topic);
        this.consumer().subscribe(topic);
    }

    protected void registerQueueListener(String queue, MessagingBrokerQueueListener listener) {
        this.consumer().setMessageConsumer(listener);
    }

    protected void emitTopicMessage(String topic, TopicMessageEventContext context) {
        this.kafkaClientFactory.getOrCreateTopicAdminClient().createTopicIfNotExisting(topic);
        try {
            String message;
            Integer partition = this.retrievePartition(context);
            String objectKey = this.retrieveObjectKey(context);
            List<Header> headers = this.buildHeaders(context);
            if (context.getDataMap() != null) {
                message = CloudEventUtils.toJson((Map)context.getDataMap());
            } else {
                message = context.getData();
                headers.add(this.createRecordHeader(HEADERS_IN_MESSAGE_HEADER, "true"));
            }
            this.producer().publish(topic, partition, objectKey, message, headers);
        }
        catch (ExecutionException | TimeoutException e) {
            throw new ErrorStatusException((ErrorStatus)CdsErrorStatuses.EVENT_EMITTING_FAILED, new Object[]{topic, e});
        }
        catch (InterruptedException e) {
            logger.error("Error while emitting a message for topic '{}', thread interrupted.", (Object)topic, (Object)e);
            Thread.currentThread().interrupt();
        }
    }

    private Integer retrievePartition(TopicMessageEventContext context) {
        return (Integer)context.get(PARTITION);
    }

    private String retrieveObjectKey(TopicMessageEventContext context) {
        String objectKey = (String)context.get(OBJECT_KEY);
        if (objectKey != null) {
            return objectKey;
        }
        String eventName = (String)context.get("cds.eventName");
        return context.getModel().findEvent(eventName).filter(e -> e.keyElements().findAny().isPresent()).map(event -> {
            CloudEventMessageEventContext cloudEvent = CloudEventUtils.toCloudEventMessageContext((TopicMessageEventContext)context, (String)event.getName());
            if (cloudEvent.getData() != null) {
                Map data = cloudEvent.getData();
                Map keys = event.keyElements().collect(TreeMap::new, (m, k) -> m.put(k.getName(), data.get(k.getName())), TreeMap::putAll);
                return Jsonizer.json((Object)keys);
            }
            return null;
        }).orElse(null);
    }

    private List<Header> buildHeaders(TopicMessageEventContext context) {
        UserInfo userInfo;
        String effectiveTopic;
        ArrayList<Header> headers = new ArrayList<Header>();
        if (context.getHeadersMap() != null) {
            context.getHeadersMap().entrySet().stream().filter(this::isNotInternalHeader).map(this::createRecordHeader).forEach(headers::add);
        }
        if ((effectiveTopic = (String)context.get("x-sap-cap-effective-topic")) != null) {
            headers.add(this.createRecordHeader("x-sap-cap-effective-topic", effectiveTopic));
        }
        if (!StringUtils.isEmpty((String)(userInfo = context.getUserInfo()).getTenant())) {
            headers.add(this.createRecordHeader(TENANT_ID_HEADER, userInfo.getTenant()));
        }
        headers.add(this.createRecordHeader(MESSAGE_ID_HEADER, UUID.randomUUID().toString()));
        return headers;
    }

    private boolean isNotInternalHeader(Map.Entry<String, Object> entry) {
        return !INTERNAL_HEADERS.contains(entry.getKey());
    }

    private Header createRecordHeader(Map.Entry<String, Object> entry) {
        return this.createRecordHeader(entry.getKey(), entry.getValue().toString());
    }

    private Header createRecordHeader(String name, String value) {
        return new RecordHeader(name, KafkaUtils.toBytes(value));
    }

    @VisibleForTesting
    KafkaMessagingProducer producer() {
        return this.kafkaClientFactory.getOrCreateProducer();
    }

    @VisibleForTesting
    KafkaMessagingConsumer consumer() {
        return this.kafkaClientFactory.getOrCreateConsumer();
    }

    private void run(Runnable action) {
        Thread runner = new Thread(action, this.getName() + " - Initializer");
        runner.setDaemon(true);
        runner.start();
    }
}

