/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.cluster.HaManager;
import org.jetlinks.core.cluster.ServerNode;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

public class RedisHaManager
implements HaManager {
    private static final Logger log = LoggerFactory.getLogger(RedisHaManager.class);
    private final ServerNode current;
    private final String haName;
    private final ClusterTopic<ServerNode> offlineTopic;
    private ClusterManager clusterManager;
    private final Map<String, ServerNode> allNode = new ConcurrentHashMap<String, ServerNode>();
    private final ReactiveRedisOperations<String, ServerNode> operations;
    private final ClusterTopic<ServerNode> keepalive;
    private final ReactiveHashOperations<String, String, ServerNode> inRedisNode;
    private final String allNodeHashKey;
    private final FluxProcessor<ServerNode, ServerNode> onlineProcessor = EmitterProcessor.create((boolean)false);
    private final FluxProcessor<ServerNode, ServerNode> offlineProcessor = EmitterProcessor.create((boolean)false);
    private volatile boolean started = false;

    public RedisHaManager(String name, ServerNode current, ClusterManager clusterManager, ReactiveRedisOperations<String, ServerNode> operations) {
        this.haName = name;
        this.current = current.copy();
        this.current.setUptime(System.currentTimeMillis());
        this.current.setLeader(false);
        this.clusterManager = clusterManager;
        this.operations = operations;
        this.inRedisNode = this.operations.opsForHash();
        this.offlineTopic = clusterManager.getTopic("__ha_offline_topic:".concat(this.haName));
        this.keepalive = clusterManager.getTopic("__ha_keepalive:".concat(this.haName));
        this.allNodeHashKey = "__ha_all_node:".concat(this.haName);
    }

    public void checkAlive() {
        this.current.setLastKeepAlive(System.currentTimeMillis());
        this.inRedisNode.put((Object)this.allNodeHashKey, (Object)this.current.getId(), (Object)this.current).subscribe();
        this.keepalive.publish((Publisher)Mono.just((Object)this.current)).subscribe();
        Map maybeOffline = this.getAllNode().stream().filter(node -> System.currentTimeMillis() - node.getLastKeepAlive() > TimeUnit.SECONDS.toMillis(30L)).filter(node -> !node.isSame(this.current)).collect(Collectors.toMap(ServerNode::getId, Function.identity()));
        ((Mono)this.inRedisNode.keys((Object)this.allNodeHashKey).filter(maybeOffline::containsKey).map(maybeOffline::get).collectList().filter(list -> !list.isEmpty()).flatMapMany(list -> this.inRedisNode.remove((Object)this.allNodeHashKey, list.stream().map(ServerNode::getId).toArray()).thenMany((Publisher)Flux.fromIterable((Iterable)list))).as(arg_0 -> this.offlineTopic.publish(arg_0))).subscribe();
    }

    private void electionLeader() {
        this.allNode.values().stream().peek(serverNode -> serverNode.setLeader(false)).min(Comparator.comparing(ServerNode::getUptime)).ifPresent(serverNode -> serverNode.setLeader(true));
    }

    public void shutdown() {
        this.inRedisNode.remove((Object)this.allNodeHashKey, new Object[]{this.current.getId()}).then(this.offlineTopic.publish((Publisher)Mono.just((Object)this.current))).block();
    }

    public synchronized void startup() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.allNode.put(this.current.getId(), this.current);
        this.inRedisNode.put((Object)this.allNodeHashKey, (Object)this.current.getId(), (Object)this.current).flatMapMany(r -> this.inRedisNode.values((Object)this.allNodeHashKey)).collectList().doOnNext(node -> {
            for (ServerNode serverNode : node) {
                serverNode.setLastKeepAlive(System.currentTimeMillis());
                this.allNode.put(serverNode.getId(), serverNode);
            }
            this.electionLeader();
            Flux.interval((Duration)Duration.ZERO, (Duration)Duration.ofSeconds(5L)).doOnNext(i -> this.checkAlive()).subscribe();
        }).block();
        this.offlineTopic.subscribe().subscribe(serverNode -> {
            if (this.currentServer().isSame(serverNode)) {
                return;
            }
            if (this.allNode.remove(serverNode.getId()) != null) {
                log.debug("[{}]:server node [{}] offline", (Object)this.haName, (Object)serverNode.getId());
                this.inRedisNode.remove((Object)this.allNodeHashKey, new Object[]{serverNode.getId()}).subscribe();
                this.electionLeader();
                if (this.offlineProcessor.hasDownstreams()) {
                    this.offlineProcessor.onNext(serverNode);
                }
            }
        });
        this.keepalive.subscribe().subscribe(serverNode -> {
            if (this.currentServer().isSame(serverNode)) {
                return;
            }
            serverNode.setLastKeepAlive(System.currentTimeMillis());
            this.allNode.compute(serverNode.getId(), (id, node) -> {
                if (node != null) {
                    node.setLastKeepAlive(System.currentTimeMillis());
                    return node;
                }
                return null;
            });
            if (!this.allNode.containsKey(serverNode.getId())) {
                this.allNode.put(serverNode.getId(), (ServerNode)serverNode);
                this.electionLeader();
                log.debug("[{}]:server node [{}] online", (Object)this.haName, (Object)serverNode.getId());
                if (this.onlineProcessor.hasDownstreams()) {
                    this.onlineProcessor.onNext(serverNode);
                }
            }
        });
    }

    public ServerNode currentServer() {
        return this.current;
    }

    public Flux<ServerNode> subscribeServerOnline() {
        return this.onlineProcessor.filter(node -> !node.getId().equals(this.current.getId()));
    }

    public Flux<ServerNode> subscribeServerOffline() {
        return this.offlineProcessor.filter(node -> !node.getId().equals(this.current.getId()));
    }

    public List<ServerNode> getAllNode() {
        return new ArrayList<ServerNode>(this.allNode.values());
    }

    public Disposable doOnReBalance(Consumer<List<ServerNode>> runnable) {
        return () -> {};
    }
}

