/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.AbstractNodeSelection;
import io.lettuce.core.cluster.AsyncClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterDistributionChannelWriter;
import io.lettuce.core.cluster.NodeSelectionInvocationHandler;
import io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl;
import io.lettuce.core.cluster.api.NodeSelectionSupport;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands;
import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection;
import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;

class RedisClusterPubSubReactiveCommandsImpl<K, V>
extends RedisPubSubReactiveCommandsImpl<K, V>
implements RedisClusterPubSubReactiveCommands<K, V> {
    public RedisClusterPubSubReactiveCommandsImpl(StatefulRedisClusterPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

    @Override
    public StatefulRedisClusterPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterPubSubConnection)super.getStatefulConnection();
    }

    @Override
    public PubSubReactiveNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate) {
        StaticPubSubReactiveNodeSelection selection = new StaticPubSubReactiveNodeSelection(this.getStatefulConnection(), predicate);
        NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler(selection, RedisPubSubReactiveCommands.class, NodeSelectionInvocationHandler.ExecutionModel.REACTIVE);
        return (PubSubReactiveNodeSelection)Proxy.newProxyInstance(NodeSelectionSupport.class.getClassLoader(), new Class[]{NodeSelectionPubSubReactiveCommands.class, PubSubReactiveNodeSelection.class}, (InvocationHandler)h);
    }

    private static class StaticPubSubReactiveNodeSelection<K, V>
    extends AbstractNodeSelection<RedisPubSubReactiveCommands<K, V>, NodeSelectionPubSubReactiveCommands<K, V>, K, V>
    implements PubSubReactiveNodeSelection<K, V> {
        private final List<RedisClusterNode> redisClusterNodes;
        private final ClusterDistributionChannelWriter writer;

        public StaticPubSubReactiveNodeSelection(StatefulRedisClusterPubSubConnection<K, V> globalConnection, Predicate<RedisClusterNode> selector) {
            this.redisClusterNodes = globalConnection.getPartitions().getPartitions().stream().filter(selector).collect(Collectors.toList());
            this.writer = ((StatefulRedisClusterPubSubConnectionImpl)globalConnection).getClusterDistributionChannelWriter();
        }

        @Override
        protected CompletableFuture<RedisPubSubReactiveCommands<K, V>> getApi(RedisClusterNode redisClusterNode) {
            return this.getConnection(redisClusterNode).thenApply(StatefulRedisPubSubConnection::reactive);
        }

        @Override
        protected List<RedisClusterNode> nodes() {
            return this.redisClusterNodes;
        }

        @Override
        protected CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnection(RedisClusterNode redisClusterNode) {
            RedisURI uri = redisClusterNode.getUri();
            AsyncClusterConnectionProvider async = (AsyncClusterConnectionProvider)((Object)this.writer.getClusterConnectionProvider());
            return async.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, uri.getHost(), uri.getPort()).thenApply(it -> (StatefulRedisPubSubConnection)it);
        }
    }
}

