/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mcp.internal.client.connection.transport.streamable;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mulesoft.connectors.mcp.internal.error.McpErrorTypes;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.function.Consumer;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.sse.ServerSentEvent;
import org.mule.runtime.http.api.sse.client.SseListener;
import org.mule.runtime.http.api.sse.client.SseSource;
import org.mule.runtime.http.api.sse.client.SseSourceConfig;
import org.mule.sdk.api.error.ErrorTypeDefinition;
import org.mule.sdk.api.exception.ModuleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.MonoSink;

class StreamableSseHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamableSseHandler.class);
    private final MonoSink<Void> sink;
    private final SseSource sseSource;
    private final Consumer<McpSchema.JSONRPCMessage> onMessage;
    private final Consumer<StreamableSseHandler> onClose;
    private final ObjectMapper objectMapper;

    public StreamableSseHandler(HttpClient httpClient, MonoSink<Void> sink, HttpResponse response, ObjectMapper objectMapper, Consumer<McpSchema.JSONRPCMessage> onMessage, Consumer<StreamableSseHandler> onClose) {
        this.sink = sink;
        this.objectMapper = objectMapper;
        this.onMessage = onMessage;
        this.onClose = onClose;
        this.sseSource = httpClient.sseSource(SseSourceConfig.fromResponse((HttpResponse)response).build());
        this.sseSource.register(this.onMessage());
        this.sseSource.doOnConnectionFailure(ctx -> sink.error((Throwable)new ConnectionException("Connection error consuming SSE response", ctx.error())));
    }

    public void open() {
        this.sseSource.open();
    }

    public void close() {
        try {
            this.sseSource.close();
        }
        catch (Exception e) {
            LOGGER.error("Error while closing SSE source", (Throwable)e);
        }
    }

    private SseListener onMessage() {
        return new SseListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onEvent(ServerSentEvent event) {
                McpSchema.JSONRPCMessage message;
                if (!"message".equals(event.getName())) {
                    LOGGER.error("Received unexpected SSE event type: {}", (Object)event.getName());
                    return;
                }
                try {
                    message = McpSchema.deserializeJsonRpcMessage((ObjectMapper)StreamableSseHandler.this.objectMapper, (String)event.getData());
                }
                catch (Exception e) {
                    StreamableSseHandler.this.sink.error((Throwable)new ModuleException("Could not read or parse server sent event", (ErrorTypeDefinition)McpErrorTypes.PARSE_ERROR, (Throwable)e));
                    StreamableSseHandler.this.close();
                    return;
                }
                try {
                    StreamableSseHandler.this.onMessage.accept(message);
                }
                catch (Exception e) {
                    StreamableSseHandler.this.sink.error((Throwable)e);
                    StreamableSseHandler.this.close();
                }
                finally {
                    if (message instanceof McpSchema.JSONRPCResponse) {
                        StreamableSseHandler.this.sink.success();
                    }
                }
            }

            public void onClose() {
                StreamableSseHandler.this.onClose.accept(StreamableSseHandler.this);
            }
        };
    }
}

