/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.common;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Cancellation;
import reactor.core.Exceptions;
import reactor.core.Loopback;
import reactor.core.Producer;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.NettyChannel;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.QueueSupplier;

public class NettyChannelHandler<C extends NettyChannel>
extends ChannelDuplexHandler
implements Producer,
Publisher<Object> {
    protected static final Logger log = Loggers.getLogger(NettyChannelHandler.class);
    protected final Function<? super NettyChannel, ? extends Publisher<Void>> handler;
    protected final ChannelBridge<C> bridgeFactory;
    protected final Flux<Object> input;
    final InboundSink inboundEmitter;

    public NettyChannelHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<C> bridgeFactory, Channel ch) {
        this(handler, bridgeFactory, ch, null);
    }

    public NettyChannelHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<C> bridgeFactory, Channel ch, NettyChannelHandler parent) {
        this.handler = handler;
        if (parent == null) {
            this.inboundEmitter = new InboundSink(ch);
            this.input = Flux.from((Publisher)this).subscribeOn(Schedulers.fromExecutor((Executor)ch.eventLoop()));
        } else {
            this.inboundEmitter = parent.inboundEmitter;
            this.input = parent.input;
        }
        this.bridgeFactory = bridgeFactory;
    }

    public FluxSink<Object> downstream() {
        return this.inboundEmitter;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.handler.apply((NettyChannel)this.bridgeFactory.createChannelBridge(ctx.channel(), this.input, new Object[0])).subscribe((Subscriber)new CloseSubscriber(ctx));
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {
            this.inboundEmitter.complete();
            super.channelInactive(ctx);
        }
        catch (Throwable err) {
            Exceptions.throwIfFatal((Throwable)err);
            this.inboundEmitter.error(err);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.doRead(msg);
    }

    protected final void doRead(Object msg) {
        if (msg == null) {
            return;
        }
        try {
            if (msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
                return;
            }
            this.inboundEmitter.next(msg);
        }
        catch (Throwable err) {
            Exceptions.throwIfFatal((Throwable)err);
            this.inboundEmitter.error(err);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.inboundEmitter.requested != 0L) {
            ctx.read();
        } else if (log.isDebugEnabled()) {
            log.debug("Pausing read due to lack of request");
        }
        ctx.fireChannelReadComplete();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof ChannelWriter) {
            ChannelWriter dataWriter = (ChannelWriter)msg;
            if (dataWriter.flushMode == FlushMode.MANUAL_COMPLETE) {
                dataWriter.writeStream.subscribe((Subscriber)new FlushOnTerminateSubscriber(ctx, promise));
            } else {
                dataWriter.writeStream.subscribe((Subscriber)new FlushOnEachSubscriber(ctx, promise));
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable err) throws Exception {
        Exceptions.throwIfFatal((Throwable)err);
        this.inboundEmitter.error(err);
    }

    protected ChannelFuture doOnWrite(Object data, ChannelHandlerContext ctx) {
        if (Unpooled.EMPTY_BUFFER != data) {
            return ctx.channel().write(data);
        }
        return null;
    }

    protected void doOnTerminate(ChannelHandlerContext ctx, ChannelFuture last, ChannelPromise promise, Throwable exception) {
        if (ctx.channel().isOpen()) {
            ChannelFutureListener listener = future -> {
                if (exception != null) {
                    promise.tryFailure(exception);
                } else if (future.isSuccess()) {
                    promise.trySuccess();
                } else {
                    promise.tryFailure(future.cause());
                }
            };
            if (last != null) {
                last.addListener((GenericFutureListener)listener);
                ctx.flush();
            }
        } else if (exception != null) {
            promise.tryFailure(exception);
        } else {
            promise.trySuccess();
        }
    }

    public Function<? super NettyChannel, ? extends Publisher<Void>> getHandler() {
        return this.handler;
    }

    public void subscribe(Subscriber<? super Object> s) {
        if (log.isDebugEnabled()) {
            log.debug("Subscribing inbound receiver [pending: " + this.inboundEmitter.getPending() + ", done: " + this.inboundEmitter.done + "]");
        }
        if (this.inboundEmitter.actual == null) {
            if (this.inboundEmitter.done) {
                if (this.inboundEmitter.error != null) {
                    Operators.error(s, (Throwable)this.inboundEmitter.error);
                    return;
                }
                if (this.inboundEmitter.getPending() == 0L) {
                    Operators.complete(s);
                    return;
                }
            }
            this.inboundEmitter.init(s);
            s.onSubscribe((Subscription)this.inboundEmitter);
        } else {
            Operators.error(s, (Throwable)new IllegalStateException("Only one connection receive subscriber allowed."));
        }
    }

    static final class InboundSink
    implements FluxSink<Object>,
    Trackable,
    Cancellation,
    Subscription,
    Producer {
        final Channel ch;
        Subscriber<? super Object> actual;
        boolean caughtUp;
        Queue<Object> queue;
        boolean done;
        Throwable error;
        long requested;
        int wip;
        volatile Cancellation cancel;
        static final AtomicReferenceFieldUpdater<InboundSink, Cancellation> CANCEL = AtomicReferenceFieldUpdater.newUpdater(InboundSink.class, Cancellation.class, "cancel");
        static final Cancellation CANCELLED = () -> {};

        public InboundSink(Channel channel) {
            this.ch = channel;
            CANCEL.lazySet(this, this);
        }

        void init(Subscriber<? super Object> s) {
            this.actual = s;
            CANCEL.lazySet(this, this);
            this.wip = 0;
        }

        public void next(Object value) {
            if (value == null) {
                this.error(new NullPointerException("value is null"));
                return;
            }
            if (this.done) {
                Operators.onNextDropped((Object)value);
                return;
            }
            if (this.caughtUp && this.actual != null) {
                try {
                    this.actual.onNext(value);
                }
                finally {
                    this.ch.read();
                    ReferenceCountUtil.release((Object)value);
                }
            } else {
                Queue q = this.queue;
                if (q == null) {
                    this.queue = q = (Queue)QueueSupplier.unbounded().get();
                }
                q.offer((Object)value);
                if (this.drain()) {
                    this.caughtUp = true;
                }
            }
        }

        public void error(Throwable error) {
            if (error == null) {
                error = new NullPointerException("error is null");
            }
            if (this.isCancelled() || this.done) {
                Operators.onErrorDropped((Throwable)error);
                return;
            }
            this.done = true;
            if (this.caughtUp && this.actual != null) {
                this.actual.onError(error);
            } else {
                this.error = error;
                this.done = true;
                this.drain();
            }
        }

        public boolean isCancelled() {
            return this.cancel == CANCELLED;
        }

        public void complete() {
            if (this.isCancelled() || this.done) {
                return;
            }
            this.done = true;
            if (this.caughtUp && this.actual != null) {
                this.actual.onComplete();
            }
            this.drain();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean drain() {
            long r;
            if (this.wip++ != 0) {
                return false;
            }
            int missed = 1;
            do {
                long e;
                Queue<Object> q = this.queue;
                Subscriber<? super Object> a = this.actual;
                if (a == null) {
                    return false;
                }
                r = this.requested;
                for (e = 0L; e != r; ++e) {
                    boolean empty;
                    if (this.isCancelled()) {
                        return false;
                    }
                    boolean d = this.done;
                    Object v = q != null ? q.poll() : null;
                    boolean bl = empty = v == null;
                    if (d && empty) {
                        Throwable ex;
                        this.cancelResource();
                        if (q != null) {
                            q.clear();
                        }
                        if ((ex = this.error) != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return false;
                    }
                    if (empty) {
                        this.ch.read();
                        break;
                    }
                    try {
                        a.onNext(v);
                        continue;
                    }
                    finally {
                        ReferenceCountUtil.release((Object)v);
                        this.ch.read();
                    }
                }
                if (e == r) {
                    if (this.isCancelled()) {
                        return false;
                    }
                    if (this.done && (q == null || q.isEmpty())) {
                        Throwable ex;
                        this.cancelResource();
                        if (q != null) {
                            q.clear();
                        }
                        if ((ex = this.error) != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return false;
                    }
                }
                if (e == 0L || r == Long.MAX_VALUE || (this.requested -= e) <= 0L) continue;
                this.ch.read();
            } while ((missed = (this.wip -= missed)) != 0);
            if (r == Long.MAX_VALUE) {
                this.ch.config().setAutoRead(true);
                this.ch.read();
                return true;
            }
            return false;
        }

        public void setCancellation(Cancellation c) {
            if (!CANCEL.compareAndSet(this, null, c) && this.cancel != CANCELLED && c != null) {
                c.dispose();
            }
        }

        public void request(long n) {
            if (Operators.validate((long)n)) {
                this.requested = Operators.addCap((long)this.requested, (long)n);
                this.drain();
            }
        }

        void cancelResource() {
            Cancellation c = this.cancel;
            if (c != CANCELLED && (c = CANCEL.getAndSet(this, CANCELLED)) != null && c != CANCELLED) {
                this.requested = 0L;
                c.dispose();
            }
        }

        public void cancel() {
            Queue<Object> q;
            this.cancelResource();
            if (this.wip++ == 0 && (q = this.queue) != null) {
                Object o;
                while ((o = q.poll()) != null) {
                    ReferenceCountUtil.release((Object)o);
                }
            }
        }

        public FluxSink<Object> serialize() {
            return this;
        }

        public long requestedFromDownstream() {
            return this.requested;
        }

        public long getCapacity() {
            return Long.MAX_VALUE;
        }

        public long getPending() {
            return this.queue != null ? (long)this.queue.size() : 0L;
        }

        public Throwable getError() {
            return this.error;
        }

        public Object downstream() {
            return this.actual;
        }

        void dereference() {
            this.actual = null;
        }

        public void dispose() {
            if (this.ch.eventLoop().inEventLoop()) {
                this.dereference();
            } else {
                this.ch.eventLoop().execute(this::dereference);
            }
            this.ch.config().setAutoRead(false);
        }
    }

    final class FlushOnEachSubscriber
    implements Subscriber<Object>,
    ChannelFutureListener,
    Loopback,
    Trackable,
    Receiver {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise promise;
        volatile Subscription subscription;
        private final ChannelFutureListener writeListener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    FlushOnEachSubscriber.this.promise.tryFailure(future.cause());
                    if (log.isDebugEnabled()) {
                        log.debug("Write error", future.cause());
                    }
                    return;
                }
                Subscription subscription = FlushOnEachSubscriber.this.subscription;
                if (subscription != null) {
                    subscription.request(1L);
                }
            }
        };

        public FlushOnEachSubscriber(ChannelHandlerContext ctx, ChannelPromise promise) {
            this.ctx = ctx;
            this.promise = promise;
        }

        public boolean isCancelled() {
            return !this.ctx.channel().isOpen();
        }

        public boolean isStarted() {
            return this.subscription != null;
        }

        public boolean isTerminated() {
            return !this.ctx.channel().isOpen();
        }

        public Object connectedInput() {
            return NettyChannelHandler.this;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.subscription, (Subscription)s)) {
                this.subscription = s;
                this.ctx.channel().closeFuture().addListener((GenericFutureListener)this);
                s.request(1L);
            }
        }

        public void onNext(Object w) {
            if (w == null) {
                throw Exceptions.argumentIsNullException();
            }
            if (this.subscription == null) {
                throw Exceptions.failWithCancel();
            }
            try {
                ChannelFuture cf = NettyChannelHandler.this.doOnWrite(w, this.ctx);
                if (cf != null) {
                    cf.addListener((GenericFutureListener)this.writeListener);
                }
                this.ctx.flush();
            }
            catch (Throwable t) {
                log.error("Write error for " + w, t);
                this.onError(t);
                throw Exceptions.failWithCancel();
            }
        }

        public void onError(Throwable t) {
            if (t == null) {
                throw Exceptions.argumentIsNullException();
            }
            if (this.subscription == null) {
                throw new IllegalStateException("already flushed", t);
            }
            log.error("Write error", t);
            this.subscription = null;
            this.ctx.channel().closeFuture().removeListener((GenericFutureListener)this);
            this.ctx.channel().eventLoop().execute(() -> NettyChannelHandler.this.doOnTerminate(this.ctx, null, this.promise, t));
        }

        public void onComplete() {
            if (this.subscription == null) {
                throw new IllegalStateException("already flushed");
            }
            this.subscription = null;
            if (log.isDebugEnabled()) {
                log.debug("Flush Connection");
            }
            this.ctx.channel().closeFuture().removeListener((GenericFutureListener)this);
            this.ctx.channel().eventLoop().execute(() -> NettyChannelHandler.this.doOnTerminate(this.ctx, null, this.promise, null));
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            Subscription subscription = this.subscription;
            this.subscription = null;
            if (subscription != null && NettyChannelHandler.this.inboundEmitter.getPending() == 0L) {
                if (log.isDebugEnabled()) {
                    log.debug("Cancel from remotely closed connection");
                }
                subscription.cancel();
            }
        }

        public Object upstream() {
            return this.subscription;
        }
    }

    final class FlushOnTerminateSubscriber
    implements Subscriber<Object>,
    ChannelFutureListener,
    Loopback {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise promise;
        ChannelFuture lastWrite;
        Subscription subscription;

        public FlushOnTerminateSubscriber(ChannelHandlerContext ctx, ChannelPromise promise) {
            this.ctx = ctx;
            this.promise = promise;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            Subscription subscription = this.subscription;
            this.subscription = null;
            if (subscription != null && NettyChannelHandler.this.inboundEmitter.getPending() == 0L) {
                if (log.isDebugEnabled()) {
                    log.debug("Cancel from remotely closed connection");
                }
                subscription.cancel();
            }
        }

        public Object connectedInput() {
            return NettyChannelHandler.this;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.subscription, (Subscription)s)) {
                this.subscription = s;
                this.ctx.channel().closeFuture().addListener((GenericFutureListener)this);
                s.request(Long.MAX_VALUE);
            }
        }

        public void onNext(final Object w) {
            if (w == null) {
                throw Exceptions.argumentIsNullException();
            }
            if (this.subscription == null) {
                throw Exceptions.failWithCancel();
            }
            try {
                ChannelFuture cf;
                this.lastWrite = cf = NettyChannelHandler.this.doOnWrite(w, this.ctx);
                if (cf != null && log.isDebugEnabled()) {
                    cf.addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                log.error("write error :" + w, future.cause());
                                if (ByteBuf.class.isAssignableFrom(w.getClass())) {
                                    ((ByteBuf)w).resetReaderIndex();
                                }
                            }
                        }
                    });
                }
            }
            catch (Throwable t) {
                log.error("Write error for " + w, t);
                this.onError(t);
            }
        }

        public void onError(Throwable t) {
            if (t == null) {
                throw Exceptions.argumentIsNullException();
            }
            if (this.subscription == null) {
                throw new IllegalStateException("already flushed", t);
            }
            log.error("Write error", t);
            this.subscription = null;
            this.ctx.channel().closeFuture().removeListener((GenericFutureListener)this);
            this.ctx.channel().eventLoop().execute(() -> NettyChannelHandler.this.doOnTerminate(this.ctx, this.lastWrite, this.promise, t));
        }

        public void onComplete() {
            if (this.subscription == null) {
                throw new IllegalStateException("already flushed");
            }
            this.subscription = null;
            this.ctx.channel().closeFuture().removeListener((GenericFutureListener)this);
            this.ctx.channel().eventLoop().execute(() -> NettyChannelHandler.this.doOnTerminate(this.ctx, this.lastWrite, this.promise, null));
        }
    }

    static final class CloseSubscriber
    implements Subscriber<Void> {
        private final ChannelHandlerContext ctx;

        public CloseSubscriber(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
        }

        public void onNext(Void aVoid) {
        }

        public void onError(Throwable t) {
            if (t instanceof IOException && t.getMessage().contains("Broken pipe")) {
                if (log.isDebugEnabled()) {
                    log.debug("Connection closed remotely", t);
                }
                return;
            }
            log.error("Error processing connection. Closing the channel.", t);
            this.ctx.channel().eventLoop().execute(() -> ((ChannelHandlerContext)this.ctx).close());
        }

        public void onComplete() {
            if (log.isDebugEnabled()) {
                log.debug("Closing connection");
            }
            this.ctx.channel().eventLoop().execute(() -> ((ChannelHandlerContext)this.ctx).close());
        }
    }

    public static final class ChannelWriter {
        final Publisher<?> writeStream;
        final FlushMode flushMode;

        public ChannelWriter(Publisher<?> writeStream, FlushMode flushMode) {
            this.writeStream = writeStream;
            this.flushMode = flushMode;
        }
    }

    public static enum FlushMode {
        AUTO_EACH,
        AUTO_LOOP,
        MANUAL_COMPLETE,
        MANUAL_BOUNDARY;

    }
}

