/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.config.TlsConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.Serializer;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.tls.TlsConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniRetry;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.jboss.logging.Logger;

class WebSocketSink {
    private static final Logger log = Logger.getLogger(WebSocketSink.class);
    private static final String WSS = "wss";
    private static final List<String> supportedSchemes = Arrays.asList("ws", "wss");
    private final URI uri;
    private final HttpClient httpClient;
    private final SubscriberBuilder<Message<?>, Void> subscriber;
    private final boolean ssl;
    private final String serializer;
    private final SerializerFactoryBase serializerFactory;
    private final AtomicReference<WebSocket> websocket = new AtomicReference();

    WebSocketSink(Vertx vertx, URI uri, String serializer, SerializerFactoryBase serializerFactory, int maxRetries, Optional<Duration> delay, double jitter, Optional<TlsConfiguration> tlsConfiguration) {
        this.uri = uri;
        this.serializerFactory = serializerFactory;
        this.serializer = serializer;
        String scheme = uri.getScheme().toLowerCase(Locale.getDefault());
        if (!supportedSchemes.contains(scheme)) {
            throw new IllegalArgumentException("Invalid scheme '" + scheme + "' for the websocket sink URL: " + String.valueOf(uri));
        }
        this.ssl = WSS.equals(scheme);
        HttpClientOptions options = new HttpClientOptions();
        tlsConfiguration.ifPresent(config -> TlsConfig.configure(options, config));
        this.httpClient = vertx.createHttpClient(options);
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(m -> {
            Uni send = this.send((Message<?>)m);
            log.debugf("maxRetries: %d", maxRetries);
            if (maxRetries > 0) {
                UniRetry retry = send.onFailure().retry();
                if (delay.isPresent()) {
                    retry = retry.withBackOff((Duration)delay.get()).withJitter(jitter);
                }
                send = retry.atMost((long)maxRetries);
            }
            return send.onItemOrFailure().transformToUni((result, error) -> {
                if (error != null) {
                    return Uni.createFrom().completionStage(m.nack(error).thenApply(x -> {
                        log.debug((Object)"error responding", error);
                        return m;
                    }));
                }
                return Uni.createFrom().completionStage(m.ack().thenApply(x -> {
                    log.debug((Object)"responded with success", error);
                    return m;
                }));
            }).subscribeAsCompletionStage();
        }).ignore();
    }

    private void connect(WebSocketConnectOptions options, Handler<AsyncResult<WebSocket>> handler) {
        log.debug((Object)"using a new web socket connection");
        this.httpClient.webSocket(options, connectResult -> {
            if (connectResult.succeeded()) {
                WebSocket result = (WebSocket)connectResult.result();
                WebSocket oldWs = this.websocket.getAndSet(result);
                if (oldWs != null) {
                    log.debug((Object)"Closing previous web socket connection");
                    oldWs.close();
                }
                result.closeHandler(ignored -> {
                    log.debug((Object)"WebSocket disconnected");
                    this.websocket.compareAndSet(result, null);
                });
                handler.handle(connectResult);
            } else {
                handler.handle((Object)Future.failedFuture((Throwable)connectResult.cause()));
            }
        });
    }

    private Uni<Void> send(Message<?> message) {
        WebSocketConnectOptions options = this.options();
        Serializer<Object> serializer = this.serializerFactory.getSerializer(this.serializer, message.getPayload());
        Buffer serialized = serializer.serialize(message.getPayload());
        return AsyncResultUni.toUni(handler -> {
            WebSocket ws = this.websocket.get();
            if (ws != null && !ws.isClosed()) {
                log.debug((Object)"reusing a previous web socket connection");
                this._send(ws, serialized, (Handler<AsyncResult<Void>>)handler);
            } else {
                this.connect(options, (Handler<AsyncResult<WebSocket>>)((Handler)result -> {
                    if (result.succeeded()) {
                        this._send((WebSocket)result.result(), serialized, (Handler<AsyncResult<Void>>)handler);
                    } else {
                        handler.handle((Object)Future.failedFuture((Throwable)result.cause()));
                    }
                }));
            }
        });
    }

    private WebSocketConnectOptions options() {
        return new WebSocketConnectOptions().setSsl(Boolean.valueOf(this.ssl)).setHost(this.uri.getHost()).setPort(Integer.valueOf(this.uri.getPort())).setURI(this.uri.getPath());
    }

    private void _send(WebSocket webSocket, Buffer serialized, Handler<AsyncResult<Void>> handler) {
        log.debug((Object)"sending out the message");
        webSocket.write((Object)serialized, writeResult -> {
            if (writeResult.succeeded()) {
                log.debug((Object)"success");
            } else {
                Throwable cause = writeResult.cause();
                log.debug((Object)"failure", cause);
            }
            handler.handle(writeResult);
        });
    }

    SubscriberBuilder<Message<?>, Void> sink() {
        return this.subscriber;
    }
}

