/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.util.concurrent.atomic.AtomicBoolean;
import org.jetlinks.core.cluster.ClusterTopic;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedisClusterTopic<T>
implements ClusterTopic<T> {
    private final String topicName;
    private final ReactiveRedisOperations<Object, T> operations;
    private final FluxProcessor<ClusterTopic.TopicMessage<T>, ClusterTopic.TopicMessage<T>> processor;
    private final FluxSink<ClusterTopic.TopicMessage<T>> sink;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private Disposable disposable;

    public RedisClusterTopic(String topic, ReactiveRedisOperations<Object, T> operations) {
        this.topicName = topic;
        this.operations = operations;
        this.processor = EmitterProcessor.create((boolean)false);
        this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    private void doSubscribe() {
        if (this.subscribed.compareAndSet(false, true)) {
            this.disposable = this.operations.listenToPattern(new String[]{this.topicName}).subscribe(data -> {
                if (!this.processor.hasDownstreams()) {
                    this.disposable.dispose();
                    this.subscribed.compareAndSet(true, false);
                } else {
                    this.sink.next((Object)new ClusterTopic.TopicMessage<T>((ReactiveSubscription.Message)data){
                        final /* synthetic */ ReactiveSubscription.Message val$data;
                        {
                            this.val$data = message;
                        }

                        public String getTopic() {
                            return (String)this.val$data.getChannel();
                        }

                        public T getMessage() {
                            return this.val$data.getMessage();
                        }
                    });
                }
            });
        }
    }

    public Flux<ClusterTopic.TopicMessage<T>> subscribePattern() {
        return this.processor.doOnSubscribe(r -> this.doSubscribe());
    }

    public Mono<Integer> publish(Publisher<? extends T> publisher) {
        return Flux.from(publisher).flatMap(data -> this.operations.convertAndSend(this.topicName, data)).last((Object)1L).map(Number::intValue);
    }
}

