/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.channel.AbortedException;
import reactor.ipc.netty.channel.ChannelOperations;
import reactor.ipc.netty.channel.ContextHandler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.QueueSupplier;

final class ChannelOperationsHandler
extends ChannelDuplexHandler
implements NettyPipeline.SendOptions {
    final PublisherSender inner = new PublisherSender(this);
    final BiConsumer<?, ? super ByteBuf> encoder = NOOP_ENCODER;
    final int prefetch;
    final ContextHandler<?> originContext;
    BiPredicate<ChannelPromise, Object> pendingWriteOffer;
    Queue<?> pendingWrites;
    ChannelHandlerContext ctx;
    boolean flushOnEach;
    long pendingBytes;
    ContextHandler<?> lastContext = null;
    volatile boolean innerActive;
    volatile boolean removed;
    volatile int wip;
    static final AtomicIntegerFieldUpdater<ChannelOperationsHandler> WIP = AtomicIntegerFieldUpdater.newUpdater(ChannelOperationsHandler.class, "wip");
    static final Logger log = Loggers.getLogger(ChannelOperationsHandler.class);
    static final BiConsumer<?, ? super ByteBuf> NOOP_ENCODER = (a, b) -> {};

    ChannelOperationsHandler(ContextHandler<?> contextHandler) {
        this.prefetch = 32;
        this.originContext = contextHandler;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.originContext.createOperations(ctx.channel(), null);
    }

    public final void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {
            ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
            if (ops != null) {
                ops.onHandlerTerminate();
            } else if (this.lastContext != null) {
                this.lastContext.terminateChannel(ctx.channel());
                this.lastContext.fireContextError(new AbortedException());
            }
        }
        catch (Throwable err) {
            Exceptions.throwIfFatal((Throwable)err);
            this.exceptionCaught(ctx, err);
        }
    }

    public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
            return;
        }
        try {
            ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
            if (ops != null) {
                ChannelOperations.get(ctx.channel()).onInboundNext(ctx, msg);
            } else {
                if (log.isDebugEnabled()) {
                    if (msg instanceof ByteBufHolder) {
                        msg = ((ByteBufHolder)msg).content().toString(Charset.defaultCharset());
                    }
                    log.debug("{} No ChannelOperation attached. Dropping: {}", new Object[]{ctx.channel().toString(), msg});
                }
                ReferenceCountUtil.release((Object)msg);
            }
        }
        catch (Throwable err) {
            ReferenceCountUtil.release((Object)msg);
            Exceptions.throwIfFatal((Throwable)err);
            this.exceptionCaught(ctx, err);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("{} Write state change {}", new Object[]{ctx.channel(), ctx.channel().isWritable()});
        }
        if (ctx.channel().isWritable()) {
            this.inner.request(this.prefetch);
        }
        this.drain();
    }

    public final void exceptionCaught(ChannelHandlerContext ctx, Throwable err) throws Exception {
        Exceptions.throwIfFatal((Throwable)err);
        ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
        if (ops != null) {
            ops.onInboundError(err);
        } else if (this.lastContext != null) {
            this.lastContext.terminateChannel(ctx.channel());
            this.lastContext.fireContextError(err);
        }
    }

    public void flush(ChannelHandlerContext ctx) throws Exception {
        this.drain();
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        this.inner.request(this.prefetch);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (!this.removed) {
            this.removed = true;
            this.inner.cancel();
            this.drain();
        }
    }

    public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (log.isTraceEnabled()) {
            log.trace("{} End of the pipeline, User event {}", new Object[]{ctx.channel(), evt});
        }
        if (evt == NettyPipeline.handlerTerminatedEvent()) {
            ContextHandler<?> c = this.lastContext;
            if (c == null) {
                if (log.isDebugEnabled()) {
                    log.debug("{} No context to dispose", new Object[]{ctx.channel()});
                }
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("{} Disposing context {}", new Object[]{ctx.channel(), c});
            }
            this.lastContext = null;
            c.terminateChannel(ctx.channel());
            return;
        }
        if (evt instanceof NettyPipeline.SendOptionsChangeEvent) {
            if (log.isDebugEnabled()) {
                log.debug("{} New sending options", new Object[]{ctx.channel()});
            }
            ((NettyPipeline.SendOptionsChangeEvent)evt).configurator().accept(this);
            return;
        }
        ctx.fireUserEventTriggered(evt);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("{} Writing object {}", new Object[]{ctx.channel(), msg});
        }
        if (this.pendingWrites == null) {
            this.pendingWrites = (Queue)QueueSupplier.unbounded().get();
            this.pendingWriteOffer = (BiPredicate)((Object)this.pendingWrites);
        }
        if (!this.pendingWriteOffer.test(promise, msg)) {
            promise.setFailure((Throwable)new IllegalStateException("Send Queue full?!"));
        }
    }

    @Override
    public NettyPipeline.SendOptions flushOnBoundary() {
        this.flushOnEach = false;
        return this;
    }

    @Override
    public NettyPipeline.SendOptions flushOnEach() {
        this.flushOnEach = true;
        return this;
    }

    ChannelFuture doWrite(Object msg, ChannelPromise promise, PublisherSender inner) {
        if (this.flushOnEach || inner == null && this.pendingWrites.isEmpty() || !this.ctx.channel().isWritable()) {
            this.pendingBytes = 0L;
            if (inner != null) {
                inner.justFlushed = true;
            }
            return this.ctx.writeAndFlush(msg, promise);
        }
        if (msg instanceof ByteBuf) {
            this.pendingBytes = Operators.addCap((long)this.pendingBytes, (long)((ByteBuf)msg).readableBytes());
        } else if (msg instanceof ByteBufHolder) {
            this.pendingBytes = Operators.addCap((long)this.pendingBytes, (long)((ByteBufHolder)msg).content().readableBytes());
        } else if (msg instanceof FileRegion) {
            this.pendingBytes = Operators.addCap((long)this.pendingBytes, (long)((FileRegion)msg).count());
        }
        if (log.isTraceEnabled()) {
            log.trace("{} Pending write size = {}", new Object[]{this.ctx.channel(), this.pendingBytes});
        }
        if (inner != null && inner.justFlushed) {
            inner.justFlushed = false;
        }
        return this.ctx.write(msg, promise);
    }

    void discard() {
        while (this.pendingWrites != null && !this.pendingWrites.isEmpty()) {
            ChannelPromise promise;
            Object v = this.pendingWrites.poll();
            try {
                promise = (ChannelPromise)v;
            }
            catch (Throwable e) {
                this.ctx.fireExceptionCaught(e);
                return;
            }
            v = this.pendingWrites.poll();
            if (log.isDebugEnabled()) {
                log.debug("{} Terminated ChannelOperation. Dropping Pending Write: {}", new Object[]{this.ctx.channel().toString(), v});
            }
            ReferenceCountUtil.release(v);
            promise.tryFailure((Throwable)new AbortedException("Connection has been closed"));
        }
        return;
    }

    void drain() {
        if (WIP.getAndIncrement(this) == 0) {
            while (true) {
                boolean empty;
                ChannelPromise promise;
                if (this.removed) {
                    this.discard();
                    return;
                }
                if (this.pendingWrites == null || this.innerActive || !this.ctx.channel().isWritable()) {
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                Object v = this.pendingWrites.poll();
                try {
                    promise = (ChannelPromise)v;
                }
                catch (Throwable e) {
                    this.ctx.fireExceptionCaught(e);
                    return;
                }
                boolean bl = empty = promise == null;
                if (empty) {
                    if (WIP.decrementAndGet(this) != 0) continue;
                    break;
                }
                v = this.pendingWrites.poll();
                if (v instanceof Publisher) {
                    Publisher p = (Publisher)v;
                    if (p instanceof Callable) {
                        Object vr;
                        Callable supplier = (Callable)p;
                        try {
                            vr = supplier.call();
                        }
                        catch (Throwable e) {
                            promise.setFailure(e);
                            continue;
                        }
                        if (vr == null) {
                            promise.setSuccess();
                            continue;
                        }
                        if (this.inner.unbounded) {
                            this.doWrite(vr, promise, null);
                            continue;
                        }
                        this.innerActive = true;
                        this.inner.promise = promise;
                        this.inner.onSubscribe(Operators.scalarSubscription((Subscriber)this.inner, vr));
                        continue;
                    }
                    this.innerActive = true;
                    this.inner.promise = promise;
                    p.subscribe((Subscriber)this.inner);
                    continue;
                }
                this.doWrite(v, promise, null);
            }
        }
    }

    static final class PublisherSender
    implements Subscriber<Object>,
    Subscription,
    ChannelFutureListener {
        final ChannelOperationsHandler parent;
        volatile Subscription missedSubscription;
        volatile long missedRequested;
        volatile long missedProduced;
        volatile int wip;
        boolean inactive;
        boolean justFlushed;
        long requested;
        boolean unbounded;
        Subscription actual;
        long produced;
        ChannelPromise promise;
        ChannelFuture lastWrite;
        static final AtomicReferenceFieldUpdater<PublisherSender, Subscription> MISSED_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(PublisherSender.class, Subscription.class, "missedSubscription");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_REQUESTED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedRequested");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_PRODUCED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedProduced");
        static final AtomicIntegerFieldUpdater<PublisherSender> WIP = AtomicIntegerFieldUpdater.newUpdater(PublisherSender.class, "wip");

        PublisherSender(ChannelOperationsHandler parent) {
            this.parent = parent;
        }

        public final void cancel() {
            if (!this.inactive) {
                this.inactive = true;
                this.drain();
            }
        }

        public void onComplete() {
            long p = this.produced;
            ChannelFuture f = this.lastWrite;
            this.parent.innerActive = false;
            if (p != 0L) {
                this.produced = 0L;
                this.produced(p);
                if (!this.justFlushed) {
                    if (this.parent.ctx.channel().isActive()) {
                        this.justFlushed = true;
                        this.parent.ctx.flush();
                    } else {
                        this.promise.setFailure((Throwable)new AbortedException("Connection has been closed"));
                        return;
                    }
                }
            }
            if (f != null) {
                f.addListener((GenericFutureListener)this);
            } else {
                this.promise.setSuccess();
                this.parent.drain();
            }
        }

        public void onError(Throwable t) {
            long p = this.produced;
            ChannelFuture f = this.lastWrite;
            this.parent.innerActive = false;
            if (p != 0L) {
                this.produced = 0L;
                this.produced(p);
                if (this.parent.ctx.channel().isActive()) {
                    this.justFlushed = true;
                    this.parent.ctx.flush();
                } else {
                    this.promise.setFailure((Throwable)new AbortedException("Connection has been closed"));
                    return;
                }
            }
            if (f != null) {
                f.addListener((GenericFutureListener)this);
            } else {
                this.promise.setFailure(t);
                this.parent.drain();
            }
        }

        public void onNext(Object t) {
            ++this.produced;
            this.lastWrite = this.parent.doWrite(t, this.parent.ctx.newPromise(), this);
            if (this.parent.ctx.channel().isWritable()) {
                this.request(1L);
            }
        }

        public void onSubscribe(Subscription s) {
            if (this.inactive) {
                s.cancel();
                return;
            }
            Objects.requireNonNull(s);
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                this.actual = s;
                long r = this.requested;
                if (WIP.decrementAndGet(this) != 0) {
                    this.drainLoop();
                }
                if (r != 0L) {
                    s.request(r);
                }
                return;
            }
            MISSED_SUBSCRIPTION.set(this, s);
            this.drain();
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.promise.setSuccess();
            } else {
                this.promise.setFailure(future.cause());
            }
        }

        public final void request(long n) {
            if (Operators.validate((long)n)) {
                if (this.unbounded) {
                    return;
                }
                if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                    long r = this.requested;
                    if (r != Long.MAX_VALUE) {
                        this.requested = r = Operators.addCap((long)r, (long)n);
                        if (r == Long.MAX_VALUE) {
                            this.unbounded = true;
                        }
                    }
                    Subscription a = this.actual;
                    if (WIP.decrementAndGet(this) != 0) {
                        this.drainLoop();
                    }
                    if (a != null) {
                        a.request(n);
                    }
                    return;
                }
                Operators.getAndAddCap(MISSED_REQUESTED, (Object)this, (long)n);
                this.drain();
            }
        }

        final void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            this.drainLoop();
        }

        final void drainLoop() {
            int missed = 1;
            long requestAmount = 0L;
            Subscription requestTarget = null;
            do {
                long mp;
                long mr;
                Subscription ms;
                if ((ms = this.missedSubscription) != null) {
                    ms = MISSED_SUBSCRIPTION.getAndSet(this, null);
                }
                if ((mr = this.missedRequested) != 0L) {
                    mr = MISSED_REQUESTED.getAndSet(this, 0L);
                }
                if ((mp = this.missedProduced) != 0L) {
                    mp = MISSED_PRODUCED.getAndSet(this, 0L);
                }
                Subscription a = this.actual;
                if (this.inactive) {
                    if (a != null) {
                        a.cancel();
                        this.actual = null;
                    }
                    if (ms == null) continue;
                    ms.cancel();
                    continue;
                }
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = Operators.addCap((long)r, (long)mr);
                    if (u != Long.MAX_VALUE) {
                        long v = u - mp;
                        if (v < 0L) {
                            Operators.reportMoreProduced();
                            v = 0L;
                        }
                        r = v;
                    } else {
                        r = u;
                    }
                    this.requested = r;
                }
                if (ms != null) {
                    this.actual = ms;
                    if (r == 0L) continue;
                    requestAmount = Operators.addCap((long)requestAmount, (long)r);
                    requestTarget = ms;
                    continue;
                }
                if (mr == 0L || a == null) continue;
                requestAmount = Operators.addCap((long)requestAmount, (long)mr);
                requestTarget = a;
            } while ((missed = WIP.addAndGet(this, -missed)) != 0);
            if (requestAmount != 0L) {
                requestTarget.request(requestAmount);
            }
        }

        final void produced(long n) {
            if (this.unbounded) {
                return;
            }
            if (this.wip == 0 && WIP.compareAndSet(this, 0, 1)) {
                long r = this.requested;
                if (r != Long.MAX_VALUE) {
                    long u = r - n;
                    if (u < 0L) {
                        Operators.reportMoreProduced();
                        u = 0L;
                    }
                    this.requested = u;
                } else {
                    this.unbounded = true;
                }
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
                this.drainLoop();
                return;
            }
            Operators.getAndAddCap(MISSED_PRODUCED, (Object)this, (long)n);
            this.drain();
        }
    }
}

