/*
 * Decompiled with CFR 0.152.
 */
package ratpack.server.internal;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.server.internal.ResponseBodyWriter;
import ratpack.server.internal.ResponseWritingListener;

class StreamingResponseBodyWriter
implements ResponseBodyWriter,
ResponseWritingListener {
    private final Publisher<? extends ByteBuf> publisher;
    private boolean done;
    private Subscription subscription;
    private ChannelPromise channelPromise;
    @Nullable
    private Continuation continuation;

    public StreamingResponseBodyWriter(Publisher<? extends ByteBuf> publisher) {
        this.publisher = publisher;
    }

    @Override
    public void dispose() {
        this.done = true;
    }

    @Override
    public void onClosed() {
        this.exec(() -> {
            if (!this.done) {
                this.done = true;
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
                if (this.channelPromise != null) {
                    this.channelPromise.setSuccess();
                }
            }
        });
    }

    @Override
    public void onWritable() {
        if (this.continuation == null) {
            return;
        }
        if (!this.done && this.subscription != null) {
            this.exec(() -> this.subscription.request(1L));
        }
    }

    public ChannelPromise write(Channel channel) {
        this.channelPromise = channel.newPromise();
        this.publisher.subscribe((org.reactivestreams.Subscriber)new Subscriber(channel));
        return this.channelPromise;
    }

    private void exec(Runnable rest) {
        if (this.continuation == null) {
            rest.run();
        } else {
            Continuation continuation = this.continuation;
            this.continuation = null;
            continuation.resume(rest::run);
        }
    }

    private class Subscriber
    implements org.reactivestreams.Subscriber<ByteBuf> {
        private final Channel channel;

        public Subscriber(Channel channel) {
            this.channel = channel;
        }

        public void onSubscribe(Subscription incomingSubscription) {
            if (incomingSubscription == null) {
                throw new NullPointerException("'subscription' is null");
            }
            if (StreamingResponseBodyWriter.this.subscription != null) {
                incomingSubscription.cancel();
                return;
            }
            StreamingResponseBodyWriter.this.subscription = incomingSubscription;
            this.requestOrDelimit();
        }

        public void onNext(ByteBuf o) {
            o.touch();
            if (o.readableBytes() == 0) {
                o.release();
                StreamingResponseBodyWriter.this.subscription.request(1L);
                return;
            }
            if (StreamingResponseBodyWriter.this.done) {
                o.release();
                return;
            }
            this.channel.writeAndFlush((Object)new DefaultHttpContent(o.touch()), this.channel.voidPromise());
            this.requestOrDelimit();
        }

        private void requestOrDelimit() {
            if (this.channel.isWritable()) {
                StreamingResponseBodyWriter.this.subscription.request(1L);
            } else {
                DefaultExecution.require().delimit(Action.throwException(), continuation -> StreamingResponseBodyWriter.this.continuation = continuation);
            }
        }

        public void onError(Throwable t) {
            StreamingResponseBodyWriter.this.exec(() -> {
                if (t == null) {
                    throw new NullPointerException("error is null");
                }
                if (!StreamingResponseBodyWriter.this.done) {
                    StreamingResponseBodyWriter.this.done = true;
                    StreamingResponseBodyWriter.this.channelPromise.setFailure(t);
                }
            });
        }

        public void onComplete() {
            StreamingResponseBodyWriter.this.exec(() -> {
                if (!StreamingResponseBodyWriter.this.done) {
                    StreamingResponseBodyWriter.this.done = true;
                    this.channel.write((Object)LastHttpContent.EMPTY_LAST_CONTENT, StreamingResponseBodyWriter.this.channelPromise);
                    this.channel.flush();
                }
            });
        }
    }
}

