/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mcp.internal.client.connection.transport.sse;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mulesoft.connectors.mcp.internal.MessagingManager;
import com.mulesoft.connectors.mcp.internal.client.connection.transport.AbstractHttpClientTransport;
import com.mulesoft.connectors.mcp.internal.error.McpErrorTypes;
import com.mulesoft.connectors.mcp.internal.util.HttpTransportUtils;
import io.modelcontextprotocol.spec.McpSchema;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.mule.extension.http.api.request.authentication.HttpRequestAuthentication;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.domain.entity.ByteArrayHttpEntity;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.request.HttpRequestBuilder;
import org.mule.runtime.http.api.sse.client.SseFailureContext;
import org.mule.runtime.http.api.sse.client.SseListener;
import org.mule.runtime.http.api.sse.client.SseRetryConfig;
import org.mule.runtime.http.api.sse.client.SseSource;
import org.mule.runtime.http.api.sse.client.SseSourceConfig;
import org.mule.sdk.api.error.ErrorTypeDefinition;
import org.mule.sdk.api.exception.ModuleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class SseClientTransport
extends AbstractHttpClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(SseClientTransport.class);
    private final String sseEndpointPath;
    private final long reconnectDelay;
    private final TimeUnit reconenctDelayTimeUnit;
    private SseSource sseSource;
    private Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler;
    private volatile URI messageEndpoint;
    private volatile CompletableFuture<Void> messageEndpointFuture;

    public SseClientTransport(String serverUrl, String sseEndpointPath, long reconnectDelay, TimeUnit reconnectDelayTimeUnit, int responseTimeout, HttpClient httpClient, HttpRequestAuthentication authentication, MessagingManager messagingManager) {
        super(httpClient, serverUrl, responseTimeout, authentication, messagingManager);
        this.sseEndpointPath = sseEndpointPath;
        this.reconnectDelay = reconnectDelay;
        this.reconenctDelayTimeUnit = reconnectDelayTimeUnit;
    }

    public void open() throws ConnectionException {
        if (!this.opened.compareAndSet(false, true)) {
            return;
        }
        this.messageEndpointFuture = new CompletableFuture();
        try {
            this.sseSource = this.httpClient.sseSource(SseSourceConfig.builder((String)this.parseEndpointUri(this.sseEndpointPath).toString()).withPreserveHeadersCase(true).withRetryConfig(new SseRetryConfig(true, this.reconenctDelayTimeUnit.toMillis(this.reconnectDelay), true)).withRequestOptions(this.requestOptions).withRequestCustomizer(request -> {
                if (this.authentication != null) {
                    try {
                        this.authentication.authenticate(request);
                    }
                    catch (MuleException e) {
                        throw new ModuleException("Found exception applying request authentication", (ErrorTypeDefinition)McpErrorTypes.INTERNAL_ERROR, (Throwable)e);
                    }
                }
            }).build());
            this.sseSource.register("endpoint", this.onEndpointEvent());
            this.sseSource.register("message", this.onMessageEvent());
            this.sseSource.register(this.onUnknownEvent());
            this.sseSource.doOnConnectionFailure(this::onSseError);
            this.sseSource.open();
        }
        catch (Exception e) {
            throw new ConnectionException((Throwable)e);
        }
        this.awaitMessageEndpoint();
    }

    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        if (this.messageEndpoint == null) {
            try {
                this.open();
            }
            catch (Exception e) {
                return Mono.error((Throwable)e);
            }
        }
        this.handler = handler;
        return Mono.empty();
    }

    public Mono<Void> closeGracefully() {
        this.opened.set(false);
        if (this.sseSource != null) {
            try {
                this.sseSource.close();
            }
            catch (Exception e) {
                LOGGER.error("Exception found closing sseEventSource", (Throwable)e);
            }
        }
        return Mono.empty();
    }

    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        HttpRequest request;
        try {
            this.assureMessageTargetReceived();
            HttpRequestBuilder requestBuilder = (HttpRequestBuilder)((HttpRequestBuilder)HttpRequest.builder((boolean)true).uri(this.messageEndpoint).method(HttpConstants.Method.POST).addHeader("Content-Type", "application/json")).entity((HttpEntity)new ByteArrayHttpEntity(this.objectMapper.writeValueAsBytes((Object)message)));
            this.messagingManager.popOutbound(message).ifPresent(ctx -> requestBuilder.headers(ctx.getAdditionalProperties()));
            if (this.authentication != null) {
                this.authentication.authenticate(requestBuilder);
            }
            request = requestBuilder.build();
        }
        catch (JsonProcessingException e) {
            return Mono.error((Throwable)new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"Error serializing message"), (Throwable)e));
        }
        catch (Throwable e) {
            return Mono.error((Throwable)e);
        }
        return Mono.create(sink -> {
            if (!this.opened.get()) {
                LOGGER.debug("sendMessage() invoked but transport closed. Ignoring");
                return;
            }
            this.httpClient.sendAsync(request, this.requestOptions).whenComplete((response, e) -> {
                if (e != null) {
                    sink.error(e);
                } else if (response.getStatusCode() >= 200 && response.getStatusCode() < 300) {
                    HttpTransportUtils.discardAndClose(response);
                    sink.success();
                } else {
                    sink.error((Throwable)new ModuleException("Unexpected response status: " + response.getStatusCode() + ". " + response.getReasonPhrase(), (ErrorTypeDefinition)McpErrorTypes.SERVER_ERROR));
                }
            });
        });
    }

    private void awaitMessageEndpoint() throws ConnectionException {
        try {
            this.messageEndpointFuture.get(10L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            throw new ConnectionException("Timeout waiting for message endpoint", (Throwable)e);
        }
        catch (InterruptedException e) {
            throw new ConnectionException("Interrupted waiting for message endpoint", (Throwable)e);
        }
        catch (ExecutionException e) {
            throw new ConnectionException("Exception waiting for message endpoint", e.getCause());
        }
        this.assureMessageTargetReceived();
    }

    private void assureMessageTargetReceived() throws ConnectionException {
        if (this.messageEndpoint == null) {
            throw new ConnectionException("Message endpoint not received");
        }
    }

    private void onSseError(SseFailureContext context) {
        if (!this.opened.get()) {
            return;
        }
        LOGGER.error("SSE error found", context.error());
        if (this.messageEndpointFuture != null) {
            this.messageEndpointFuture.completeExceptionally(context.error());
        }
    }

    private URI parseEndpointUri(String uriString) {
        URI maybeEndpoint = URI.create(uriString);
        if (!maybeEndpoint.isAbsolute()) {
            Object path = maybeEndpoint.getPath();
            if (this.serverUrl.endsWith("/")) {
                if (((String)path).startsWith("/")) {
                    path = ((String)path).substring(1);
                }
            } else if (!((String)path).startsWith("/")) {
                path = "/" + (String)path;
            }
            String url = this.serverUrl + (String)path;
            String query = maybeEndpoint.getQuery();
            if (!StringUtils.isBlank((String)query)) {
                url = url + "?" + query;
            }
            return URI.create(url);
        }
        return maybeEndpoint;
    }

    private SseListener onEndpointEvent() {
        return event -> {
            try {
                this.messageEndpoint = this.parseEndpointUri(event.getData());
                this.messageEndpointFuture.complete(null);
            }
            catch (Exception e) {
                try {
                    LOGGER.error("Error reading endpoint event", (Throwable)e);
                }
                finally {
                    this.messageEndpointFuture.completeExceptionally(e);
                }
            }
        };
    }

    private SseListener onMessageEvent() {
        return event -> {
            if (!this.opened.get()) {
                LOGGER.debug("received SSE event for path {} but transport closed. Ignoring", (Object)this.sseEndpointPath);
                return;
            }
            if (this.handler == null) {
                LOGGER.debug("Received message but client not connected to transport yet. Ignoring");
                return;
            }
            try {
                McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage((ObjectMapper)this.objectMapper, (String)event.getData());
                this.handler.apply((Mono<McpSchema.JSONRPCMessage>)Mono.just((Object)message)).subscribe();
            }
            catch (IOException e) {
                LOGGER.error("Exception processing message event", (Throwable)e);
            }
        };
    }

    private SseListener onUnknownEvent() {
        return event -> LOGGER.error("Received unexpected SSE event type: {}", (Object)event.getName());
    }
}

