/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.event;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.codec.defaults.TopicPayloadCodec;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.cluster.event.AbstractClusterEventBroker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisClusterEventBroker
extends AbstractClusterEventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterEventBroker.class);

    public RedisClusterEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory factory) {
        super(clusterManager, factory);
    }

    @Override
    protected Flux<TopicPayload> listen(String localId, String brokerId) {
        return this.clusterManager.getQueue("/broker/bus/" + brokerId + "/" + localId).subscribe().map(msg -> TopicPayloadCodec.doDecode((ByteBuf)Unpooled.wrappedBuffer((byte[])msg)));
    }

    @Override
    protected Mono<Void> dispatch(String localId, String brokerId, TopicPayload payload) {
        try {
            ByteBuf byteBuf = TopicPayloadCodec.doEncode((TopicPayload)payload);
            byte[] body = ByteBufUtil.getBytes((ByteBuf)byteBuf);
            ReferenceCountUtil.safeRelease((Object)payload);
            ReferenceCountUtil.safeRelease((Object)byteBuf);
            return this.clusterManager.getQueue("/broker/bus/" + localId + "/" + brokerId).add((Publisher)Mono.just((Object)body)).then();
        }
        catch (Throwable e) {
            log.error(e.getMessage(), e);
            return Mono.empty();
        }
    }
}

