/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.health;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.config.KafkaHealthConfiguration;
import io.micronaut.configuration.kafka.config.KafkaHealthConfigurationProperties;
import io.micronaut.configuration.kafka.health.DefaultNetworkClientCreator;
import io.micronaut.configuration.kafka.health.NetworkClientCreator;
import io.micronaut.configuration.kafka.reactor.KafkaReactorUtil;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Time;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
@Requirements(value={@Requires(bean=KafkaDefaultConfiguration.class), @Requires(property="kafka.health.enabled", value="true", defaultValue="true")})
public class KafkaHealthIndicator
implements HealthIndicator,
ClusterResourceListener {
    private static final String ID = "kafka";
    private static final String MIN_INSYNC_REPLICAS_PROPERTY = "min.insync.replicas";
    private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor";
    private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor";
    private static final String DETAILS_BROKER_ID = "brokerId";
    private static final String DETAILS_CLUSTER_ID = "clusterId";
    private static final String DETAILS_NODES = "nodes";
    private final Supplier<AdminClient> adminClientSupplier;
    private final KafkaDefaultConfiguration defaultConfiguration;
    private final Supplier<NetworkClient> networkClientSupplier;
    private final KafkaHealthConfiguration kafkaHealthConfiguration;
    private String clusterId;

    @Inject
    public KafkaHealthIndicator(BeanContext beanContext, KafkaDefaultConfiguration defaultConfiguration, NetworkClientCreator networkClientCreator, KafkaHealthConfiguration kafkaHealthConfiguration) {
        this.adminClientSupplier = SupplierUtil.memoized(() -> (AdminClient)beanContext.getBean(AdminClient.class));
        this.defaultConfiguration = defaultConfiguration;
        this.networkClientSupplier = SupplierUtil.memoized(() -> networkClientCreator.create(this));
        this.kafkaHealthConfiguration = kafkaHealthConfiguration;
    }

    @Deprecated(forRemoval=true)
    public KafkaHealthIndicator(AdminClient adminClient, KafkaDefaultConfiguration defaultConfiguration) {
        this.adminClientSupplier = () -> adminClient;
        this.defaultConfiguration = defaultConfiguration;
        this.networkClientSupplier = SupplierUtil.memoized(() -> new DefaultNetworkClientCreator(defaultConfiguration).create(this));
        this.kafkaHealthConfiguration = new KafkaHealthConfigurationProperties();
    }

    public void onUpdate(ClusterResource clusterResource) {
        this.clusterId = Optional.ofNullable(clusterResource).map(ClusterResource::clusterId).orElse(null);
    }

    public static int getClusterReplicationFactor(Config config) {
        ConfigEntry ce = Optional.ofNullable(config.get(REPLICATION_PROPERTY)).orElseGet(() -> config.get(DEFAULT_REPLICATION_PROPERTY));
        return ce != null ? Integer.parseInt(ce.value()) : Integer.MAX_VALUE;
    }

    public static int getMinNodeCount(Config config) {
        return Optional.ofNullable(config.get(MIN_INSYNC_REPLICAS_PROPERTY)).map(ConfigEntry::value).map(Integer::parseInt).orElseGet(() -> KafkaHealthIndicator.getClusterReplicationFactor(config));
    }

    public Flux<HealthResult> getResult() {
        if (this.kafkaHealthConfiguration.isRestricted()) {
            try {
                NetworkClient client = this.networkClientSupplier.get();
                return Flux.just((Object)this.hasReadyNodes(client).orElseGet(() -> this.waitForLeastLoadedNode(client)));
            }
            catch (Exception e) {
                return Flux.just((Object)KafkaHealthIndicator.failure(e, Collections.emptyMap()));
            }
        }
        AdminClient adminClient = this.adminClientSupplier.get();
        DescribeClusterResult result = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int)this.defaultConfiguration.getHealthTimeout().toMillis())));
        Mono clusterId = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).clusterId());
        Mono nodes = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).nodes());
        Mono controller = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeClusterResult)result).controller());
        return controller.flux().switchMap(node -> {
            String brokerId = node.idString();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            DescribeConfigsResult configResult = adminClient.describeConfigs(Collections.singletonList(configResource));
            Mono configs = KafkaReactorUtil.fromKafkaFuture(() -> ((DescribeConfigsResult)configResult).all());
            return configs.flux().switchMap(resources -> {
                Config config = (Config)resources.get(configResource);
                int minNodeCount = KafkaHealthIndicator.getMinNodeCount(config);
                return nodes.flux().switchMap(nodeList -> clusterId.map(clusterIdString -> {
                    int nodeCount = nodeList.size();
                    return KafkaHealthIndicator.getHealthResult(nodeCount >= minNodeCount, clusterIdString, nodeCount, brokerId);
                }));
            });
        }).onErrorResume(throwable -> Mono.just((Object)HealthResult.builder((String)ID, (HealthStatus)HealthStatus.DOWN).exception(throwable).build()));
    }

    private static HealthResult getHealthResult(boolean up, @Nullable String clusterId, @Nullable Integer nodeCount, @Nullable String brokerId) {
        HashMap<String, Object> details = new HashMap<String, Object>();
        if (clusterId != null) {
            details.put(DETAILS_CLUSTER_ID, clusterId);
        }
        if (brokerId != null) {
            details.put(DETAILS_BROKER_ID, brokerId);
        }
        if (nodeCount != null) {
            details.put(DETAILS_NODES, nodeCount);
        }
        return KafkaHealthIndicator.result(up, details).build();
    }

    private static HealthResult getHealthResult(boolean up, @Nullable String clusterId) {
        return KafkaHealthIndicator.getHealthResult(up, clusterId, null, null);
    }

    private static HealthResult.Builder result(boolean up, Map<String, Object> details) {
        return HealthResult.builder((String)ID, (HealthStatus)(up ? HealthStatus.UP : HealthStatus.DOWN)).details(details);
    }

    private static HealthResult success(Map<String, Object> details) {
        return KafkaHealthIndicator.result(true, details).build();
    }

    private static HealthResult failure(Throwable error, Map<String, Object> details) {
        return KafkaHealthIndicator.result(false, details).exception(error).build();
    }

    private Optional<HealthResult> hasReadyNodes(NetworkClient networkClient) {
        return networkClient.hasReadyNodes(Time.SYSTEM.milliseconds()) ? Optional.of(KafkaHealthIndicator.getHealthResult(true, this.clusterId)) : Optional.empty();
    }

    private HealthResult waitForLeastLoadedNode(NetworkClient networkClient) {
        long requestTimeoutMs = this.defaultConfiguration.getHealthTimeout().toMillis();
        Node node = networkClient.leastLoadedNode(Time.SYSTEM.milliseconds());
        try {
            return KafkaHealthIndicator.result(NetworkClientUtils.awaitReady((KafkaClient)networkClient, (Node)node, (Time)Time.SYSTEM, (long)requestTimeoutMs), null).build();
        }
        catch (IOException e) {
            return KafkaHealthIndicator.failure(e, Collections.emptyMap());
        }
    }
}

