/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.stream.BaseSharedBuffer;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.body.stream.PublisherAsBlocking;
import io.micronaut.http.body.stream.UpstreamBalancer;
import io.micronaut.http.netty.PublisherAsStream;
import io.micronaut.http.netty.body.AvailableNettyByteBody;
import io.micronaut.http.netty.body.ByteBufConsumer;
import io.micronaut.http.netty.body.NettyByteBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetectorFactory;
import io.netty.util.ResourceLeakTracker;
import java.io.InputStream;
import java.util.List;
import java.util.OptionalLong;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

@Internal
public final class StreamingNettyByteBody
extends NettyByteBody
implements CloseableByteBody {
    private final SharedBuffer sharedBuffer;
    private final boolean forceDelaySubscribe;
    private BufferConsumer.Upstream upstream;

    public StreamingNettyByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, false, sharedBuffer.getRootUpstream());
    }

    private StreamingNettyByteBody(SharedBuffer sharedBuffer, boolean forceDelaySubscribe, BufferConsumer.Upstream upstream) {
        this.sharedBuffer = sharedBuffer;
        this.forceDelaySubscribe = forceDelaySubscribe;
        this.upstream = upstream;
    }

    public BufferConsumer.Upstream primary(ByteBufConsumer primary) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        this.sharedBuffer.subscribe(primary, upstream, this.forceDelaySubscribe);
        return upstream;
    }

    @Override
    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode backpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
        this.upstream = pair.left();
        boolean forceDelaySubscribe = this.sharedBuffer.reserve();
        return new StreamingNettyByteBody(this.sharedBuffer, forceDelaySubscribe, pair.right());
    }

    @Override
    @NonNull
    public StreamingNettyByteBody allowDiscard() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        upstream.allowDiscard();
        return this;
    }

    @Override
    protected Flux<ByteBuf> toByteBufPublisher() {
        AsFlux asFlux = new AsFlux(this.sharedBuffer);
        BufferConsumer.Upstream upstream = this.primary(asFlux);
        return asFlux.asFlux(upstream).doOnDiscard(ByteBuf.class, ReferenceCounted::release);
    }

    @Override
    @NonNull
    public OptionalLong expectedLength() {
        return this.sharedBuffer.getExpectedLength();
    }

    @Override
    @NonNull
    public InputStream toInputStream() {
        PublisherAsBlocking<ByteBuf> blocking = new PublisherAsBlocking<ByteBuf>(){

            @Override
            protected void release(ByteBuf item) {
                item.release();
            }
        };
        this.toByteBufPublisher().subscribe((Subscriber<ByteBuf>)blocking);
        return new PublisherAsStream(blocking);
    }

    @Override
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return this.sharedBuffer.subscribeFull(upstream, this.forceDelaySubscribe).map(AvailableNettyByteBody::new);
    }

    @Override
    @NonNull
    public CloseableByteBody move() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        return new StreamingNettyByteBody(this.sharedBuffer, this.forceDelaySubscribe, upstream);
    }

    @Override
    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        this.sharedBuffer.subscribe(null, upstream, this.forceDelaySubscribe);
    }

    public static final class SharedBuffer
    extends BaseSharedBuffer<ByteBufConsumer, ByteBuf>
    implements ByteBufConsumer {
        private static final Supplier<ResourceLeakDetector<SharedBuffer>> LEAK_DETECTOR = SupplierUtil.memoized(() -> ResourceLeakDetectorFactory.instance().newResourceLeakDetector(SharedBuffer.class));
        @Nullable
        private final ResourceLeakTracker<SharedBuffer> tracker = LEAK_DETECTOR.get().track(this);
        private final EventLoop eventLoop;
        private CompositeByteBuf buffer;
        private List<@NonNull DelayedExecutionFlow<ByteBuf>> fullSubscribers;
        private ByteBuf addingBuffer;

        public SharedBuffer(EventLoop loop, BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) {
            super(limits, rootUpstream);
            this.eventLoop = loop;
        }

        public void setExpectedLengthFrom(HttpHeaders headers) {
            this.setExpectedLengthFrom(headers.get(HttpHeaderNames.CONTENT_LENGTH));
        }

        boolean reserve() {
            if (this.eventLoop.inEventLoop() && this.addingBuffer == null) {
                this.reserve0();
                return false;
            }
            this.eventLoop.execute(this::reserve0);
            return true;
        }

        @Override
        protected void reserve0() {
            super.reserve0();
            if (this.tracker != null) {
                this.tracker.record();
            }
        }

        void subscribe(@Nullable ByteBufConsumer subscriber, BufferConsumer.Upstream specificUpstream, boolean forceDelay) {
            if (!forceDelay && this.eventLoop.inEventLoop() && this.addingBuffer == null) {
                this.subscribe0(subscriber, specificUpstream);
            } else {
                this.eventLoop.execute(() -> this.subscribe0(subscriber, specificUpstream));
            }
        }

        @Override
        protected void forwardInitialBuffer(@Nullable ByteBufConsumer subscriber, boolean last) {
            if (subscriber != null) {
                if (this.buffer != null) {
                    if (last) {
                        subscriber.add(this.buffer.slice());
                        this.buffer = null;
                    } else {
                        subscriber.add(this.buffer.retainedSlice());
                    }
                }
            } else if (this.buffer != null && last) {
                this.buffer.release();
                this.buffer = null;
            }
        }

        @Override
        protected void afterSubscribe(boolean last) {
            if (this.tracker != null) {
                if (last) {
                    this.tracker.close(this);
                } else {
                    this.tracker.record();
                }
            }
        }

        @Override
        protected ByteBuf subscribeFullResult(boolean last) {
            if (this.buffer == null) {
                return Unpooled.EMPTY_BUFFER;
            }
            if (last) {
                CompositeByteBuf buf = this.buffer;
                this.buffer = null;
                return buf;
            }
            return this.buffer.retainedSlice();
        }

        ExecutionFlow<ByteBuf> subscribeFull(BufferConsumer.Upstream specificUpstream, boolean forceDelay) {
            DelayedExecutionFlow<ByteBuf> asyncFlow = DelayedExecutionFlow.create();
            if (!forceDelay && this.eventLoop.inEventLoop() && this.addingBuffer == null) {
                return this.subscribeFull0(asyncFlow, specificUpstream, true);
            }
            this.eventLoop.execute(() -> {
                ExecutionFlow res = this.subscribeFull0(asyncFlow, specificUpstream, false);
                assert (res == asyncFlow);
            });
            return asyncFlow;
        }

        @Override
        public void add(ByteBuf buf) {
            this.addingBuffer = buf.touch();
            this.add(buf.readableBytes());
            this.addingBuffer = null;
        }

        @Override
        protected void addForward(List<ByteBufConsumer> consumers) {
            for (ByteBufConsumer consumer : consumers) {
                consumer.add(this.addingBuffer.retainedSlice());
            }
        }

        @Override
        protected void addBuffer() {
            if (this.buffer == null) {
                this.buffer = this.addingBuffer.alloc().compositeBuffer();
            }
            this.buffer.addComponent(true, this.addingBuffer);
        }

        @Override
        protected void addDoNotBuffer() {
            this.addingBuffer.release();
        }

        @Override
        protected void discardBuffer() {
            if (this.buffer != null) {
                this.buffer.release();
                this.buffer = null;
            }
        }
    }

    private static final class AsFlux
    extends BaseSharedBuffer.AsFlux<ByteBuf>
    implements ByteBufConsumer {
        public AsFlux(BaseSharedBuffer<?, ?> sharedBuffer) {
            super(sharedBuffer);
        }

        @Override
        public void add(ByteBuf buf) {
            if (!this.add0(buf)) {
                buf.release();
            }
        }

        @Override
        protected int size(ByteBuf buf) {
            return buf.readableBytes();
        }
    }
}

