/*
 * Decompiled with CFR 0.152.
 */
package io.streamzi.openshift.dataflow.container.kafka;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.streamzi.cloudevents.CloudEvent;
import io.streamzi.openshift.dataflow.container.CloudEventOutput;
import io.streamzi.openshift.dataflow.container.config.EnvironmentResolver;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaCloudEventOutputImpl
extends CloudEventOutput {
    private ObjectMapper mapper;
    private static final Logger logger = Logger.getLogger(KafkaCloudEventOutputImpl.class.getName());
    private String bootstrapServers = EnvironmentResolver.get("STREAMZI_KAFKA_BOOTSTRAP_SERVER");
    private volatile boolean connected = false;
    private String topicName;
    private Producer<String, String> producer = null;

    public KafkaCloudEventOutputImpl(Object producerObject, String outputName) {
        super(producerObject, outputName);
        this.topicName = EnvironmentResolver.get(outputName);
        this.mapper = new ObjectMapper();
        this.mapper.registerModule((Module)new Jdk8Module());
        this.mapper.registerModule((Module)new JavaTimeModule());
        this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        this.mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    }

    public void send(CloudEvent event) {
        this.send(null, event);
    }

    public void send(String key, CloudEvent event) {
        if (this.connected) {
            try {
                String json = this.mapper.writeValueAsString((Object)event);
                ProducerRecord record = new ProducerRecord(this.topicName, (Object)key, (Object)json);
                this.producer.send(record);
            }
            catch (Exception e) {
                logger.log(Level.SEVERE, "Error sending event to Kafka: " + e.getMessage(), e);
            }
        } else {
            logger.log(Level.WARNING, "Producer not connected");
        }
    }

    @Override
    public void startOutput() {
        if (this.producer == null) {
            logger.info("Trying to connect to Kafka");
            try {
                this.producer = this.createProducer();
                this.connected = true;
                logger.info("Connected to Kafka");
            }
            catch (Exception e) {
                logger.warning("Cannot connect to Kafka: " + e.getMessage());
            }
        }
    }

    @Override
    public void stopOutput() {
        this.connected = false;
        if (this.producer != null) {
            this.producer.close();
        }
    }

    private Producer<String, String> createProducer() throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("client.id", this.processorUuid);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        return new KafkaProducer(props);
    }
}

