/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import jakarta.annotation.Nonnull;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.event.InternalEventBus;
import org.jetlinks.supports.event.LocalSubscriber;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

class EventSubscribeFlux<T>
extends Flux<T>
implements org.reactivestreams.Subscription,
Scannable,
BiFunction<InternalEventBus.SubscriptionInfo, TopicPayload, Mono<Void>> {
    private Subscription subscription;
    private final Function<TopicPayload, T> converter;
    private final InternalEventBus parent;
    private final Consumer<TopicPayload> dropListener;
    volatile boolean cancelled;
    volatile int wip;
    static final AtomicIntegerFieldUpdater<EventSubscribeFlux> WIP = AtomicIntegerFieldUpdater.newUpdater(EventSubscribeFlux.class, "wip");
    volatile long requested;
    static final AtomicLongFieldUpdater<EventSubscribeFlux> REQUESTED = AtomicLongFieldUpdater.newUpdater(EventSubscribeFlux.class, "requested");
    volatile long remainder;
    static final AtomicLongFieldUpdater<EventSubscribeFlux> REMAINDER = AtomicLongFieldUpdater.newUpdater(EventSubscribeFlux.class, "remainder");
    private Scannable.Attr.RunStyle runStyle;
    private LocalSubscriber subscriber;
    private CoreSubscriber<? super T> actual;
    private volatile Queue<TopicPayload> buffer;

    EventSubscribeFlux(Subscription subscription, InternalEventBus parent, Function<TopicPayload, T> converter) {
        this.subscription = subscription;
        this.dropListener = subscription.getDropListener();
        this.parent = parent;
        this.converter = converter;
        if (parent.maxBufferSize < 0) {
            this.remainder = Long.MAX_VALUE;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
        EventSubscribeFlux eventSubscribeFlux = this;
        synchronized (eventSubscribeFlux) {
            if (this.subscriber != null) {
                actual.onError((Throwable)Exceptions.duplicateOnSubscribeException());
                return;
            }
            this.actual = actual;
            this.runStyle = (Scannable.Attr.RunStyle)Scannable.from(actual).scan(Scannable.Attr.RUN_STYLE);
            this.subscriber = new LocalSubscriber(this.parent, this.subscription, this);
        }
        this.subscription = null;
        actual.onSubscribe((org.reactivestreams.Subscription)this);
    }

    @Override
    public Mono<Void> apply(InternalEventBus.SubscriptionInfo info, TopicPayload payload) {
        this.next(info, payload);
        return Mono.empty();
    }

    void dropped(TopicPayload payload) {
        if (this.dropListener == null) {
            return;
        }
        this.dropListener.accept(payload);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryCreateBuffer() {
        if ((this.requested < Integer.MAX_VALUE || this.runStyle != Scannable.Attr.RunStyle.ASYNC) && this.buffer == null) {
            EventSubscribeFlux eventSubscribeFlux = this;
            synchronized (eventSubscribeFlux) {
                if (this.buffer == null) {
                    this.buffer = this.newBuffer();
                }
            }
        }
    }

    void next(InternalEventBus.SubscriptionInfo info, TopicPayload payload) {
        this.tryCreateBuffer();
        Queue<TopicPayload> buffer = this.buffer;
        if (buffer != null) {
            if (this.remainder == Long.MAX_VALUE) {
                if (!buffer.offer(payload)) {
                    info.dropped(payload);
                    this.dropped(payload);
                }
            } else {
                long size = REMAINDER.incrementAndGet(this);
                if (size >= (long)this.parent.maxBufferSize) {
                    info.dropped(payload);
                    this.dropped(payload);
                    REMAINDER.decrementAndGet(this);
                } else if (!buffer.offer(payload)) {
                    info.dropped(payload);
                    this.dropped(payload);
                    REMAINDER.decrementAndGet(this);
                }
            }
            this.drain();
        } else {
            this.next0(payload);
        }
    }

    private void next0(TopicPayload payload) {
        T result = this.converter.apply(payload);
        if (result != null) {
            this.actual.onNext(result);
        } else {
            Operators.onDiscard((Object)payload, (Context)this.actual.currentContext());
            this.dropped(payload);
        }
    }

    void drain() {
        Queue<TopicPayload> buffer = this.buffer;
        if (buffer == null) {
            return;
        }
        if (WIP.getAndIncrement(this) != 0) {
            return;
        }
        CoreSubscriber<? super T> a = this.actual;
        int missed = 1;
        do {
            long e;
            long r = this.requested;
            for (e = 0L; r != e; ++e) {
                boolean empty;
                boolean d = this.cancelled;
                TopicPayload t = buffer.poll();
                boolean bl = empty = t == null;
                if (this.checkTerminated(d, (Subscriber<? super T>)a)) {
                    return;
                }
                if (empty) break;
                this.next0(t);
            }
            if (r == e && this.checkTerminated(this.cancelled, (Subscriber<? super T>)a)) {
                return;
            }
            if (e == 0L) continue;
            if (this.remainder != Long.MAX_VALUE) {
                Operators.produced(REMAINDER, (Object)this, (long)e);
            }
            if (r == Long.MAX_VALUE) continue;
            Operators.produced(REQUESTED, (Object)this, (long)e);
        } while ((missed = WIP.addAndGet(this, -missed)) != 0);
    }

    boolean checkTerminated(boolean done, Subscriber<? super T> a) {
        if (done) {
            a.onComplete();
            return true;
        }
        return false;
    }

    private Queue<TopicPayload> newBuffer() {
        return (Queue)Queues.unboundedMultiproducer().get();
    }

    public void request(long n) {
        if (Operators.validate((long)n)) {
            Operators.addCap(REQUESTED, (Object)this, (long)n);
            this.drain();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        EventSubscribeFlux eventSubscribeFlux = this;
        synchronized (eventSubscribeFlux) {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
        }
        Queue<TopicPayload> buffer = this.buffer;
        if (buffer != null) {
            TopicPayload payload;
            while ((payload = buffer.poll()) != null) {
                Operators.onDiscard((Object)payload, (Context)this.actual.currentContext());
                this.dropped(payload);
            }
        }
        if (null != this.subscriber) {
            this.subscriber.dispose();
        }
    }

    public Object scanUnsafe(@Nonnull Scannable.Attr key) {
        if (key == Scannable.Attr.ACTUAL) {
            return this.actual;
        }
        if (key == Scannable.Attr.BUFFERED) {
            return this.buffer == null ? 0 : this.buffer.size();
        }
        if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return this.requested;
        }
        if (key == Scannable.Attr.CANCELLED) {
            return this.cancelled;
        }
        if (key == Scannable.Attr.RUN_STYLE) {
            return Scannable.Attr.RunStyle.ASYNC;
        }
        return null;
    }
}

