/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.handler;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.exceptions.MessageBodyException;
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest;
import io.micronaut.http.netty.stream.EmptyHttpRequest;
import io.micronaut.http.server.netty.SmartHttpContentCompressor;
import io.micronaut.http.server.netty.handler.RequestHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.ResourceLeakTracker;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

@Internal
public final class PipeliningServerHandler
extends ChannelInboundHandlerAdapter {
    public static final Supplier<AttributeKey<SmartHttpContentCompressor>> ZERO_COPY_PREDICATE = SupplierUtil.memoized(() -> AttributeKey.newInstance((String)"zero-copy-predicate"));
    private static final int LENGTH_8K = 8192;
    private static final Logger LOG = LoggerFactory.getLogger(PipeliningServerHandler.class);
    private final RequestHandler requestHandler;
    private final DroppingInboundHandler droppingInboundHandler = new DroppingInboundHandler();
    private final InboundHandler baseInboundHandler = new MessageInboundHandler();
    private final OptimisticBufferingInboundHandler optimisticBufferingInboundHandler = new OptimisticBufferingInboundHandler();
    private InboundHandler inboundHandler = this.baseInboundHandler;
    private final Queue<OutboundAccess> outboundQueue = new ArrayDeque<OutboundAccess>(1);
    @Nullable
    private OutboundHandler outboundHandler = null;
    private ChannelHandlerContext ctx;
    private boolean reading = false;
    private boolean moreRequested = false;
    private boolean removed = false;
    private boolean flushPending = false;
    private boolean writing = false;

    public PipeliningServerHandler(RequestHandler requestHandler) {
        this.requestHandler = requestHandler;
    }

    public static boolean canHaveBody(HttpResponseStatus status) {
        return status != HttpResponseStatus.CONTINUE && status != HttpResponseStatus.SWITCHING_PROTOCOLS && status != HttpResponseStatus.PROCESSING && status != HttpResponseStatus.NO_CONTENT && status != HttpResponseStatus.NOT_MODIFIED;
    }

    private static boolean hasBody(HttpRequest request) {
        int contentLength;
        if (request.decoderResult().isFailure()) {
            return false;
        }
        try {
            contentLength = HttpUtil.getContentLength((HttpMessage)request, (int)0);
        }
        catch (NumberFormatException e) {
            contentLength = 0;
        }
        return contentLength != 0 || HttpUtil.isTransferEncodingChunked((HttpMessage)request);
    }

    private void setNeedMore(boolean needMore) {
        boolean oldMoreRequested = this.moreRequested;
        this.moreRequested = needMore;
        if (!oldMoreRequested && !this.reading && needMore) {
            this.ctx.read();
        }
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.removed = true;
        if (this.outboundHandler != null) {
            this.outboundHandler.discard();
        }
        for (OutboundAccess queued : this.outboundQueue) {
            if (queued.handler == null) continue;
            queued.handler.discard();
        }
        this.outboundQueue.clear();
        this.requestHandler.removed();
    }

    public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
        this.reading = true;
        this.inboundHandler.read(msg);
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        this.inboundHandler.readComplete();
        this.reading = false;
        if (this.flushPending) {
            ctx.flush();
            this.flushPending = false;
        }
        if (this.moreRequested) {
            ctx.read();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.inboundHandler.handleUpstreamError(cause);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeSome();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent idleStateEvent;
        IdleState state;
        if (evt instanceof IdleStateEvent && (state = (idleStateEvent = (IdleStateEvent)evt).state()) == IdleState.ALL_IDLE) {
            ctx.close();
        }
        super.userEventTriggered(ctx, evt);
    }

    private void write(Object message, boolean flush, boolean close) {
        if (close) {
            this.ctx.writeAndFlush(message).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        } else if (flush) {
            if (this.reading) {
                this.ctx.write(message, this.ctx.voidPromise());
                this.flushPending = true;
            } else {
                this.ctx.writeAndFlush(message, this.ctx.voidPromise());
            }
        } else {
            this.ctx.write(message, this.ctx.voidPromise());
        }
    }

    private void writeSome() {
        if (this.writing) {
            return;
        }
        this.writing = true;
        try {
            while (this.ctx.channel().isWritable()) {
                if (this.outboundHandler == null) {
                    OutboundAccess next = this.outboundQueue.peek();
                    if (next != null && next.handler != null) {
                        this.outboundQueue.poll();
                        this.outboundHandler = next.handler;
                    } else {
                        return;
                    }
                }
                OutboundHandler oldHandler = this.outboundHandler;
                oldHandler.writeSome();
                if (this.outboundHandler != oldHandler) continue;
                break;
            }
        }
        finally {
            this.writing = false;
        }
    }

    private final class DroppingInboundHandler
    extends InboundHandler {
        private DroppingInboundHandler() {
        }

        @Override
        void read(Object message) {
            ((HttpContent)message).release();
            if (message instanceof LastHttpContent) {
                PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.baseInboundHandler;
            }
        }

        @Override
        void handleUpstreamError(Throwable cause) {
            PipeliningServerHandler.this.requestHandler.handleUnboundError(cause);
        }
    }

    private final class MessageInboundHandler
    extends InboundHandler {
        private MessageInboundHandler() {
        }

        @Override
        void read(Object message) {
            HttpRequest request = (HttpRequest)message;
            OutboundAccess outboundAccess = new OutboundAccess();
            PipeliningServerHandler.this.outboundQueue.add(outboundAccess);
            if (request instanceof FullHttpRequest) {
                FullHttpRequest full = (FullHttpRequest)request;
                PipeliningServerHandler.this.requestHandler.accept(PipeliningServerHandler.this.ctx, (HttpRequest)full, outboundAccess);
            } else if (!PipeliningServerHandler.hasBody(request)) {
                PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.droppingInboundHandler;
                if (message instanceof HttpContent) {
                    PipeliningServerHandler.this.inboundHandler.read(message);
                }
                PipeliningServerHandler.this.requestHandler.accept(PipeliningServerHandler.this.ctx, (HttpRequest)new EmptyHttpRequest(request), outboundAccess);
            } else {
                PipeliningServerHandler.this.optimisticBufferingInboundHandler.init(request, outboundAccess);
                PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.optimisticBufferingInboundHandler;
            }
        }

        @Override
        void handleUpstreamError(Throwable cause) {
            PipeliningServerHandler.this.requestHandler.handleUnboundError(cause);
        }
    }

    private static abstract class InboundHandler {
        private InboundHandler() {
        }

        abstract void read(Object var1);

        abstract void handleUpstreamError(Throwable var1);

        void readComplete() {
        }
    }

    private final class OptimisticBufferingInboundHandler
    extends InboundHandler {
        private HttpRequest request;
        private OutboundAccess outboundAccess;
        private final List<HttpContent> buffer = new ArrayList<HttpContent>();

        private OptimisticBufferingInboundHandler() {
        }

        void init(HttpRequest request, OutboundAccess outboundAccess) {
            assert (this.buffer.isEmpty());
            assert (!(request instanceof HttpContent));
            this.request = request;
            this.outboundAccess = outboundAccess;
        }

        @Override
        void read(Object message) {
            HttpContent content = (HttpContent)message;
            if (content.content().isReadable()) {
                this.buffer.add(content);
            } else {
                content.release();
            }
            if (message instanceof LastHttpContent) {
                ByteBuf fullBody;
                LastHttpContent last = (LastHttpContent)message;
                if (this.buffer.size() == 0) {
                    fullBody = Unpooled.EMPTY_BUFFER;
                } else if (this.buffer.size() == 1) {
                    fullBody = this.buffer.get(0).content();
                } else {
                    CompositeByteBuf composite = PipeliningServerHandler.this.ctx.alloc().compositeBuffer();
                    for (HttpContent c : this.buffer) {
                        composite.addComponent(true, c.content());
                    }
                    fullBody = composite;
                }
                this.buffer.clear();
                DefaultFullHttpRequest fullRequest = new DefaultFullHttpRequest(this.request.protocolVersion(), this.request.method(), this.request.uri(), fullBody, this.request.headers(), last.trailingHeaders());
                fullRequest.setDecoderResult(this.request.decoderResult());
                this.request = null;
                OutboundAccess outboundAccess = this.outboundAccess;
                this.outboundAccess = null;
                PipeliningServerHandler.this.requestHandler.accept(PipeliningServerHandler.this.ctx, (HttpRequest)fullRequest, outboundAccess);
                PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.baseInboundHandler;
            }
        }

        @Override
        void readComplete() {
            this.devolveToStreaming();
            PipeliningServerHandler.this.inboundHandler.readComplete();
        }

        @Override
        void handleUpstreamError(Throwable cause) {
            this.devolveToStreaming();
            PipeliningServerHandler.this.inboundHandler.handleUpstreamError(cause);
        }

        private void devolveToStreaming() {
            final StreamingInboundHandler streamingInboundHandler = new StreamingInboundHandler();
            for (HttpContent content : this.buffer) {
                streamingInboundHandler.read(content);
            }
            this.buffer.clear();
            HttpRequest request = this.request;
            OutboundAccess outboundAccess = this.outboundAccess;
            this.request = null;
            this.outboundAccess = null;
            PipeliningServerHandler.this.inboundHandler = streamingInboundHandler;
            Flux flux = streamingInboundHandler.flux();
            if (HttpUtil.is100ContinueExpected((HttpMessage)request)) {
                flux = flux.doOnSubscribe(s -> outboundAccess.writeContinue());
            }
            PipeliningServerHandler.this.requestHandler.accept(PipeliningServerHandler.this.ctx, (HttpRequest)new DelegateStreamedHttpRequest(request, (Publisher)flux){

                public void closeIfNoSubscriber() {
                    streamingInboundHandler.closeIfNoSubscriber();
                }
            }, outboundAccess);
        }
    }

    private static abstract class OutboundHandler {
        final OutboundAccess outboundAccess;

        private OutboundHandler(OutboundAccess outboundAccess) {
            this.outboundAccess = outboundAccess;
        }

        abstract void writeSome();

        abstract void discard();
    }

    public final class OutboundAccess
    implements NettyWriteContext {
        private OutboundHandler handler;
        private Object attachment = null;
        private boolean closeAfterWrite = false;

        private OutboundAccess() {
        }

        public ByteBufAllocator alloc() {
            return PipeliningServerHandler.this.ctx.alloc();
        }

        public void attachment(Object attachment) {
            this.attachment = attachment;
        }

        public void closeAfterWrite() {
            this.closeAfterWrite = true;
        }

        private void preprocess(HttpResponse message) {
            if (message.protocolVersion().isKeepAliveDefault()) {
                if (message.headers().contains((CharSequence)HttpHeaderNames.CONNECTION, (CharSequence)HttpHeaderValues.CLOSE, true)) {
                    this.closeAfterWrite();
                } else if (this.closeAfterWrite) {
                    message.headers().add((CharSequence)HttpHeaderNames.CONNECTION, (Object)HttpHeaderValues.CLOSE);
                }
            } else if (!message.headers().contains((CharSequence)HttpHeaderNames.CONNECTION, (CharSequence)HttpHeaderValues.KEEP_ALIVE, true)) {
                this.closeAfterWrite();
            } else if (this.closeAfterWrite) {
                message.headers().remove((CharSequence)HttpHeaderNames.CONNECTION);
            }
            if (!HttpUtil.isContentLengthSet((HttpMessage)message) && !HttpUtil.isTransferEncodingChunked((HttpMessage)message) && PipeliningServerHandler.canHaveBody(message.status())) {
                HttpUtil.setKeepAlive((HttpMessage)message, (boolean)false);
                this.closeAfterWrite();
            }
        }

        private void writeContinue() {
            if (this.handler == null) {
                this.write(new ContinueOutboundHandler());
            }
        }

        private void write(OutboundHandler handler) {
            if (this.handler != null && !(this.handler instanceof ContinueOutboundHandler)) {
                throw new IllegalStateException("Only one response per request");
            }
            EventLoop eventLoop = PipeliningServerHandler.this.ctx.channel().eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(() -> this.write(handler));
                return;
            }
            OutboundHandler outboundHandler = this.handler;
            if (outboundHandler instanceof ContinueOutboundHandler) {
                ContinueOutboundHandler cont = (ContinueOutboundHandler)outboundHandler;
                cont.next = handler;
                PipeliningServerHandler.this.writeSome();
            } else {
                this.handler = handler;
                if (PipeliningServerHandler.this.outboundQueue.peek() == this) {
                    PipeliningServerHandler.this.writeSome();
                }
            }
        }

        public void writeFull(FullHttpResponse response, boolean headResponse) {
            response.headers().remove((CharSequence)HttpHeaderNames.TRANSFER_ENCODING);
            if (PipeliningServerHandler.canHaveBody(response.status())) {
                if (!headResponse) {
                    response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)response.content().readableBytes());
                }
            } else {
                response.headers().remove((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
            }
            this.preprocess((HttpResponse)response);
            this.write(new FullOutboundHandler(this, response));
        }

        public void writeStreamed(HttpResponse response, Publisher<HttpContent> content) {
            response.headers().remove((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
            if (PipeliningServerHandler.canHaveBody(response.status())) {
                response.headers().set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
            } else {
                response.headers().remove((CharSequence)HttpHeaderNames.TRANSFER_ENCODING);
            }
            this.preprocess(response);
            content.subscribe((Subscriber)new StreamingOutboundHandler(this, response));
        }

        private void writeStreamed(CustomResponse response) {
            this.preprocess(response.response());
            this.write(new ChunkedOutboundHandler(this, response));
        }

        public void writeChunked(HttpResponse response, HttpChunkedInput chunkedInput) {
            this.writeStreamed(new CustomResponse(response, chunkedInput, false));
        }

        public void writeFile(HttpResponse response, RandomAccessFile randomAccessFile, long position, long contentLength) {
            SmartHttpContentCompressor predicate = (SmartHttpContentCompressor)((Object)PipeliningServerHandler.this.ctx.channel().attr(ZERO_COPY_PREDICATE.get()).get());
            if (predicate != null && predicate.shouldSkip(response)) {
                this.writeStreamed(new CustomResponse(response, (Object)new TrackedDefaultFileRegion(randomAccessFile.getChannel(), position, contentLength), true));
            } else {
                try {
                    HttpChunkedInput chunkedInput = new HttpChunkedInput((ChunkedInput)new TrackedChunkedFile(randomAccessFile, position, contentLength, 8192));
                    this.writeStreamed(new CustomResponse(response, chunkedInput, false));
                }
                catch (IOException e) {
                    throw new MessageBodyException("Could not read file", (Throwable)e);
                }
            }
        }
    }

    private static class TrackedChunkedFile
    extends ChunkedFile {
        private static final Supplier<ResourceLeakDetector<TrackedChunkedFile>> LEAK_DETECTOR = SupplierUtil.memoized(() -> ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedChunkedFile.class));
        private final ResourceLeakTracker<TrackedChunkedFile> tracker = LEAK_DETECTOR.get().track((Object)this);

        public TrackedChunkedFile(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException {
            super(file, offset, length, chunkSize);
        }

        public void close() throws Exception {
            super.close();
            if (this.tracker != null) {
                this.tracker.close((Object)this);
            }
        }
    }

    private static class TrackedDefaultFileRegion
    extends DefaultFileRegion {
        private static final Supplier<ResourceLeakDetector<TrackedDefaultFileRegion>> LEAK_DETECTOR = SupplierUtil.memoized(() -> ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedDefaultFileRegion.class));
        private final ResourceLeakTracker<TrackedDefaultFileRegion> tracker = LEAK_DETECTOR.get().track((Object)this);

        public TrackedDefaultFileRegion(FileChannel fileChannel, long position, long count) {
            super(fileChannel, position, count);
        }

        protected void deallocate() {
            super.deallocate();
            if (this.tracker != null) {
                this.tracker.close((Object)this);
            }
        }
    }

    private final class ChunkedOutboundHandler
    extends OutboundHandler {
        private final CustomResponse message;

        ChunkedOutboundHandler(OutboundAccess outboundAccess, CustomResponse message) {
            super(outboundAccess);
            this.message = message;
        }

        @Override
        void writeSome() {
            boolean responseIsLast = this.message.body() == null && !this.message.needLast();
            PipeliningServerHandler.this.write(this.message.response(), responseIsLast, responseIsLast && this.outboundAccess.closeAfterWrite);
            if (this.message.body() != null) {
                boolean bodyIsLast = !this.message.needLast();
                PipeliningServerHandler.this.write(this.message.body(), bodyIsLast, bodyIsLast && this.outboundAccess.closeAfterWrite);
            }
            if (this.message.needLast()) {
                PipeliningServerHandler.this.write(LastHttpContent.EMPTY_LAST_CONTENT, true, this.outboundAccess.closeAfterWrite);
            }
            PipeliningServerHandler.this.outboundHandler = null;
            PipeliningServerHandler.this.requestHandler.responseWritten(this.outboundAccess.attachment);
            PipeliningServerHandler.this.writeSome();
        }

        @Override
        void discard() {
            ReferenceCountUtil.release((Object)this.message.response());
            Object object = this.message.body();
            if (object instanceof ChunkedInput) {
                ChunkedInput ci = (ChunkedInput)object;
                try {
                    ci.close();
                }
                catch (Exception e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Failed to close ChunkedInput", (Throwable)e);
                    }
                }
            } else {
                object = this.message.body();
                if (object instanceof FileRegion) {
                    FileRegion fr = (FileRegion)object;
                    fr.release();
                }
            }
            PipeliningServerHandler.this.outboundHandler = null;
        }
    }

    private final class StreamingOutboundHandler
    extends OutboundHandler
    implements Subscriber<HttpContent> {
        private final OutboundAccess outboundAccess;
        private HttpResponse initialMessage;
        private Subscription subscription;
        private boolean earlyComplete;
        private boolean writtenLast;

        StreamingOutboundHandler(OutboundAccess outboundAccess, HttpResponse initialMessage) {
            super(outboundAccess);
            this.earlyComplete = false;
            this.writtenLast = false;
            if (initialMessage instanceof FullHttpResponse) {
                throw new IllegalArgumentException("Cannot have a full response as the initial message of a streaming response");
            }
            this.outboundAccess = outboundAccess;
            this.initialMessage = Objects.requireNonNull(initialMessage, "initialMessage");
        }

        @Override
        void writeSome() {
            if (this.initialMessage != null) {
                PipeliningServerHandler.this.write(this.initialMessage, false, false);
                this.initialMessage = null;
            }
            if (this.earlyComplete) {
                this.onComplete();
            } else {
                this.subscription.request(1L);
            }
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            this.outboundAccess.write(this);
        }

        public void onNext(HttpContent httpContent) {
            EventLoop eventLoop = PipeliningServerHandler.this.ctx.channel().eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(() -> this.onNext(httpContent));
                return;
            }
            if (PipeliningServerHandler.this.outboundHandler != this) {
                throw new IllegalStateException("onNext before request?");
            }
            if (this.writtenLast) {
                throw new IllegalStateException("Already written a LastHttpContent");
            }
            if (!PipeliningServerHandler.this.removed) {
                if (httpContent instanceof LastHttpContent) {
                    this.writtenLast = true;
                }
                PipeliningServerHandler.this.write(httpContent, true, false);
                if (PipeliningServerHandler.this.ctx.channel().isWritable()) {
                    this.subscription.request(1L);
                }
            } else {
                httpContent.release();
            }
        }

        public void onError(Throwable t) {
            EventLoop eventLoop = PipeliningServerHandler.this.ctx.channel().eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(() -> this.onError(t));
                return;
            }
            if (!PipeliningServerHandler.this.removed) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Reactive response received an error after some data has already been written. This error cannot be forwarded to the client.", t);
                }
                PipeliningServerHandler.this.ctx.close();
                PipeliningServerHandler.this.requestHandler.responseWritten(this.outboundAccess.attachment);
            }
        }

        public void onComplete() {
            EventLoop eventLoop = PipeliningServerHandler.this.ctx.channel().eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(this::onComplete);
                return;
            }
            if (PipeliningServerHandler.this.outboundHandler != this) {
                this.earlyComplete = true;
                return;
            }
            PipeliningServerHandler.this.outboundHandler = null;
            if (!PipeliningServerHandler.this.removed) {
                if (this.initialMessage != null) {
                    PipeliningServerHandler.this.write(this.initialMessage, false, false);
                    this.initialMessage = null;
                }
                if (!this.writtenLast) {
                    PipeliningServerHandler.this.write(LastHttpContent.EMPTY_LAST_CONTENT, true, this.outboundAccess.closeAfterWrite);
                }
                PipeliningServerHandler.this.requestHandler.responseWritten(this.outboundAccess.attachment);
                PipeliningServerHandler.this.writeSome();
            }
        }

        @Override
        void discard() {
            PipeliningServerHandler.this.requestHandler.responseWritten(this.outboundAccess.attachment);
            this.subscription.cancel();
            PipeliningServerHandler.this.outboundHandler = null;
        }
    }

    private final class FullOutboundHandler
    extends OutboundHandler {
        private final FullHttpResponse message;

        FullOutboundHandler(OutboundAccess outboundAccess, FullHttpResponse message) {
            super(outboundAccess);
            this.message = message;
        }

        @Override
        void writeSome() {
            PipeliningServerHandler.this.write(this.message, true, this.outboundAccess.closeAfterWrite);
            PipeliningServerHandler.this.outboundHandler = null;
            PipeliningServerHandler.this.requestHandler.responseWritten(this.outboundAccess.attachment);
            PipeliningServerHandler.this.writeSome();
        }

        @Override
        void discard() {
            this.message.release();
            PipeliningServerHandler.this.outboundHandler = null;
        }
    }

    private final class ContinueOutboundHandler
    extends OutboundHandler {
        private static final FullHttpResponse CONTINUE = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
        boolean written;
        OutboundHandler next;

        private ContinueOutboundHandler() {
            super(null);
            this.written = false;
        }

        @Override
        void writeSome() {
            if (!this.written) {
                PipeliningServerHandler.this.write(CONTINUE, true, false);
                this.written = true;
            }
            if (this.next != null) {
                PipeliningServerHandler.this.outboundHandler = this.next;
            }
        }

        @Override
        void discard() {
            if (this.next != null) {
                this.next.discard();
                this.next = null;
            }
        }
    }

    private final class StreamingInboundHandler
    extends InboundHandler {
        private final Queue<HttpContent> queue = (Queue)Queues.unbounded().get();
        private final Sinks.Many<HttpContent> sink = Sinks.many().unicast().onBackpressureBuffer(this.queue);
        private long requested = 0L;

        private StreamingInboundHandler() {
        }

        @Override
        void read(Object message) {
            --this.requested;
            HttpContent content = (HttpContent)message;
            if (this.sink.tryEmitNext((Object)content.touch()) != Sinks.EmitResult.OK) {
                content.release();
            }
            if (message instanceof LastHttpContent) {
                this.sink.tryEmitComplete();
                PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.baseInboundHandler;
            }
            PipeliningServerHandler.this.setNeedMore(this.requested > 0L);
        }

        @Override
        void handleUpstreamError(Throwable cause) {
            this.releaseQueue();
            if (this.sink.tryEmitError(cause) != Sinks.EmitResult.OK) {
                PipeliningServerHandler.this.requestHandler.handleUnboundError(cause);
            }
        }

        private void request(long n) {
            EventLoop eventLoop = PipeliningServerHandler.this.ctx.channel().eventLoop();
            if (!eventLoop.inEventLoop()) {
                eventLoop.execute(() -> this.request(n));
                return;
            }
            long newRequested = this.requested + n;
            if (newRequested < this.requested) {
                newRequested = Long.MAX_VALUE;
            }
            this.requested = newRequested;
            PipeliningServerHandler.this.setNeedMore(newRequested > 0L);
        }

        Flux<HttpContent> flux() {
            return this.sink.asFlux().doOnRequest(this::request).doOnCancel(this::releaseQueue);
        }

        void closeIfNoSubscriber() {
            if (this.sink.currentSubscriberCount() == 0) {
                this.releaseQueue();
                if (PipeliningServerHandler.this.inboundHandler == this) {
                    PipeliningServerHandler.this.inboundHandler = PipeliningServerHandler.this.droppingInboundHandler;
                }
            }
        }

        private void releaseQueue() {
            HttpContent c;
            while ((c = this.queue.poll()) != null) {
                c.release();
            }
        }
    }

    private record CustomResponse(HttpResponse response, @Nullable Object body, boolean needLast) {
        CustomResponse(HttpResponse response, @Nullable Object body, boolean needLast) {
            if (response instanceof FullHttpResponse) {
                throw new IllegalArgumentException("Response must not be a FullHttpResponse to send a special body");
            }
        }
    }
}

