/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.health;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.reactor.KafkaReactorUtil;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
@Requirements(value={@Requires(beans={AdminClient.class}), @Requires(property="kafka.health.enabled", value="true", defaultValue="true")})
public class KafkaHealthIndicator
implements HealthIndicator {
    private static final String ID = "kafka";
    private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor";
    private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor";
    private final AdminClient adminClient;
    private final KafkaDefaultConfiguration defaultConfiguration;

    public KafkaHealthIndicator(AdminClient adminClient, KafkaDefaultConfiguration defaultConfiguration) {
        this.adminClient = adminClient;
        this.defaultConfiguration = defaultConfiguration;
    }

    public static int getClusterReplicationFactor(Config config) {
        ConfigEntry ce = Optional.ofNullable(config.get(REPLICATION_PROPERTY)).orElseGet(() -> config.get(DEFAULT_REPLICATION_PROPERTY));
        return ce != null ? Integer.parseInt(ce.value()) : Integer.MAX_VALUE;
    }

    public Flux<HealthResult> getResult() {
        DescribeClusterResult result = this.adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int)this.defaultConfiguration.getHealthTimeout().toMillis())));
        Mono clusterId = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).clusterId());
        Mono nodes = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).nodes());
        Mono controller = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).controller());
        return controller.flux().switchMap(node -> {
            String brokerId = node.idString();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            DescribeConfigsResult configResult = this.adminClient.describeConfigs(Collections.singletonList(configResource));
            Mono configs = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeConfigsResult)configResult).all());
            return configs.flux().switchMap(resources -> {
                Config config = (Config)resources.get(configResource);
                int replicationFactor = KafkaHealthIndicator.getClusterReplicationFactor(config);
                return nodes.flux().switchMap(nodeList -> clusterId.map(clusterIdString -> {
                    int nodeCount = nodeList.size();
                    HealthResult.Builder builder = nodeCount >= replicationFactor ? HealthResult.builder((String)ID, (HealthStatus)HealthStatus.UP) : HealthResult.builder((String)ID, (HealthStatus)HealthStatus.DOWN);
                    return builder.details((Object)CollectionUtils.mapOf((Object[])new Object[]{"brokerId", brokerId, "clusterId", clusterIdString, "nodes", nodeCount})).build();
                }));
            });
        }).onErrorResume(throwable -> Mono.just((Object)HealthResult.builder((String)ID, (HealthStatus)HealthStatus.DOWN).exception(throwable).build()));
    }
}

