/*
 * Decompiled with CFR 0.152.
 */
package kieker.analysis.plugin.reader.kafka;

import java.util.Arrays;
import java.util.Properties;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.newio.AbstractRawDataReader;
import kieker.common.configuration.Configuration;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

@Plugin(description="A plugin that reads monitoring records from a Kafka topic", outputPorts={@OutputPort(name="monitoringRecords", eventTypes={IMonitoringRecord.class}, description="Output port of the Kafka reader")}, configuration={@Property(name="topicName", defaultValue="kiekerRecords", description="Name of the Kafka topic to read the records from"), @Property(name="bootstrapServers", defaultValue="localhost:9092", description="Bootstrap servers for the Kafka cluster"), @Property(name="groupId", defaultValue="kieker", description="Group ID for the Kafka consumer group"), @Property(name="autoCommit", defaultValue="true", description="Auto-commit the current position?"), @Property(name="autoCommitIntervalMs", defaultValue="1000", description="Auto commit interval in milliseconds"), @Property(name="sessionTimeoutMs", defaultValue="30000", description="Session timeout interval in milliseconds")})
public class KafkaReader
extends AbstractRawDataReader {
    public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords";
    public static final String CONFIG_PROPERTY_DESERIALIZER = "deserializer";
    public static final String CONFIG_PROPERTY_TOPIC_NAME = "topicName";
    public static final String CONFIG_PROPERTY_BOOTSTRAP_SERVERS = "bootstrapServers";
    public static final String CONFIG_PROPERTY_GROUP_ID = "groupId";
    public static final String CONFIG_PROPERTY_AUTO_COMMIT = "autoCommit";
    public static final String CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS = "autoCommitIntervalMs";
    public static final String CONFIG_PROPERTY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
    private final String topicName;
    private final String bootstrapServers;
    private final String groupId;
    private final boolean enableAutoCommit;
    private final int autoCommitIntervalMs;
    private final int sessionTimeoutMs;
    private KafkaConsumer<String, byte[]> consumer;
    private volatile boolean terminated = false;

    public KafkaReader(Configuration configuration, IProjectContext projectContext) {
        super(configuration, projectContext, configuration.getStringProperty(CONFIG_PROPERTY_DESERIALIZER));
        this.topicName = configuration.getStringProperty(CONFIG_PROPERTY_TOPIC_NAME);
        this.bootstrapServers = configuration.getStringProperty(CONFIG_PROPERTY_BOOTSTRAP_SERVERS);
        this.groupId = configuration.getStringProperty(CONFIG_PROPERTY_GROUP_ID);
        this.enableAutoCommit = configuration.getBooleanProperty(CONFIG_PROPERTY_AUTO_COMMIT);
        this.autoCommitIntervalMs = configuration.getIntProperty(CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS);
        this.sessionTimeoutMs = configuration.getIntProperty(CONFIG_PROPERTY_SESSION_TIMEOUT_MS);
    }

    @Override
    public Configuration getCurrentConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setProperty(CONFIG_PROPERTY_TOPIC_NAME, this.topicName);
        configuration.setProperty(CONFIG_PROPERTY_BOOTSTRAP_SERVERS, this.bootstrapServers);
        if (this.groupId != null) {
            configuration.setProperty(CONFIG_PROPERTY_GROUP_ID, this.groupId);
        }
        configuration.setProperty(CONFIG_PROPERTY_AUTO_COMMIT, this.enableAutoCommit);
        configuration.setProperty(CONFIG_PROPERTY_AUTO_COMMIT_INTERVAL_MS, this.autoCommitIntervalMs);
        configuration.setProperty(CONFIG_PROPERTY_SESSION_TIMEOUT_MS, this.sessionTimeoutMs);
        return configuration;
    }

    @Override
    public boolean init() {
        boolean superInitSuccessful = super.init();
        if (!superInitSuccessful) {
            return false;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("group.id", this.groupId);
        properties.put("enable.auto.commit", (Object)this.enableAutoCommit);
        properties.put("auto.commit.interval.ms", (Object)this.autoCommitIntervalMs);
        properties.put("session.timeout.ms", (Object)this.sessionTimeoutMs);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        this.consumer = new KafkaConsumer(properties);
        return true;
    }

    @Override
    public boolean read() {
        this.consumer.subscribe(Arrays.asList(this.topicName));
        try {
            while (!this.terminated) {
                ConsumerRecords records = this.consumer.poll(100L);
                this.processRecords((ConsumerRecords<String, byte[]>)records);
            }
        }
        finally {
            this.consumer.close();
        }
        return true;
    }

    private void processRecords(ConsumerRecords<String, byte[]> records) {
        for (ConsumerRecord record : records) {
            byte[] valueAsBytes = (byte[])record.value();
            this.decodeAndDeliverRecords(valueAsBytes, OUTPUT_PORT_NAME_RECORDS);
        }
    }

    @Override
    public void terminate(boolean error) {
        this.terminated = true;
    }
}

