/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube.event;

import com.alibaba.fastjson.JSON;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cache.Caches;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ScalecubeEventBusBroker
implements EventBroker,
Disposable {
    private static final Logger log = LoggerFactory.getLogger(ScalecubeEventBusBroker.class);
    private static final String SUB_QUALIFIER = "/jeb/_sub";
    private static final String UNSUB_QUALIFIER = "/jeb/_unsub";
    private static final String HELLO_QUALIFIER = "/jeb/_hello";
    private static final String PUB_QUALIFIER = "/jeb/_pub";
    private static final String FROM_HEADER = "_f";
    private static final String TOPIC_HEADER = "_t";
    private static final String TOPIC_HEADER_HEADER = "_th";
    final ExtendedCluster cluster;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<String, MemberEventConnection> cachedConnections = new NonBlockingHashMap();
    private final Sinks.Many<EventConnection> connections = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
    private final Map<String, List<Message>> earlyMessage = Caches.newCache((Duration)Duration.ofMinutes(10L));

    public ScalecubeEventBusBroker(ExtendedCluster cluster) {
        this.cluster = cluster;
        this.init();
    }

    private void init() {
        this.cluster.handler(extendedCluster -> new ClusterMessageHandler(){

            public void onMessage(Message message) {
                String from = message.header(ScalecubeEventBusBroker.FROM_HEADER);
                if (StringUtils.isEmpty((Object)from)) {
                    return;
                }
                MemberEventConnection connection = ScalecubeEventBusBroker.this.getOrCreateConnection(from);
                if (null != connection) {
                    ScalecubeEventBusBroker.this.handleMessage(connection, message);
                } else {
                    log.info("received early message {} {}", (Object)from, message.data());
                    ScalecubeEventBusBroker.this.earlyMessage.computeIfAbsent(from, id -> new CopyOnWriteArrayList()).add(message);
                }
            }

            public void onGossip(Message gossip) {
                this.onMessage(gossip);
            }

            public void onMembershipEvent(MembershipEvent event) {
                if (event.isLeaving() || event.isRemoved()) {
                    ScalecubeEventBusBroker.this.earlyMessage.remove(event.member().id());
                    MemberEventConnection connection = (MemberEventConnection)ScalecubeEventBusBroker.this.cachedConnections.remove(event.member().id());
                    if (connection != null) {
                        log.debug("remove event broker {}", (Object)event.member().address());
                        connection.dispose();
                    }
                }
                if (event.isAdded() || event.isUpdated()) {
                    ScalecubeEventBusBroker.this.getOrCreateConnection(event.member());
                }
            }
        });
        for (Member member : this.cluster.otherMembers()) {
            this.cachedConnections.putIfAbsent(member.id(), new MemberEventConnection(member));
        }
    }

    private MemberEventConnection getOrCreateConnection(String memberId) {
        Member member = this.cluster.member(memberId).orElse(null);
        return member == null ? null : this.getOrCreateConnection(member);
    }

    private void handleMessage(MemberEventConnection connection, Message message) {
        if (Objects.equals(message.qualifier(), PUB_QUALIFIER)) {
            String topic = message.header(TOPIC_HEADER);
            String headers = message.header(TOPIC_HEADER_HEADER);
            Object data = message.data();
            TopicPayload topicPayload = data instanceof byte[] ? TopicPayload.of((String)topic, (Payload)Payload.of((byte[])((byte[])message.data()))) : TopicPayload.of((String)topic, (Payload)NativePayload.of((Object)data));
            if (StringUtils.hasText((String)headers)) {
                topicPayload.addHeader((Map)JSON.parseObject((String)headers));
            }
            log.trace("publish from {} : {}", (Object)connection, (Object)topic);
            connection.subscriber.emitNext((Object)topicPayload, Reactors.emitFailureHandler());
        } else if (Objects.equals(message.qualifier(), SUB_QUALIFIER)) {
            log.debug("subscribe from {} : {}", (Object)connection, message.data());
            connection.subscriptions.emitNext(message.data(), Reactors.emitFailureHandler());
        } else if (Objects.equals(message.qualifier(), UNSUB_QUALIFIER)) {
            log.debug("unsubscribe from {} : {}", (Object)connection, message.data());
            connection.unSubscriptions.emitNext(message.data(), Reactors.emitFailureHandler());
        }
    }

    private MemberEventConnection getOrCreateConnection(Member member) {
        return this.cachedConnections.compute(member.id(), (key, old) -> {
            if (old == null) {
                log.debug("add event broker {}", (Object)member.address());
                MemberEventConnection connection = new MemberEventConnection(member);
                this.connections.emitNext((Object)connection, (signalType, emitResult) -> emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED || emitResult == Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER);
                List<Message> early = this.earlyMessage.remove(member.id());
                if (null != early) {
                    for (Message message : early) {
                        this.handleMessage(connection, message);
                    }
                    early.clear();
                }
                return connection;
            }
            old.setMember(member);
            return old;
        });
    }

    private Message createMessage(String qualifier, Object data) {
        return Message.builder().qualifier(qualifier).data(data).header(FROM_HEADER, this.cluster.member().id()).build();
    }

    @Override
    public String getId() {
        return "scalecube";
    }

    @Override
    public Flux<EventConnection> accept() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.cachedConnections.values()), this.connections.asFlux()});
    }

    public void dispose() {
        this.disposable.dispose();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public class MemberEventConnection
    implements EventConnection,
    EventProducer,
    EventConsumer {
        private Member member;
        private final Sinks.Many<Subscription> subscriptions = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Sinks.Many<Subscription> unSubscriptions = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Sinks.Many<TopicPayload> subscriber = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Disposable.Composite disposable = Disposables.composite();
        private FluxSink<TopicPayload> publisher;

        public MemberEventConnection(Member member) {
            this.member = member;
            this.doOnDispose(Flux.create(sink -> {
                this.publisher = sink;
            }).flatMap(this::doPublish).subscribe());
        }

        private Mono<Void> doPublish(TopicPayload payload) {
            try {
                Object payloadObj;
                String topic = payload.getTopic();
                if (payload.getPayload() instanceof NativePayload) {
                    payloadObj = ((NativePayload)payload.getPayload()).getNativeObject();
                    payload.release();
                } else {
                    payloadObj = payload.getBytes();
                }
                String headers = null;
                if (payload.getHeaders() != null) {
                    headers = JSON.toJSONString((Object)payload.getHeaders());
                }
                return ScalecubeEventBusBroker.this.cluster.send(this.member, Message.builder().qualifier(ScalecubeEventBusBroker.PUB_QUALIFIER).header(ScalecubeEventBusBroker.TOPIC_HEADER, topic).header(ScalecubeEventBusBroker.TOPIC_HEADER_HEADER, headers).header(ScalecubeEventBusBroker.FROM_HEADER, ScalecubeEventBusBroker.this.cluster.member().id()).data(payloadObj).build()).onErrorResume(err -> {
                    log.error(err.getMessage(), err);
                    return Mono.empty();
                });
            }
            catch (Throwable err2) {
                log.error(err2.getMessage(), err2);
                return Mono.empty();
            }
        }

        @Override
        public String getId() {
            return this.member.id();
        }

        @Override
        public boolean isAlive() {
            return ScalecubeEventBusBroker.this.cluster.member(this.member.id()).isPresent();
        }

        @Override
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override
        public EventBroker getBroker() {
            return ScalecubeEventBusBroker.this;
        }

        @Override
        public Flux<Subscription> handleSubscribe() {
            return this.subscriptions.asFlux();
        }

        @Override
        public Flux<Subscription> handleUnSubscribe() {
            return this.unSubscriptions.asFlux();
        }

        @Override
        public FluxSink<TopicPayload> sink() {
            return this.publisher;
        }

        @Override
        public Mono<Void> subscribe(Subscription subscription) {
            return ScalecubeEventBusBroker.this.cluster.send(this.member, ScalecubeEventBusBroker.this.createMessage(ScalecubeEventBusBroker.SUB_QUALIFIER, subscription)).then();
        }

        @Override
        public Mono<Void> unsubscribe(Subscription subscription) {
            return ScalecubeEventBusBroker.this.cluster.send(this.member, ScalecubeEventBusBroker.this.createMessage(ScalecubeEventBusBroker.UNSUB_QUALIFIER, subscription)).then();
        }

        @Override
        public Flux<TopicPayload> subscribe() {
            return this.subscriber.asFlux();
        }

        public void dispose() {
            this.disposable.dispose();
        }

        public String toString() {
            return this.member.alias() + "@" + this.member.address();
        }

        public MemberEventConnection(Member member, FluxSink<TopicPayload> publisher) {
            this.member = member;
            this.publisher = publisher;
        }

        public void setMember(Member member) {
            this.member = member;
        }
    }
}

