/*
 * Decompiled with CFR 0.152.
 */
package io.smartcat.cassandra.diagnostics.reporter;

import io.smartcat.cassandra.diagnostics.GlobalConfiguration;
import io.smartcat.cassandra.diagnostics.Measurement;
import io.smartcat.cassandra.diagnostics.reporter.Reporter;
import io.smartcat.cassandra.diagnostics.reporter.ReporterConfiguration;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaReporter
extends Reporter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaReporter.class);
    private static final String SERVERS_PROP = "kafkaBootstrapServers";
    private static final String TOPIC_PROP = "kafkaTopic";
    private static Producer<String, String> producer;
    private String partitionKey;
    private String topic;

    public KafkaReporter(ReporterConfiguration configuration, GlobalConfiguration globalConfiguration) {
        super(configuration, globalConfiguration);
        String servers = (String)configuration.getDefaultOption(SERVERS_PROP, (Object)"");
        if (servers.isEmpty()) {
            logger.warn("Missing required property kafkaBootstrapServers. Aborting initialization.");
            return;
        }
        this.topic = (String)configuration.getDefaultOption(TOPIC_PROP, (Object)"");
        if (this.topic.isEmpty()) {
            logger.warn("Missing required property kafkaTopic. Aborting initialization.");
            return;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        producer = new KafkaProducer(properties);
        this.partitionKey = globalConfiguration.systemName + "_" + globalConfiguration.hostname;
    }

    public void report(Measurement measurement) {
        if (producer == null) {
            logger.warn("Kafka producer is not initialized.");
            return;
        }
        producer.send(new ProducerRecord(this.topic, (Object)this.partitionKey, (Object)measurement.toJson()));
    }

    public void stop() {
        if (producer != null) {
            producer.close();
        }
    }
}

