/*
 * Decompiled with CFR 0.152.
 */
package io.github.stavshamir.springwolf.asyncapi;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.stavshamir.springwolf.asyncapi.controller.dtos.MessageDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

@RestController
@RequestMapping(value={"/springwolf/kafka"})
@ConditionalOnProperty(prefix="springwolf.plugin.kafka", name={"publishing.enabled"})
public class SpringwolfKafkaController {
    private static final Logger log = LoggerFactory.getLogger(SpringwolfKafkaController.class);
    private final AsyncApiDocketService asyncApiDocketService;
    private final SpringwolfKafkaProducer producer;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @PostMapping(value={"/publish"})
    public void publish(@RequestParam String topic, @RequestBody MessageDto message) {
        if (!this.producer.isEnabled()) {
            log.debug("Kafka producer is not enabled - message will not be published");
            throw new ResponseStatusException((HttpStatusCode)HttpStatus.NOT_FOUND, "Kafka producer is not enabled");
        }
        String payloadType = message.getPayloadType();
        if (payloadType.startsWith(this.asyncApiDocketService.getAsyncApiDocket().getBasePackage())) {
            try {
                Class<?> payloadClass = Class.forName(payloadType);
                Object payload = this.objectMapper.readValue(message.getPayload(), payloadClass);
                String kafkaKey = message.getBindings() != null ? (String)message.getBindings().get("key") : null;
                log.debug("Publishing to kafka topic {} with key {}: {}", new Object[]{topic, kafkaKey, message});
                this.producer.send(topic, kafkaKey, message.getHeaders(), payload);
            }
            catch (JsonProcessingException | ClassNotFoundException ex) {
                throw new ResponseStatusException((HttpStatusCode)HttpStatus.BAD_REQUEST, MessageFormat.format("Unable to create payload {0} from data: {1}", payloadType, message.getPayload()));
            }
        } else {
            throw new ResponseStatusException((HttpStatusCode)HttpStatus.BAD_REQUEST, "No payloadType specified.");
        }
    }

    public SpringwolfKafkaController(AsyncApiDocketService asyncApiDocketService, SpringwolfKafkaProducer producer) {
        this.asyncApiDocketService = asyncApiDocketService;
        this.producer = producer;
    }
}

