/*
 * Decompiled with CFR 0.152.
 */
package net.osomahe.esk.eventstore.control;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.inject.Inject;
import net.osomahe.esk.config.boundary.ConfigurationBoundary;
import net.osomahe.esk.eventstore.entity.EventStoreEvent;
import net.osomahe.esk.eventstore.entity.TopicName;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

@Singleton
@ConcurrencyManagement(value=ConcurrencyManagementType.BEAN)
public class TopicService {
    private final Map<String, Integer> mapPartitionCount = new ConcurrentHashMap<String, Integer>();
    private static final String DEFAULT_TOPIC = "eventstore";
    @Inject
    private ConfigurationBoundary config;

    public String getTopicName(Class<? extends EventStoreEvent> eventClass) {
        TopicName topicName = eventClass.getAnnotation(TopicName.class);
        if (topicName == null) {
            return DEFAULT_TOPIC;
        }
        return topicName.value();
    }

    public int getPartitionCount(Class<? extends EventStoreEvent> eventClass) {
        return this.getPartitionCount(this.getTopicName(eventClass));
    }

    public synchronized int getPartitionCount(String topicName) {
        if (this.mapPartitionCount.containsKey(topicName)) {
            return this.mapPartitionCount.get(topicName);
        }
        Integer count = this.loadPartitionCount(topicName);
        this.mapPartitionCount.put(topicName, count);
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Integer loadPartitionCount(String topicName) {
        Properties props = this.config.getKafkaConsumerConfig();
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer consumer = new KafkaConsumer(props);
        try {
            Integer n = consumer.partitionsFor(topicName).size();
            return n;
        }
        finally {
            consumer.close(10L, TimeUnit.SECONDS);
        }
    }
}

