/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.configuration.lettuce.health;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.aggregator.HealthAggregator;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import javax.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
@Requires(classes={HealthIndicator.class})
public class RedisHealthIndicator
implements HealthIndicator {
    public static final Logger LOG = LoggerFactory.getLogger(RedisHealthIndicator.class);
    public static final String NAME = "redis";
    private static final int TIMEOUT_SECONDS = 3;
    private static final int RETRY = 3;
    private final BeanContext beanContext;
    private final HealthAggregator<?> healthAggregator;
    private final RedisClient[] redisClients;
    private final RedisClusterClient[] redisClusterClients;

    public RedisHealthIndicator(BeanContext beanContext, HealthAggregator<?> healthAggregator, RedisClient[] redisClients, RedisClusterClient[] redisClusterClients) {
        this.beanContext = beanContext;
        this.healthAggregator = healthAggregator;
        this.redisClients = redisClients;
        this.redisClusterClients = redisClusterClients;
    }

    public Publisher<HealthResult> getResult() {
        Flux<HealthResult> clientResults = this.getResult(RedisClient.class, RedisClient::connect, StatefulRedisConnection::reactive);
        Flux<HealthResult> clusteredClientResults = this.getResult(RedisClusterClient.class, RedisClusterClient::connect, StatefulRedisClusterConnection::reactive);
        return this.healthAggregator.aggregate(NAME, (Publisher)Flux.concat((Publisher[])new Publisher[]{clientResults, clusteredClientResults}));
    }

    private <T, R extends StatefulConnection<K, V>, K, V> Flux<HealthResult> getResult(Class<T> type, Function<T, R> getConnection, Function<R, BaseRedisReactiveCommands<K, V>> getReactive) {
        Collection registrations = this.beanContext.getActiveBeanRegistrations(type);
        Flux redisClients = Flux.fromIterable((Iterable)registrations);
        return redisClients.flatMap(client -> {
            StatefulConnection connection;
            String connectionName = client.getIdentifier().getName();
            String dbName = "redis(" + connectionName + ")";
            try {
                connection = (StatefulConnection)getConnection.apply(client.getBean());
            }
            catch (Exception e) {
                HealthResult result = HealthResult.builder((String)dbName, (HealthStatus)HealthStatus.DOWN).exception((Throwable)e).build();
                return Flux.just((Object)result);
            }
            Mono pingCommand = ((BaseRedisReactiveCommands)getReactive.apply(connection)).ping();
            pingCommand = pingCommand.timeout(Duration.ofSeconds(3L)).retry(3L);
            return pingCommand.map(s -> {
                if (s.equalsIgnoreCase("pong")) {
                    return HealthResult.builder((String)dbName, (HealthStatus)HealthStatus.UP).build();
                }
                return HealthResult.builder((String)dbName, (HealthStatus)HealthStatus.DOWN).details(Collections.singletonMap("message", "Unexpected response: " + s)).build();
            }).onErrorResume(throwable -> Mono.just((Object)HealthResult.builder((String)dbName, (HealthStatus)HealthStatus.DOWN).exception(throwable).build())).doFinally(f -> {
                try {
                    LOG.trace("Closing connection on signal " + f);
                    connection.close();
                }
                catch (Exception e) {
                    LOG.error("Failed to close connection", (Throwable)e);
                }
            });
        });
    }
}

