/*
 * Decompiled with CFR 0.152.
 */
package kieker.monitoring.writer.kafka;

import java.nio.ByteBuffer;
import java.util.Properties;
import kieker.common.configuration.Configuration;
import kieker.common.exception.InvalidConfigurationException;
import kieker.monitoring.writer.raw.IRawDataWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaWriter
implements IRawDataWriter {
    private static final String DEFAULT_ACKS = "all";
    private static final int DEFAULT_BATCH_SIZE = 16384;
    private static final int DEFAULT_LINGER_MS = 1;
    private static final int DEFAULT_BUFFER_MEMORY = 0x2000000;
    private static final String PREFIX = KafkaWriter.class.getName() + ".";
    public static final String CONFIG_PROPERTY_ACKS = PREFIX + "acks";
    public static final String CONFIG_PROPERTY_BATCH_SIZE = PREFIX + "batchSize";
    public static final String CONFIG_PROPERTY_BOOTSTRAP_SERVERS = PREFIX + "bootstrapServers";
    public static final String CONFIG_PROPERTY_BUFFER_MEMORY = PREFIX + "bufferMemory";
    public static final String CONFIG_PROPERTY_LINGER_MS = PREFIX + "lingerMs";
    public static final String CONFIG_PROPERTY_TOPIC_NAME = PREFIX + "topicName";
    private final String bootstrapServers;
    private final String topicName;
    private final String acknowledges;
    private final int lingerMs;
    private final int batchSize;
    private final int bufferMemory;
    private Producer<String, byte[]> producer;

    public KafkaWriter(Configuration configuration) {
        this.bootstrapServers = configuration.getStringProperty(CONFIG_PROPERTY_BOOTSTRAP_SERVERS);
        this.topicName = configuration.getStringProperty(CONFIG_PROPERTY_TOPIC_NAME);
        this.acknowledges = configuration.getStringProperty(CONFIG_PROPERTY_ACKS, DEFAULT_ACKS);
        this.lingerMs = configuration.getIntProperty(CONFIG_PROPERTY_LINGER_MS, 1);
        this.batchSize = configuration.getIntProperty(CONFIG_PROPERTY_BATCH_SIZE, 16384);
        this.bufferMemory = configuration.getIntProperty(CONFIG_PROPERTY_BUFFER_MEMORY, 0x2000000);
        this.checkConfiguration();
    }

    private void checkConfiguration() {
        if (this.bootstrapServers.isEmpty()) {
            throw new InvalidConfigurationException("At least one bootstrap server must be provided.");
        }
        if (this.topicName.isEmpty()) {
            throw new InvalidConfigurationException("A topic name must be provided.");
        }
    }

    @Override
    public void onInitialization() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("acks", this.acknowledges);
        properties.put("batch.size", (Object)this.batchSize);
        properties.put("linger.ms", (Object)this.lingerMs);
        properties.put("buffer.memory", (Object)this.bufferMemory);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = new KafkaProducer(properties);
    }

    @Override
    public void onTermination() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    @Override
    public void writeData(ByteBuffer buffer, int offset, int length) {
        buffer.position(offset);
        byte[] rawDataAsBytes = new byte[length];
        buffer.get(rawDataAsBytes);
        ProducerRecord record = new ProducerRecord(this.topicName, (Object)rawDataAsBytes);
        this.producer.send(record);
    }
}

