/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.event;

import io.netty.util.ReferenceCountUtil;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.cluster.event.AbstractClusterEventBroker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisClusterEventBroker
extends AbstractClusterEventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterEventBroker.class);

    public RedisClusterEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory factory) {
        super(clusterManager, factory);
    }

    @Override
    protected Flux<TopicPayload> listen(String localId, String brokerId) {
        return this.clusterManager.getQueue("/broker/bus/" + brokerId + "/" + localId).subscribe().map(msg -> (TopicPayload)Payload.of((byte[])msg).decode((Decoder)this.topicPayloadCodec, false));
    }

    @Override
    protected Mono<Void> dispatch(String localId, String brokerId, TopicPayload payload) {
        Payload encoded = this.topicPayloadCodec.encode((Object)payload);
        byte[] body = encoded.getBytes(true);
        ReferenceCountUtil.safeRelease((Object)payload);
        return this.clusterManager.getQueue("/broker/bus/" + localId + "/" + brokerId).add((Publisher)Mono.just((Object)body)).then();
    }
}

