/*
 * Decompiled with CFR 0.152.
 */
package de.telekom.eni.pandora.horizon.autoconfigure.kafka;

import de.telekom.eni.pandora.horizon.kafka.config.Compression;
import de.telekom.eni.pandora.horizon.kafka.config.KafkaProperties;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.model.meta.HorizonComponentId;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableConfigurationProperties(value={KafkaProperties.class})
public class KafkaAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaAutoConfiguration.class);

    private ProducerFactory<String, String> producerFactory(KafkaProperties kafkaProperties) {
        log.debug("Initialized new ProducerFactory");
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        props.put("acks", kafkaProperties.getAcks());
        props.put("max.request.size", kafkaProperties.getMaxRequestSize());
        Compression compression = kafkaProperties.getCompression();
        log.info("Compression is {}", (Object)(compression.isEnabled() ? "enabled" : "disabled"));
        if (compression.isEnabled()) {
            log.debug("Using compression: " + compression.getType());
            props.put("compression.type", compression.getType());
            props.put("linger.ms", kafkaProperties.getLingerMs());
        }
        return new DefaultKafkaProducerFactory(props);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        log.debug("Initialized new consumer factory");
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
        if (!kafkaProperties.isDisableGroupId()) {
            props.put("group.id", kafkaProperties.getGroupId());
        }
        props.put("allow.auto.create.topics", String.valueOf(kafkaProperties.isAutoCreateTopics()));
        props.put("auto.offset.reset", kafkaProperties.getAutoOffsetReset());
        props.put("isolation.level", kafkaProperties.getIsolationLevel());
        props.put("max.poll.records", kafkaProperties.getMaxPollRecords());
        return new DefaultKafkaConsumerFactory(props, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    @Bean(name={"kafkaTemplate"})
    public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties kafkaProperties, ConsumerFactory<String, String> consumerFactory) {
        log.debug("Initialized new kafka template");
        KafkaTemplate kafkaTemplate = new KafkaTemplate(this.producerFactory(kafkaProperties));
        kafkaTemplate.setConsumerFactory(consumerFactory);
        return kafkaTemplate;
    }

    @Bean
    public EventWriter eventWriter(@Qualifier(value="kafkaTemplate") KafkaTemplate<String, String> kafkaTemplate, KafkaProperties kafkaProperties) {
        return new EventWriter(kafkaTemplate, HorizonComponentId.fromGroupId((String)kafkaProperties.getGroupId()));
    }
}

