/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.events.kafka;

import io.apicurio.common.apps.config.Info;
import io.apicurio.registry.events.EventSink;
import io.apicurio.registry.utils.RegistryProperties;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;

@ApplicationScoped
public class KafkaEventSink
implements EventSink {
    @Inject
    Logger log;
    @Inject
    @RegistryProperties(value={"registry.events.kafka.config"}, empties={"ssl.endpoint.identification.algorithm="})
    Properties producerProperties;
    private ProducerActions<String, byte[]> producer;
    private Integer partition;
    @ConfigProperty(name="registry.events.kafka.topic")
    @Info(category="kafka", description="Events Kafka topic", availableSince="2.0.0.Final")
    Optional<String> eventsTopic;
    @ConfigProperty(name="registry.events.kafka.topic-partition")
    @Info(category="kafka", description="Events Kafka topic partition", availableSince="2.0.0.Final")
    Optional<Integer> eventsTopicPartition;

    @PostConstruct
    void init() {
        this.partition = this.eventsTopicPartition.orElse(null);
    }

    @Override
    public String name() {
        return "Kafka Sink";
    }

    @Override
    public boolean isConfigured() {
        return this.eventsTopic.isPresent();
    }

    @Override
    public void handle(Message<Buffer> message) {
        String type = message.headers().get("type");
        String artifactId = message.headers().get("artifactId");
        this.log.info("Firing event " + type);
        UUID uuid = UUID.randomUUID();
        RecordHeaders headers = new RecordHeaders();
        headers.add("ce_id", uuid.toString().getBytes());
        headers.add("ce_specversion", "1.0".getBytes());
        headers.add("ce_source", "apicurio-registry".getBytes());
        headers.add("ce_type", type.getBytes());
        headers.add("ce_time", Instant.now().toString().getBytes());
        headers.add("content-type", "application/json".getBytes());
        String key = artifactId;
        if (key == null) {
            key = uuid.toString();
        }
        this.getProducer().apply((Object)new ProducerRecord(this.eventsTopic.get(), this.partition, (Object)key, (Object)((Buffer)message.body()).getBytes(), (Iterable)headers));
    }

    public synchronized ProducerActions<String, byte[]> getProducer() {
        if (this.producer == null) {
            this.producer = new AsyncProducer(this.producerProperties, Serdes.String().serializer(), Serdes.ByteArray().serializer());
        }
        return this.producer;
    }
}

