/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ThreadLocalRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.validation.constraints.NotNull;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventProducer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class BrokerEventBus
implements EventBus {
    private final Topic<SubscriptionInfo> root = Topic.createRoot();
    private final Map<String, EventBroker> brokers = new ConcurrentHashMap<String, EventBroker>(32);
    private final Map<String, EventConnection> connections = new ConcurrentHashMap<String, EventConnection>(512);
    private Scheduler publishScheduler = Schedulers.immediate();
    private Logger log = LoggerFactory.getLogger(BrokerEventBus.class);

    public <T> Flux<T> subscribe(@NotNull Subscription subscription, @NotNull Decoder<T> decoder) {
        return this.subscribe(subscription).flatMap(payload -> {
            try {
                Mono mono = Mono.justOrEmpty((Object)payload.decode(decoder, false));
                return mono;
            }
            catch (Throwable e) {
                this.log.error("decode message [{}] error", (Object)payload.getTopic(), (Object)e);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)payload);
            }
            return Mono.empty();
        }).publishOn(this.publishScheduler);
    }

    public Flux<TopicPayload> subscribe(Subscription subscription) {
        return Flux.create(sink -> {
            Disposable.Composite disposable = Disposables.composite();
            String subscriberId = subscription.getSubscriber();
            for (String topic : subscription.getTopics()) {
                Topic topicInfo = this.root.append(topic);
                SubscriptionInfo subInfo = SubscriptionInfo.of(subscriberId, EnumDict.toMask((EnumDict[])subscription.getFeatures()), (FluxSink<TopicPayload>)sink, false);
                topicInfo.subscribe((Object[])new SubscriptionInfo[]{subInfo});
                disposable.add(() -> {
                    topicInfo.unsubscribe((Object[])new SubscriptionInfo[]{subInfo});
                    subInfo.dispose();
                });
            }
            sink.onDispose((Disposable)disposable);
            if (subscription.hasFeature(Subscription.Feature.broker)) {
                this.doSubscribeBroker(subscription).doOnSuccess(nil -> {
                    if (subscription.getDoOnSubscribe() != null) {
                        subscription.getDoOnSubscribe().run();
                    }
                }).subscribe();
                disposable.add(() -> this.doUnsubscribeBroker(subscription).subscribe());
            } else if (subscription.getDoOnSubscribe() != null) {
                subscription.getDoOnSubscribe().run();
            }
            this.log.debug("local subscriber [{}],features:{},topics: {}", new Object[]{subscriberId, subscription.getFeatures(), subscription.getTopics()});
        });
    }

    public void addBroker(EventBroker broker) {
        this.brokers.put(broker.getId(), broker);
        this.startBroker(broker);
    }

    public void removeBroker(EventBroker broker) {
        this.brokers.remove(broker.getId());
    }

    public void removeBroker(String broker) {
        this.brokers.remove(broker);
    }

    public List<EventBroker> getBrokers() {
        return new ArrayList<EventBroker>(this.brokers.values());
    }

    private Mono<Void> doSubscribeBroker(Subscription subscription) {
        return Flux.fromIterable(this.connections.values()).filter(conn -> conn.isProducer() && conn.isAlive()).cast(EventProducer.class).flatMap(conn -> conn.subscribe(subscription)).then();
    }

    private Mono<Void> doUnsubscribeBroker(Subscription subscription) {
        return Flux.fromIterable(this.connections.values()).filter(conn -> conn.isProducer() && conn.isAlive()).cast(EventProducer.class).flatMap(conn -> conn.unsubscribe(subscription)).then();
    }

    private void startBroker(EventBroker broker) {
        broker.accept().subscribe(connection -> {
            String connectionId = broker.getId().concat(":").concat(connection.getId());
            EventConnection old = this.connections.put(connectionId, (EventConnection)connection);
            if (old == connection) {
                return;
            }
            if (old != null) {
                old.dispose();
            }
            connection.doOnDispose(() -> this.connections.remove(connectionId));
            connection.asProducer().flatMap(eventProducer -> this.root.getAllSubscriber().doOnNext(sub -> {
                for (SubscriptionInfo subscriber : sub.getSubscribers()) {
                    if (!subscriber.isLocal() || !subscriber.hasFeature(Subscription.Feature.broker)) continue;
                    eventProducer.subscribe(subscriber.toSubscription(sub.getTopic())).subscribe();
                }
            }).then(Mono.just((Object)eventProducer))).flatMapMany(EventProducer::subscribe).flatMap(payload -> this.doPublishFromBroker((TopicPayload)payload, sub -> {
                if (sub.isLocal()) {
                    return sub.hasFeature(Subscription.Feature.broker);
                }
                if (sub.isBroker()) {
                    if (sub.getEventBroker() == broker) {
                        if (sub.getEventConnection() == connection) {
                            return sub.hasConnectionFeature(EventConnection.Feature.consumeSameConnection);
                        }
                        return sub.hasConnectionFeature(EventConnection.Feature.consumeSameBroker);
                    }
                    return sub.hasConnectionFeature(EventConnection.Feature.consumeAnotherBroker);
                }
                return false;
            }), Integer.MAX_VALUE).onErrorContinue((err, obj) -> this.log.error(err.getMessage(), err)).subscribe();
            connection.asConsumer().subscribe(subscriber -> {
                subscriber.handleSubscribe().doOnNext(subscription -> this.handleBrokerSubscription((Subscription)subscription, SubscriptionInfo.of(subscription.getSubscriber(), EnumDict.toMask((EnumDict[])subscription.getFeatures()), subscriber.sink(), true).connection(broker, (EventConnection)connection), (EventConnection)connection)).onErrorContinue((err, obj) -> this.log.error(err.getMessage(), err)).subscribe();
                subscriber.handleUnSubscribe().doOnNext(subscription -> this.handleBrokerUnsubscription((Subscription)subscription, SubscriptionInfo.of(subscription.getSubscriber()), (EventConnection)connection)).onErrorContinue((err, obj) -> this.log.error(err.getMessage(), err)).subscribe();
            });
        });
    }

    private void handleBrokerUnsubscription(Subscription subscription, SubscriptionInfo info, EventConnection connection) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("broker [{}] unsubscribe : {}", (Object)info, (Object)subscription.getTopics());
        }
        for (String topic : subscription.getTopics()) {
            AtomicBoolean unsub = new AtomicBoolean(false);
            this.root.append(topic).unsubscribe(sub -> sub.getEventConnection() == connection && sub.getSubscriber().equals(info.getSubscriber()) && unsub.compareAndSet(false, true));
        }
    }

    private void subAnotherBroker(Subscription subscription, SubscriptionInfo info, EventConnection connection) {
        Subscription sub = subscription.hasFeature(Subscription.Feature.shared) ? subscription.copy(new Subscription.Feature[]{Subscription.Feature.shared, Subscription.Feature.local}) : subscription.copy(new Subscription.Feature[]{Subscription.Feature.local});
        Flux.fromIterable(this.connections.values()).filter(conn -> {
            if (conn == connection) {
                return info.hasConnectionFeature(EventConnection.Feature.consumeSameConnection);
            }
            if (conn.getBroker() == connection.getBroker()) {
                return info.hasConnectionFeature(EventConnection.Feature.consumeSameBroker);
            }
            return true;
        }).flatMap(EventConnection::asProducer).flatMap(eventProducer -> eventProducer.subscribe(sub)).subscribe();
    }

    private void handleBrokerSubscription(Subscription subscription, SubscriptionInfo info, EventConnection connection) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("broker [{}] subscribe : {}", (Object)info, (Object)subscription.getTopics());
        }
        for (String topic : subscription.getTopics()) {
            Topic topic_ = this.root.append(topic);
            topic_.subscribe((Object[])new SubscriptionInfo[]{info});
            info.onDispose(() -> topic_.unsubscribe((Object[])new SubscriptionInfo[]{info}));
        }
        if (subscription.hasFeature(Subscription.Feature.broker) && info.hasConnectionFeature(EventConnection.Feature.consumeAnotherBroker)) {
            this.subAnotherBroker(subscription, info, connection);
        }
    }

    private boolean doPublish(String topic, SubscriptionInfo info, TopicPayload payload) {
        try {
            if (info.sink.isCancelled()) {
                return false;
            }
            payload.retain();
            info.sink.next((Object)payload);
            if (this.log.isDebugEnabled()) {
                this.log.debug("publish [{}] to [{}] complete", (Object)topic, (Object)info);
            }
            return true;
        }
        catch (Throwable error) {
            this.log.error("publish [{}] to [{}] event error", new Object[]{topic, info, error});
            ReferenceCountUtil.safeRelease((Object)payload);
            return false;
        }
    }

    private Mono<Long> doPublish(String topic, Predicate<SubscriptionInfo> predicate, Function<Flux<SubscriptionInfo>, Mono<Long>> subscriberConsumer) {
        return (Mono)this.root.findTopic(topic).flatMapIterable(Topic::getSubscribers).filter(sub -> {
            if (sub.isBroker() && !sub.getEventConnection().isAlive()) {
                sub.dispose();
                return false;
            }
            return predicate.test((SubscriptionInfo)sub);
        }).groupBy(SubscriptionInfo::getSubscriber, Integer.MAX_VALUE).flatMap(group -> group.groupBy(sub -> sub.hasFeature(Subscription.Feature.shared)).flatMap(groups -> {
            if (Boolean.TRUE.equals(groups.key())) {
                return this.selectSharedSubscription((Flux<SubscriptionInfo>)groups);
            }
            return groups;
        }), Integer.MAX_VALUE).distinct(SubscriptionInfo::getSink).as(subscriberConsumer);
    }

    private Flux<SubscriptionInfo> selectSharedSubscription(Flux<SubscriptionInfo> subscriptionInfoFlux) {
        return subscriptionInfoFlux.collectList().flatMapMany(subs -> Flux.just(subs.get(ThreadLocalRandom.current().nextInt(0, subs.size()))));
    }

    private Mono<Long> doPublishFromBroker(TopicPayload payload, Predicate<SubscriptionInfo> predicate) {
        return this.doPublish(payload.getTopic(), predicate, (Flux<SubscriptionInfo> flux) -> flux.doOnNext(info -> {
            try {
                payload.retain();
                info.sink.next((Object)payload);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("broker publish [{}] to [{}] complete", (Object)payload.getTopic(), info);
                }
            }
            catch (Throwable e) {
                this.log.warn("broker publish [{}] to [{}] error", new Object[]{payload.getTopic(), info, e});
            }
        }).count()).doFinally(i -> ReferenceCountUtil.safeRelease((Object)payload));
    }

    public <T> Mono<Long> publish(String topic, Publisher<T> event) {
        return this.publish(topic, msg -> Codecs.lookup(msg.getClass()).encode(msg), event);
    }

    public <T> Mono<Long> publish(String topic, Encoder<T> encoder, Publisher<? extends T> eventStream) {
        return this.publish(topic, encoder, eventStream, this.publishScheduler);
    }

    public <T> Mono<Long> publish(String topic, Encoder<T> encoder, Publisher<? extends T> eventStream, Scheduler publisher) {
        return (Mono)this.doPublish(topic, (SubscriptionInfo sub) -> !sub.isLocal() || sub.hasFeature(Subscription.Feature.local), (Flux<SubscriptionInfo> subscribers) -> {
            Flux cache = Flux.from((Publisher)eventStream).map(payload -> TopicPayload.of((String)topic, (Payload)Payload.of((Object)payload, (Encoder)encoder))).cache();
            return subscribers.flatMap(sub -> cache.map(payload -> this.doPublish(topic, (SubscriptionInfo)sub, (TopicPayload)payload)).count()).count().flatMap(s -> {
                if (s > 0L) {
                    return cache.map(payload -> {
                        ReferenceCountUtil.safeRelease((Object)payload);
                        return true;
                    }).then().thenReturn(s);
                }
                return Mono.just((Object)s);
            });
        }).as(res -> {
            if (this.log.isTraceEnabled()) {
                return res.doOnNext(subs -> this.log.trace("topic [{}] has {} subscriber", (Object)topic, subs));
            }
            return res;
        });
    }

    public void setPublishScheduler(Scheduler publishScheduler) {
        this.publishScheduler = publishScheduler;
    }

    public void setLog(Logger log) {
        this.log = log;
    }

    private static class SubscriptionInfo
    implements Disposable {
        String subscriber;
        long features;
        FluxSink<TopicPayload> sink;
        boolean broker;
        Disposable.Composite disposable;
        EventBroker eventBroker;
        EventConnection eventConnection;
        long connectionFeatures;

        public String toString() {
            return this.isLocal() ? this.subscriber + "@local" : this.subscriber + "@" + this.eventBroker.getId() + ":" + this.eventConnection.getId();
        }

        public Subscription toSubscription(String topic) {
            return Subscription.of((String)this.subscriber, (String[])new String[]{topic}, (Subscription.Feature[])EnumDict.getByMask(Subscription.Feature.class, (long)this.features).toArray(new Subscription.Feature[0]));
        }

        public SubscriptionInfo connection(EventBroker broker, EventConnection connection) {
            this.eventConnection = connection;
            this.eventBroker = broker;
            this.connectionFeatures = EnumDict.toMask((EnumDict[])connection.features());
            return this;
        }

        public static SubscriptionInfo of(String subscriber) {
            return SubscriptionInfo.of(subscriber, 0L, null, false);
        }

        public static SubscriptionInfo of(Subscription subscription, FluxSink<TopicPayload> sink, boolean remote) {
            return SubscriptionInfo.of(subscription.getSubscriber(), EnumDict.toMask((EnumDict[])subscription.getFeatures()), sink, remote);
        }

        public static SubscriptionInfo of(String subscriber, long features, FluxSink<TopicPayload> sink, boolean remote) {
            return new SubscriptionInfo(subscriber, features, sink, remote);
        }

        public SubscriptionInfo(String subscriber, long features, FluxSink<TopicPayload> sink, boolean broker) {
            this.subscriber = subscriber;
            this.features = features;
            this.sink = sink;
            this.broker = broker;
        }

        synchronized void onDispose(Disposable disposable) {
            if (this.disposable == null) {
                this.disposable = Disposables.composite((Disposable[])new Disposable[]{disposable});
            } else {
                this.disposable.add(disposable);
            }
        }

        public void dispose() {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
        }

        boolean isLocal() {
            return !this.broker;
        }

        boolean hasFeature(Subscription.Feature feature) {
            return feature.in(this.features);
        }

        boolean hasConnectionFeature(EventConnection.Feature feature) {
            return feature.in(this.connectionFeatures);
        }

        private SubscriptionInfo(String subscriber, long features, FluxSink<TopicPayload> sink, boolean broker, Disposable.Composite disposable, EventBroker eventBroker, EventConnection eventConnection, long connectionFeatures) {
            this.subscriber = subscriber;
            this.features = features;
            this.sink = sink;
            this.broker = broker;
            this.disposable = disposable;
            this.eventBroker = eventBroker;
            this.eventConnection = eventConnection;
            this.connectionFeatures = connectionFeatures;
        }

        public static SubscriptionInfo of(String subscriber, long features, FluxSink<TopicPayload> sink, boolean broker, Disposable.Composite disposable, EventBroker eventBroker, EventConnection eventConnection, long connectionFeatures) {
            return new SubscriptionInfo(subscriber, features, sink, broker, disposable, eventBroker, eventConnection, connectionFeatures);
        }

        public String getSubscriber() {
            return this.subscriber;
        }

        public long getFeatures() {
            return this.features;
        }

        public FluxSink<TopicPayload> getSink() {
            return this.sink;
        }

        public Disposable.Composite getDisposable() {
            return this.disposable;
        }

        public EventBroker getEventBroker() {
            return this.eventBroker;
        }

        public EventConnection getEventConnection() {
            return this.eventConnection;
        }

        public long getConnectionFeatures() {
            return this.connectionFeatures;
        }

        public void setSubscriber(String subscriber) {
            this.subscriber = subscriber;
        }

        public void setFeatures(long features) {
            this.features = features;
        }

        public void setSink(FluxSink<TopicPayload> sink) {
            this.sink = sink;
        }

        public void setBroker(boolean broker) {
            this.broker = broker;
        }

        public void setDisposable(Disposable.Composite disposable) {
            this.disposable = disposable;
        }

        public void setEventBroker(EventBroker eventBroker) {
            this.eventBroker = eventBroker;
        }

        public void setEventConnection(EventConnection eventConnection) {
            this.eventConnection = eventConnection;
        }

        public void setConnectionFeatures(long connectionFeatures) {
            this.connectionFeatures = connectionFeatures;
        }

        public boolean isBroker() {
            return this.broker;
        }
    }
}

