/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.source.kafka.util;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;

public class KafkaClient {
    private KafkaClient() {
        throw new IllegalStateException("Class KafkaClient is an utility class !");
    }

    public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup) {
        Properties properties = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
        return KafkaClient.getKafkaConsumer(brokers, consumerGroup, properties);
    }

    public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) {
        Properties props = KafkaClient.constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties);
        KafkaConsumer consumer = new KafkaConsumer(props);
        return consumer;
    }

    private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("group.id", consumerGroup);
        props.put("enable.auto.commit", "false");
        if (properties != null) {
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                props.put(entry.getKey(), entry.getValue());
            }
        }
        return props;
    }

    public static String getKafkaBrokers(KafkaConfig kafkaConfig) {
        String brokers = null;
        for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
            for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) {
                if (brokers == null) {
                    brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort();
                    continue;
                }
                brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort();
            }
        }
        if (StringUtils.isEmpty(brokers)) {
            throw new IllegalArgumentException("No cluster info in Kafka config '" + kafkaConfig.getName() + "'");
        }
        return brokers;
    }

    public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) {
        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(Arrays.asList(topicPartition));
        return consumer.position(topicPartition);
    }

    public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) {
        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToEnd(Arrays.asList(topicPartition));
        return consumer.position(topicPartition);
    }

    public static Map<Integer, Long> getLatestOffsets(CubeInstance cubeInstance) {
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
        String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
        String topic = kafkaConfig.getTopic();
        HashMap startOffsets = Maps.newHashMap();
        try (KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName());){
            List partitionInfos = consumer.partitionsFor(topic);
            for (PartitionInfo partitionInfo : partitionInfos) {
                long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
                startOffsets.put(partitionInfo.partition(), latest);
            }
        }
        return startOffsets;
    }

    public static Map<Integer, Long> getEarliestOffsets(CubeInstance cubeInstance) {
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
        String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
        String topic = kafkaConfig.getTopic();
        HashMap startOffsets = Maps.newHashMap();
        try (KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName());){
            List partitionInfos = consumer.partitionsFor(topic);
            for (PartitionInfo partitionInfo : partitionInfos) {
                long latest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition());
                startOffsets.put(partitionInfo.partition(), latest);
            }
        }
        return startOffsets;
    }
}

