/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.HttpMessage;
import io.quarkus.reactivemessaging.http.runtime.HttpSink;
import io.quarkus.reactivemessaging.http.runtime.QuarkusHttpConnectorIncomingConfiguration;
import io.quarkus.reactivemessaging.http.runtime.QuarkusHttpConnectorOutgoingConfiguration;
import io.quarkus.reactivemessaging.http.runtime.ReactiveHttpHandlerBean;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.runtime.configuration.DurationConverter;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import java.time.Duration;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.jboss.logging.Logger;
import org.reactivestreams.Publisher;

@ConnectorAttributes(value={@ConnectorAttribute(name="url", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="The target URL", mandatory=true), @ConnectorAttribute(name="serializer", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Message serializer"), @ConnectorAttribute(name="maxPoolSize", type="int", direction=ConnectorAttribute.Direction.OUTGOING, description="Maximum pool size for connections"), @ConnectorAttribute(name="maxWaitQueueSize", type="int", direction=ConnectorAttribute.Direction.OUTGOING, description="Maximum requests allowed in the wait queue of the underlying client.  If the value is set to a negative number then the queue will be unbounded"), @ConnectorAttribute(name="maxRetries", type="int", direction=ConnectorAttribute.Direction.OUTGOING, description="The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero", defaultValue="0"), @ConnectorAttribute(name="jitter", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configures the random factor when using back-off with maxRetries > 0", defaultValue="0.5"), @ConnectorAttribute(name="delay", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configures a back-off delay between attempts to send a request. A random factor (jitter) is applied to increase the delay when several failures happen."), @ConnectorAttribute(name="method", type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description="The HTTP method (either `POST` or `PUT`)", defaultValue="POST"), @ConnectorAttribute(name="path", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="The path of the endpoint", mandatory=true), @ConnectorAttribute(name="buffer-size", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.", defaultValue="8"), @ConnectorAttribute(name="broadcast", type="boolean", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the messages should be dispatched to multiple consumers", defaultValue="false")})
@Connector(value="quarkus-http")
@ApplicationScoped
public class QuarkusHttpConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    private static final Logger log = Logger.getLogger(QuarkusHttpConnector.class);
    static final String DEFAULT_JITTER = "0.5";
    static final String DEFAULT_MAX_ATTEMPTS_STR = "0";
    static final String DEFAULT_SOURCE_BUFFER_STR = "8";
    public static final Integer DEFAULT_SOURCE_BUFFER = Integer.valueOf("8");
    public static final String NAME = "quarkus-http";
    @Inject
    ReactiveHttpHandlerBean handlerBean;
    @Inject
    Vertx vertx;
    @Inject
    SerializerFactoryBase serializerFactory;

    public PublisherBuilder<HttpMessage<?>> getPublisherBuilder(Config configuration) {
        QuarkusHttpConnectorIncomingConfiguration config = new QuarkusHttpConnectorIncomingConfiguration(configuration);
        String methodAsString = config.getMethod();
        HttpMethod method = this.getMethod(methodAsString);
        Multi<HttpMessage<?>> processor = this.handlerBean.getProcessor(config.getPath(), method);
        boolean broadcast = config.getBroadcast();
        if (broadcast) {
            return ReactiveStreams.fromPublisher((Publisher)processor.broadcast().toAllSubscribers());
        }
        return ReactiveStreams.fromPublisher(processor);
    }

    private HttpMethod getMethod(String methodAsString) {
        try {
            return HttpMethod.valueOf((String)methodAsString);
        }
        catch (IllegalArgumentException e) {
            String error = "Unsupported HTTP method: " + methodAsString + ". The supported methods are: " + HttpMethod.values();
            log.warn((Object)error, (Throwable)e);
            throw new IllegalArgumentException(error);
        }
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config configuration) {
        double jitter;
        QuarkusHttpConnectorOutgoingConfiguration config = new QuarkusHttpConnectorOutgoingConfiguration(configuration);
        String url = config.getUrl();
        String method = this.getMethod(config.getMethod()).name();
        String serializer = config.getSerializer().orElse(null);
        Optional<String> maybeDelay = config.getDelay();
        Optional<Duration> delay = maybeDelay.map(DurationConverter::parseDuration);
        String jitterAsString = config.getJitter();
        Integer maxRetries = config.getMaxRetries();
        Optional<Integer> maxPoolSize = config.getMaxPoolSize();
        Optional<Integer> maxWaitQueueSize = config.getMaxWaitQueueSize();
        try {
            jitter = Double.valueOf(jitterAsString);
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException(String.format("Failed to parse jitter value '%s' to a double.", jitterAsString));
        }
        return new HttpSink(this.vertx, method, url, serializer, maxRetries, jitter, delay, maxPoolSize, maxWaitQueueSize, this.serializerFactory).sink();
    }
}

