/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.netty.impl.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiFunction;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.streaming.Cursor;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.entity.multipart.MultipartHttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.netty.impl.client.ChunkedHttpEntityPublisher;
import org.mule.service.http.netty.impl.client.MaxRedirectException;
import org.mule.service.http.netty.impl.message.HttpResponseCreator;
import org.mule.service.http.netty.impl.message.ReactorNettyResponseWrapper;
import org.mule.service.http.netty.impl.server.util.HttpParser;
import org.mule.service.http.netty.impl.util.HttpResponseCreatorUtils;
import org.mule.service.http.netty.impl.util.HttpUtils;
import org.mule.service.http.netty.impl.util.MuleToNettyUtils;
import org.mule.service.http.netty.impl.util.NettyUtils;
import org.mule.service.http.netty.impl.util.ReactorNettyUtils;
import org.mule.service.http.netty.impl.util.RedirectHelper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

public class ReactorNettyClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final int MAX_REDIRECTS = 5;
    static final AttributeKey<CompletableFuture<MultiMap<String, String>>> TRAILERS_FUTURE = AttributeKey.valueOf((String)"TRAILERS_FUTURE");
    private final String clientName;
    private final HttpClient httpClient;
    private final Executor ioExecutor;
    private final boolean streamingEnabled;
    static final AttributeKey<HttpEntity> REQUEST_ENTITY_KEY = AttributeKey.valueOf((String)"REQUEST_ENTITY");
    static final AttributeKey<Boolean> ALWAYS_SEND_BODY_KEY = AttributeKey.valueOf((String)"ALWAYS_SEND_BODY");
    static final AttributeKey<Boolean> REDIRECT_CHANGE_METHOD = AttributeKey.valueOf((String)"REDIRECT_CHANGE_METHOD");

    public ReactorNettyClient(String clientName, HttpClient httpClient, Executor ioExecutor, boolean streamingEnabled) {
        this.clientName = clientName;
        this.httpClient = httpClient;
        this.ioExecutor = ioExecutor;
        this.streamingEnabled = streamingEnabled;
    }

    public Flux<ByteBuf> sendAsyncRequest(HttpRequest request, HttpRequestOptions options, HttpHeaders headersToAdd, BiFunction<HttpClientResponse, ByteBufFlux, Publisher<ByteBuf>> responseFunction, CompletableFuture<HttpResponse> result) {
        RedirectHelper redirectHelper = new RedirectHelper(headersToAdd);
        URI uri = ReactorNettyClient.uriWithQueryParams(request);
        Map propagatedMdc = MDC.getCopyOfContextMap();
        Reference overriddenMDC = new Reference();
        LOGGER.debug("Sending request to {} with headers {}", (Object)uri, (Object)headersToAdd);
        return ((HttpClient.RequestSender)((HttpClient)this.httpClient.followRedirect((req, res) -> {
            if (req.redirectedFrom().length >= 5) {
                throw new MuleRuntimeException((Throwable)new MaxRedirectException("Max redirects limit reached."));
            }
            return redirectHelper.isRedirectStatusCode(res.status().code()) && options.isFollowsRedirect();
        }, redirectHelper::addCookiesToRedirectedRequest).doOnRedirect((response, connection) -> {
            redirectHelper.handleRedirectResponse((HttpClientResponse)response);
            boolean shouldPreserveMethod = Boolean.FALSE.equals(redirectHelper.shouldChangeMethod());
            if (shouldPreserveMethod || options.shouldSendBodyAlways()) {
                this.rewindStreamContent(request.getEntity());
            }
        }).doOnConnected(connection -> {
            overriddenMDC.set((Object)MDC.getCopyOfContextMap());
            MDC.setContextMap((Map)propagatedMdc);
            connection.channel().attr(ALWAYS_SEND_BODY_KEY).set((Object)options.shouldSendBodyAlways());
            connection.channel().attr(REDIRECT_CHANGE_METHOD).set((Object)redirectHelper.shouldChangeMethod());
            connection.channel().attr(AttributeKey.valueOf((String)"removeContentLength")).set((Object)MuleToNettyUtils.calculateShouldRemoveContentLength(request));
            if (request.getEntity() != null && this.isExpect(headersToAdd)) {
                connection.channel().attr(REQUEST_ENTITY_KEY).set((Object)request.getEntity());
            }
            connection.channel().attr(TRAILERS_FUTURE).set(HttpResponseCreatorUtils.trailersAsFuture(request.getEntity()));
        })).responseTimeout(Duration.ofMillis(options.getResponseTimeout())).headers(h -> h.add(headersToAdd)).request(HttpMethod.valueOf((String)request.getMethod())).uri(uri)).send((httpClientRequest, nettyOutbound) -> {
            if (this.isExpect(httpClientRequest.requestHeaders())) {
                return nettyOutbound.send((Publisher)Mono.empty());
            }
            if (request.getEntity() != null) {
                return nettyOutbound.send(this.entityPublisher(request), buffer -> false);
            }
            return null;
        }).response(responseFunction).onErrorMap(ReactorNettyUtils::onErrorMap).doOnError(result::completeExceptionally).doOnComplete(() -> MDC.setContextMap((Map)((Map)overriddenMDC.get()))).onErrorComplete();
    }

    private boolean isExpect(HttpHeaders entries) {
        return entries.contains("expect");
    }

    private static URI uriWithQueryParams(HttpRequest request) {
        MultiMap queryParams = request.getQueryParams();
        if (queryParams.isEmpty()) {
            return HttpUtils.ensureSchemeAndHost(request.getUri());
        }
        return URI.create(HttpUtils.buildUriString(request.getUri(), (MultiMap<String, String>)queryParams));
    }

    private Publisher<? extends ByteBuf> entityPublisher(HttpRequest request) {
        HttpEntity entity = request.getEntity();
        if (entity.getBytesLength().isPresent() && entity.getBytesLength().getAsLong() == 0L) {
            return Mono.empty();
        }
        if (entity.isComposed()) {
            try {
                return new ChunkedHttpEntityPublisher((HttpEntity)HttpParser.fromMultipartEntity(request.getHeaderValue("Content-Type"), (MultipartHttpEntity)entity, ct -> {}, Collections.singletonMap("Content-Transfer-Encoding", "binary")));
            }
            catch (IOException e) {
                return new ChunkedHttpEntityPublisher(entity);
            }
        }
        if (entity.isReactive()) {
            return this.reactiveEntityToByteBufFlux(entity);
        }
        return new ChunkedHttpEntityPublisher(entity);
    }

    public Publisher<ByteBuf> receiveContent(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        LOGGER.debug("Received response with headers {} and status {}", (Object)response.responseHeaders(), (Object)response.status());
        if (this.streamingEnabled) {
            return this.handleResponseStreaming(response, content, result);
        }
        return this.handleResponseNonStreaming(response, content, result);
    }

    void prepareContentForRepeatability(HttpEntity entity) {
        if (entity != null && !entity.isReactive() && entity.isStreaming() && entity.getContent().markSupported()) {
            this.doReset(entity.getContent());
            entity.getContent().mark(0);
        }
    }

    void rewindStreamContent(HttpEntity entity) {
        if (entity != null && entity.isStreaming()) {
            InputStream inputStream = entity.getContent();
            if (inputStream instanceof Cursor) {
                Cursor cursor = (Cursor)inputStream;
                try {
                    cursor.seek(0L);
                }
                catch (IOException e) {
                    LOGGER.warn("Unable to perform seek(0) on input stream being sent by {}: {}", (Object)this.clientName, (Object)e.getMessage());
                }
            } else if (entity.getContent().markSupported()) {
                this.doReset(entity.getContent());
            } else {
                LOGGER.warn("Stream '{}' cannot be rewinded, payload cannot be resent by {}", entity.getContent().getClass(), (Object)this.clientName);
            }
        }
    }

    private void doReset(InputStream getContent) {
        try {
            getContent.reset();
        }
        catch (IOException e) {
            LOGGER.warn("Unable to reset the input stream: {}", (Object)e.getMessage());
        }
    }

    private Publisher<ByteBuf> handleResponseStreaming(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        try {
            ReactorNettyResponseWrapper feedableResponse = new HttpResponseCreator().createFeedable(response);
            this.tryCompleteAsyncWithCallerRunsFallback(result, feedableResponse);
            return content.retain().doOnNext(data -> {
                try {
                    feedableResponse.feed(NettyUtils.toNioBuffer(data));
                }
                catch (Exception e) {
                    result.completeExceptionally(e);
                    feedableResponse.error(e);
                }
                finally {
                    ReferenceCountUtil.release((Object)data);
                }
            }).doOnComplete(() -> {
                try {
                    HttpResponseCreatorUtils.trailersAsFuture(response).whenComplete((trailers, throwable) -> {
                        try {
                            if (throwable != null) {
                                feedableResponse.error((Exception)throwable);
                                return;
                            }
                            if (trailers.isEmpty()) {
                                feedableResponse.complete();
                                return;
                            }
                            feedableResponse.completeWithTrailers((MultiMap<String, String>)trailers);
                        }
                        catch (Exception e) {
                            feedableResponse.error(e);
                        }
                    });
                }
                catch (Exception e) {
                    feedableResponse.error(e);
                }
            }).doOnError(error -> {
                Exception mappedError = ReactorNettyUtils.onErrorMap(error);
                feedableResponse.error(mappedError);
            });
        }
        catch (Exception e) {
            result.completeExceptionally(e);
            return content;
        }
    }

    private void tryCompleteAsyncWithCallerRunsFallback(CompletableFuture<HttpResponse> result, ReactorNettyResponseWrapper response) {
        try {
            result.completeAsync(() -> response, this.ioExecutor);
        }
        catch (RejectedExecutionException e) {
            result.complete(response);
        }
    }

    private Publisher<ByteBuf> handleResponseNonStreaming(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        return content.aggregate().doOnError(error -> result.completeExceptionally(ReactorNettyUtils.onErrorMap(error))).doOnSuccess(byteBuf -> result.complete(new HttpResponseCreator().create(response, (ByteBuf)byteBuf)));
    }

    private Flux<ByteBuf> reactiveEntityToByteBufFlux(HttpEntity entity) {
        return Flux.create(sink -> this.propagateDataFromEntityToSink(entity, (FluxSink<ByteBuf>)sink), (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR);
    }

    private void propagateDataFromEntityToSink(HttpEntity entity, FluxSink<ByteBuf> sink) {
        entity.onData(data -> sink.next((Object)Unpooled.wrappedBuffer((ByteBuffer)data)));
        entity.onComplete((ts, err) -> {
            if (err != null) {
                sink.error(err);
            } else {
                sink.complete();
            }
        });
    }
}

