/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.binders;

import io.micronaut.context.BeanLocator;
import io.micronaut.context.Qualifier;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.subscriber.TypedSubscriber;
import io.micronaut.core.bind.ArgumentBinder;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionError;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.convert.exceptions.ConversionErrorException;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.bind.binders.DefaultBodyAnnotationBinder;
import io.micronaut.http.bind.binders.NonBlockingBodyArgumentBinder;
import io.micronaut.http.netty.stream.StreamedHttpRequest;
import io.micronaut.http.server.HttpServerConfiguration;
import io.micronaut.http.server.netty.DefaultHttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentProcessor;
import io.micronaut.http.server.netty.HttpContentSubscriberFactory;
import io.micronaut.http.server.netty.NettyHttpRequest;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.web.router.exceptions.UnsatisfiedRouteException;
import io.micronaut.web.router.qualifier.ConsumesMediaTypeQualifier;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.EmptyByteBuf;
import io.netty.util.ReferenceCounted;
import java.util.Optional;
import javax.inject.Singleton;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Internal
public class PublisherBodyBinder
extends DefaultBodyAnnotationBinder<Publisher>
implements NonBlockingBodyArgumentBinder<Publisher> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServer.class);
    private final BeanLocator beanLocator;
    private final HttpServerConfiguration httpServerConfiguration;

    public PublisherBodyBinder(ConversionService conversionService, BeanLocator beanLocator, HttpServerConfiguration httpServerConfiguration) {
        super(conversionService);
        this.beanLocator = beanLocator;
        this.httpServerConfiguration = httpServerConfiguration;
    }

    public Argument<Publisher> argumentType() {
        return Argument.of(Publisher.class);
    }

    public ArgumentBinder.BindingResult<Publisher> bind(final ArgumentConversionContext<Publisher> context, HttpRequest<?> source) {
        NettyHttpRequest nettyHttpRequest;
        io.netty.handler.codec.http.HttpRequest nativeRequest;
        if (source instanceof NettyHttpRequest && (nativeRequest = (nettyHttpRequest = (NettyHttpRequest)source).getNativeRequest()) instanceof StreamedHttpRequest) {
            Optional contentType = source.getContentType();
            final Argument targetType = context.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
            HttpContentProcessor processor = contentType.flatMap(type -> this.beanLocator.findBean(HttpContentSubscriberFactory.class, (Qualifier)new ConsumesMediaTypeQualifier(type))).map(factory -> factory.build(nettyHttpRequest)).orElse(new DefaultHttpContentProcessor(nettyHttpRequest, this.httpServerConfiguration));
            return () -> Optional.of(subscriber -> processor.subscribe((Subscriber)new TypedSubscriber<Object>(context.getArgument()){
                Subscription s;

                protected void doOnSubscribe(Subscription subscription) {
                    this.s = subscription;
                    subscriber.onSubscribe(subscription);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                protected void doOnNext(Object message) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Server received streaming message for argument [{}]: {}", (Object)context.getArgument(), message);
                    }
                    ArgumentConversionContext conversionContext = context.with(targetType);
                    if (message instanceof ByteBufHolder && (message = ((ByteBufHolder)message).content()) instanceof EmptyByteBuf) {
                        return;
                    }
                    Optional converted = PublisherBodyBinder.this.conversionService.convert(message, conversionContext);
                    if (converted.isPresent()) {
                        subscriber.onNext(converted.get());
                    } else {
                        try {
                            Optional lastError = conversionContext.getLastError();
                            if (lastError.isPresent()) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Cannot convert message for argument [" + context.getArgument() + "] and value: " + message, lastError.get());
                                }
                                subscriber.onError((Throwable)new ConversionErrorException(context.getArgument(), (ConversionError)lastError.get()));
                            } else {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Cannot convert message for argument [{}] and value: {}", (Object)context.getArgument(), message);
                                }
                                subscriber.onError((Throwable)new UnsatisfiedRouteException(context.getArgument()));
                            }
                        }
                        finally {
                            this.s.cancel();
                        }
                    }
                    if (message instanceof ReferenceCounted) {
                        ((ReferenceCounted)message).release();
                    }
                }

                protected void doOnError(Throwable t) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Server received error for argument [" + context.getArgument() + "]: " + t.getMessage(), t);
                    }
                    try {
                        subscriber.onError(t);
                    }
                    finally {
                        this.s.cancel();
                    }
                }

                protected void doOnComplete() {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Done receiving messages for argument: {}", (Object)context.getArgument());
                    }
                    subscriber.onComplete();
                }
            }));
        }
        return ArgumentBinder.BindingResult.EMPTY;
    }
}

