/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

final class FluxReceive
extends Flux<Object>
implements Subscription,
Disposable {
    final Channel channel;
    final ChannelOperations<?, ?> parent;
    final EventLoop eventLoop;
    CoreSubscriber<? super Object> receiver;
    boolean receiverFastpath;
    long receiverDemand;
    Queue<Object> receiverQueue;
    volatile boolean inboundDone;
    Throwable inboundError;
    volatile Disposable receiverCancel;
    volatile int wip;
    static final AtomicIntegerFieldUpdater<FluxReceive> WIP = AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "wip");
    static final AtomicReferenceFieldUpdater<FluxReceive, Disposable> CANCEL = AtomicReferenceFieldUpdater.newUpdater(FluxReceive.class, Disposable.class, "receiverCancel");
    static final Disposable CANCELLED = () -> {};
    static final Logger log = Loggers.getLogger(FluxReceive.class);

    FluxReceive(ChannelOperations<?, ?> parent) {
        this.parent = parent;
        this.channel = parent.channel();
        this.eventLoop = this.channel.eventLoop();
        CANCEL.lazySet(this, () -> {
            if (this.eventLoop.inEventLoop()) {
                this.unsubscribeReceiver();
            } else {
                this.eventLoop.execute(this::unsubscribeReceiver);
            }
        });
    }

    public void cancel() {
        this.cancelReceiver();
        this.drainReceiver();
    }

    final long getPending() {
        return this.receiverQueue != null ? (long)this.receiverQueue.size() : 0L;
    }

    final boolean isCancelled() {
        return this.receiverCancel == CANCELLED;
    }

    public void dispose() {
        this.cancel();
    }

    public boolean isDisposed() {
        return this.inboundDone && (this.receiverQueue == null || this.receiverQueue.isEmpty());
    }

    public void request(long n) {
        if (Operators.validate((long)n)) {
            if (this.eventLoop.inEventLoop()) {
                this.receiverDemand = Operators.addCap((long)this.receiverDemand, (long)n);
                this.drainReceiver();
            } else {
                this.eventLoop.execute(() -> {
                    this.receiverDemand = Operators.addCap((long)this.receiverDemand, (long)n);
                    this.drainReceiver();
                });
            }
        }
    }

    public void subscribe(CoreSubscriber<? super Object> s) {
        if (this.eventLoop.inEventLoop()) {
            this.startReceiver(s);
        } else {
            this.eventLoop.execute(() -> this.startReceiver(s));
        }
    }

    final boolean cancelReceiver() {
        Disposable c = this.receiverCancel;
        if (c != CANCELLED && (c = CANCEL.getAndSet(this, CANCELLED)) != CANCELLED) {
            c.dispose();
            return true;
        }
        return false;
    }

    final void cleanQueue(@Nullable Queue<Object> q) {
        if (q != null) {
            Object o;
            while ((o = q.poll()) != null) {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(this.channel, "Dropping frame {}, {} in buffer"), new Object[]{o, this.getPending()});
                }
                ReferenceCountUtil.release((Object)o);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    final void drainReceiver() {
        if (FluxReceive.WIP.getAndIncrement(this) != 0) {
            return;
        }
        missed = 1;
        do lbl-1000:
        // 3 sources

        {
            block19: {
                q = this.receiverQueue;
                a = this.receiver;
                d = this.inboundDone;
                if (a != null) break block19;
                if (this.isCancelled()) {
                    this.cleanQueue(q);
                    return;
                }
                if (d && this.getPending() == 0L) {
                    ex = this.inboundError;
                    if (ex != null) {
                        this.parent.listener.onUncaughtException(this.parent, ex);
                    }
                    return;
                }
                if ((missed = FluxReceive.WIP.addAndGet(this, -missed)) != 0) ** GOTO lbl-1000
                break;
            }
            r = this.receiverDemand;
            for (e = 0L; e != r; ++e) {
                if (this.isCancelled()) {
                    this.cleanQueue(q);
                    this.terminateReceiver(q, a);
                    return;
                }
                d = this.inboundDone;
                v = q != null ? q.poll() : null;
                v0 = empty = v == null;
                if (d && empty) {
                    this.terminateReceiver(q, a);
                    return;
                }
                if (empty) break;
                try {
                    a.onNext(v);
                    continue;
                }
                finally {
                    try {
                        ReferenceCountUtil.release((Object)v);
                    }
                    catch (Throwable t) {
                        this.inboundError = t;
                        this.cleanQueue(q);
                        this.terminateReceiver(q, a);
                    }
                }
            }
            if (this.isCancelled()) {
                this.cleanQueue(q);
                this.terminateReceiver(q, a);
                return;
            }
            if (this.inboundDone && (q == null || q.isEmpty())) {
                this.terminateReceiver(q, a);
                return;
            }
            if (r == 0x7FFFFFFFFFFFFFFFL) {
                this.channel.config().setAutoRead(true);
                this.channel.read();
                missed = FluxReceive.WIP.addAndGet(this, -missed);
                if (missed == 0) break;
                this.receiverFastpath = true;
            }
            if ((this.receiverDemand -= e) <= 0L && e <= 0L) continue;
            this.channel.read();
        } while ((missed = FluxReceive.WIP.addAndGet(this, -missed)) != 0);
    }

    final void startReceiver(CoreSubscriber<? super Object> s) {
        if (this.receiver == null) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel, "Subscribing inbound receiver [pending: {}, cancelled:{}, inboundDone: {}]"), new Object[]{this.getPending(), this.isCancelled(), this.inboundDone});
            }
            if (this.inboundDone && this.getPending() == 0L) {
                if (this.inboundError != null) {
                    Operators.error(s, (Throwable)this.inboundError);
                    return;
                }
                Operators.complete(s);
                return;
            }
            this.receiver = s;
            s.onSubscribe((Subscription)this);
        } else {
            Operators.error(s, (Throwable)new IllegalStateException("Only one connection receive subscriber allowed."));
        }
    }

    final void onInboundNext(Object msg) {
        if (this.inboundDone || this.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel, "Dropping frame {}, {} in buffer"), new Object[]{msg, this.getPending()});
            }
            ReferenceCountUtil.release((Object)msg);
            return;
        }
        if (this.receiverFastpath && this.receiver != null) {
            try {
                if (log.isDebugEnabled()) {
                    if (msg instanceof ByteBuf) {
                        ((ByteBuf)msg).touch((Object)ReactorNetty.format(this.channel, "Unbounded receiver, bypass inbound buffer queue"));
                    } else if (msg instanceof ByteBufHolder) {
                        ((ByteBufHolder)msg).touch((Object)ReactorNetty.format(this.channel, "Unbounded receiver, bypass inbound buffer queue"));
                    }
                }
                this.receiver.onNext(msg);
            }
            finally {
                ReferenceCountUtil.release((Object)msg);
            }
        } else {
            Queue q = this.receiverQueue;
            if (q == null) {
                this.receiverQueue = q = (Queue)Queues.unbounded().get();
            }
            if (log.isDebugEnabled()) {
                if (msg instanceof ByteBuf) {
                    ((ByteBuf)msg).touch((Object)ReactorNetty.format(this.channel, "Buffered ByteBuf in Inbound Flux Queue"));
                } else if (msg instanceof ByteBufHolder) {
                    ((ByteBufHolder)msg).touch((Object)ReactorNetty.format(this.channel, "Buffered ByteBufHolder in Inbound Flux Queue"));
                }
            }
            q.offer(msg);
            this.drainReceiver();
        }
    }

    final void onInboundComplete() {
        if (this.inboundDone) {
            return;
        }
        this.inboundDone = true;
        if (this.receiverFastpath) {
            CoreSubscriber<? super Object> receiver = this.receiver;
            if (receiver != null) {
                receiver.onComplete();
            }
            return;
        }
        this.drainReceiver();
    }

    final void onInboundError(Throwable err) {
        if (this.isCancelled() || this.inboundDone) {
            if (log.isDebugEnabled()) {
                log.warn(ReactorNetty.format(this.channel, "An exception has been observed post termination"), err);
            } else if (log.isWarnEnabled()) {
                log.warn(ReactorNetty.format(this.channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), new Object[]{err.toString()});
            }
            return;
        }
        CoreSubscriber<? super Object> receiver = this.receiver;
        this.inboundDone = true;
        if (this.channel.isActive()) {
            this.parent.markPersistent(false);
        }
        if (err instanceof OutOfMemoryError) {
            if (log.isWarnEnabled()) {
                // empty if block
            }
            this.inboundError = ReactorNetty.wrapException(err);
            this.parent.terminate();
        } else {
            this.inboundError = err instanceof ClosedChannelException ? ReactorNetty.wrapException(err) : err;
        }
        if (this.receiverFastpath && receiver != null) {
            receiver.onError(err);
        } else {
            this.drainReceiver();
        }
    }

    final void terminateReceiver(@Nullable Queue<?> q, CoreSubscriber<?> a) {
        if (q != null) {
            q.clear();
        }
        Throwable ex = this.inboundError;
        this.receiver = null;
        if (ex != null) {
            a.onError(ex);
        } else {
            a.onComplete();
        }
    }

    final void unsubscribeReceiver() {
        this.receiverDemand = 0L;
        this.receiver = null;
        if (this.isCancelled()) {
            this.parent.onInboundCancel();
        }
    }

    public String toString() {
        return "FluxReceive{receiverQueueSize=" + (this.receiverQueue != null ? this.receiverQueue.size() : 0) + ", inboundDone=" + this.inboundDone + ",inboundError=" + this.inboundError + '}';
    }
}

