/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.kafka.streams.health;

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.configuration.kafka.streams.KafkaStreamsFactory;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Internal;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.aggregator.HealthAggregator;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@Singleton
@Requirements(value={@Requires(classes={HealthIndicator.class}), @Requires(property="kafka.health.streams.enabled", value="true", defaultValue="true")})
public class KafkaStreamsHealth
implements HealthIndicator {
    public static final String ENABLED_PROPERTY = "kafka.health.streams.enabled";
    private static final String NAME = "kafkaStreams";
    private static final String METADATA_PARTITIONS = "partitions";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsHealth.class);
    private final KafkaStreamsFactory kafkaStreamsFactory;
    private final HealthAggregator<?> healthAggregator;

    public KafkaStreamsHealth(KafkaStreamsFactory kafkaStreamsFactory, HealthAggregator<?> healthAggregator) {
        this.kafkaStreamsFactory = kafkaStreamsFactory;
        this.healthAggregator = healthAggregator;
    }

    public Publisher<HealthResult> getResult() {
        Flux kafkaStreamHealth = Flux.fromIterable(this.kafkaStreamsFactory.getStreams().keySet()).map(kafkaStreams -> Pair.of(this.getApplicationId((KafkaStreams)kafkaStreams), kafkaStreams)).flatMap(pair -> Flux.just((Object)pair).filter(p -> ((KafkaStreams)p.getValue()).state().isRunningOrRebalancing()).map(p -> HealthResult.builder((String)((String)p.getKey()), (HealthStatus)HealthStatus.UP).details(this.buildDetails((KafkaStreams)p.getValue()))).switchIfEmpty((Publisher)Flux.create(emitter -> {
            String key = (String)pair.getKey();
            KafkaStreams value = (KafkaStreams)pair.getValue();
            if (value.state().isRunningOrRebalancing()) {
                HealthResult.Builder upResult = HealthResult.builder((String)key, (HealthStatus)HealthStatus.UP).details(this.buildDetails(value));
                emitter.next((Object)upResult);
            } else {
                HealthResult.Builder downResult = HealthResult.builder((String)key, (HealthStatus)HealthStatus.DOWN).details(this.buildDownDetails(value.state(), key));
                emitter.next((Object)downResult);
            }
            emitter.complete();
        })).onErrorResume(e -> Flux.just((Object)HealthResult.builder((String)((String)pair.getKey()), (HealthStatus)HealthStatus.DOWN).details(this.buildDownDetails(e.getMessage(), ((KafkaStreams)pair.getValue()).state(), (String)pair.getKey(), (Throwable)e))))).map(HealthResult.Builder::build);
        return this.healthAggregator.aggregate(NAME, (Publisher)kafkaStreamHealth);
    }

    private Map<String, String> buildDownDetails(KafkaStreams.State state, String streamId) {
        return this.buildDownDetails("Processor appears to be down", state, streamId, null);
    }

    private Map<String, String> buildDownDetails(String message, KafkaStreams.State state, String streamId, Throwable e) {
        if (e != null) {
            LOG.debug("Reporting Kafka health DOWN. Kafka stream [{}] in state [{}] is DOWN. Reason: {}", new Object[]{streamId, state, message});
            LOG.debug(e.getMessage(), e);
        } else {
            LOG.debug("Reporting Kafka health DOWN. Kafka stream [{}] in state [{}] is DOWN. Reason: {}", new Object[]{streamId, state, message});
        }
        HashMap<String, String> details = new HashMap<String, String>();
        details.put("threadState", state.name());
        details.put("error", message);
        return details;
    }

    private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) {
        HashMap<String, Object> streamDetails = new HashMap<String, Object>();
        if (kafkaStreams.state().isRunningOrRebalancing()) {
            for (ThreadMetadata metadata : kafkaStreams.metadataForLocalThreads()) {
                HashMap<String, Object> threadDetails = new HashMap<String, Object>();
                threadDetails.put("threadName", metadata.threadName());
                threadDetails.put("threadState", metadata.threadState());
                threadDetails.put("adminClientId", metadata.adminClientId());
                threadDetails.put("consumerClientId", metadata.consumerClientId());
                threadDetails.put("restoreConsumerClientId", metadata.restoreConsumerClientId());
                threadDetails.put("producerClientIds", metadata.producerClientIds());
                threadDetails.put("activeTasks", KafkaStreamsHealth.taskDetails(metadata.activeTasks()));
                threadDetails.put("standbyTasks", KafkaStreamsHealth.taskDetails(metadata.standbyTasks()));
                streamDetails.put(metadata.threadName(), threadDetails);
            }
        } else {
            streamDetails.put("error", "The processor is down");
        }
        return streamDetails;
    }

    private String getApplicationId(KafkaStreams kafkaStreams) {
        try {
            ConfiguredStreamBuilder configuredStreamBuilder = this.kafkaStreamsFactory.getStreams().get(kafkaStreams);
            if (configuredStreamBuilder != null) {
                Properties configuration = configuredStreamBuilder.getConfiguration();
                return (String)configuration.getOrDefault((Object)"application.id", configuration.getProperty("client.id"));
            }
            return KafkaStreamsHealth.getDefaultStreamName(kafkaStreams);
        }
        catch (Exception e) {
            return KafkaStreamsHealth.getDefaultStreamName(kafkaStreams);
        }
    }

    private static String getDefaultStreamName(KafkaStreams kafkaStreams) {
        return Optional.ofNullable(kafkaStreams).filter(kafkaStreams1 -> kafkaStreams1.state().isRunningOrRebalancing()).map(KafkaStreams::metadataForLocalThreads).map(Collection::stream).flatMap(Stream::findFirst).map(ThreadMetadata::threadName).orElse("unidentified");
    }

    private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadataSet) {
        HashMap<String, Object> details = new HashMap<String, Object>();
        for (TaskMetadata taskMetadata : taskMetadataSet) {
            details.put("taskId", taskMetadata.taskId());
            if (details.containsKey(METADATA_PARTITIONS)) {
                List partitionsInfo = (List)details.get(METADATA_PARTITIONS);
                partitionsInfo.addAll(KafkaStreamsHealth.addPartitionsInfo(taskMetadata));
                continue;
            }
            details.put(METADATA_PARTITIONS, KafkaStreamsHealth.addPartitionsInfo(taskMetadata));
        }
        return details;
    }

    private static List<String> addPartitionsInfo(TaskMetadata metadata) {
        return metadata.topicPartitions().stream().map(p -> "partition=" + p.partition() + ", topic=" + p.topic()).collect(Collectors.toList());
    }

    @Internal
    private static class Pair<K, V> {
        private final K key;
        private final V value;

        public Pair(K key, V value) {
            this.key = key;
            this.value = value;
        }

        public static <K, V> Pair<K, V> of(K key, V value) {
            return new Pair<K, V>(key, value);
        }

        public K getKey() {
            return this.key;
        }

        public V getValue() {
            return this.value;
        }
    }
}

