/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.HttpCloudEventHelper;
import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
import io.quarkus.reactivemessaging.http.runtime.config.TlsConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.Serializer;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.tls.TlsConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniRetry;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.ext.web.client.HttpRequest;
import io.vertx.mutiny.ext.web.client.HttpResponse;
import io.vertx.mutiny.ext.web.client.WebClient;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.jboss.logging.Logger;

class HttpSink {
    private static final Logger log = Logger.getLogger(HttpSink.class);
    private static final String[] SUPPORTED_SCHEMES = new String[]{"http:", "https:"};
    private final SubscriberBuilder<Message<?>, Void> subscriber;
    private final WebClient client;
    private final String method;
    private final String url;
    private final SerializerFactoryBase serializerFactory;
    private final String serializerName;

    HttpSink(Vertx vertx, String method, String url, String serializerName, int maxRetries, double jitter, Optional<Duration> delay, Optional<Integer> maxPoolSize, Optional<Integer> maxWaitQueueSize, SerializerFactoryBase serializerFactory, Optional<TlsConfiguration> tlsConfiguration) {
        this.method = method;
        this.url = url;
        this.serializerFactory = serializerFactory;
        this.serializerName = serializerName;
        WebClientOptions options = new WebClientOptions();
        if (maxPoolSize.isPresent()) {
            options.setMaxPoolSize(maxPoolSize.get().intValue());
        }
        if (maxWaitQueueSize.isPresent()) {
            options.setMaxWaitQueueSize(maxWaitQueueSize.get().intValue());
        }
        tlsConfiguration.ifPresent(config -> TlsConfig.configure((HttpClientOptions)options, config));
        this.client = WebClient.create((io.vertx.mutiny.core.Vertx)io.vertx.mutiny.core.Vertx.newInstance((Vertx)vertx), (WebClientOptions)options);
        if (Arrays.stream(SUPPORTED_SCHEMES).noneMatch(url.toLowerCase()::startsWith)) {
            throw new IllegalArgumentException("Unsupported scheme for the http connector in URL: " + url);
        }
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(m -> {
            Uni send = this.send((Message<?>)m);
            if (maxRetries > 0) {
                UniRetry retry = send.onFailure().retry();
                if (delay.isPresent()) {
                    retry = retry.withBackOff((Duration)delay.get()).withJitter(jitter);
                }
                send = retry.atMost((long)maxRetries);
            }
            return send.onItemOrFailure().transformToUni((result, error) -> {
                if (error != null) {
                    return Uni.createFrom().completionStage(m.nack(error).thenApply(x -> m));
                }
                return Uni.createFrom().completionStage(m.ack().thenApply(x -> m));
            }).subscribeAsCompletionStage();
        }).ignore();
    }

    SubscriberBuilder<Message<?>, Void> sink() {
        return this.subscriber;
    }

    private Uni<Void> send(Message<?> message) {
        HttpRequest<?> request = this.toHttpRequest(message);
        return Uni.createFrom().item(message.getPayload()).onItem().transform(this::serialize).onItem().transformToUni(buffer -> this.invoke(request, (Buffer)buffer));
    }

    private <T> Buffer serialize(T payload) {
        Serializer<T> serializer = this.serializerFactory.getSerializer(this.serializerName, payload);
        return Buffer.newInstance((io.vertx.core.buffer.Buffer)serializer.serialize(payload));
    }

    private Uni<Void> invoke(HttpRequest<?> request, Buffer buffer) {
        log.debugf("Invoking request: ", (Object)this.toString(request, buffer));
        return request.sendBuffer(buffer).onItem().transform(resp -> {
            if (resp.statusCode() >= 200 && resp.statusCode() < 300) {
                return null;
            }
            throw new VertxException("Http request: " + this.toString(request, buffer) + " failed with response: " + this.toString((HttpResponse<?>)resp));
        });
    }

    private String toString(HttpRequest<?> req, Buffer buffer) {
        return "URI:" + req.uri() + " Method:" + String.valueOf(req.method()) + " Headers: " + String.valueOf(req.headers()) + " Body: " + String.valueOf(buffer);
    }

    private String toString(HttpResponse<?> resp) {
        return "Code: " + resp.statusCode() + " Message: " + resp.statusMessage();
    }

    private HttpRequest<?> toHttpRequest(Message<?> message) {
        try {
            OutgoingHttpMetadata metadata = message.getMetadata(OutgoingHttpMetadata.class).orElse(null);
            Map<String, String> cloudEventHeaders = HttpCloudEventHelper.getCloudEventHeaders(message);
            Map<String, List<String>> httpHeaders = metadata != null ? metadata.getHeaders() : Collections.emptyMap();
            httpHeaders = this.safeAddAll(httpHeaders, cloudEventHeaders);
            Map<String, List<String>> query = metadata != null ? metadata.getQuery() : Collections.emptyMap();
            Map<String, String> pathParams = metadata != null ? metadata.getPathParameters() : Collections.emptyMap();
            String url = this.prepareUrl(pathParams);
            HttpRequest<Buffer> request = this.createRequest(url);
            this.addHeaders(request, httpHeaders);
            this.addQueryParameters(query, request);
            return request;
        }
        catch (Exception any) {
            log.error((Object)"Failed to transform message to http request", (Throwable)any);
            throw any;
        }
    }

    private Map<String, List<String>> safeAddAll(Map<String, List<String>> httpHeaders, Map<String, String> cloudEventHeaders) {
        if (cloudEventHeaders.isEmpty()) {
            return httpHeaders;
        }
        if (httpHeaders.isEmpty()) {
            return cloudEventHeaders.entrySet().stream().collect(Collectors.toMap(e -> (String)e.getKey(), e -> List.of((String)e.getValue())));
        }
        HashMap<String, List<String>> mergedMap = new HashMap<String, List<String>>(httpHeaders);
        for (Map.Entry<String, String> entry : cloudEventHeaders.entrySet()) {
            mergedMap.put(entry.getKey(), List.of(entry.getValue()));
        }
        return mergedMap;
    }

    private HttpRequest<Buffer> createRequest(String url) {
        return switch (this.method) {
            case "POST" -> this.client.postAbs(url);
            case "PUT" -> this.client.putAbs(url);
            default -> throw new IllegalArgumentException("Unsupported HTTP method: " + this.method + "only PUT and POST are supported");
        };
    }

    private void addQueryParameters(Map<String, List<String>> query, HttpRequest<Buffer> request) {
        for (Map.Entry<String, List<String>> queryParam : query.entrySet()) {
            for (String queryParamValue : queryParam.getValue()) {
                request.addQueryParam(queryParam.getKey(), queryParamValue);
            }
        }
    }

    private void addHeaders(HttpRequest<Buffer> request, Map<String, List<String>> httpHeaders) {
        if (!httpHeaders.isEmpty()) {
            for (Map.Entry<String, List<String>> header : httpHeaders.entrySet()) {
                request.putHeader(header.getKey(), (Iterable)header.getValue());
            }
        }
    }

    private String prepareUrl(Map<String, String> pathParams) {
        String result = this.url;
        for (Map.Entry<String, String> pathParamEntry : pathParams.entrySet()) {
            String toReplace = String.format("{%s}", pathParamEntry.getKey());
            if (this.url.contains(toReplace)) {
                result = this.url.replace(toReplace, pathParamEntry.getValue());
                continue;
            }
            log.warnf("Failed to find %s in the URL that would correspond to the %s path parameter", (Object)toReplace, (Object)pathParamEntry.getKey());
        }
        return result;
    }
}

