/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.body.AbstractBodyAdapter;
import io.micronaut.http.body.AvailableByteBody;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.netty.EventLoopFlow;
import io.micronaut.http.netty.body.AvailableNettyByteBody;
import io.micronaut.http.netty.body.NettyByteBody;
import io.micronaut.http.netty.body.StreamingNettyByteBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaders;
import org.reactivestreams.Publisher;

@Internal
public final class NettyBodyAdapter
extends AbstractBodyAdapter<ByteBuf, StreamingNettyByteBody.SharedBuffer> {
    private final EventLoopFlow eventLoopFlow;

    private NettyBodyAdapter(EventLoop eventLoop, Publisher<ByteBuf> source, @Nullable Runnable onDiscard) {
        super(source, onDiscard);
        this.eventLoopFlow = new EventLoopFlow(eventLoop);
    }

    @NonNull
    public static NettyByteBody adapt(@NonNull ByteBody body, @NonNull EventLoop eventLoop) {
        if (body instanceof NettyByteBody) {
            NettyByteBody nbb = (NettyByteBody)body;
            return nbb;
        }
        if (body instanceof AvailableByteBody) {
            AvailableByteBody available = (AvailableByteBody)body;
            return new AvailableNettyByteBody(Unpooled.wrappedBuffer(available.toByteArray()));
        }
        NettyBodyAdapter adapter = new NettyBodyAdapter(eventLoop, NettyByteBody.toByteBufs(body), null);
        adapter.sharedBuffer = new StreamingNettyByteBody.SharedBuffer(eventLoop, BodySizeLimits.UNLIMITED, adapter);
        body.expectedLength().ifPresent(((StreamingNettyByteBody.SharedBuffer)adapter.sharedBuffer)::setExpectedLength);
        return new StreamingNettyByteBody((StreamingNettyByteBody.SharedBuffer)adapter.sharedBuffer);
    }

    public static StreamingNettyByteBody adapt(Publisher<ByteBuf> publisher, EventLoop eventLoop) {
        return NettyBodyAdapter.adapt(publisher, eventLoop, null, null);
    }

    public static StreamingNettyByteBody adapt(Publisher<ByteBuf> publisher, EventLoop eventLoop, @Nullable HttpHeaders headersForLength, @Nullable Runnable onDiscard) {
        NettyBodyAdapter adapter = new NettyBodyAdapter(eventLoop, publisher, onDiscard);
        adapter.sharedBuffer = new StreamingNettyByteBody.SharedBuffer(eventLoop, BodySizeLimits.UNLIMITED, adapter);
        if (headersForLength != null) {
            ((StreamingNettyByteBody.SharedBuffer)adapter.sharedBuffer).setExpectedLengthFrom(headersForLength);
        }
        return new StreamingNettyByteBody((StreamingNettyByteBody.SharedBuffer)adapter.sharedBuffer);
    }

    @Override
    public void onNext(ByteBuf bytes) {
        if (this.eventLoopFlow.executeNow(() -> this.onNext0(bytes))) {
            this.onNext0(bytes);
        }
    }

    private void onNext0(ByteBuf bytes) {
        long newDemand = this.demand.addAndGet(-bytes.readableBytes());
        ((StreamingNettyByteBody.SharedBuffer)this.sharedBuffer).add(bytes);
        if (newDemand > 0L) {
            this.subscription.request(1L);
        }
    }

    @Override
    public void onError(Throwable t2) {
        if (this.eventLoopFlow.executeNow(() -> super.onError(t2))) {
            super.onError(t2);
        }
    }

    @Override
    public void onComplete() {
        if (this.eventLoopFlow.executeNow(() -> super.onComplete())) {
            super.onComplete();
        }
    }
}

