/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mcp.internal.server.connection.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mulesoft.connectors.mcp.internal.jackson.ObjectMapperFactory;
import com.mulesoft.connectors.mcp.internal.server.connection.MuleServerSession;
import com.mulesoft.connectors.mcp.internal.server.connection.observer.RpcRequestObserver;
import com.mulesoft.connectors.mcp.internal.server.connection.observer.SessionObserver;
import com.mulesoft.connectors.mcp.internal.server.session.SessionManager;
import com.mulesoft.connectors.mcp.internal.util.HttpTransportUtils;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.server.async.HttpResponseReadyCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class BaseServerTransportProvider
implements McpServerTransportProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseServerTransportProvider.class);
    protected static final ObjectMapper objectMapper = ObjectMapperFactory.get();
    protected final AtomicBoolean open = new AtomicBoolean(false);
    protected final String refName;
    protected final SessionManager sessionManager;
    private McpServerSession.Factory sessionFactory;
    private SessionObserver sessionObserver = null;
    private RpcRequestObserver requestObserver = null;

    public BaseServerTransportProvider(String refName, SessionManager sessionManager) {
        this.refName = refName;
        this.sessionManager = sessionManager;
    }

    public void onNewSession(SessionObserver observer) {
        this.sessionObserver = observer;
    }

    public void onRPCRequest(RpcRequestObserver observer) {
        this.requestObserver = observer;
    }

    public final void open() throws ConnectionException {
        if (!this.open.compareAndSet(false, true)) {
            LOGGER.debug("Transport provider of config '{}' already open", (Object)this.refName);
            return;
        }
        try {
            this.doOpen();
        }
        catch (Exception e) {
            this.closeGracefully();
            throw new ConnectionException("Exception initializing transport for config " + this.refName, (Throwable)e);
        }
    }

    protected abstract void doOpen();

    public final Mono<Void> closeGracefully() {
        if (!this.open.compareAndSet(true, false)) {
            LOGGER.debug("Transport provider of config '{}' already closed", (Object)this.refName);
            return Mono.empty();
        }
        Collection<MuleServerSession> localSessions = this.sessionManager.getLocalSessions();
        LOGGER.debug("Initiating graceful shutdown of config '{}', notifying {} active sessions", (Object)this.refName, (Object)localSessions.size());
        return Flux.fromIterable(localSessions).flatMap(session -> session.closeGracefully().onErrorResume(e -> {
            LOGGER.error("Error while closing session {} of config '{}'", new Object[]{session.getId(), this.refName, e});
            return Mono.empty();
        })).then().doOnNext(v -> {
            try {
                this.doCloseGracefully();
            }
            catch (Exception e) {
                LOGGER.error("Error while closing transport provider of config '{}'", (Object)this.refName, (Object)e);
            }
            finally {
                this.sessionManager.close();
                this.sessionObserver = null;
                this.requestObserver = null;
            }
        });
    }

    protected abstract void doCloseGracefully();

    public Mono<Void> notifyClients(String method, Object params) {
        Collection<MuleServerSession> localSessions = this.sessionManager.getLocalSessions();
        if (localSessions.isEmpty()) {
            LOGGER.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        LOGGER.debug("Attempting to broadcast message to {} active sessions of config '{}'", (Object)localSessions.size(), (Object)this.refName);
        return Flux.fromIterable(localSessions).flatMap(session -> session.sendNotification(method, params).onErrorResume(e -> {
            LOGGER.error("Failed to send message to session {} of config {}: {}", new Object[]{session.getId(), this.refName, e.getMessage()});
            return Mono.empty();
        })).then();
    }

    protected boolean assureTransportOpen(HttpResponseReadyCallback responseCallback) {
        if (!this.open.get()) {
            LOGGER.info("Received message but transport stopped. Rejecting...");
            HttpTransportUtils.sendHttpResponse(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode(), "Service stopped", responseCallback);
            return false;
        }
        return true;
    }

    public void setSessionFactory(McpServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    protected Optional<SessionObserver> getSessionObserver() {
        return Optional.ofNullable(this.sessionObserver);
    }

    protected Optional<RpcRequestObserver> getRequestObserver() {
        return Optional.ofNullable(this.requestObserver);
    }

    protected McpServerSession.Factory getSessionFactory() {
        return this.sessionFactory;
    }
}

