/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase;
import io.quarkus.reactivemessaging.http.runtime.RequestMetadata;
import io.quarkus.reactivemessaging.http.runtime.StrictQueueSizeGuard;
import io.quarkus.reactivemessaging.http.runtime.WebSocketMessage;
import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.quarkus.reactivemessaging.http.runtime.config.WebSocketStreamConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.ext.web.RoutingContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import org.jboss.logging.Logger;

@Singleton
public class ReactiveWebSocketHandlerBean
extends ReactiveHandlerBeanBase<WebSocketStreamConfig, WebSocketMessage<?>> {
    private static final Logger log = Logger.getLogger(ReactiveWebSocketHandlerBean.class);
    @Inject
    ReactiveHttpConfig config;

    @Override
    protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocketMessage<?>> emitter, StrictQueueSizeGuard guard, String path) {
        event.request().toWebSocket(webSocket -> {
            if (webSocket.failed()) {
                log.error((Object)"failed to connect web socket", webSocket.cause());
            } else {
                ServerWebSocket serverWebSocket = (ServerWebSocket)webSocket.result();
                serverWebSocket.handler(b -> {
                    if (emitter == null) {
                        this.onUnexpectedError(serverWebSocket, null, "No consumer subscribed for messages sent to Reactive Messaging WebSocket endpoint on path: " + path);
                    } else if (guard.prepareToEmit()) {
                        try {
                            emitter.emit(new WebSocketMessage<Buffer>((Buffer)b, new RequestMetadata(event), () -> serverWebSocket.write((Object)Buffer.buffer((String)"ACK")), error -> this.onUnexpectedError(serverWebSocket, (Throwable)error, "Failed to process incoming web socket message.")));
                        }
                        catch (Exception error2) {
                            guard.dequeue();
                            this.onUnexpectedError(serverWebSocket, error2, "Emitting message failed");
                        }
                    } else {
                        serverWebSocket.write((Object)Buffer.buffer((String)"BUFFER_OVERFLOW"));
                    }
                });
            }
        });
    }

    @Override
    protected String description(WebSocketStreamConfig config) {
        return String.format("path %s", config.path);
    }

    @Override
    protected String key(WebSocketStreamConfig config) {
        return config.path;
    }

    @Override
    protected String key(RoutingContext context) {
        return context.currentRoute().getPath();
    }

    @Override
    protected Collection<WebSocketStreamConfig> configs() {
        return this.config.getWebSocketConfigs();
    }

    private void onUnexpectedError(ServerWebSocket serverWebSocket, Throwable error, String message) {
        log.error((Object)message, error);
        serverWebSocket.close((short)3500, "Unexpected error while processing the message");
    }

    Multi<WebSocketMessage<?>> getProcessor(String path) {
        ReactiveHandlerBeanBase.Bundle bundle = (ReactiveHandlerBeanBase.Bundle)this.processors.get(path);
        if (bundle == null) {
            throw new IllegalStateException("No incoming stream defined for path " + path);
        }
        return bundle.getProcessor();
    }
}

