/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.event;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedisClusterEventBroker
implements EventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterEventBroker.class);
    private final ReactiveRedisOperations<String, byte[]> operations;
    private final String id;
    private final EmitterProcessor<EventConnection> processor = EmitterProcessor.create((boolean)false);
    private final Map<String, ClusterConnecting> connections = new ConcurrentHashMap<String, ClusterConnecting>();
    private final FluxSink<EventConnection> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final ClusterManager clusterManager;
    private boolean started = false;
    private final Codec<Subscription> subscriptionCodec = Codecs.lookup(Subscription.class);
    private final Codec<TopicPayload> topicPayloadCodec = Codecs.lookup(TopicPayload.class);

    public RedisClusterEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory factory) {
        this.id = clusterManager.getClusterName();
        this.operations = new ReactiveRedisTemplate(factory, RedisSerializationContext.newSerializationContext().key(RedisSerializer.string()).hashKey(RedisSerializer.string()).value(RedisSerializer.byteArray()).hashValue(RedisSerializer.byteArray()).build());
        this.clusterManager = clusterManager;
        this.startup();
    }

    public void startup() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.clusterManager.getHaManager().getAllNode().forEach(node -> {
            if (!node.getId().equals(this.clusterManager.getCurrentServerId())) {
                this.handleRemoteConnection(this.clusterManager.getCurrentServerId(), node.getId());
            }
        });
        this.clusterManager.getHaManager().subscribeServerOnline().subscribe(node -> this.handleRemoteConnection(this.clusterManager.getCurrentServerId(), node.getId()));
    }

    public void shutdown() {
        for (ClusterConnecting value : this.connections.values()) {
            value.disposable.dispose();
        }
    }

    private void handleRemoteConnection(String localId, String id) {
        this.connections.computeIfAbsent(id, _id -> {
            log.debug("handle redis connection:{}", (Object)id);
            ClusterConnecting connection = new ClusterConnecting(localId, (String)_id);
            this.sink.next((Object)connection);
            return connection;
        });
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public Flux<EventConnection> accept() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.connections.values()), this.processor}).distinct();
    }

    class ClusterConnecting
    implements EventProducer,
    EventConsumer {
        private final String brokerId;
        private final String localId;
        private final EmitterProcessor<TopicPayload> processor = EmitterProcessor.create((boolean)false);
        private final FluxSink<TopicPayload> input = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
        FluxSink<TopicPayload> output;
        EmitterProcessor<Subscription> subProcessor = EmitterProcessor.create((boolean)false);
        FluxSink<Subscription> subSink = this.subProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        EmitterProcessor<Subscription> unsubProcessor = EmitterProcessor.create((boolean)false);
        FluxSink<Subscription> unsubSink = this.unsubProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        Disposable.Composite disposable = Disposables.composite();
        private final String allSubsInfoKey;

        public ClusterConnecting(String localId, String brokerId) {
            this.brokerId = brokerId;
            this.localId = localId;
            this.allSubsInfoKey = "/broker/" + localId + "/" + brokerId + "/subs";
            this.disposable.add(() -> this.subProcessor.onComplete());
            this.disposable.add(() -> this.unsubProcessor.onComplete());
            this.disposable.add(() -> this.processor.onComplete());
            this.disposable.add(RedisClusterEventBroker.this.clusterManager.getQueue("/broker/bus/" + brokerId + "/" + localId).subscribe().doOnNext(msg -> {
                if (!this.processor.hasDownstreams()) {
                    return;
                }
                TopicPayload payload = (TopicPayload)RedisClusterEventBroker.this.topicPayloadCodec.decode(Payload.of((ByteBuf)Unpooled.wrappedBuffer((byte[])msg)));
                log.trace("{} handle redis [{}] event {}", new Object[]{localId, brokerId, payload.getTopic()});
                this.input.next((Object)payload);
            }).onErrorContinue((err, res) -> log.error(err.getMessage(), err)).subscribe());
            this.disposable.add(RedisClusterEventBroker.this.operations.listenToPattern(new String[]{"/broker/" + brokerId + "/" + localId + "/*"}).subscribe(msg -> {
                Subscription subscription = (Subscription)RedisClusterEventBroker.this.subscriptionCodec.decode(Payload.of((ByteBuf)Unpooled.wrappedBuffer((byte[])((byte[])msg.getMessage()))));
                if (subscription != null) {
                    if (((String)msg.getChannel()).endsWith("unsub") && this.unsubProcessor.hasDownstreams()) {
                        this.unsubSink.next((Object)subscription);
                        return;
                    }
                    if (((String)msg.getChannel()).endsWith("sub") && this.subProcessor.hasDownstreams()) {
                        this.subSink.next((Object)subscription);
                        return;
                    }
                }
            }));
            String loadSubsInfoKey = "/broker/" + brokerId + "/" + localId + "/subs";
            this.disposable.add(RedisClusterEventBroker.this.operations.opsForSet().members((Object)loadSubsInfoKey).doOnNext(msg -> {
                Subscription subscription = (Subscription)RedisClusterEventBroker.this.subscriptionCodec.decode(Payload.of((ByteBuf)Unpooled.wrappedBuffer((byte[])msg)));
                this.subSink.next((Object)subscription);
            }).onErrorContinue((err, v) -> log.warn(err.getMessage(), err)).subscribe());
            this.disposable.add(Flux.create(sink -> {
                this.output = sink;
            }).flatMap(payload -> {
                payload.retain();
                Payload encoded = RedisClusterEventBroker.this.topicPayloadCodec.encode(payload);
                byte[] body = encoded.getBytes(true);
                return RedisClusterEventBroker.this.clusterManager.getQueue("/broker/bus/" + localId + "/" + brokerId).add((Publisher)Mono.just((Object)body));
            }).onErrorContinue((err, res) -> log.error(err.getMessage(), err)).subscribe());
        }

        @Override
        public Mono<Void> subscribe(Subscription subscription) {
            byte[] sub = RedisClusterEventBroker.this.subscriptionCodec.encode((Object)subscription).getBytes(true);
            String topic = "/broker/" + this.localId + "/" + this.brokerId + "/sub";
            return RedisClusterEventBroker.this.operations.opsForSet().add((Object)this.allSubsInfoKey, (Object[])new byte[][]{sub}).then(RedisClusterEventBroker.this.operations.convertAndSend(topic, (Object)sub)).then();
        }

        @Override
        public Mono<Void> unsubscribe(Subscription subscription) {
            byte[] sub = RedisClusterEventBroker.this.subscriptionCodec.encode((Object)subscription).getBytes(true);
            String topic = "/broker/" + this.localId + "/" + this.brokerId + "/unsub";
            return RedisClusterEventBroker.this.operations.opsForSet().remove((Object)this.allSubsInfoKey, new Object[]{sub}).then(RedisClusterEventBroker.this.operations.convertAndSend(topic, (Object)sub)).then();
        }

        @Override
        public Flux<TopicPayload> subscribe() {
            return this.processor;
        }

        @Override
        public String getId() {
            return this.brokerId;
        }

        @Override
        public boolean isAlive() {
            return true;
        }

        @Override
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override
        public EventBroker getBroker() {
            return RedisClusterEventBroker.this;
        }

        @Override
        public EventConnection.Feature[] features() {
            return new EventConnection.Feature[]{EventConnection.Feature.consumeAnotherBroker};
        }

        @Override
        public Flux<Subscription> handleSubscribe() {
            return this.subProcessor;
        }

        @Override
        public Flux<Subscription> handleUnSubscribe() {
            return this.unsubProcessor;
        }

        @Override
        public FluxSink<TopicPayload> sink() {
            return this.output;
        }

        public void dispose() {
            this.disposable.dispose();
        }

        public boolean isDisposed() {
            return false;
        }
    }
}

