/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.util.Assert;
import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class WebFluxSseClientTransport
implements McpClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class);
    private static final String MESSAGE_EVENT_TYPE = "message";
    private static final String ENDPOINT_EVENT_TYPE = "endpoint";
    private static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<ServerSentEvent<String>>(){};
    private final WebClient webClient;
    protected ObjectMapper objectMapper;
    private Disposable inboundSubscription;
    private volatile boolean isClosing = false;
    protected final Sinks.One<String> messageEndpointSink = Sinks.one();
    private String sseEndpoint;
    private BiConsumer<Retry.RetrySignal, SynchronousSink<Object>> inboundRetryHandler = (retrySpec, sink) -> {
        if (this.isClosing) {
            logger.debug("SSE connection closed during shutdown");
            sink.error(retrySpec.failure());
            return;
        }
        if (retrySpec.failure() instanceof IOException) {
            logger.debug("Retrying SSE connection after IO error");
            sink.next(retrySpec);
            return;
        }
        logger.error("Fatal SSE error, not retrying: {}", (Object)retrySpec.failure().getMessage());
        sink.error(retrySpec.failure());
    };

    public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
        this(webClientBuilder, new ObjectMapper());
    }

    public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
        this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
    }

    public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper, String sseEndpoint) {
        Assert.notNull((Object)objectMapper, (String)"ObjectMapper must not be null");
        Assert.notNull((Object)webClientBuilder, (String)"WebClient.Builder must not be null");
        Assert.hasText((String)sseEndpoint, (String)"SSE endpoint must not be null or empty");
        this.objectMapper = objectMapper;
        this.webClient = webClientBuilder.build();
        this.sseEndpoint = sseEndpoint;
    }

    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        Flux<ServerSentEvent<String>> events = this.eventStream();
        this.inboundSubscription = events.concatMap(event -> Mono.just((Object)event).handle((e, s) -> {
            if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
                String messageEndpointUri = (String)event.data();
                if (this.messageEndpointSink.tryEmitValue((Object)messageEndpointUri).isSuccess()) {
                    s.complete();
                } else {
                    s.error((Throwable)new McpError((Object)"Failed to handle SSE endpoint event"));
                }
            } else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
                try {
                    McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage((ObjectMapper)this.objectMapper, (String)((String)event.data()));
                    s.next((Object)message);
                }
                catch (IOException ioException) {
                    s.error((Throwable)ioException);
                }
            } else {
                s.error((Throwable)new McpError((Object)("Received unrecognized SSE event type: " + event.event())));
            }
        }).transform(handler)).subscribe();
        return this.messageEndpointSink.asMono().then();
    }

    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        return this.messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
            if (this.isClosing) {
                return Mono.empty();
            }
            try {
                String jsonText = this.objectMapper.writeValueAsString((Object)message);
                return ((WebClient.RequestBodySpec)this.webClient.post().uri(messageEndpointUri, new Object[0])).contentType(MediaType.APPLICATION_JSON).bodyValue((Object)jsonText).retrieve().toBodilessEntity().doOnSuccess(response -> logger.debug("Message sent successfully")).doOnError(error -> {
                    if (!this.isClosing) {
                        logger.error("Error sending message: {}", (Object)error.getMessage());
                    }
                });
            }
            catch (IOException e) {
                if (!this.isClosing) {
                    return Mono.error((Throwable)new RuntimeException("Failed to serialize message", e));
                }
                return Mono.empty();
            }
        }).then();
    }

    protected Flux<ServerSentEvent<String>> eventStream() {
        return this.webClient.get().uri(this.sseEndpoint, new Object[0]).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).retrieve().bodyToFlux(SSE_TYPE).retryWhen(Retry.from(retrySignal -> retrySignal.handle(this.inboundRetryHandler)));
    }

    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            if (this.inboundSubscription != null) {
                this.inboundSubscription.dispose();
            }
        }).then().subscribeOn(Schedulers.boundedElastic());
    }

    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }

    public static Builder builder(WebClient.Builder webClientBuilder) {
        return new Builder(webClientBuilder);
    }

    public static class Builder {
        private final WebClient.Builder webClientBuilder;
        private String sseEndpoint = "/sse";
        private ObjectMapper objectMapper = new ObjectMapper();

        public Builder(WebClient.Builder webClientBuilder) {
            Assert.notNull((Object)webClientBuilder, (String)"WebClient.Builder must not be null");
            this.webClientBuilder = webClientBuilder;
        }

        public Builder sseEndpoint(String sseEndpoint) {
            Assert.hasText((String)sseEndpoint, (String)"sseEndpoint must not be empty");
            this.sseEndpoint = sseEndpoint;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull((Object)objectMapper, (String)"objectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public WebFluxSseClientTransport build() {
            return new WebFluxSseClientTransport(this.webClientBuilder, this.objectMapper, this.sseEndpoint);
        }
    }
}

