/*
 * Decompiled with CFR 0.152.
 */
package dev.snowdrop.vertx.http.common;

import dev.snowdrop.vertx.http.common.WriteStreamSubscriber;
import dev.snowdrop.vertx.http.utils.BufferConverter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketBase;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class VertxWebSocketSession
extends AbstractWebSocketSession<WebSocketBase> {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final BufferConverter bufferConverter;

    public VertxWebSocketSession(WebSocketBase delegate, HandshakeInfo handshakeInfo, BufferConverter bufferConverter) {
        super((Object)delegate, ObjectUtils.getIdentityHexString((Object)delegate), handshakeInfo, (DataBufferFactory)bufferConverter.getDataBufferFactory());
        this.bufferConverter = bufferConverter;
    }

    public Flux<WebSocketMessage> receive() {
        return Flux.create(sink -> {
            this.logger.debug("{}Connecting to a web socket read stream", (Object)this.getLogPrefix());
            WebSocketBase socket = (WebSocketBase)this.getDelegate();
            socket.pause().textMessageHandler(payload -> {
                this.logger.debug("{}Received text '{}' from a web socket read stream", (Object)this.getLogPrefix(), payload);
                sink.next((Object)this.textMessage((String)payload));
            }).binaryMessageHandler(payload -> {
                this.logger.debug("{}Received binary '{}' from a web socket read stream", (Object)this.getLogPrefix(), payload);
                sink.next((Object)this.binaryMessage((Buffer)payload));
            }).pongHandler(payload -> {
                this.logger.debug("{}Received pong '{}' from a web socket read stream", (Object)this.getLogPrefix(), payload);
                sink.next((Object)this.pongMessage((Buffer)payload));
            }).exceptionHandler(throwable -> {
                this.logger.debug("{}Received exception '{}' from a web socket read stream", (Object)this.getLogPrefix(), throwable);
                sink.error(throwable);
            }).endHandler(e -> {
                this.logger.debug("{}Web socket read stream ended", (Object)this.getLogPrefix());
                sink.complete();
            });
            sink.onRequest(i -> {
                this.logger.debug("{}Fetching '{}' entries from a web socket read stream", (Object)this.getLogPrefix(), (Object)i);
                socket.fetch(i);
            });
        });
    }

    public Mono<Void> send(Publisher<WebSocketMessage> messages) {
        return Mono.create(sink -> {
            this.logger.debug("{}Subscribing to messages publisher", (Object)this.getLogPrefix());
            WriteStreamSubscriber<WebSocketBase, WebSocketMessage> subscriber = new WriteStreamSubscriber.Builder().writeStream((WebSocketBase)this.getDelegate()).nextHandler(this::messageHandler).endHook((MonoSink<Void>)sink).build();
            messages.subscribe(subscriber);
        });
    }

    public Mono<Void> close(CloseStatus status) {
        this.logger.debug("{}Closing web socket with status '{}'", (Object)this.getLogPrefix(), (Object)status);
        return Mono.create(sink -> ((WebSocketBase)this.getDelegate()).closeHandler(e -> {
            this.logger.debug("{}Web socket closed", (Object)this.getLogPrefix());
            sink.success();
        }).close((short)status.getCode(), status.getReason()));
    }

    private void messageHandler(WebSocketBase socket, WebSocketMessage message) {
        if (message.getType() == WebSocketMessage.Type.TEXT) {
            String payload = message.getPayloadAsText();
            socket.writeTextMessage(payload);
        } else {
            Buffer buffer = this.bufferConverter.toBuffer(message.getPayload());
            if (message.getType() == WebSocketMessage.Type.PING) {
                socket.writePing(buffer);
            } else if (message.getType() == WebSocketMessage.Type.PONG) {
                socket.writePong(buffer);
            } else {
                socket.writeBinaryMessage(buffer);
            }
        }
    }

    private WebSocketMessage binaryMessage(Buffer payloadBuffer) {
        DataBuffer payload = this.bufferConverter.toDataBuffer(payloadBuffer);
        return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);
    }

    private WebSocketMessage pongMessage(Buffer payloadBuffer) {
        DataBuffer payload = this.bufferConverter.toDataBuffer(payloadBuffer);
        return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
    }
}

