/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.mcp.internal.server.source;

import com.mulesoft.connectors.mcp.internal.MessagingManager;
import com.mulesoft.connectors.mcp.internal.server.config.ServerConfig;
import com.mulesoft.connectors.mcp.internal.server.connection.observer.InboundRequestContext;
import com.mulesoft.connectors.mcp.internal.tracing.TracingUtils;
import io.modelcontextprotocol.server.McpAsyncServer;
import java.util.Optional;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.sdk.api.annotation.param.Config;
import org.mule.sdk.api.annotation.param.Connection;
import org.mule.sdk.api.runtime.source.Source;
import org.mule.sdk.api.runtime.source.SourceCallback;
import org.mule.sdk.api.runtime.source.SourceCallbackContext;
import reactor.core.publisher.MonoSink;

public abstract class AbstractRequestListener<T, A>
extends Source<T, A> {
    private static final String SINK_CTX_VAR = "_sink";
    @Inject
    protected MessagingManager messagingManager;
    @Config
    protected ServerConfig serverConfig;
    @Connection
    private ConnectionProvider<McpAsyncServer> connectionProvider;
    private McpAsyncServer server;

    public final void onStart(SourceCallback<T, A> sourceCallback) throws MuleException {
        this.server = (McpAsyncServer)this.connectionProvider.connect();
        this.doStart(sourceCallback);
    }

    protected abstract void doStart(SourceCallback<T, A> var1) throws MuleException;

    protected <T> void setInContext(MonoSink<T> sink, SourceCallbackContext ctx) {
        ctx.addVariable(SINK_CTX_VAR, sink);
    }

    protected <T> MonoSink<T> getSink(SourceCallbackContext ctx) {
        return (MonoSink)ctx.getVariable(SINK_CTX_VAR).get();
    }

    public final void onStop() {
        try {
            this.doStop();
        }
        finally {
            this.server = null;
        }
    }

    protected Optional<InboundRequestContext> getTracedInboundRequestContext(SourceCallbackContext ctx) {
        Optional<InboundRequestContext> inboundCtx = this.messagingManager.popInbound();
        inboundCtx.ifPresent(ic -> TracingUtils.traceInboundRequest(ic, ctx.getDistributedSourceTraceContext()));
        return inboundCtx;
    }

    protected abstract void doStop();

    protected McpAsyncServer getServer() {
        return this.server;
    }
}

