/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.QuarkusWebSocketConnectorIncomingConfiguration;
import io.quarkus.reactivemessaging.http.runtime.QuarkusWebSocketConnectorOutgoingConfiguration;
import io.quarkus.reactivemessaging.http.runtime.ReactiveWebSocketHandlerBean;
import io.quarkus.reactivemessaging.http.runtime.WebSocketMessage;
import io.quarkus.reactivemessaging.http.runtime.WebSocketSink;
import io.quarkus.reactivemessaging.http.runtime.config.TlsConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.runtime.configuration.DurationConverter;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.net.URI;
import java.time.Duration;
import java.util.Optional;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;

@Connector(value="quarkus-websocket")
@ConnectorAttributes(value={@ConnectorAttribute(name="url", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="The target URL", mandatory=true), @ConnectorAttribute(name="serializer", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Message serializer"), @ConnectorAttribute(name="maxRetries", type="int", direction=ConnectorAttribute.Direction.OUTGOING, description="The number of retries to make for sending a message to a remote websocket endpoint. A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message", defaultValue="1"), @ConnectorAttribute(name="jitter", type="double", direction=ConnectorAttribute.Direction.OUTGOING, description="Configures the random factor when using back-off with maxAttempts > 1", defaultValue="0.5"), @ConnectorAttribute(name="delay", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Configures a back-off delay between attempts to send a request. A random factor (jitter) is applied to increase the delay when several failures happen."), @ConnectorAttribute(name="tlsConfigurationName", type="string", direction=ConnectorAttribute.Direction.OUTGOING, description="Name of the TLS configuration to be used from TLS registry."), @ConnectorAttribute(name="path", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="The path of the endpoint", mandatory=true), @ConnectorAttribute(name="buffer-size", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="Web socket endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.", defaultValue="8")})
@ApplicationScoped
public class QuarkusWebSocketConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    public static final String NAME = "quarkus-websocket";
    static final String DEFAULT_JITTER = "0.5";
    static final String DEFAULT_MAX_ATTEMPTS_STR = "1";
    static final String DEFAULT_SOURCE_BUFFER_STR = "8";
    public static final Integer DEFAULT_SOURCE_BUFFER = Integer.valueOf("8");
    @Inject
    ReactiveWebSocketHandlerBean handlerBean;
    @Inject
    SerializerFactoryBase serializerFactory;
    @Inject
    Vertx vertx;
    @Inject
    Instance<TlsConfigurationRegistry> tlsRegistry;

    public PublisherBuilder<WebSocketMessage<?>> getPublisherBuilder(Config configuration) {
        QuarkusWebSocketConnectorIncomingConfiguration config = new QuarkusWebSocketConnectorIncomingConfiguration(configuration);
        String path = config.getPath();
        Multi<WebSocketMessage<?>> processor = this.handlerBean.getProcessor(path);
        return ReactiveStreams.fromPublisher((Publisher)AdaptersToReactiveStreams.publisher(processor));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config configuration) {
        QuarkusWebSocketConnectorOutgoingConfiguration config = new QuarkusWebSocketConnectorOutgoingConfiguration(configuration);
        String serializer = config.getSerializer().orElse(null);
        Optional<Duration> delay = config.getDelay().map(DurationConverter::parseDuration);
        Double jitter = config.getJitter();
        Integer maxRetries = config.getMaxRetries();
        URI url = URI.create(config.getUrl());
        Optional<TlsConfiguration> tlsConfiguration = TlsConfig.lookupConfig(config.getTlsConfigurationName(), this.tlsRegistry.isResolvable() ? Optional.of((TlsConfigurationRegistry)this.tlsRegistry.get()) : Optional.empty());
        return new WebSocketSink(this.vertx, url, serializer, this.serializerFactory, maxRetries, delay, jitter, tlsConfiguration).sink();
    }
}

