/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.server.netty.FormRouteCompleter;
import io.micronaut.http.server.netty.body.ManagedBody;
import io.micronaut.http.server.netty.body.MultiObjectBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

@Internal
public final class StreamingMultiObjectBody
extends ManagedBody<Publisher<?>>
implements MultiObjectBody {
    StreamingMultiObjectBody(Publisher<?> publisher) {
        super(publisher);
    }

    @Override
    void release(Publisher<?> value) {
    }

    @Override
    public InputStream coerceToInputStream(ByteBufAllocator alloc) {
        PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking();
        ((Publisher)this.claim()).subscribe(publisherAsBlocking);
        return new PublisherAsStream(publisherAsBlocking);
    }

    @Override
    public Publisher<?> asPublisher() {
        return (Publisher)this.claim();
    }

    @Override
    public MultiObjectBody mapNotNull(Function<Object, Object> transform) {
        return this.next(new StreamingMultiObjectBody((Publisher<?>)Flux.from((Publisher)this.prepareClaim()).mapNotNull(transform)));
    }

    @Override
    public void handleForm(FormRouteCompleter formRouteCompleter) {
        ((Publisher)this.prepareClaim()).subscribe(formRouteCompleter);
        this.next(formRouteCompleter);
    }

    private static final class PublisherAsBlocking
    implements Subscriber<Object>,
    Closeable {
        private final Lock lock = new ReentrantLock();
        private final Condition newDataCondition = this.lock.newCondition();
        private boolean pendingDemand;
        private Object swap;
        private Subscription subscription;
        private boolean done;
        private boolean closed;
        private Throwable failure;

        private PublisherAsBlocking() {
        }

        @Override
        public void onSubscribe(Subscription s) {
            boolean pendingDemand;
            this.lock.lock();
            try {
                this.subscription = s;
                pendingDemand = this.pendingDemand;
            }
            finally {
                this.lock.unlock();
            }
            if (pendingDemand) {
                s.request(1L);
            }
        }

        @Override
        public void onNext(Object o) {
            this.lock.lock();
            try {
                if (this.closed) {
                    ReferenceCountUtil.release(o);
                    return;
                }
                this.swap = o;
                this.newDataCondition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void onError(Throwable t) {
            this.lock.lock();
            try {
                if (this.swap != null) {
                    ReferenceCountUtil.release(this.swap);
                    this.swap = null;
                }
                this.failure = t;
                this.done = true;
                this.newDataCondition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public void onComplete() {
            this.lock.lock();
            try {
                this.done = true;
                this.newDataCondition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nullable
        public Object take() throws InterruptedException {
            boolean demanded = false;
            while (true) {
                Subscription subscription;
                this.lock.lock();
                try {
                    Object swap = this.swap;
                    if (swap != null) {
                        this.swap = null;
                        Object object = swap;
                        return object;
                    }
                    if (this.done) {
                        Object var4_5 = null;
                        return var4_5;
                    }
                    if (demanded) {
                        this.newDataCondition.await();
                    }
                    if ((subscription = this.subscription) == null) {
                        this.pendingDemand = true;
                    }
                }
                finally {
                    this.lock.unlock();
                }
                if (demanded) continue;
                demanded = true;
                if (subscription == null) continue;
                subscription.request(1L);
            }
        }

        @Override
        public void close() {
            this.lock.lock();
            try {
                this.closed = true;
                if (this.swap != null) {
                    ReferenceCountUtil.release(this.swap);
                    this.swap = null;
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private static final class PublisherAsStream
    extends InputStream {
        private final PublisherAsBlocking publisherAsBlocking;
        private ByteBuf buffer;

        private PublisherAsStream(PublisherAsBlocking publisherAsBlocking) {
            this.publisherAsBlocking = publisherAsBlocking;
        }

        @Override
        public int read() throws IOException {
            byte[] arr = new byte[1];
            int n = this.read(arr);
            return n == -1 ? -1 : arr[0] & 0xFF;
        }

        @Override
        public int read(@NonNull byte[] b, int off, int len) throws IOException {
            while (this.buffer == null) {
                try {
                    ByteBuf buf;
                    Object o = this.publisherAsBlocking.take();
                    if (o == null) {
                        if (this.publisherAsBlocking.failure == null) {
                            return -1;
                        }
                        throw new IOException(this.publisherAsBlocking.failure);
                    }
                    if (o instanceof ByteBufHolder) {
                        ByteBufHolder holder = (ByteBufHolder)o;
                        v0 = holder.content();
                    } else {
                        v0 = buf = (ByteBuf)o;
                    }
                    if (!buf.isReadable()) continue;
                    this.buffer = buf;
                }
                catch (InterruptedException e) {
                    throw new InterruptedIOException();
                }
            }
            int toRead = Math.min(len, this.buffer.readableBytes());
            this.buffer.readBytes(b, off, toRead);
            if (!this.buffer.isReadable()) {
                this.buffer.release();
                this.buffer = null;
            }
            return toRead;
        }

        @Override
        public void close() throws IOException {
            if (this.buffer != null) {
                this.buffer.release();
                this.buffer = null;
            }
            this.publisherAsBlocking.close();
        }
    }
}

