/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.multipart;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.BeanProvider;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.subscriber.TypedSubscriber;
import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.bind.binders.NonBlockingBodyArgumentBinder;
import io.micronaut.http.netty.stream.StreamedHttpRequest;
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.multipart.MultipartBody;
import io.micronaut.http.server.netty.DefaultHttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentSubscriberFactory;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.http.server.netty.multipart.NettyCompletedAttribute;
import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.micronaut.web.router.qualifier.ConsumesMediaTypeQualifier;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpData;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Provider;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MultipartBodyArgumentBinder
implements NonBlockingBodyArgumentBinder<MultipartBody> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);
    private final BeanLocator beanLocator;
    private final BeanProvider<HttpServerConfiguration> httpServerConfiguration;

    @Deprecated
    public MultipartBodyArgumentBinder(BeanLocator beanLocator, Provider<HttpServerConfiguration> httpServerConfiguration) {
        this.beanLocator = beanLocator;
        this.httpServerConfiguration = httpServerConfiguration::get;
    }

    public MultipartBodyArgumentBinder(BeanLocator beanLocator, BeanProvider<HttpServerConfiguration> httpServerConfiguration) {
        this.beanLocator = beanLocator;
        this.httpServerConfiguration = httpServerConfiguration;
    }

    @Override
    @Deprecated
    public boolean supportsSuperTypes() {
        return false;
    }

    @Override
    public Argument<MultipartBody> argumentType() {
        return Argument.of(MultipartBody.class);
    }

    @Override
    public ArgumentBinder.BindingResult<MultipartBody> bind(final ArgumentConversionContext<MultipartBody> context, HttpRequest<?> source) {
        NettyHttpRequest nettyHttpRequest;
        io.netty.handler.codec.http.HttpRequest nativeRequest;
        if (source instanceof NettyHttpRequest && (nativeRequest = (nettyHttpRequest = (NettyHttpRequest)source).getNativeRequest()) instanceof StreamedHttpRequest) {
            HttpContentProcessor processor = this.beanLocator.findBean(HttpContentSubscriberFactory.class, new ConsumesMediaTypeQualifier(MediaType.MULTIPART_FORM_DATA_TYPE)).map(factory -> factory.build(nettyHttpRequest)).orElse(new DefaultHttpContentProcessor(nettyHttpRequest, this.httpServerConfiguration.get()));
            return () -> Optional.of(subscriber -> processor.subscribe(new TypedSubscriber<Object>(context.getArgument()){
                Subscription s;
                AtomicLong partsRequested;
                {
                    super(x0);
                    this.partsRequested = new AtomicLong(0L);
                }

                @Override
                protected void doOnSubscribe(final Subscription subscription) {
                    this.s = subscription;
                    subscriber.onSubscribe(new Subscription(){

                        @Override
                        public void request(long n) {
                            if (partsRequested.getAndUpdate(prev -> prev + n) == 0L) {
                                s.request(n);
                            }
                        }

                        @Override
                        public void cancel() {
                            subscription.cancel();
                        }
                    });
                }

                @Override
                protected void doOnNext(Object message) {
                    HttpData data;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Server received streaming message for argument [{}]: {}", (Object)context.getArgument(), message);
                    }
                    if (message instanceof ByteBufHolder && ((ByteBufHolder)message).content() instanceof EmptyByteBuf) {
                        return;
                    }
                    if (message instanceof HttpData && (data = (HttpData)message).isCompleted()) {
                        this.partsRequested.decrementAndGet();
                        if (data instanceof FileUpload) {
                            subscriber.onNext(new NettyCompletedFileUpload((FileUpload)data, false));
                        } else if (data instanceof Attribute) {
                            subscriber.onNext(new NettyCompletedAttribute((Attribute)data, false));
                        }
                        if (data.refCnt() > 0) {
                            data.release();
                        }
                    }
                    if (this.partsRequested.get() > 0L) {
                        this.s.request(1L);
                    }
                }

                @Override
                protected void doOnError(Throwable t) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
                    }
                    try {
                        subscriber.onError(t);
                    }
                    finally {
                        this.s.cancel();
                    }
                }

                @Override
                protected void doOnComplete() {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Done receiving messages for argument: {}", (Object)context.getArgument());
                    }
                    subscriber.onComplete();
                }
            }));
        }
        return ArgumentBinder.BindingResult.EMPTY;
    }
}

