/*
 * Decompiled with CFR 0.152.
 */
package com.izettle.metrics.influxdb;

import com.izettle.metrics.influxdb.InfluxDbBaseSender;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class InfluxDBKafkaSender
extends InfluxDbBaseSender {
    private static final String KAFKA_CLIENT_ID = "metrics_influxdb_reporter";
    private final KafkaProducer<byte[], byte[]> kafkaProducer;
    private final String topic;

    public InfluxDBKafkaSender(String database, TimeUnit timePrecision, String measurementPrefix) {
        super(database, timePrecision, measurementPrefix);
        int idx = database.indexOf("@");
        if (idx == -1) {
            throw new IllegalArgumentException("invalid database format: " + database + ", expected: topic@host1,host2...");
        }
        this.topic = database.substring(0, idx);
        String hosts = database.substring(idx + 1);
        Properties props = new Properties();
        props.put("bootstrap.servers", hosts);
        props.put("client.id", KAFKA_CLIENT_ID);
        props.put("key.serializer", ByteArraySerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        this.kafkaProducer = new KafkaProducer(props);
    }

    @Override
    protected int writeData(byte[] line) throws Exception {
        ProducerRecord record = new ProducerRecord(this.topic, null, (Object)line);
        this.kafkaProducer.send(record);
        return 0;
    }
}

