/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import jakarta.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.jetlinks.core.Routable;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.core.utils.HashUtils;
import org.jetlinks.supports.event.InternalEventBus;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.function.Function5;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

class EventPublisher<T>
extends Mono<Long>
implements Consumer<Topic<InternalEventBus.SubscriptionInfo>>,
Runnable {
    private static final FastThreadLocal<Set<InternalEventBus.SubscriptionInfo>> PUB_HANDLERS = new FastThreadLocal<Set<InternalEventBus.SubscriptionInfo>>(){

        protected Set<InternalEventBus.SubscriptionInfo> initialValue() {
            return new HashSet<InternalEventBus.SubscriptionInfo>();
        }
    };
    private static final FastThreadLocal<Map<CharSequence, ShareSelector>> SHARED = new FastThreadLocal<Map<CharSequence, ShareSelector>>(){

        protected Map<CharSequence, ShareSelector> initialValue() {
            return new HashMap<CharSequence, ShareSelector>();
        }
    };
    private static final AtomicReferenceFieldUpdater<EventPublisher, Map> SHARED_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventPublisher.class, Map.class, "shared");
    private volatile Map<CharSequence, ShareSelector> shared;
    private static final AtomicReferenceFieldUpdater<EventPublisher, Set> READY_TO_PUB_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventPublisher.class, Set.class, "readyToPub");
    private volatile Set<InternalEventBus.SubscriptionInfo> readyToPub;
    private static final AtomicReferenceFieldUpdater<EventPublisher, Map> SHARE_PUBLISHED_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventPublisher.class, Map.class, "sharePublished");
    private volatile Map<CharSequence, String> sharePublished;
    private final InternalEventBus parent;
    private final CharSequence topic;
    private final SubscriptionFilter predicate;
    private final T target;
    private final Function5<CharSequence, T, Collection<InternalEventBus.SubscriptionInfo>, Map<CharSequence, String>, ContextView, Mono<Void>> handler;
    private Comparator<InternalEventBus.SubscriptionInfo> comparator;

    private Map<CharSequence, String> sharePublished() {
        return this.sharePublished == null ? (this.sharePublished = new ConcurrentHashMap<CharSequence, String>(this.shared.size())) : this.sharePublished;
    }

    private Map<CharSequence, String> getAndReleaseSharePublished() {
        Map sharePublished = SHARE_PUBLISHED_UPDATER.getAndSet(this, null);
        return sharePublished == null ? Collections.emptyMap() : sharePublished;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<InternalEventBus.SubscriptionInfo> release() {
        Set readyToPub = READY_TO_PUB_UPDATER.getAndSet(this, null);
        Map shared = SHARED_UPDATER.getAndSet(this, null);
        try {
            if (CollectionUtils.isEmpty((Collection)readyToPub)) {
                Set<InternalEventBus.SubscriptionInfo> set = Collections.emptySet();
                return set;
            }
            HashSet<InternalEventBus.SubscriptionInfo> hashSet = new HashSet<InternalEventBus.SubscriptionInfo>(Sets.filter((Set)readyToPub, this.predicate::ready));
            return hashSet;
        }
        finally {
            readyToPub.clear();
            shared.clear();
        }
    }

    @Override
    public void run() {
        Map<CharSequence, ShareSelector> shared = this.shared;
        if (MapUtils.isEmpty(shared)) {
            return;
        }
        Set<InternalEventBus.SubscriptionInfo> readyToPub = this.readyToPub;
        Map<CharSequence, String> sharePublished = this.sharePublished();
        for (ShareSelector value : shared.values()) {
            InternalEventBus.SubscriptionInfo first = value.current();
            sharePublished.put(first.getSubscriber0(), first.serverId());
            readyToPub.add(first);
        }
    }

    @Nonnull
    private Comparator<InternalEventBus.SubscriptionInfo> createHashComparator() {
        if (this.comparator == null) {
            T t = this.target;
            if (t instanceof Routable) {
                Routable routable = (Routable)t;
                this.comparator = Comparator.comparingLong(e -> routable.hash(new Object[]{e.serverId()})).thenComparingLong(xva$0 -> routable.hash(new Object[]{xva$0}));
                return this.comparator;
            }
            this.comparator = Comparator.comparingLong(e -> HashUtils.murmur3_128(this.target, (Object[])new Object[]{e.serverId()})).thenComparingLong(e -> HashUtils.murmur3_128(this.target, (Object[])new Object[]{e}));
            return this.comparator;
        }
        return this.comparator;
    }

    @Override
    public void accept(Topic<InternalEventBus.SubscriptionInfo> subs) {
        Set subscriptions = subs.getSubscribers();
        if (subscriptions.isEmpty()) {
            return;
        }
        for (InternalEventBus.SubscriptionInfo sub : subscriptions) {
            if (!this.predicate.prepare(sub)) continue;
            if (sub.hasFeature(Subscription.Feature.shared)) {
                this.getShareContainer(sub).add(sub);
                continue;
            }
            this.readyToPub.add(sub);
        }
    }

    public void subscribe(@Nonnull CoreSubscriber<? super Long> actual) {
        if (!SHARED_UPDATER.compareAndSet(this, null, (Map)SHARED.get()) || !READY_TO_PUB_UPDATER.compareAndSet(this, null, (Set)PUB_HANDLERS.get())) {
            Operators.error(actual, (Throwable)Exceptions.duplicateOnSubscribeException());
            return;
        }
        this.parent.root.findTopic(this.topic, (Consumer)this, (Runnable)this);
        Map<CharSequence, String> sharePublished = this.getAndReleaseSharePublished();
        Set<InternalEventBus.SubscriptionInfo> readyToPub = this.release();
        long size = readyToPub.size();
        if (size == 0L) {
            actual.onSubscribe(Operators.emptySubscription());
            actual.onNext((Object)size);
            actual.onComplete();
            return;
        }
        Context context = actual.currentContext();
        ((Mono)this.handler.apply((Object)this.topic, this.target, readyToPub, sharePublished, (Object)context)).then(Mono.just((Object)size)).subscribe(actual);
    }

    private ShareSelector getShareContainer(InternalEventBus.SubscriptionInfo conn) {
        ShareSelector container = this.shared.get(conn.getSubscriber0());
        if (container != null) {
            return container;
        }
        if (conn.hasFeature(Subscription.Feature.sharedOldest)) {
            container = conn.hasFeature(Subscription.Feature.sharedLocalFirst) ? new ShareLocalFirstSelector(InternalEventBus.SubscriptionInfo.comparatorByTime) : new ShareSelector(InternalEventBus.SubscriptionInfo.comparatorByTime);
        } else if (conn.hasFeature(Subscription.Feature.sharedHashed)) {
            container = conn.hasFeature(Subscription.Feature.sharedLocalFirst) ? new ShareLocalFirstSelector(this.createHashComparator()) : new ShareSelector(this.createHashComparator());
        } else if (conn.hasFeature(Subscription.Feature.sharedMinimumLoad)) {
            container = conn.hasFeature(Subscription.Feature.sharedLocalFirst) ? new ShareLocalFirstSelector(InternalEventBus.SubscriptionInfo.comparatorByLoad) : new ShareSelector(InternalEventBus.SubscriptionInfo.comparatorByLoad);
        } else {
            ShareSelector shareSelector = container = conn.hasFeature(Subscription.Feature.sharedLocalFirst) ? new RandomLocalFirstShareSelector() : new RandomShareSelector();
        }
        if (this.shared.putIfAbsent(conn.getSubscriber0(), container) == null) {
            return container;
        }
        return this.shared.get(conn.getSubscriber0());
    }

    @Generated
    public EventPublisher(InternalEventBus parent, CharSequence topic, SubscriptionFilter predicate, T target, Function5<CharSequence, T, Collection<InternalEventBus.SubscriptionInfo>, Map<CharSequence, String>, ContextView, Mono<Void>> handler) {
        this.parent = parent;
        this.topic = topic;
        this.predicate = predicate;
        this.target = target;
        this.handler = handler;
    }

    static interface SubscriptionFilter {
        public boolean prepare(InternalEventBus.SubscriptionInfo var1);

        default public boolean ready(InternalEventBus.SubscriptionInfo info) {
            return true;
        }
    }

    static class ShareSelector {
        private final Comparator<InternalEventBus.SubscriptionInfo> comparator;
        InternalEventBus.SubscriptionInfo current;

        ShareSelector(Comparator<InternalEventBus.SubscriptionInfo> comparator) {
            this.comparator = comparator;
        }

        public InternalEventBus.SubscriptionInfo current() {
            return this.current;
        }

        public void add(InternalEventBus.SubscriptionInfo subscriptionInfo) {
            if (this.current == null) {
                this.current = subscriptionInfo;
            } else {
                int c = this.comparator.compare(this.current, subscriptionInfo);
                if (c > 0) {
                    this.current = subscriptionInfo;
                }
            }
        }
    }

    static class ShareLocalFirstSelector
    extends ShareSelector {
        boolean hasLocal;

        ShareLocalFirstSelector(Comparator<InternalEventBus.SubscriptionInfo> comparator) {
            super(comparator);
        }

        @Override
        public void add(InternalEventBus.SubscriptionInfo sub) {
            if (!sub.isCluster()) {
                if (this.current == null || this.current.isCluster()) {
                    this.current = sub;
                } else {
                    super.add(sub);
                }
                this.hasLocal = true;
                return;
            }
            if (!this.hasLocal) {
                super.add(sub);
            }
        }
    }

    static class RandomLocalFirstShareSelector
    extends RandomShareSelector {
        private boolean hasLocal;

        RandomLocalFirstShareSelector() {
        }

        @Override
        public void add(InternalEventBus.SubscriptionInfo sub) {
            if (!sub.isCluster()) {
                if (this.current == null || this.current.isCluster()) {
                    this.current = sub;
                } else {
                    super.add(sub);
                }
                this.hasLocal = true;
                return;
            }
            if (!this.hasLocal) {
                super.add(sub);
            }
        }
    }

    static class RandomShareSelector
    extends ShareSelector {
        final long seeds = ThreadLocalRandom.current().nextInt();
        long weight;

        RandomShareSelector() {
            super(null);
        }

        @Override
        public void add(InternalEventBus.SubscriptionInfo sub) {
            long weight = this.seeds ^ sub.readRandom();
            if (this.current == null) {
                this.current = sub;
                this.weight = weight;
            } else if (weight < this.weight) {
                this.current = sub;
                this.weight = weight;
            }
        }
    }
}

