/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.QueueSupplier;

final class FluxReceive
extends Flux<Object>
implements Subscription,
Disposable {
    final Channel channel;
    final ChannelOperations<?, ?> parent;
    final EventLoop eventLoop;
    Subscriber<? super Object> receiver;
    boolean receiverFastpath;
    long receiverDemand;
    Queue<Object> receiverQueue;
    volatile boolean inboundDone;
    Throwable inboundError;
    volatile Disposable receiverCancel;
    volatile int wip;
    static final AtomicIntegerFieldUpdater<FluxReceive> WIP = AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "wip");
    static final AtomicReferenceFieldUpdater<FluxReceive, Disposable> CANCEL = AtomicReferenceFieldUpdater.newUpdater(FluxReceive.class, Disposable.class, "receiverCancel");
    static final Disposable CANCELLED = () -> {};
    static final Logger log = Loggers.getLogger(FluxReceive.class);

    FluxReceive(ChannelOperations<?, ?> parent) {
        this.parent = parent;
        this.channel = parent.channel;
        this.eventLoop = this.channel.eventLoop();
        CANCEL.lazySet(this, () -> {
            if (this.eventLoop.inEventLoop()) {
                this.unsubscribeReceiver();
            } else {
                this.eventLoop.execute(this::unsubscribeReceiver);
            }
        });
    }

    public void cancel() {
        this.cancelReceiver();
        this.drainReceiver();
    }

    final long getPending() {
        return this.receiverQueue != null ? (long)this.receiverQueue.size() : 0L;
    }

    final boolean isCancelled() {
        return this.receiverCancel == CANCELLED;
    }

    public void dispose() {
        this.cancel();
    }

    public boolean isDisposed() {
        return this.inboundDone && (this.receiverQueue == null || this.receiverQueue.isEmpty());
    }

    public void request(long n) {
        if (Operators.validate((long)n)) {
            if (this.eventLoop.inEventLoop()) {
                this.receiverDemand = Operators.addCap((long)this.receiverDemand, (long)n);
                this.drainReceiver();
            } else {
                this.eventLoop.execute(() -> {
                    this.receiverDemand = Operators.addCap((long)this.receiverDemand, (long)n);
                    this.drainReceiver();
                });
            }
        }
    }

    public void subscribe(Subscriber<? super Object> s) {
        if (s == null) {
            throw Exceptions.argumentIsNullException();
        }
        if (this.eventLoop.inEventLoop()) {
            this.startReceiver(s);
        } else {
            this.eventLoop.execute(() -> this.startReceiver(s));
        }
    }

    final boolean cancelReceiver() {
        Disposable c = this.receiverCancel;
        if (c != CANCELLED && (c = CANCEL.getAndSet(this, CANCELLED)) != CANCELLED) {
            c.dispose();
            return true;
        }
        return false;
    }

    final void cleanQueue(Queue<Object> q) {
        if (q != null) {
            Object o;
            while ((o = q.poll()) != null) {
                if (log.isDebugEnabled()) {
                    log.debug("Dropping frame {}, {} in buffer", new Object[]{o, this.getPending()});
                }
                ReferenceCountUtil.release((Object)o);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    final boolean drainReceiver() {
        if (FluxReceive.WIP.getAndIncrement(this) != 0) {
            return false;
        }
        missed = 1;
        do lbl-1000:
        // 3 sources

        {
            block18: {
                q = this.receiverQueue;
                a = this.receiver;
                d = this.inboundDone;
                if (a != null) break block18;
                if (d && this.getPending() == 0L) {
                    ex = this.inboundError;
                    if (ex != null) {
                        this.parent.context.fireContextError(ex);
                    } else if (this.parent.shouldEmitEmptyContext()) {
                        this.parent.context.fireContextActive(null);
                    } else {
                        this.parent.context.fireContextActive(this.parent);
                    }
                    return false;
                }
                if ((missed = FluxReceive.WIP.addAndGet(this, -missed)) != 0) ** GOTO lbl-1000
                break;
            }
            r = this.receiverDemand;
            for (e = 0L; e != r; ++e) {
                if (this.isCancelled()) {
                    this.cleanQueue(q);
                    return false;
                }
                d = this.inboundDone;
                v = q != null ? q.poll() : null;
                v0 = empty = v == null;
                if (d && empty) {
                    this.terminateReceiver(q, a);
                    return false;
                }
                if (empty) break;
                try {
                    a.onNext(v);
                    continue;
                }
                finally {
                    ReferenceCountUtil.safeRelease((Object)v);
                }
            }
            if (this.isCancelled()) {
                this.cleanQueue(q);
                return false;
            }
            if (this.inboundDone && (q == null || q.isEmpty())) {
                this.terminateReceiver(q, a);
                return false;
            }
            if (r == 0x7FFFFFFFFFFFFFFFL) {
                this.channel.config().setAutoRead(true);
                this.channel.read();
                missed = FluxReceive.WIP.addAndGet(this, -missed);
                if (missed != 0) {
                    return true;
                }
                break;
            }
            if ((this.receiverDemand -= e) <= 0L && e <= 0L) continue;
            this.channel.read();
        } while ((missed = FluxReceive.WIP.addAndGet(this, -missed)) != 0);
        return false;
    }

    final void startReceiver(Subscriber<? super Object> s) {
        if (this.receiver == null) {
            if (log.isDebugEnabled()) {
                log.debug("{} Subscribing inbound receiver [pending: {}, cancelled:{}, inboundDone: {}]", new Object[]{this.channel.toString(), this.getPending(), this.isCancelled(), this.inboundDone});
            }
            if (this.inboundDone && this.getPending() == 0L) {
                if (this.inboundError != null) {
                    Operators.error(s, (Throwable)this.inboundError);
                    return;
                }
                Operators.complete(s);
                return;
            }
            this.receiver = s;
            s.onSubscribe((Subscription)this);
        } else {
            Operators.error(s, (Throwable)new IllegalStateException("Only one connection receive subscriber allowed."));
        }
    }

    final void onInboundNext(Object msg) {
        if (this.inboundDone || this.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping frame {}, {} in buffer", new Object[]{msg, this.getPending()});
            }
            ReferenceCountUtil.release((Object)msg);
            return;
        }
        if (this.receiverFastpath && this.receiver != null) {
            try {
                if (log.isDebugEnabled()) {
                    if (msg instanceof ByteBuf) {
                        ((ByteBuf)msg).touch((Object)"Unbounded receiver, bypass inbound buffer queue");
                    } else if (msg instanceof ByteBufHolder) {
                        ((ByteBufHolder)msg).touch((Object)"Unbounded receiver, bypass inbound buffer queue");
                    }
                }
                this.receiver.onNext(msg);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)msg);
            }
        } else {
            Queue q = this.receiverQueue;
            if (q == null) {
                this.receiverQueue = q = (Queue)QueueSupplier.unbounded().get();
            }
            if (log.isDebugEnabled()) {
                if (msg instanceof ByteBuf) {
                    ((ByteBuf)msg).touch((Object)"Buffered ByteBuf in Inbound Flux Queue");
                } else if (msg instanceof ByteBufHolder) {
                    ((ByteBufHolder)msg).touch((Object)"Buffered ByteBufHolder in Inbound Flux Queue");
                }
            }
            q.offer(msg);
            if (this.drainReceiver()) {
                this.receiverFastpath = true;
            }
        }
    }

    final boolean onInboundComplete() {
        if (this.inboundDone) {
            return false;
        }
        this.inboundDone = true;
        Subscriber<? super Object> receiver = this.receiver;
        if (this.receiverFastpath && receiver != null) {
            receiver.onComplete();
            return true;
        }
        this.drainReceiver();
        return false;
    }

    final boolean onInboundError(Throwable err) {
        if (this.isCancelled() || this.inboundDone) {
            Operators.onErrorDropped((Throwable)err);
            return false;
        }
        Subscriber<? super Object> receiver = this.receiver;
        this.inboundError = err;
        this.inboundDone = true;
        if (this.channel.isActive()) {
            this.channel.close();
        }
        if (this.receiverFastpath && receiver != null) {
            this.parent.context.fireContextError(err);
            receiver.onError(err);
            return true;
        }
        this.drainReceiver();
        return false;
    }

    final void terminateReceiver(Queue<?> q, Subscriber<?> a) {
        Throwable ex;
        if (q != null) {
            q.clear();
        }
        if ((ex = this.inboundError) != null) {
            this.parent.context.fireContextError(ex);
            a.onError(ex);
        } else {
            a.onComplete();
        }
    }

    final void unsubscribeReceiver() {
        this.receiverDemand = 0L;
        this.receiver = null;
        if (this.isCancelled()) {
            this.parent.onInboundCancel();
        }
    }
}

