/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mcp.internal.client.connection.transport.streamable;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.mulesoft.connectors.mcp.internal.MessagingManager;
import com.mulesoft.connectors.mcp.internal.client.connection.transport.AbstractHttpClientTransport;
import com.mulesoft.connectors.mcp.internal.client.connection.transport.streamable.StreamableSseHandler;
import com.mulesoft.connectors.mcp.internal.error.McpErrorTypes;
import com.mulesoft.connectors.mcp.internal.util.HttpTransportUtils;
import io.modelcontextprotocol.spec.McpSchema;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.mule.extension.http.api.request.authentication.HttpRequestAuthentication;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.domain.entity.ByteArrayHttpEntity;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.request.HttpRequestBuilder;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.sdk.api.error.ErrorTypeDefinition;
import org.mule.sdk.api.exception.ModuleException;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class StreamableHttpClientTransport
extends AbstractHttpClientTransport {
    private static final String ACCEPT_HEADER_VALUE = "application/json, text/event-stream";
    private final String mcpEndpoint;
    private final Set<StreamableSseHandler> sseHandlers = ConcurrentHashMap.newKeySet();
    private Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler;
    private String sessionId;

    public StreamableHttpClientTransport(String serverUrl, String mcpEndpoint, int responseTimeout, HttpClient httpClient, HttpRequestAuthentication authentication, MessagingManager messagingManager) {
        super(httpClient, serverUrl, responseTimeout, authentication, messagingManager);
        this.mcpEndpoint = this.buildEndpointUrl(mcpEndpoint);
    }

    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        this.handler = handler;
        return Mono.empty();
    }

    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        HttpRequest request;
        try {
            HttpRequestBuilder requestBuilder = (HttpRequestBuilder)((HttpRequestBuilder)((HttpRequestBuilder)HttpRequest.builder((boolean)true).uri(this.mcpEndpoint).method(HttpConstants.Method.POST).addHeader("Content-Type", "application/json")).addHeader("Accept", ACCEPT_HEADER_VALUE)).entity((HttpEntity)new ByteArrayHttpEntity(this.objectMapper.writeValueAsBytes((Object)message)));
            if (this.sessionId != null) {
                requestBuilder.addHeader("Mcp-Session-Id", this.sessionId);
            }
            this.messagingManager.popOutbound(message).ifPresent(ctx -> requestBuilder.headers(ctx.getAdditionalProperties()));
            if (this.authentication != null) {
                this.authentication.authenticate(requestBuilder);
            }
            request = requestBuilder.build();
        }
        catch (JsonProcessingException e) {
            return Mono.error((Throwable)new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"Error serializing message"), (Throwable)e));
        }
        catch (Throwable e) {
            return Mono.error((Throwable)e);
        }
        return Mono.create(sink -> this.httpClient.sendAsync(request, this.requestOptions).whenComplete((response, e) -> {
            if (e != null) {
                sink.error(e);
            }
            this.processResponse((HttpResponse)response).doOnSuccess(arg_0 -> ((MonoSink)sink).success(arg_0)).doOnError(arg_0 -> ((MonoSink)sink).error(arg_0)).subscribe();
        }));
    }

    public Mono<Void> closeGracefully() {
        this.sessionId = null;
        this.sseHandlers.forEach(StreamableSseHandler::close);
        return Mono.empty();
    }

    private Mono<Void> processResponse(HttpResponse response) {
        int statusCode = response.getStatusCode();
        try {
            if (statusCode == HttpConstants.HttpStatus.ACCEPTED.getStatusCode()) {
                HttpTransportUtils.discardAndClose(response);
                return Mono.empty();
            }
            String responseContentType = response.getHeaderValue("Content-Type");
            if (responseContentType == null) {
                return Mono.error((Throwable)new ModuleException("Response with status code %d did not specified Content-Type".formatted(statusCode), (ErrorTypeDefinition)McpErrorTypes.SERVER_ERROR));
            }
            this.updateSessionId(response);
            if (responseContentType.contains("application/json")) {
                McpSchema.JSONRPCMessage responseMessage;
                try {
                    responseMessage = HttpTransportUtils.parseMessageFromBody(response);
                }
                catch (IOException ioException) {
                    return Mono.error((Throwable)new ModuleException("Could not read or parse server response with status code %d".formatted(statusCode), (ErrorTypeDefinition)McpErrorTypes.PARSE_ERROR, (Throwable)ioException));
                }
                this.handleMessage(responseMessage);
                return Mono.empty();
            }
            if (responseContentType.contains("text/event-stream")) {
                return Mono.create(sink -> {
                    StreamableSseHandler handler = new StreamableSseHandler(this.httpClient, (MonoSink<Void>)sink, response, this.objectMapper, this::handleMessage, this.sseHandlers::remove);
                    this.sseHandlers.add(handler);
                    handler.open();
                });
            }
            return Mono.error((Throwable)new ModuleException("Invalid response Content-Type: %s with statusCode %d".formatted(responseContentType, statusCode), (ErrorTypeDefinition)McpErrorTypes.SERVER_ERROR));
        }
        catch (Throwable t) {
            return Mono.error((Throwable)new ModuleException("Error processing response with status code " + statusCode, (ErrorTypeDefinition)McpErrorTypes.SERVER_ERROR, t));
        }
    }

    private void handleMessage(McpSchema.JSONRPCMessage responseMessage) {
        this.handler.apply((Mono<McpSchema.JSONRPCMessage>)Mono.just((Object)responseMessage)).subscribe();
    }

    private void updateSessionId(HttpResponse response) {
        String sessionHeader = response.getHeaderValue("Mcp-Session-Id");
        if (!StringUtils.isBlank((String)sessionHeader)) {
            this.sessionId = sessionHeader;
        }
    }

    private String buildEndpointUrl(String path) {
        if (this.serverUrl.endsWith("/")) {
            if (((String)path).startsWith("/")) {
                path = ((String)path).substring(1);
            }
        } else if (!((String)path).startsWith("/")) {
            path = "/" + (String)path;
        }
        return this.serverUrl + (String)path;
    }
}

