/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import lombok.Generated;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterNotifier;
import org.jetlinks.supports.cluster.redis.NotifierMessage;
import org.jetlinks.supports.cluster.redis.NotifierMessageReply;
import org.jetlinks.supports.cluster.redis.NotifyException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisClusterNotifier
implements ClusterNotifier {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedisClusterNotifier.class);
    private String currentServerId;
    private String clusterName;
    private ClusterManager clusterManager;
    private Map<String, EmitterProcessor> replyHandlers = new ConcurrentHashMap<String, EmitterProcessor>();

    public RedisClusterNotifier(String clusterName, String currentServerId, ClusterManager clusterManager) {
        this.currentServerId = currentServerId;
        this.clusterManager = clusterManager;
        this.clusterName = clusterName;
    }

    private String getNotifyTopicKey(String serverName, String address) {
        return this.clusterName.concat("").concat("__notify:").concat(serverName).concat(":").concat(address);
    }

    public void startup() {
        this.clusterManager.getTopic(this.currentServerId.concat(":notify-reply")).subscribe().subscribe(reply -> {
            EmitterProcessor processor = this.replyHandlers.get(reply.getMessageId());
            if (processor != null && !processor.isCancelled()) {
                if (reply.isSuccess()) {
                    if (reply.isComplete()) {
                        processor.onComplete();
                        this.replyHandlers.remove(reply.getMessageId());
                        log.debug("complete notify reply [{}:{}]", (Object)reply.getAddress(), (Object)reply.getMessageId());
                    } else {
                        log.debug("handle notify reply [{}:{}] : {}", new Object[]{reply.getAddress(), reply.getMessageId(), reply.getPayload()});
                        processor.onNext(reply.getPayload());
                    }
                } else {
                    this.replyHandlers.remove(reply.getMessageId());
                    processor.onError((Throwable)new NotifyException(reply.getAddress(), reply.getErrorMessage()));
                }
            } else {
                log.warn("no notify[{}] reply [{}] handler : {}", new Object[]{reply.getAddress(), reply.getMessageId(), reply});
            }
        });
    }

    public Mono<Boolean> sendNotify(String serverNodeId, String address, Publisher<?> payload) {
        return ((Mono)Flux.from(payload).map(data -> NotifierMessage.of(UUID.randomUUID().toString(), this.currentServerId, address, data)).doOnNext(notify -> log.debug("send notify [{}] to [{}] : [{}]", new Object[]{address, serverNodeId, notify})).as(stream -> this.clusterManager.getTopic(this.getNotifyTopicKey(serverNodeId, address)).publish((Publisher)stream))).map(i -> i > 0);
    }

    public <T> Flux<T> sendNotifyAndReceive(String serverNodeId, String address, Publisher<?> payload) {
        String messageId = UUID.randomUUID().toString();
        EmitterProcessor processor = EmitterProcessor.create((boolean)true);
        this.replyHandlers.put(messageId, processor);
        return ((Mono)Flux.from(payload).map(data -> NotifierMessage.of(messageId, this.currentServerId, address, data)).doOnNext(notify -> log.debug("send notify [{}] to [{}] : {}", new Object[]{address, serverNodeId, notify})).as(stream -> this.clusterManager.getTopic(this.getNotifyTopicKey(serverNodeId, address)).publish((Publisher)stream))).flatMap(i -> {
            if (i < 0) {
                return Mono.error((Throwable)new NotifyException(address, "no server handle address notify"));
            }
            return Mono.just((Object)i);
        }).thenMany((Publisher)processor.map(Function.identity())).doOnCancel(() -> log.debug("cancel receive notify [{}] reply [{}]", (Object)address, (Object)messageId)).doFinally(f -> this.replyHandlers.remove(messageId));
    }

    public <T> Flux<T> handleNotify(String address) {
        return this.clusterManager.getTopic(this.getNotifyTopicKey(this.currentServerId, address)).subscribe().map(NotifierMessage::getPayload).map(r -> r);
    }

    public <T, R> Mono<Void> handleNotify(String address, Function<T, Publisher<R>> replyHandler) {
        return this.clusterManager.getTopic(this.getNotifyTopicKey(this.currentServerId, address)).subscribe().flatMap(msg -> {
            String msgId = msg.getMessageId();
            log.debug("handle notify [{}] from [{}]", (Object)address, (Object)msg.getFromServer());
            try {
                return Flux.from((Publisher)((Publisher)replyHandler.apply(msg.getPayload()))).map(res -> NotifierMessageReply.success(address, msgId, res)).doOnError(error -> log.warn("handle notify error", error)).onErrorResume(err -> Mono.just((Object)NotifierMessageReply.fail(address, msgId, err))).switchIfEmpty((Publisher)Mono.just((Object)NotifierMessageReply.success(address, msgId, null))).flatMap(reply -> this.clusterManager.getTopic(msg.getFromServer().concat(":notify-reply")).publish((Publisher)Mono.just((Object)reply))).doOnComplete(() -> this.clusterManager.getTopic(msg.getFromServer().concat(":notify-reply")).publish((Publisher)Mono.just((Object)NotifierMessageReply.complete(address, msgId))).subscribe()).doOnNext(len -> {
                    if (len <= 0) {
                        log.warn("reply notify [{}] to server[{}] fail ", (Object)address, (Object)msg.getFromServer());
                    }
                });
            }
            catch (Exception e) {
                log.warn("handle notify error", (Throwable)e);
                return this.clusterManager.getTopic(msg.getFromServer().concat(":notify-reply")).publish((Publisher)Mono.just((Object)NotifierMessageReply.fail(address, msgId, e))).doOnNext(len -> {
                    if (len <= 0) {
                        log.warn("reply notify [{}] to server[{}] fail ", (Object)address, (Object)msg.getFromServer());
                    }
                });
            }
        }).onErrorContinue((err, val) -> log.error(err.getMessage(), err)).then();
    }
}

