/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.sample;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.exception.code.ErrorCodeProducer;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.kafka.util.KafkaUtils;
import org.apache.kylin.loader.ParserClassLoaderState;
import org.apache.kylin.metadata.jar.JarInfo;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.parser.AbstractDataParser;
import org.apache.kylin.sample.StreamingSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceHandler
implements StreamingSourceHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceHandler.class);
    private static final String DEFAULT_CONSUMER_GROUP = "sample";
    private static final String DEFAULT_TOPIC = "__consumer_offsets";
    public static final String DEFAULT_PARSER = "org.apache.kylin.parser.TimedJsonStreamParser";
    private static final String COL_PATTERN = "^(?!\\d+|_)([0-9a-zA-Z_]{1,}$)";
    private static final String UUID_PATTERN = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
    private static final int SAMPLE_MSG_COUNT = 10;
    private static final Long CONSUMER_LIST_TOPICS_TIMEOUT = 30000L;
    private static final int CLIENT_LIST_TOPICS_TIMEOUT = 5000;

    @Override
    public Map<String, List<String>> getTopics(KafkaConfig kafkaConfig, String fuzzyTopic) {
        int index = 0;
        ++index;
        ArrayList topics = Lists.newArrayList();
        try (Consumer<String, ByteBuffer> consumer = KafkaUtils.getKafkaConsumer(kafkaConfig.getKafkaBootstrapServers(), DEFAULT_CONSUMER_GROUP);){
            HashMap topicsMap = Maps.newHashMap();
            try {
                topicsMap.putAll(consumer.listTopics(Duration.ofMillis(CONSUMER_LIST_TOPICS_TIMEOUT)));
            }
            catch (TimeoutException e) {
                throw new KylinException((ErrorCodeSupplier)ServerErrorCode.BROKER_TIMEOUT_MESSAGE, MsgPicker.getMsg().getBrokerTimeoutMessage());
            }
            boolean isEmptyFuzzy = StringUtils.isEmpty((CharSequence)fuzzyTopic);
            topicsMap.keySet().stream().filter(this::isUsefulTopic).filter(topic -> isEmptyFuzzy || StringUtils.containsIgnoreCase((CharSequence)topic.toLowerCase(Locale.ROOT), (CharSequence)fuzzyTopic)).forEach(topics::add);
        }
        Collections.sort(topics);
        TreeMap clusterTopics = Maps.newTreeMap();
        clusterTopics.put("kafka-cluster-" + index, topics);
        return clusterTopics;
    }

    @Override
    public List<String> getBrokenBrokers(KafkaConfig kafkaConfig) {
        ArrayList<String> failList = new ArrayList<String>();
        ArrayList clientList = new ArrayList();
        HashMap<String, ListTopicsResult> futureMap = new HashMap<String, ListTopicsResult>();
        Arrays.stream(kafkaConfig.getKafkaBootstrapServers().split(",")).forEach(broker -> {
            AdminClient kafkaAdminClient = KafkaUtils.getKafkaAdminClient(broker, DEFAULT_CONSUMER_GROUP);
            ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf(5000)));
            futureMap.put((String)broker, listTopicsResult);
            clientList.add(kafkaAdminClient);
        });
        futureMap.forEach((broker, result) -> {
            try {
                result.names().get();
            }
            catch (ExecutionException | TimeoutException e) {
                failList.add((String)broker);
                log.warn("Broker [{}] cannot be connected, marked as failed", broker);
            }
            catch (InterruptedException e) {
                log.error("The current thread is interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        clientList.forEach(Admin::close);
        return failList;
    }

    private boolean isUsefulTopic(String topic) {
        Pattern pattern = Pattern.compile(UUID_PATTERN);
        if (pattern.matcher(topic).matches()) {
            return false;
        }
        return !StringUtils.equals((CharSequence)topic, (CharSequence)DEFAULT_TOPIC);
    }

    @Override
    public List<ByteBuffer> getMessages(KafkaConfig kafkaConfig) {
        log.info("Start to get sample messages from Kafka.");
        long pollMsgTimeout = KylinConfig.getInstanceFromEnv().getKafkaPollMessageTimeout();
        String topic = kafkaConfig.getSubscribe();
        String brokers = kafkaConfig.getKafkaBootstrapServers();
        long startTime = System.currentTimeMillis();
        log.info("Trying to get messages from brokers: {}", (Object)brokers);
        ArrayList samples = Lists.newArrayList();
        try (Consumer<String, ByteBuffer> consumer = KafkaUtils.getKafkaConsumer(brokers, DEFAULT_CONSUMER_GROUP);){
            List partitionInfos = consumer.partitionsFor(topic);
            if (CollectionUtils.isEmpty((Collection)partitionInfos)) {
                log.warn("There are no partitions in topic: {}", (Object)topic);
                ArrayList arrayList = samples;
                return arrayList;
            }
            List partitions = partitionInfos.stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList());
            Map beginningOffsets = consumer.beginningOffsets(partitions);
            for (TopicPartition partition : partitions) {
                if (samples.size() >= 10) {
                    break;
                }
                Long beginOffset = (Long)beginningOffsets.get(partition);
                this.pollMsg(partition, beginOffset, consumer, topic, pollMsgTimeout, samples);
            }
        }
        log.info("Get sample message size is: {}, cost: {}ms", (Object)samples.size(), (Object)(System.currentTimeMillis() - startTime));
        return samples;
    }

    private void pollMsg(TopicPartition partition, Long beginOffset, Consumer<String, ByteBuffer> consumer, String topic, Long pollMsgTimeout, List<ByteBuffer> samples) {
        List<TopicPartition> partitionList = Collections.singletonList(partition);
        consumer.assign(partitionList);
        consumer.seekToEnd(partitionList);
        long maxOffset = consumer.position(partition);
        long count = maxOffset - beginOffset;
        if (count <= 0L) {
            return;
        }
        consumer.seek(partition, count < 10L ? beginOffset : maxOffset - 10L);
        log.info("Ready to poll messages. Topic: {}, Partition: {}, Partition beginning offset: {}, Offset: {}", new Object[]{topic, partition.partition(), beginOffset, maxOffset});
        ConsumerRecords records = consumer.poll(Duration.ofMillis(pollMsgTimeout));
        if (records.isEmpty()) {
            return;
        }
        for (ConsumerRecord record : records) {
            if (samples.size() >= 10) break;
            samples.add((ByteBuffer)record.value());
        }
    }

    @Override
    public Map<String, Object> parserMessage(KafkaConfig kafkaConfig, String msg) {
        Map result;
        String parserName = kafkaConfig.getParserName();
        String project = kafkaConfig.getProject();
        String topic = kafkaConfig.getSubscribe();
        try {
            ParserClassLoaderState loaderState = ParserClassLoaderState.getInstance(project);
            this.checkParserRegister(parserName, project, loaderState);
            result = AbstractDataParser.getDataParser((String)parserName, (ClassLoader)loaderState.getClassLoader()).process((Object)StandardCharsets.UTF_8.encode(msg));
        }
        catch (Exception e) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.STREAMING_PARSE_MESSAGE_ERROR, (Throwable)e, new Object[]{parserName, topic});
        }
        KafkaSourceHandler.checkColName(result);
        return result;
    }

    public void checkParserRegister(String parserName, String project, ParserClassLoaderState loaderState) {
        if (StringUtils.equals((CharSequence)DEFAULT_PARSER, (CharSequence)parserName)) {
            return;
        }
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        DataParserInfo parserInfo = DataParserManager.getInstance((KylinConfig)config, (String)project).getDataParserInfo(parserName);
        JarInfo jarInfo = JarInfoManager.getInstance((KylinConfig)config, (String)project).getJarInfo(JarTypeEnum.STREAMING_CUSTOM_PARSER, parserInfo.getJarName());
        if (loaderState.getLoadedJars().contains(jarInfo.getJarPath())) {
            return;
        }
        loaderState.registerJars(Sets.newHashSet((Object[])new String[]{jarInfo.getJarPath()}));
    }

    private static void checkColName(Map<String, Object> inputParserMap) {
        Pattern pattern = Pattern.compile(COL_PATTERN);
        for (String colName : inputParserMap.keySet()) {
            if (pattern.matcher(colName).matches()) continue;
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.CUSTOM_PARSER_CHECK_COLUMN_NAME_FAILED, new Object[0]);
        }
    }
}

