/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.metric;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(value={"application.kafkaHealth.enabled"}, matchIfMissing=false)
public class KafkaHealthIndicator
extends AbstractHealthIndicator {
    private static final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);
    private final KafkaAdmin kafkaAdmin;
    private final Integer connectionTimeout;
    private final String systemTopic;

    public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, @Value(value="${application.kafkaHealth.connectionTimeout:1000}") Integer connectionTimeout, @Value(value="${application.kafka-system-queue:'system_queue'}") String systemTopic) {
        this.kafkaAdmin = kafkaAdmin;
        this.connectionTimeout = connectionTimeout;
        this.systemTopic = systemTopic;
    }

    protected void doHealthCheck(Health.Builder builder) {
        StopWatch executionTime = StopWatch.createStarted();
        DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(this.connectionTimeout);
        try (AdminClient adminClient = AdminClient.create((Map)this.kafkaAdmin.getConfigurationProperties());){
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(List.of(this.systemTopic), describeTopicsOptions);
            Map topicDescriptionMap = (Map)describeTopicsResult.all().get();
            boolean monitoringResult = Objects.nonNull(topicDescriptionMap);
            if (monitoringResult) {
                log.debug("Connection to kafka is {}, time: {}", (Object)monitoringResult, (Object)executionTime.getTime());
                builder.up();
            } else {
                log.error("Connection to kafka is {}, time: {}", (Object)monitoringResult, (Object)executionTime.getTime());
                builder.down();
            }
        }
        catch (Exception e) {
            log.error("Exception when try connect to kafka topic: {}, exception: {}, time: {}", new Object[]{this.systemTopic, e, executionTime.getTime()});
            builder.down((Throwable)e);
        }
    }
}

