/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.io.buffer.ByteArrayBufferFactory;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.InternalByteBody;
import io.micronaut.http.body.stream.AvailableByteArrayBody;
import io.micronaut.http.body.stream.BaseSharedBuffer;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.body.stream.PublisherAsBlocking;
import io.micronaut.http.body.stream.UpstreamBalancer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Internal
public final class ReactiveByteBufferByteBody
implements CloseableByteBody,
InternalByteBody {
    private final SharedBuffer sharedBuffer;
    private BufferConsumer.Upstream upstream;

    public ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, sharedBuffer.getRootUpstream());
    }

    private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Upstream upstream) {
        this.sharedBuffer = sharedBuffer;
        this.upstream = upstream;
    }

    BufferConsumer.Upstream primary(ByteBufferConsumer primary) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        this.sharedBuffer.subscribe(primary, upstream);
        return upstream;
    }

    @Override
    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode backpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
        this.upstream = pair.left();
        this.sharedBuffer.reserve();
        return new ReactiveByteBufferByteBody(this.sharedBuffer, pair.right());
    }

    @Override
    @NonNull
    public OptionalLong expectedLength() {
        return this.sharedBuffer.getExpectedLength();
    }

    private Flux<ByteBuffer> toNioBufferPublisher() {
        AsFlux asFlux = new AsFlux(this.sharedBuffer);
        BufferConsumer.Upstream upstream = this.primary(asFlux);
        return asFlux.asFlux(upstream);
    }

    @Override
    @NonNull
    public InputStream toInputStream() {
        final PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking();
        this.toNioBufferPublisher().subscribe(publisherAsBlocking);
        return new InputStream(){
            private ByteBuffer buffer;

            @Override
            public int read() throws IOException {
                byte[] arr = new byte[1];
                int n = this.read(arr);
                return n == -1 ? -1 : arr[0] & 0xFF;
            }

            @Override
            public int read(byte @NonNull [] b, int off, int len) throws IOException {
                while (this.buffer == null) {
                    try {
                        ByteBuffer o = (ByteBuffer)publisherAsBlocking.take();
                        if (o == null) {
                            Throwable failure = publisherAsBlocking.getFailure();
                            if (failure == null) {
                                return -1;
                            }
                            throw new IOException(failure);
                        }
                        if (!o.hasRemaining()) continue;
                        this.buffer = o;
                    }
                    catch (InterruptedException e) {
                        throw new InterruptedIOException();
                    }
                }
                int toRead = Math.min(len, this.buffer.remaining());
                this.buffer.get(b, off, toRead);
                if (this.buffer.remaining() == 0) {
                    this.buffer = null;
                }
                return toRead;
            }

            @Override
            public void close() {
                publisherAsBlocking.close();
            }
        };
    }

    @NonNull
    public Flux<byte[]> toByteArrayPublisher() {
        return this.toNioBufferPublisher().map(ReactiveByteBufferByteBody::toByteArray);
    }

    private static byte @NonNull [] toByteArray(ByteBuffer bb) {
        byte[] bytes = new byte[bb.remaining()];
        bb.get(bytes);
        return bytes;
    }

    @Override
    @NonNull
    public Publisher<io.micronaut.core.io.buffer.ByteBuffer<?>> toByteBufferPublisher() {
        return ((Flux)this.toByteArrayPublisher()).map(ByteArrayBufferFactory.INSTANCE::wrap);
    }

    @Override
    @NonNull
    public CloseableByteBody move() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        return new ReactiveByteBufferByteBody(this.sharedBuffer, upstream);
    }

    @Override
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return this.sharedBuffer.subscribeFull(upstream).map(bb -> AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, ReactiveByteBufferByteBody.toByteArray(bb)));
    }

    @Override
    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        this.sharedBuffer.subscribe(null, upstream);
    }

    @Override
    @NonNull
    public CloseableByteBody allowDiscard() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        upstream.allowDiscard();
        return this;
    }

    public static final class SharedBuffer
    extends BaseSharedBuffer<ByteBufferConsumer, ByteBuffer>
    implements ByteBufferConsumer {
        private final ReentrantLock lock = new ReentrantLock();
        private final ConcurrentLinkedQueue<Runnable> workQueue = new ConcurrentLinkedQueue();
        private SnapshotByteArrayOutputStream buffer;
        private ByteBuffer adding;

        public SharedBuffer(BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) {
            super(limits, rootUpstream);
        }

        private void submit(Runnable task) {
            this.workQueue.add(task);
            while (!this.workQueue.isEmpty() && this.lock.tryLock()) {
                try {
                    Runnable todo = this.workQueue.poll();
                    if (todo == null) continue;
                    todo.run();
                }
                finally {
                    this.lock.unlock();
                }
            }
        }

        void reserve() {
            this.submit(() -> this.reserve0());
        }

        void subscribe(@Nullable ByteBufferConsumer consumer, BufferConsumer.Upstream upstream) {
            this.submit(() -> this.subscribe0(consumer, upstream));
        }

        public DelayedExecutionFlow<ByteBuffer> subscribeFull(BufferConsumer.Upstream specificUpstream) {
            DelayedExecutionFlow<ByteBuffer> flow = DelayedExecutionFlow.create();
            this.submit(() -> this.subscribeFull0(flow, specificUpstream, false));
            return flow;
        }

        @Override
        protected void forwardInitialBuffer(@Nullable ByteBufferConsumer subscriber, boolean last) {
            if (this.buffer != null) {
                if (subscriber != null) {
                    subscriber.add(this.buffer.snapshot());
                }
                if (last) {
                    this.buffer = null;
                }
            }
        }

        @Override
        protected ByteBuffer subscribeFullResult(boolean last) {
            if (this.buffer == null) {
                return ByteBuffer.allocate(0);
            }
            ByteBuffer snapshot = this.buffer.snapshot();
            if (last) {
                this.buffer = null;
            }
            return snapshot;
        }

        @Override
        protected void addForward(List<ByteBufferConsumer> consumers) {
            for (ByteBufferConsumer consumer : consumers) {
                consumer.add(this.adding.asReadOnlyBuffer());
            }
        }

        @Override
        protected void addBuffer() {
            if (this.buffer == null) {
                this.buffer = new SnapshotByteArrayOutputStream();
            }
            this.buffer.write(this.adding);
        }

        @Override
        protected void discardBuffer() {
            this.buffer = null;
        }

        @Override
        public void add(ByteBuffer buffer) {
            this.submit(() -> {
                this.adding = buffer;
                this.add(buffer.remaining());
                this.adding = null;
            });
        }

        @Override
        public void error(Throwable e) {
            this.submit(() -> super.error(e));
        }

        @Override
        public void complete() {
            this.submit(() -> super.complete());
        }
    }

    static interface ByteBufferConsumer
    extends BufferConsumer {
        public void add(@NonNull ByteBuffer var1);
    }

    private static final class AsFlux
    extends BaseSharedBuffer.AsFlux<ByteBuffer>
    implements ByteBufferConsumer {
        AsFlux(BaseSharedBuffer<?, ?> sharedBuffer) {
            super(sharedBuffer);
        }

        @Override
        protected int size(ByteBuffer buf) {
            return buf.remaining();
        }

        @Override
        public void add(ByteBuffer buffer) {
            this.add0(buffer);
        }
    }

    private static enum WorkState {
        CLEAN,
        WORKING_THEN_CLEAN,
        WORKING_THEN_DIRTY;

    }

    private static final class SnapshotByteArrayOutputStream
    extends ByteArrayOutputStream {
        private SnapshotByteArrayOutputStream() {
        }

        public ByteBuffer snapshot() {
            return ByteBuffer.wrap(this.buf, 0, this.count).asReadOnlyBuffer();
        }

        public void write(ByteBuffer buffer) {
            if (buffer.hasArray()) {
                this.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
            } else {
                byte[] b = new byte[buffer.remaining()];
                buffer.get(buffer.position(), b);
                this.write(b, 0, b.length);
            }
        }
    }
}

