/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterCounter;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterNotifier;
import org.jetlinks.core.cluster.ClusterQueue;
import org.jetlinks.core.cluster.ClusterSet;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.cluster.HaManager;
import org.jetlinks.core.cluster.ServerNode;
import org.jetlinks.supports.cluster.redis.RedisClusterCache;
import org.jetlinks.supports.cluster.redis.RedisClusterCounter;
import org.jetlinks.supports.cluster.redis.RedisClusterNotifier;
import org.jetlinks.supports.cluster.redis.RedisClusterQueue;
import org.jetlinks.supports.cluster.redis.RedisClusterSet;
import org.jetlinks.supports.cluster.redis.RedisClusterTopic;
import org.jetlinks.supports.cluster.redis.RedisHaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import reactor.core.publisher.Flux;

public class RedisClusterManager
implements ClusterManager {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterManager.class);
    private String clusterName;
    private String serverId;
    private Map<String, RedisClusterQueue> queues = new ConcurrentHashMap<String, RedisClusterQueue>();
    private Map<String, ClusterTopic> topics = new ConcurrentHashMap<String, ClusterTopic>();
    private Map<String, ClusterCache> caches = new ConcurrentHashMap<String, ClusterCache>();
    private Map<String, ClusterSet> sets = new ConcurrentHashMap<String, ClusterSet>();
    private ReactiveRedisTemplate<?, ?> commonOperations;
    private RedisHaManager haManager;
    private RedisClusterNotifier notifier;
    private ReactiveRedisOperations<String, String> stringOperations;
    private ReactiveRedisTemplate<String, ?> queueRedisTemplate;

    public RedisClusterManager(String name, ServerNode serverNode, ReactiveRedisTemplate<?, ?> operations) {
        this.clusterName = name;
        this.commonOperations = operations;
        this.notifier = new RedisClusterNotifier(name, serverNode.getId(), this);
        this.serverId = serverNode.getId();
        this.haManager = new RedisHaManager(name, serverNode, this, (ReactiveRedisOperations<String, ServerNode>)operations);
        this.stringOperations = new ReactiveRedisTemplate(operations.getConnectionFactory(), RedisSerializationContext.string());
        this.queueRedisTemplate = new ReactiveRedisTemplate(operations.getConnectionFactory(), RedisSerializationContext.newSerializationContext().key(RedisSerializer.string()).value(operations.getSerializationContext().getValueSerializationPair()).hashKey(RedisSerializer.string()).hashValue(operations.getSerializationContext().getHashValueSerializationPair()).build());
    }

    public RedisClusterManager(String name, String serverId, ReactiveRedisTemplate<?, ?> operations) {
        this(name, ServerNode.builder().id(serverId).build(), operations);
    }

    public String getCurrentServerId() {
        return this.serverId;
    }

    public void startup() {
        this.notifier.startup();
        this.haManager.startup();
        Flux.interval((Duration)Duration.ofSeconds(5L)).flatMap(i -> Flux.fromIterable(this.queues.values())).subscribe(RedisClusterQueue::tryPoll);
        this.queueRedisTemplate.listenToPattern(new String[]{"queue:data:produced"}).doOnError(err -> log.error(err.getMessage(), err)).subscribe(sub -> {
            RedisClusterQueue queue = this.queues.get(sub.getMessage());
            if (queue != null) {
                queue.tryPoll();
            }
        });
    }

    public void shutdown() {
        this.haManager.shutdown();
    }

    public HaManager getHaManager() {
        return this.haManager;
    }

    protected <K, V> ReactiveRedisTemplate<K, V> getRedis() {
        return this.commonOperations;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public ClusterNotifier getNotifier() {
        return this.notifier;
    }

    public <T> ClusterQueue<T> getQueue(String queueId) {
        return this.queues.computeIfAbsent(queueId, id -> new RedisClusterQueue((String)id, this.queueRedisTemplate));
    }

    public <T> ClusterTopic<T> getTopic(String topic) {
        return this.topics.computeIfAbsent(topic, id -> new RedisClusterTopic((String)id, this.getRedis()));
    }

    public <K, V> ClusterCache<K, V> getCache(String cache) {
        return this.caches.computeIfAbsent(cache, id -> new RedisClusterCache((String)id, (ReactiveRedisOperations<Object, Object>)this.getRedis()));
    }

    public <V> ClusterSet<V> getSet(String name) {
        return this.sets.computeIfAbsent(name, id -> new RedisClusterSet((String)id, (ReactiveRedisOperations<Object, Object>)this.getRedis()));
    }

    public ClusterCounter getCounter(String name) {
        return new RedisClusterCounter(this.stringOperations, this.clusterName + ":counter:" + name);
    }
}

