/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import lombok.Generated;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.event.Cancelable;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.lang.SeparatedCharSequence;
import org.jetlinks.core.lang.SeparatedString;
import org.jetlinks.core.lang.SharedPathString;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.metadata.Jsonable;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.RecyclerUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.core.utils.json.ObjectMappers;
import org.jetlinks.supports.event.EventPublisher;
import org.jetlinks.supports.event.EventSubscribeFlux;
import org.jetlinks.supports.event.LocalSubscriber;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.function.Function5;
import reactor.util.context.ContextView;

public class InternalEventBus
implements EventBus {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InternalEventBus.class);
    static final HeaderKey<Map<CharSequence, String>> SHARE_PUBLISHED = HeaderKey.of((String)"_spd", Collections.emptyMap());
    private static final FastThreadLocal<Set<Object>> DISTINCT = new FastThreadLocal<Set<Object>>(){

        protected Set<Object> initialValue() {
            return new HashSet<Object>();
        }
    };
    final Topic<SubscriptionInfo> root = Topic.createRoot();
    private final Disposable.Composite disposable = Disposables.composite();
    int maxBufferSize = 1000000;
    private ObjectName objectName;

    public InternalEventBus() {
        this.init();
    }

    public Topic<SubscriptionInfo> getSubscriptionTable() {
        return this.root;
    }

    private void init() {
        this.disposable.add(Flux.interval((Duration)Duration.ofMinutes(5L)).onBackpressureDrop().doOnNext(ignore -> this.cleanup()).subscribe());
        this.registerMBean();
    }

    public void shutdown() {
        log.info("shutdown eventbus");
        this.disposable.dispose();
        this.unregisterMBean();
    }

    void cleanup() {
        this.root.cleanup();
    }

    public Flux<TopicPayload> subscribe(Subscription subscription) {
        return new EventSubscribeFlux<TopicPayload>(subscription, this, Function.identity());
    }

    public <T> Flux<T> subscribe(Subscription subscription, Class<T> type) {
        return new EventSubscribeFlux<Object>(subscription, this, payload -> {
            try {
                return payload.decode(type);
            }
            catch (Throwable e) {
                log.error("decode message [{}] error", (Object)payload.getTopic(), (Object)e);
                return null;
            }
        });
    }

    public Cancelable subscribe(Subscription subscription, Function<TopicPayload, Mono<Void>> handler) {
        return new LocalSubscriber(this, subscription, handler);
    }

    protected void subscribeCluster(SubscriptionInfo info) {
    }

    protected void unsubscribeCluster(SubscriptionInfo info) {
    }

    public <T> Mono<Long> publish(String topic, Publisher<T> event) {
        return this.doPublish(topic, event, (t, e, p, s, f) -> Flux.from((Publisher)e).flatMap(val -> this.publishFromLocal((CharSequence)t, (Object)val, (Collection<SubscriptionInfo>)p, (Map<CharSequence, String>)s, (ContextView)f)).then(), sub -> true);
    }

    public <T> Mono<Long> publish(String topic, T event, Scheduler scheduler) {
        return this.publish(topic, event).subscribeOn(scheduler);
    }

    public <T> Mono<Long> publish(String topic, T event) {
        return this.doPublish(topic, event, this::publishFromLocal, sub -> true);
    }

    public <T> Mono<Long> publish(CharSequence topic, T event) {
        return this.doPublish(topic, event, this::publishFromLocal, sub -> true);
    }

    public <T> Mono<Long> publish(CharSequence topic, Publisher<T> event) {
        return this.doPublish(topic, event, (t, e, p, s, f) -> Flux.from((Publisher)e).flatMap(val -> this.publishFromLocal((CharSequence)t, (Object)val, (Collection<SubscriptionInfo>)p, (Map<CharSequence, String>)s, (ContextView)f)).then(), sub -> true);
    }

    public <T> Mono<Long> publish(CharSequence topic, Supplier<T> event) {
        return this.doPublish(topic, event, (t, e, p, s, f) -> this.publishFromLocal((CharSequence)t, e.get(), (Collection<SubscriptionInfo>)p, (Map<CharSequence, String>)s, (ContextView)f), sub -> sub.hasFeature(Subscription.Feature.local));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Mono<Void> doPublish0(CharSequence topic, TopicPayload payload, Collection<SubscriptionInfo> subs, Map<CharSequence, String> published, ContextView ctx) {
        int subSize = subs.size();
        TreeMap<Integer, Map> priority = subSize == 1 ? null : new TreeMap<Integer, Map>();
        Mono task = null;
        for (SubscriptionInfo sub : subs) {
            log.trace("publish {} to {}", (Object)topic, (Object)sub);
            BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> handler = sub.handler;
            task = handler.apply(sub, payload);
            if (subSize <= 1 || task == null) continue;
            priority.computeIfAbsent(sub.priority, ignore -> Maps.newHashMapWithExpectedSize((int)subSize)).put(handler, task);
        }
        if (task == null) {
            return Mono.empty();
        }
        if (subSize != 1) {
            if (priority.size() == 1) {
                Collection handlers = ((Map)priority.firstEntry().getValue()).values();
                task = handlers.size() == 1 ? (Mono)handlers.iterator().next() : Flux.merge(handlers).then();
            } else {
                Set distinct = (Set)DISTINCT.get();
                Flux all = null;
                try {
                    for (Map value : priority.values()) {
                        Flux that = null;
                        for (Map.Entry entry : value.entrySet()) {
                            if (!distinct.add(entry.getKey())) continue;
                            if (that == null) {
                                that = ((Mono)entry.getValue()).flux();
                                continue;
                            }
                            that = that.mergeWith((Publisher)entry.getValue());
                        }
                        if (that == null) continue;
                        if (all == null) {
                            all = that;
                            continue;
                        }
                        all = all.concatWith(that);
                    }
                }
                finally {
                    distinct.clear();
                }
                task = all == null ? Mono.empty() : all.then();
            }
        }
        return task;
    }

    private <T> Mono<Void> publishFromLocal(CharSequence topic, T value, Collection<SubscriptionInfo> subs, Map<CharSequence, String> published, ContextView ctx) {
        TopicPayload payload = TopicPayload.of((CharSequence)topic, value);
        payload.addHeader(SHARE_PUBLISHED.getKey(), published);
        TraceHolder.writeContextTo((ContextView)ctx, (Object)payload, TopicPayload::addHeader);
        return this.doPublish0(topic, payload, subs, published, ctx);
    }

    private <T> Mono<Long> doPublish(CharSequence topic, T arg, Function5<CharSequence, T, Collection<SubscriptionInfo>, Map<CharSequence, String>, ContextView, Mono<Void>> handler, EventPublisher.SubscriptionFilter predicate) {
        return new EventPublisher<T>(this, topic, predicate, arg, handler);
    }

    void registerMBean() {
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            this.objectName = new ObjectName("org.jetlinks:type=EventBus,name=InternalEventBus");
            mBeanServer.registerMBean(new StandardMBean(new EventBusMBeanImpl(), EventBusMBean.class), this.objectName);
        }
        catch (InstanceAlreadyExistsException instanceAlreadyExistsException) {
        }
        catch (Exception e) {
            log.warn(e.getMessage(), (Throwable)e);
        }
    }

    void unregisterMBean() {
        try {
            if (this.objectName != null) {
                MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
                mBeanServer.unregisterMBean(this.objectName);
            }
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    @Generated
    public void setMaxBufferSize(int maxBufferSize) {
        this.maxBufferSize = maxBufferSize;
    }

    @Generated
    public int getMaxBufferSize() {
        return this.maxBufferSize;
    }

    public static class SubscriptionInfo
    implements Disposable,
    Jsonable {
        @Generated
        private transient int $hashCodeCache;
        private static final long baseTime = 1704038400000L;
        static final long RANDOM_MASK = 0xFFFFFFL;
        static final AtomicLongFieldUpdater<SubscriptionInfo> IN = AtomicLongFieldUpdater.newUpdater(SubscriptionInfo.class, "in");
        static final AtomicLongFieldUpdater<SubscriptionInfo> OUT = AtomicLongFieldUpdater.newUpdater(SubscriptionInfo.class, "out");
        static final AtomicLongFieldUpdater<SubscriptionInfo> ERROR = AtomicLongFieldUpdater.newUpdater(SubscriptionInfo.class, "error");
        static final AtomicLongFieldUpdater<SubscriptionInfo> DROPPED = AtomicLongFieldUpdater.newUpdater(SubscriptionInfo.class, "dropped");
        static final Comparator<SubscriptionInfo> comparatorByTime = Comparator.comparingLong(SubscriptionInfo::getTime).thenComparing(SubscriptionInfo::getId);
        static final Comparator<SubscriptionInfo> comparatorByLoad = Comparator.comparingLong(SubscriptionInfo::getIn).thenComparingLong(SubscriptionInfo::getId);
        long id;
        @JsonIgnore
        private CharSequence subscriber0;
        @JsonIgnore
        private SharedPathString topic;
        private long features;
        @JsonIgnore
        transient BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> handler;
        @JsonIgnore
        transient Topic<SubscriptionInfo> topicRef;
        private int priority;
        private volatile transient long in;
        private volatile transient long out;
        private volatile transient long error;
        private volatile transient long dropped = 0L;

        public static long newId(long time) {
            if (time == 0L) {
                time = System.currentTimeMillis();
            }
            long t = Math.abs(time - 1704038400000L);
            int r = SubscriptionInfo.newRandom();
            return t << 24 | (long)r & 0xFFFFFFL;
        }

        public static int newRandom() {
            return ThreadLocalRandom.current().nextInt(65535, 0x1000000);
        }

        public long readRandom() {
            return this.id & 0xFFFFFFL;
        }

        public static long readTime(long id) {
            return (id >> 24) + 1704038400000L;
        }

        public long getTime() {
            return SubscriptionInfo.readTime(this.id);
        }

        @JsonIgnore
        public SeparatedCharSequence getTopic() {
            return this.topic == null ? this.topicRef : this.topic;
        }

        @JsonIgnore
        public String getTopicString() {
            return this.topic == null ? this.topicRef.getTopic() : this.topic.toString();
        }

        public boolean isCluster() {
            return false;
        }

        public String serverId() {
            return "default";
        }

        public String getSubscriber() {
            return this.subscriber0.toString();
        }

        public static SubscriptionInfo of(Subscription subscription, Topic<SubscriptionInfo> topic, BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> handler) {
            SubscriptionInfo info = new SubscriptionInfo();
            info.id = SubscriptionInfo.newId(subscription.getTime());
            info.topicRef = topic;
            info.subscriber0 = SeparatedString.of((char)':', (String)subscription.getSubscriber());
            info.features = EnumDict.toMask((EnumDict[])subscription.getFeatures());
            info.priority = subscription.getPriority();
            info.handler = handler;
            return info;
        }

        public void dropped(TopicPayload payload) {
            DROPPED.incrementAndGet(this);
        }

        public void error(Throwable error) {
            ERROR.incrementAndGet(this);
        }

        public void in() {
            IN.incrementAndGet(this);
        }

        public void out() {
            OUT.incrementAndGet(this);
        }

        public void dispose() {
        }

        SubscriptionInfo intern() {
            if (this.subscriber0 instanceof SeparatedCharSequence) {
                SeparatedCharSequence c = (SeparatedCharSequence)this.subscriber0;
                this.subscriber0 = c.internInner().intern();
            } else {
                this.subscriber0 = (CharSequence)RecyclerUtils.intern((Object)this.subscriber0);
            }
            return this;
        }

        public void setTopicRef(Topic<SubscriptionInfo> topicRef) {
            this.topicRef = topicRef;
            this.topic = null;
        }

        @JsonIgnore
        public boolean isDisposed() {
            return super.isDisposed();
        }

        boolean hasFeature(Subscription.Feature feature) {
            return feature.in(this.features);
        }

        public String toString() {
            return this.getSubscriber() + "::" + this.getTopicString();
        }

        public JSONObject toJson() {
            JSONObject obj = (JSONObject)ObjectMappers.parseJson((byte[])ObjectMappers.toJsonBytes((Object)this), JSONObject.class);
            obj.put("features", (Object)Lists.transform((List)EnumDict.getByMask(Subscription.Feature.class, (long)this.features), Subscription.Feature::getValue));
            return obj;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SubscriptionInfo)) {
                return false;
            }
            SubscriptionInfo other = (SubscriptionInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getId() != other.getId()) {
                return false;
            }
            CharSequence this$subscriber0 = this.getSubscriber0();
            CharSequence other$subscriber0 = other.getSubscriber0();
            if (this$subscriber0 == null ? other$subscriber0 != null : !this$subscriber0.equals(other$subscriber0)) {
                return false;
            }
            Topic<SubscriptionInfo> this$topicRef = this.getTopicRef();
            Topic<SubscriptionInfo> other$topicRef = other.getTopicRef();
            return !(this$topicRef == null ? other$topicRef != null : !this$topicRef.equals(other$topicRef));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof SubscriptionInfo;
        }

        @Generated
        public int hashCode() {
            if (this.$hashCodeCache != 0) {
                return this.$hashCodeCache;
            }
            int PRIME = 59;
            int result = 1;
            long $id = this.getId();
            result = result * 59 + (int)($id >>> 32 ^ $id);
            CharSequence $subscriber0 = this.getSubscriber0();
            result = result * 59 + ($subscriber0 == null ? 43 : $subscriber0.hashCode());
            Topic<SubscriptionInfo> $topicRef = this.getTopicRef();
            if ((result = result * 59 + ($topicRef == null ? 43 : $topicRef.hashCode())) == 0) {
                result = Integer.MIN_VALUE;
            }
            this.$hashCodeCache = result;
            return result;
        }

        @Generated
        public long getId() {
            return this.id;
        }

        @Generated
        public CharSequence getSubscriber0() {
            return this.subscriber0;
        }

        @Generated
        public long getFeatures() {
            return this.features;
        }

        @Generated
        public BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> getHandler() {
            return this.handler;
        }

        @Generated
        public Topic<SubscriptionInfo> getTopicRef() {
            return this.topicRef;
        }

        @Generated
        public int getPriority() {
            return this.priority;
        }

        @Generated
        public long getIn() {
            return this.in;
        }

        @Generated
        public long getOut() {
            return this.out;
        }

        @Generated
        public long getError() {
            return this.error;
        }

        @Generated
        public long getDropped() {
            return this.dropped;
        }

        @Generated
        public SubscriptionInfo(long id, CharSequence subscriber0, SharedPathString topic, long features, BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> handler, Topic<SubscriptionInfo> topicRef, int priority, long in, long out, long error, long dropped) {
            this.id = id;
            this.subscriber0 = subscriber0;
            this.topic = topic;
            this.features = features;
            this.handler = handler;
            this.topicRef = topicRef;
            this.priority = priority;
            this.in = in;
            this.out = out;
            this.error = error;
            this.dropped = dropped;
        }

        @Generated
        public SubscriptionInfo() {
        }

        @Generated
        private void setHandler(BiFunction<SubscriptionInfo, TopicPayload, Mono<Void>> handler) {
            this.handler = handler;
        }
    }

    class EventBusMBeanImpl
    implements EventBusMBean {
        EventBusMBeanImpl() {
        }

        @Override
        public int getMaxBufferSize() {
            return InternalEventBus.this.maxBufferSize;
        }

        @Override
        public void setMaxBufferSize(int bufferSize) {
            InternalEventBus.this.maxBufferSize = bufferSize;
        }

        @Override
        public long getTotalSubscribers() {
            return InternalEventBus.this.root.getTotalSubscriber();
        }

        @Override
        public long getTotalTopics() {
            return InternalEventBus.this.root.getTotalTopic();
        }

        @Override
        public Set<Object> getSubscribers(String topic) {
            return InternalEventBus.this.root.getTopic(topic).map(Topic::getSubscribers).orElse(Collections.emptySet()).stream().map(SerializeUtils::convertToSafelySerializable).collect(Collectors.toSet());
        }

        @Override
        public Object view(String prefix) {
            return InternalEventBus.this.root.getTopic(prefix).map(Topic::view).map(SerializeUtils::convertToSafelySerializable).orElse(null);
        }

        @Override
        public void cleanup() {
            this.cleanup();
        }
    }

    public static interface EventBusMBean {
        public int getMaxBufferSize();

        public void setMaxBufferSize(int var1);

        public long getTotalSubscribers();

        public long getTotalTopics();

        public Set<Object> getSubscribers(String var1);

        public void cleanup();

        public Object view(String var1);
    }
}

