/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.io.buffer.ReadBuffer;
import io.micronaut.core.io.buffer.ReadBufferFactory;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.stream.AvailableByteArrayBody;
import io.micronaut.http.body.stream.BaseSharedBuffer;
import io.micronaut.http.body.stream.BaseStreamingByteBody;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.body.stream.UpstreamBalancer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

@Internal
public final class ReactiveByteBufferByteBody
extends BaseStreamingByteBody<SharedBuffer>
implements CloseableByteBody {
    public ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, sharedBuffer.getRootUpstream());
    }

    private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Upstream upstream) {
        super(sharedBuffer, upstream);
    }

    @Override
    public BufferConsumer.Upstream primary(BufferConsumer primary) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            this.failClaim();
        }
        this.recordPrimaryOp();
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        ((SharedBuffer)this.sharedBuffer).subscribe(primary, upstream);
        return upstream;
    }

    protected ReactiveByteBufferByteBody derive(BufferConsumer.Upstream upstream) {
        return new ReactiveByteBufferByteBody((SharedBuffer)this.sharedBuffer, upstream);
    }

    @Override
    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode backpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            this.failClaim();
        }
        UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
        this.upstream = pair.left();
        ((SharedBuffer)this.sharedBuffer).reserve();
        return new ReactiveByteBufferByteBody((SharedBuffer)this.sharedBuffer, pair.right());
    }

    @Override
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            this.failClaim();
        }
        this.recordPrimaryOp();
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return ((SharedBuffer)this.sharedBuffer).subscribeFull(upstream).map(AvailableByteArrayBody::create);
    }

    @Override
    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.recordClosed();
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        ((SharedBuffer)this.sharedBuffer).subscribe(null, upstream);
    }

    public static final class SharedBuffer
    extends BaseSharedBuffer
    implements BufferConsumer {
        private final ReentrantLock lock = new ReentrantLock();
        private final ConcurrentLinkedQueue<Runnable> workQueue = new ConcurrentLinkedQueue();

        SharedBuffer(ReadBufferFactory readBufferFactory, BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) {
            super(readBufferFactory, limits, rootUpstream);
        }

        private void submit(Runnable task) {
            this.workQueue.add(task);
            while (!this.workQueue.isEmpty() && this.lock.tryLock()) {
                try {
                    Runnable todo = this.workQueue.poll();
                    if (todo == null) continue;
                    todo.run();
                }
                finally {
                    this.lock.unlock();
                }
            }
        }

        void reserve() {
            this.submit(() -> this.reserve0());
        }

        void subscribe(@Nullable BufferConsumer consumer, BufferConsumer.Upstream upstream) {
            this.submit(() -> this.subscribe0(consumer, upstream));
        }

        public DelayedExecutionFlow<ReadBuffer> subscribeFull(BufferConsumer.Upstream specificUpstream) {
            DelayedExecutionFlow flow = DelayedExecutionFlow.create();
            this.submit(() -> this.subscribeFull0((DelayedExecutionFlow<ReadBuffer>)flow, specificUpstream, false));
            return flow;
        }

        @Override
        public void add(ReadBuffer rb) {
            this.submit(() -> super.add(rb));
        }

        @Override
        public void error(Throwable e) {
            this.submit(() -> super.error(e));
        }

        @Override
        public void complete() {
            this.submit(() -> super.complete());
        }
    }
}

