/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import jakarta.annotation.Nonnull;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.Generated;
import org.jetlinks.core.event.Cancelable;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.supports.event.InternalEventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;

class LocalSubscriber
implements BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>>,
Cancelable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(LocalSubscriber.class);
    private static final Object DISPOSED = new Object();
    private final int hashCode = System.identityHashCode(this);
    final BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>> handler;
    final Function<TopicPayload, Mono<Void>> handler0;
    private final InternalEventBus parent;
    static final AtomicReferenceFieldUpdater<LocalSubscriber, Object> SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(LocalSubscriber.class, Object.class, "subscription");
    private volatile Object subscription;

    LocalSubscriber(InternalEventBus eventBus, Subscription subscription, BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>> handler) {
        this.parent = eventBus;
        this.handler = handler;
        this.handler0 = null;
        this.init(this.parent.root, subscription);
    }

    LocalSubscriber(InternalEventBus eventBus, Subscription subscription, Function<TopicPayload, Mono<Void>> handler) {
        this.parent = eventBus;
        this.handler = null;
        this.handler0 = handler;
        this.init(this.parent.root, subscription);
    }

    private void init(Topic<InternalEventBus.SubscriptionInfo> root, Subscription subscription) {
        String[] topics = subscription.getTopics();
        int len = topics.length;
        for (int i = 0; i < len; ++i) {
            String topic = topics[i];
            Topic tTopic = root.append(topic);
            InternalEventBus.SubscriptionInfo subscriptionInfo = InternalEventBus.SubscriptionInfo.of(subscription, (Topic<InternalEventBus.SubscriptionInfo>)tTopic, this);
            log.debug("subscribe: {}", (Object)subscriptionInfo);
            if (len == 1) {
                this.subscription = subscriptionInfo;
            } else {
                if (this.subscription == null) {
                    this.subscription = new InternalEventBus.SubscriptionInfo[len];
                }
                Array.set(this.subscription, i, subscriptionInfo);
            }
            tTopic.subscribe((Object[])new InternalEventBus.SubscriptionInfo[]{subscriptionInfo});
            if (!subscriptionInfo.hasFeature(Subscription.Feature.broker)) continue;
            this.parent.subscribeCluster(subscriptionInfo);
        }
    }

    public void cancel() {
        this.dispose();
    }

    @Override
    public Mono<Void> apply(InternalEventBus.SubscriptionInfo info, TopicPayload payload) {
        return new LocalSubscriberMono(info, payload, this.handler, this.handler0);
    }

    public int hashCode() {
        return this.hashCode;
    }

    public boolean equals(Object o) {
        if (o instanceof LocalSubscriber) {
            return this == o;
        }
        return false;
    }

    public boolean isDisposed() {
        return SUBSCRIPTION.get(this) == DISPOSED;
    }

    public void dispose() {
        Object sub = SUBSCRIPTION.getAndSet(this, DISPOSED);
        if (sub == DISPOSED) {
            return;
        }
        if (sub instanceof InternalEventBus.SubscriptionInfo) {
            this.unsub((InternalEventBus.SubscriptionInfo)sub);
            return;
        }
        if (sub instanceof InternalEventBus.SubscriptionInfo[]) {
            InternalEventBus.SubscriptionInfo[] infos;
            for (InternalEventBus.SubscriptionInfo info : infos = (InternalEventBus.SubscriptionInfo[])sub) {
                this.unsub(info);
            }
        }
    }

    private void unsub(InternalEventBus.SubscriptionInfo info) {
        if (info.topicRef != null) {
            info.topicRef.unsubscribe((Object[])new InternalEventBus.SubscriptionInfo[]{info});
        }
        if (info.hasFeature(Subscription.Feature.broker)) {
            this.parent.unsubscribeCluster(info);
        }
    }

    private static class LocalSubscriberMono
    extends Mono<Void> {
        private final InternalEventBus.SubscriptionInfo info;
        private final TopicPayload payload;
        private final BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>> handler;
        private final Function<TopicPayload, Mono<Void>> handler0;

        protected LocalSubscriberMono(InternalEventBus.SubscriptionInfo info, TopicPayload payload, BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>> handler, Function<TopicPayload, Mono<Void>> handler0) {
            this.info = info;
            this.payload = payload;
            this.handler = handler;
            this.handler0 = handler0;
        }

        public void subscribe(@Nonnull CoreSubscriber<? super Void> actual) {
            try {
                if (this.handler0 != null) {
                    this.handler0.apply(this.payload).subscribe((CoreSubscriber)new Subscriber(actual, this.info, this.payload));
                } else if (this.handler != null) {
                    this.handler.apply(this.info, this.payload).subscribe((CoreSubscriber)new Subscriber(actual, this.info, this.payload));
                }
            }
            catch (Throwable throwable) {
                this.info.error(throwable);
                log.warn("handle publish [{}] error", (Object)this.payload.getTopic(), (Object)throwable);
                Operators.complete(actual);
            }
        }

        private static class Subscriber
        extends BaseSubscriber<Void> {
            private final CoreSubscriber<? super Void> actual;
            private final InternalEventBus.SubscriptionInfo info;
            private final TopicPayload payload;

            protected void hookOnSubscribe(@Nonnull org.reactivestreams.Subscription subscription) {
                this.info.in();
                this.actual.onSubscribe((org.reactivestreams.Subscription)this);
            }

            @Nonnull
            public Context currentContext() {
                return this.actual.currentContext();
            }

            protected void hookFinally(@Nonnull SignalType type) {
                this.actual.onComplete();
                this.info.out();
            }

            protected void hookOnError(@Nonnull Throwable throwable) {
                this.info.error(throwable);
                log.warn("handle publish [{}] error", (Object)this.payload.getTopic(), (Object)throwable);
            }

            @Generated
            public Subscriber(CoreSubscriber<? super Void> actual, InternalEventBus.SubscriptionInfo info, TopicPayload payload) {
                this.actual = actual;
                this.info = info;
                this.payload = payload;
            }
        }
    }
}

