Compare commits

..

14 Commits
0.3.1 ... main

Author SHA1 Message Date
Sunrisea f24f12c4ba
Support service metadata, update nacos-sdk-python version (#22) 2025-08-08 11:00:09 +08:00
Sunrisea cf9477af7b
Update and optimize log (#19)
* update log

* version 1.0.8
2025-07-29 12:00:01 +08:00
Sunrisea 5ab63b71cd
version 1.0.7, update nacos-maintainer-sdk-python to 0.1.2 (#17)
* Fix bug

* version 1.0.7, update nacos-maintainer-sdk-python to 0.1.2
2025-07-17 21:04:52 +08:00
Sunrisea 291949b579
optimize register logic (#16) 2025-07-17 20:11:42 +08:00
Sunrisea 8aa1fe43f9
update nacos-sdk-python to version 2.0.7 (#15) 2025-07-11 16:08:26 +08:00
Sunrisea 710139761a
Update mcp version logic and fix bug (#14)
* Optimize version logic and fix bug

* update version to 1.0.3

* update readme
2025-07-02 19:10:56 +08:00
Sunrisea dce1cfb011
Fix bug and release version 1.0.2 (#13) 2025-07-01 20:44:53 +08:00
Sunrisea 3bf041460b
支持注册PORT 自定义 (#10) 2025-06-11 13:53:07 +08:00
Sunrisea 34442de68f
update README.md (#8) 2025-06-04 15:56:00 +08:00
Sunrisea 2950ad4fe6
fix dependency (#7) 2025-06-04 15:17:53 +08:00
Sunrisea c29e002875
Support Nacos 3.0.1 and streamable-http (#6) 2025-06-04 14:51:44 +08:00
fly 75f7543e46
Enhanced examples: (#1)
- Standardized comments in examples to pure English for consistency.
- Added a minimalist example [example/nacos_simple_example.py] to simplify user implementation.
- Included an SSE port configuration demo to facilitate fixing port conflicts.
2025-05-16 13:44:21 +08:00
Sunrisea 345571626a fix stdio transport protocol bug 2025-05-14 10:37:21 +08:00
Sunrisea 32e5f468f8 0.3.1 2025-05-08 11:39:46 +08:00
14 changed files with 776 additions and 203 deletions

View File

@ -8,6 +8,7 @@ Nacos-mcp-wrapper-python is a sdk that helps you quickly register your Mcp Serve
## Installation ## Installation
### Environment ### Environment
Starting from version 1.0.0 of nacos-mcp-wrapper-python, the Nacos server version must be greater than 3.0.1.
- python >=3.10 - python >=3.10
### Use pip ### Use pip
```bash ```bash
@ -43,7 +44,10 @@ from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# mcp = FastMCP("Demo") # mcp = FastMCP("Demo")
nacos_settings = NacosSettings() nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848" nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848"
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings) mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Add an addition tool # Add an addition tool
@ -192,6 +196,7 @@ async def list_tools() -> list[types.Tool]:
async def run(): async def run():
await server.register_to_nacos("stdio")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run( await server.run(
read_stream, read_stream,

View File

@ -9,6 +9,7 @@ Nacos 一个更易于构建云原生应用的动态服务发现、配置管理
## 安装 ## 安装
### 环境要求 ### 环境要求
nacos-mcp-wrapper-python 1.0.0及以上版本要求Nacos Sever版本 > 3.0.1
- python >=3.10 - python >=3.10
### 使用 PIP ### 使用 PIP
```bash ```bash
@ -44,7 +45,10 @@ from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# mcp = FastMCP("Demo") # mcp = FastMCP("Demo")
nacos_settings = NacosSettings() nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848" nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848"
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings) mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Add an addition tool # Add an addition tool
@ -191,6 +195,7 @@ async def list_tools() -> list[types.Tool]:
async def run(): async def run():
await server.register_to_nacos("stdio")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run( await server.run(
read_stream, read_stream,

View File

@ -1,34 +1,35 @@
from nacos_mcp_wrapper.server.nacos_mcp import NacosMCP from nacos_mcp_wrapper.server.nacos_mcp import NacosMCP
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# Create an MCP server # Create an MCP server instance
# mcp = FastMCP("Demo")
nacos_settings = NacosSettings() nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g. 127.0.0.1:8848" nacos_settings.SERVER_ADDR = "127.0.0.1:8848" # <nacos_server_addr> e.g. 127.0.0.1:8848
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings) nacos_settings.USERNAME=""
# Add an addition tool nacos_settings.PASSWORD=""
mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Register an addition tool
@mcp.tool() @mcp.tool()
def add(a: int, b: int) -> int: def add(a: int, b: int) -> int:
"""Add two numbers""" """Add two integers together"""
return a + b return a + b
# Register a subtraction tool
@mcp.tool() @mcp.tool()
def minus(a: int, b: int) -> int: def minus(a: int, b: int) -> int:
"""Subtract two numbers""" """Subtract two numbers"""
return a - b return a - b
# Register a prompt function
@mcp.prompt() @mcp.prompt()
def get_prompt(topic: str) -> str: def get_prompt(topic: str) -> str:
"""Get a personalized greeting""" """Get a personalized greeting"""
return f"Hello, {topic}!" return f"Hello, {topic}!"
@mcp.resource("greeting://{name}") # Register a dynamic resource endpoint
def get_resource(name: str) -> str:
"""Get a file"""
return f"Hello, {name}!"
# Add a dynamic greeting resource
@mcp.resource("greeting://{name}") @mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str: def get_greeting(name: str) -> str:
"""Get a personalized greeting""" """Get a personalized greeting"""
@ -37,5 +38,5 @@ def get_greeting(name: str) -> str:
if __name__ == "__main__": if __name__ == "__main__":
try: try:
mcp.run(transport="sse") mcp.run(transport="sse")
except ValueError as e: except Exception as e:
print(f"运行时发生错误: {e}") print(f"Runtime error: {e}")

View File

@ -22,16 +22,19 @@ async def fetch_website(
@click.command() @click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE") @click.option("--port", default=18002, help="Port to listen on for SSE")
@click.option("--server_addr", default="127.0.0.1:8848", help="Nacos server address")
@click.option( @click.option(
"--transport", "--transport",
type=click.Choice(["stdio", "sse"]), type=click.Choice(["stdio", "sse"]),
default="sse", default="sse",
help="Transport type", help="Transport type",
) )
def main(port: int, transport: str) -> int: def main(port: int, transport: str, server_addr: str) -> int:
nacos_settings = NacosSettings() nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g. 127.0.0.1:8848" nacos_settings.SERVER_ADDR = server_addr
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
# app = Server("mcp-website-fetcher") # app = Server("mcp-website-fetcher")
app = NacosServer("mcp-website-fetcher",nacos_settings=nacos_settings) app = NacosServer("mcp-website-fetcher",nacos_settings=nacos_settings)
@ -76,7 +79,7 @@ def main(port: int, transport: str) -> int:
async with sse.connect_sse( async with sse.connect_sse(
request.scope, request.receive, request._send request.scope, request.receive, request._send
) as streams: ) as streams:
# 0 进 1出 # 0 input stream, 1 output stream
await app.run( await app.run(
streams[0], streams[1], app.create_initialization_options() streams[0], streams[1], app.create_initialization_options()
) )

View File

@ -0,0 +1,30 @@
import click
from nacos_mcp_wrapper.server.nacos_mcp import NacosMCP
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from datetime import datetime
@click.command()
@click.option("--port", default=18003, help="Port to listen on for SSE")
@click.option("--name", default="nacos-simple-mcp", help="The name of the MCP service")
@click.option("--server_addr", default="127.0.0.1:8848", help="Nacos server address")
def main(port: int, name: str, server_addr: str):
# Registration settings for Nacos
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = server_addr
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
mcp = NacosMCP(name=name, nacos_settings=nacos_settings, port=port)
@mcp.tool()
def get_datetime() -> str:
"""Get current datetime as string"""
return datetime.now().isoformat() # 返回字符串格式的时间
try:
mcp.run(transport="sse")
except ValueError as e:
print(f"Runtime errors: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,105 @@
"""
In-memory event store for demonstrating resumability functionality.
This is a simple implementation intended for examples and testing,
not for production use where a persistent storage solution would be more appropriate.
"""
import logging
from collections import deque
from dataclasses import dataclass
from uuid import uuid4
from mcp.server.streamable_http import (
EventCallback,
EventId,
EventMessage,
EventStore,
StreamId,
)
from mcp.types import JSONRPCMessage
logger = logging.getLogger(__name__)
@dataclass
class EventEntry:
"""
Represents an event entry in the event store.
"""
event_id: EventId
stream_id: StreamId
message: JSONRPCMessage
class InMemoryEventStore(EventStore):
"""
Simple in-memory implementation of the EventStore interface for resumability.
This is primarily intended for examples and testing, not for production use
where a persistent storage solution would be more appropriate.
This implementation keeps only the last N events per stream for memory efficiency.
"""
def __init__(self, max_events_per_stream: int = 100):
"""Initialize the event store.
Args:
max_events_per_stream: Maximum number of events to keep per stream
"""
self.max_events_per_stream = max_events_per_stream
# for maintaining last N events per stream
self.streams: dict[StreamId, deque[EventEntry]] = {}
# event_id -> EventEntry for quick lookup
self.event_index: dict[EventId, EventEntry] = {}
async def store_event(
self, stream_id: StreamId, message: JSONRPCMessage
) -> EventId:
"""Stores an event with a generated event ID."""
event_id = str(uuid4())
event_entry = EventEntry(
event_id=event_id, stream_id=stream_id, message=message
)
# Get or create deque for this stream
if stream_id not in self.streams:
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
# If deque is full, the oldest event will be automatically removed
# We need to remove it from the event_index as well
if len(self.streams[stream_id]) == self.max_events_per_stream:
oldest_event = self.streams[stream_id][0]
self.event_index.pop(oldest_event.event_id, None)
# Add new event
self.streams[stream_id].append(event_entry)
self.event_index[event_id] = event_entry
return event_id
async def replay_events_after(
self,
last_event_id: EventId,
send_callback: EventCallback,
) -> StreamId | None:
"""Replays events that occurred after the specified event ID."""
if last_event_id not in self.event_index:
logger.warning(f"Event ID {last_event_id} not found in store")
return None
# Get the stream and find events after the last one
last_event = self.event_index[last_event_id]
stream_id = last_event.stream_id
stream_events = self.streams.get(last_event.stream_id, deque())
# Events in deque are already in chronological order
found_last = False
for event in stream_events:
if found_last:
await send_callback(EventMessage(event.message, event.event_id))
elif event.event_id == last_event_id:
found_last = True
return stream_id

View File

@ -0,0 +1,170 @@
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator
import anyio
import mcp.types as types
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from pydantic import AnyUrl
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
from nacos_mcp_wrapper.server.nacos_server import NacosServer
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from event_store import InMemoryEventStore
# Configure logging
logger = logging.getLogger(__name__)
async def main(
log_level: str="INFO",
json_response: bool=True,
port: int=7001,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "127.0.0.1:8848" # <nacos_server_addr> e.g. 127.0.0.1:8848
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
app = NacosServer("mcp-streamable-http-demo",nacos_settings=nacos_settings)
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")
# Send the specified number of notifications with the given interval
for i in range(count):
# Include more detailed message for resumability demonstration
notification_msg = (
f"[{i+1}/{count}] Event from '{caller}' - "
f"Use Last-Event-ID to resume if disconnected"
)
await ctx.session.send_log_message(
level="info",
data=notification_msg,
logger="notification_stream",
# Associates this notification with the original request
# Ensures notifications are sent to the correct response stream
# Without this, notifications will either go to:
# - a standalone SSE stream (if GET request is supported)
# - nowhere (if GET request isn't supported)
related_request_id=ctx.request_id,
)
logger.debug(f"Sent notification {i+1}/{count} for caller: {caller}")
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)
# This will send a resource notificaiton though standalone SSE
# established by GET request
await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource"))
return [
types.TextContent(
type="text",
text=(
f"Sent {count} notifications with {interval}s interval"
f" for caller: {caller}"
),
)
]
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="start-notification-stream",
description=(
"Sends a stream of notifications with configurable count"
" and interval"
),
inputSchema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": (
"Identifier of the caller to include in notifications"
),
},
},
},
)
]
# Create event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()
# Create the session manager with our app and event store
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=event_store, # Enable resumability
json_response=json_response,
)
# ASGI handler for streamable HTTP connections
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for managing session manager lifecycle."""
async with session_manager.run():
logger.info("Application started with StreamableHTTP session manager!")
try:
yield
finally:
logger.info("Application shutting down...")
# Create an ASGI application using the transport
await app.register_to_nacos("streamable-http", port, "/mcp")
starlette_app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=handle_streamable_http),
],
lifespan=lifespan,
)
import uvicorn
config = uvicorn.Config(
starlette_app,
host="127.0.0.1",
port=port,
)
server = uvicorn.Server(config)
await server.serve()
return 0
asyncio.run(main())

View File

@ -1,29 +0,0 @@
from pydantic import BaseModel
class ServiceRef(BaseModel):
namespaceId: str
groupName: str
serviceName: str
class RemoteServerConfig(BaseModel):
serviceRef: ServiceRef | None = None
exportPath: str = None
class MCPServerInfo(BaseModel):
protocol: str
name: str
description: str | None = None
version: str | None = None
enabled: bool = True
remoteServerConfig: RemoteServerConfig | None = None
localServerConfig: dict | None = None
toolsDescriptionRef: str | None = None
promptDescriptionRef: str | None = None
resourceDescriptionRef: str | None = None
class ToolMeta(BaseModel):
enabled: bool = True

View File

@ -4,33 +4,47 @@ from typing import Any
import uvicorn import uvicorn
from mcp import stdio_server from mcp import stdio_server
from mcp.server import FastMCP from mcp.server import FastMCP
from mcp.server.auth.provider import OAuthAuthorizationServerProvider
from mcp.server.fastmcp.server import lifespan_wrapper from mcp.server.fastmcp.server import lifespan_wrapper
from mcp.server.fastmcp.tools import Tool
from mcp.server.lowlevel.server import lifespan as default_lifespan from mcp.server.lowlevel.server import lifespan as default_lifespan
from mcp.server.streamable_http import EventStore
from nacos_mcp_wrapper.server.nacos_server import NacosServer from nacos_mcp_wrapper.server.nacos_server import NacosServer
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NacosMCP(FastMCP): class NacosMCP(FastMCP):
def __init__(self, def __init__(self,
name: str | None = None, name: str | None = None,
nacos_settings: NacosSettings | None = None, nacos_settings: NacosSettings | None = None,
instructions: str | None = None, instructions: str | None = None,
**settings: Any): auth_server_provider: OAuthAuthorizationServerProvider[
super().__init__(name, instructions, **settings) Any, Any, Any]
| None = None,
event_store: EventStore | None = None,
*,
tools: list[Tool] | None = None,
version: str | None = None,
**settings: Any,
):
if "host" not in settings:
settings["host"] = "0.0.0.0"
super().__init__(name, instructions, auth_server_provider, event_store,
tools=tools, **settings)
self._mcp_server = NacosServer( self._mcp_server = NacosServer(
nacos_settings=nacos_settings, nacos_settings=nacos_settings,
name=name or "FastMCP", name=name or "FastMCP",
instructions=instructions, instructions=instructions,
version=version,
lifespan=lifespan_wrapper(self, self.settings.lifespan) lifespan=lifespan_wrapper(self, self.settings.lifespan)
if self.settings.lifespan if self.settings.lifespan
else default_lifespan, else default_lifespan,
) )
self.dependencies = self.settings.dependencies
# Set up MCP protocol handlers # Set up MCP protocol handlers
self._setup_handlers() self._setup_handlers()
@ -44,10 +58,11 @@ class NacosMCP(FastMCP):
self._mcp_server.create_initialization_options(), self._mcp_server.create_initialization_options(),
) )
async def run_sse_async(self) -> None: async def run_sse_async(self, mount_path: str | None = None) -> None:
"""Run the server using SSE transport.""" """Run the server using SSE transport."""
starlette_app = self.sse_app() starlette_app = self.sse_app(mount_path)
await self._mcp_server.register_to_nacos("sse", self.settings.port, self.settings.sse_path) await self._mcp_server.register_to_nacos("sse", self.settings.port,
self.settings.sse_path)
config = uvicorn.Config( config = uvicorn.Config(
starlette_app, starlette_app,
host=self.settings.host, host=self.settings.host,
@ -57,3 +72,19 @@ class NacosMCP(FastMCP):
server = uvicorn.Server(config) server = uvicorn.Server(config)
await server.serve() await server.serve()
async def run_streamable_http_async(self) -> None:
"""Run the server using StreamableHTTP transport."""
import uvicorn
starlette_app = self.streamable_http_app()
await self._mcp_server.register_to_nacos("streamable-http",
self.settings.port,
self.settings.streamable_http_path)
config = uvicorn.Config(
starlette_app,
host=self.settings.host,
port=self.settings.port,
log_level=self.settings.log_level.lower(),
)
server = uvicorn.Server(config)
await server.serve()

View File

@ -1,24 +1,38 @@
import asyncio
import json import json
import logging import logging
from contextlib import AbstractAsyncContextManager, asynccontextmanager from contextlib import AbstractAsyncContextManager
from typing import Literal, Callable, AsyncIterator, Any from typing import Literal, Callable, Any
from importlib import metadata
import jsonref import jsonref
from maintainer.ai.model.nacos_mcp_info import McpToolMeta, McpServerDetailInfo, \
McpTool, McpServiceRef, McpToolSpecification, McpServerBasicInfo, \
McpEndpointSpec, McpServerRemoteServiceConfig
from maintainer.ai.model.registry_mcp_info import ServerVersionDetail
from maintainer.ai.nacos_mcp_service import NacosAIMaintainerService
from maintainer.common.ai_maintainer_client_config_builder import \
AIMaintainerClientConfigBuilder
from mcp import types, Tool from mcp import types, Tool
from mcp.server import Server from mcp.server import Server
from mcp.server.lowlevel.server import LifespanResultT from mcp.server.lowlevel.server import LifespanResultT, RequestT
from mcp.server.lowlevel.server import lifespan from mcp.server.lowlevel.server import lifespan
from v2.nacos import NacosConfigService, ConfigParam, \
NacosNamingService, RegisterInstanceParam, ClientConfigBuilder
from nacos_mcp_wrapper.server.mcp_server_info import MCPServerInfo, ServiceRef, \ from v2.nacos import NacosNamingService, RegisterInstanceParam, \
RemoteServerConfig ClientConfigBuilder, NacosException
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from nacos_mcp_wrapper.server.utils import get_first_non_loopback_ip, \ from nacos_mcp_wrapper.server.utils import get_first_non_loopback_ip, \
ConfigSuffix, jsonref_default jsonref_default, compare, pkg_version
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TRANSPORT_MAP = {
"stdio": "stdio",
"sse": "mcp-sse",
"streamable-http": "mcp-streamable",
}
class NacosServer(Server): class NacosServer(Server):
def __init__( def __init__(
self, self,
@ -27,26 +41,44 @@ class NacosServer(Server):
version: str | None = None, version: str | None = None,
instructions: str | None = None, instructions: str | None = None,
lifespan: Callable[ lifespan: Callable[
[Server[LifespanResultT]], AbstractAsyncContextManager[ [Server[LifespanResultT, RequestT]],
LifespanResultT] AbstractAsyncContextManager[LifespanResultT],
] = lifespan, ] = lifespan,
): ):
if version is None:
version = pkg_version("mcp")
super().__init__(name, version, instructions, lifespan) super().__init__(name, version, instructions, lifespan)
if nacos_settings == None: if nacos_settings == None:
nacos_settings = NacosSettings() nacos_settings = NacosSettings()
if nacos_settings.SERVICE_NAMESPACE == "": if nacos_settings.NAMESPACE == "":
nacos_settings.SERVICE_NAMESPACE = "public" nacos_settings.NAMESPACE = "public"
self._nacos_settings = nacos_settings self._nacos_settings = nacos_settings
if self._nacos_settings.SERVICE_IP is None: if self._nacos_settings.SERVICE_IP is None:
self._nacos_settings.SERVICE_IP = get_first_non_loopback_ip() self._nacos_settings.SERVICE_IP = get_first_non_loopback_ip()
ai_client_config_builder = AIMaintainerClientConfigBuilder()
ai_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).access_key(
self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password(
self._nacos_settings.PASSWORD).app_conn_labels(
self._nacos_settings.APP_CONN_LABELS)
if self._nacos_settings.CREDENTIAL_PROVIDER is not None:
ai_client_config_builder.credentials_provider(
self._nacos_settings.CREDENTIAL_PROVIDER)
self._ai_client_config = ai_client_config_builder.build()
naming_client_config_builder = ClientConfigBuilder() naming_client_config_builder = ClientConfigBuilder()
naming_client_config_builder.server_address( naming_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).endpoint( self._nacos_settings.SERVER_ADDR).namespace_id(
self._nacos_settings.SERVER_ENDPOINT).namespace_id( self._nacos_settings.NAMESPACE).access_key(
self._nacos_settings.SERVICE_NAMESPACE).access_key(
self._nacos_settings.ACCESS_KEY).secret_key( self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username( self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password( self._nacos_settings.USERNAME).password(
@ -59,25 +91,8 @@ class NacosServer(Server):
self._naming_client_config = naming_client_config_builder.build() self._naming_client_config = naming_client_config_builder.build()
config_client_config_builder = ClientConfigBuilder()
config_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).endpoint(
self._nacos_settings.SERVER_ENDPOINT).namespace_id(
"nacos-default-mcp").access_key(
self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password(
self._nacos_settings.PASSWORD).app_conn_labels(
self._nacos_settings.APP_CONN_LABELS)
if self._nacos_settings.CREDENTIAL_PROVIDER is not None:
config_client_config_builder.credentials_provider(
self._nacos_settings.CREDENTIAL_PROVIDER)
self._config_client_config = config_client_config_builder.build()
self._tmp_tools: dict[str, Tool] = {} self._tmp_tools: dict[str, Tool] = {}
self._tools_meta = {} self._tools_meta: dict[str, McpToolMeta] = {}
self._tmp_tools_list_handler = None self._tmp_tools_list_handler = None
async def _list_tmp_tools(self) -> list[Tool]: async def _list_tmp_tools(self) -> list[Tool]:
@ -93,38 +108,43 @@ class NacosServer(Server):
] ]
def is_tool_enabled(self, tool_name: str) -> bool: def is_tool_enabled(self, tool_name: str) -> bool:
if self._tools_meta is None:
return True
if tool_name in self._tools_meta: if tool_name in self._tools_meta:
if "enabled" in self._tools_meta[tool_name]: mcp_tool_meta = self._tools_meta[tool_name]
if not self._tools_meta[tool_name]["enabled"]: if mcp_tool_meta.enabled is not None:
if not mcp_tool_meta.enabled:
return False return False
return True return True
async def tool_list_listener(self, tenant_id: str, group_id: str, def update_tools(self,server_detail_info:McpServerDetailInfo):
data_id: str, content: str):
self.update_local_tools(content)
def update_local_tools(self,nacos_tools:str):
def update_args_description(_local_args:dict[str, Any], _nacos_args:dict[str, Any]): def update_args_description(_local_args:dict[str, Any], _nacos_args:dict[str, Any]):
for key, value in _local_args.items(): for key, value in _local_args.items():
if key in _nacos_args and "description" in _nacos_args[key]: if key in _nacos_args and "description" in _nacos_args[key]:
_local_args[key]["description"] = _nacos_args[key][ _local_args[key]["description"] = _nacos_args[key][
"description"] "description"]
nacos_tools_dict = json.loads(nacos_tools) tool_spec = server_detail_info.toolSpec
if "toolsMeta" in nacos_tools_dict: if tool_spec is None:
self._tools_meta = nacos_tools_dict["toolsMeta"]
if "tools" not in nacos_tools_dict:
return return
for nacos_tool in nacos_tools_dict["tools"]: if tool_spec.toolsMeta is None:
if nacos_tool["name"] in self._tmp_tools: self._tools_meta = {}
local_tool = self._tmp_tools[nacos_tool["name"]] else:
if "description" in nacos_tool: self._tools_meta = tool_spec.toolsMeta
local_tool.description = nacos_tool["description"] if tool_spec.tools is None:
return
for tool in tool_spec.tools:
if tool.name in self._tmp_tools:
local_tool = self._tmp_tools[tool.name]
if tool.description is not None:
local_tool.description = tool.description
local_args = local_tool.inputSchema["properties"] local_args = local_tool.inputSchema["properties"]
nacos_args = nacos_tool["inputSchema"]["properties"] nacos_args = tool.inputSchema["properties"]
update_args_description(local_args, nacos_args) update_args_description(local_args, nacos_args)
break continue
async def init_tools_tmp(self): async def init_tools_tmp(self):
_tmp_tools = await self.request_handlers[ _tmp_tools = await self.request_handlers[
@ -141,105 +161,227 @@ class NacosServer(Server):
resolved_data = json.loads(resolved_data) resolved_data = json.loads(resolved_data)
tool.inputSchema = resolved_data tool.inputSchema = resolved_data
def check_tools_compatible(self,server_detail_info:McpServerDetailInfo) -> bool:
if (server_detail_info.toolSpec is None
or server_detail_info.toolSpec.tools is None or len(server_detail_info.toolSpec.tools) == 0):
return True
tools_spec = server_detail_info.toolSpec
tools_in_nacos = {}
for tool in tools_spec.tools:
tools_in_nacos[tool.name] = tool
tools_in_local = {}
for name,tool in self._tmp_tools.items():
tools_in_local[name] = McpTool(
name=tool.name,
description=tool.description,
inputSchema=tool.inputSchema,
)
if tools_in_nacos.keys() != tools_in_local.keys():
return False
for name,tool in tools_in_nacos.items():
str_tools_in_nacos = tool.model_dump_json(exclude_none=True)
str_tools_in_local = tools_in_local[name].model_dump_json(exclude_none=True)
if not compare(str_tools_in_nacos, str_tools_in_local):
return False
return True
def check_compatible(self,server_detail_info:McpServerDetailInfo) -> (bool,str):
if server_detail_info.version != self.version:
return False, f"version not compatible, local version:{self.version}, remote version:{server_detail_info.version}"
if server_detail_info.protocol != self.type:
return False, f"protocol not compatible, local protocol:{self.type}, remote protocol:{server_detail_info.protocol}"
if types.ListToolsRequest in self.request_handlers:
checkToolsResult = self.check_tools_compatible(server_detail_info)
if not checkToolsResult:
return False , f"tools not compatible, local tools:{self._tmp_tools}, remote tools:{server_detail_info.toolSpec}"
mcp_service_ref = server_detail_info.remoteServerConfig.serviceRef
is_same_service,error_msg = self.is_service_ref_same(mcp_service_ref)
if not is_same_service:
return False, error_msg
return True, ""
def is_service_ref_same(self,mcp_service_ref:McpServiceRef) -> (bool,str):
if self._nacos_settings.SERVICE_NAME is not None and self._nacos_settings.SERVICE_NAME != mcp_service_ref.serviceName:
return False, f"service name not compatible, local service name:{self._nacos_settings.SERVICE_NAME}, remote service name:{mcp_service_ref.serviceName}"
if self._nacos_settings.SERVICE_GROUP is not None and self._nacos_settings.SERVICE_GROUP != mcp_service_ref.groupName:
return False, f"group name not compatible, local group name:{self._nacos_settings.SERVICE_GROUP}, remote group name:{mcp_service_ref.groupName}"
if mcp_service_ref.namespaceId != self._nacos_settings.NAMESPACE:
return False, f"namespace id not compatible, local namespace id:{self._nacos_settings.NAMESPACE}, remote namespace id:{mcp_service_ref.namespaceId}"
return True, ""
def get_register_service_name(self) -> str:
if self._nacos_settings.SERVICE_NAME is not None:
return self._nacos_settings.SERVICE_NAME
else:
return self.name + "::" + self.version
async def subscribe(self):
while True:
try:
await asyncio.sleep(30)
except asyncio.TimeoutError:
logging.debug("Timeout occurred")
except asyncio.CancelledError:
return
try:
server_detail_info = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
if server_detail_info is not None:
self.update_tools(server_detail_info)
except Exception as e:
logging.info(
f"can not found McpServer info from nacos,{self.name},version:{self.version}")
async def register_to_nacos(self, async def register_to_nacos(self,
transport: Literal["stdio", "sse"] = "stdio", transport: Literal["stdio", "sse","streamable-http"] = "stdio",
port: int = 8000, port: int = 8000,
path: str = "/sse"): path: str = "/sse"):
try: try:
config_client = await NacosConfigService.create_config_service( self.type = TRANSPORT_MAP.get(transport, None)
self._config_client_config) self.mcp_service = await NacosAIMaintainerService.create_mcp_service(
self._ai_client_config
mcp_tools_data_id = self.name + ConfigSuffix.TOOLS.value )
mcp_servers_data_id = self.name + ConfigSuffix.MCP_SERVER.value self.naming_client = await NacosNamingService.create_naming_service(
self._naming_client_config)
server_detail_info = None
try:
server_detail_info = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
except Exception as e:
logging.info(f"can not found McpServer info from nacos,{self.name},version:{self.version}")
if types.ListToolsRequest in self.request_handlers: if types.ListToolsRequest in self.request_handlers:
await self.init_tools_tmp() await self.init_tools_tmp()
self.list_tools()(self._list_tmp_tools) self.list_tools()(self._list_tmp_tools)
nacos_tools = await config_client.get_config(ConfigParam( if server_detail_info is not None:
data_id=mcp_tools_data_id, group="mcp-tools" is_compatible, error_msg = self.check_compatible(server_detail_info)
)) if not is_compatible:
if nacos_tools is not None and nacos_tools != "": logging.error(f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}")
self.update_local_tools(nacos_tools) raise NacosException(
_tmp_tools = await self.request_handlers[ f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}"
types.ListToolsRequest]( )
self) if types.ListToolsRequest in self.request_handlers:
tools_dict = _tmp_tools.model_dump( self.update_tools(server_detail_info)
by_alias=True, mode="json", exclude_none=True asyncio.create_task(self.subscribe())
) if self._nacos_settings.SERVICE_REGISTER and (self.type == "mcp-sse"
tools_dict["toolsMeta"] = self._tools_meta or self.type == "mcp-streamable"):
await config_client.publish_config(ConfigParam( version = metadata.version('nacos-mcp-wrapper-python')
data_id=mcp_tools_data_id, group="mcp-tools", service_meta_data = {
content=json.dumps(tools_dict, indent=2) "source": f"nacos-mcp-wrapper-python-{version}",
)) **self._nacos_settings.SERVICE_META_DATA}
self.list_tools()(self._list_tmp_tools) await self.naming_client.register_instance(
await config_client.add_listener(mcp_tools_data_id, "mcp-tools",
self.tool_list_listener)
server_info_content = await config_client.get_config(ConfigParam(
data_id=mcp_servers_data_id, group="mcp-server"
))
server_description = self.name
if self.instructions is not None:
server_description = self.instructions
if server_info_content is not None and server_info_content != "":
server_info_dict = json.loads(server_info_content)
if "description" in server_info_dict:
server_description = server_info_dict["description"]
if transport == "stdio":
mcp_server_info = MCPServerInfo(
protocol="local",
name=self.name,
description=server_description,
version=self.version,
toolsDescriptionRef=mcp_tools_data_id,
)
mcp_server_info_dict = mcp_server_info.model_dump(
by_alias=True, mode="json", exclude_none=True
)
await config_client.publish_config(ConfigParam(
data_id=mcp_servers_data_id, group="mcp-server",
content=json.dumps(mcp_server_info_dict, indent=2)
))
elif transport == "sse":
if self._nacos_settings.SERVICE_REGISTER:
naming_client = await NacosNamingService.create_naming_service(
self._naming_client_config)
await naming_client.register_instance(
request=RegisterInstanceParam( request=RegisterInstanceParam(
group_name=self._nacos_settings.SERVICE_GROUP, group_name=server_detail_info.remoteServerConfig.serviceRef.groupName,
service_name=self.name + "-mcp-service", service_name=server_detail_info.remoteServerConfig.serviceRef.serviceName,
ip=self._nacos_settings.SERVICE_IP, ip=self._nacos_settings.SERVICE_IP,
port=port, port=self._nacos_settings.SERVICE_PORT if self._nacos_settings.SERVICE_PORT else port,
ephemeral=self._nacos_settings.SERVICE_EPHEMERAL, ephemeral=self._nacos_settings.SERVICE_EPHEMERAL,
metadata=service_meta_data
) )
) )
mcp_server_info = MCPServerInfo( logging.info(f"Register to nacos success,{self.name},version:{self.version}")
protocol="mcp-sse", return
name=self.name,
description=self.instructions, mcp_tool_specification = None
version=self.version, if types.ListToolsRequest in self.request_handlers:
remoteServerConfig=RemoteServerConfig( tool_spec = [
serviceRef=ServiceRef( McpTool(
namespaceId=self._nacos_settings.SERVICE_NAMESPACE, name=tool.name,
serviceName=self.name + "-mcp-service", description=tool.description,
groupName=self._nacos_settings.SERVICE_GROUP inputSchema=tool.inputSchema,
), )
exportPath=path, for tool in list(self._tmp_tools.values())
), ]
toolsDescriptionRef=mcp_tools_data_id, mcp_tool_specification = McpToolSpecification(
tools=tool_spec
) )
mcp_server_info_dict = mcp_server_info.model_dump(
by_alias=True, mode="json", exclude_none=True server_version_detail = ServerVersionDetail()
server_version_detail.version = self.version
server_basic_info = McpServerBasicInfo()
server_basic_info.name = self.name
server_basic_info.versionDetail = server_version_detail
server_basic_info.description = self.instructions or self.name
endpoint_spec = McpEndpointSpec()
if self.type == "stdio":
server_basic_info.protocol = self.type
server_basic_info.frontProtocol = self.type
else:
endpoint_spec.type = "REF"
data = {
"serviceName": self.get_register_service_name(),
"groupName": "DEFAULT_GROUP" if self._nacos_settings.SERVICE_GROUP is None else self._nacos_settings.SERVICE_GROUP,
"namespaceId": self._nacos_settings.NAMESPACE,
}
endpoint_spec.data = data
remote_server_config_info = McpServerRemoteServiceConfig()
remote_server_config_info.exportPath = path
server_basic_info.remoteServerConfig = remote_server_config_info
server_basic_info.protocol = self.type
server_basic_info.frontProtocol = self.type
try:
await self.mcp_service.create_mcp_server(self._nacos_settings.NAMESPACE,
self.name,
server_basic_info,
mcp_tool_specification,
endpoint_spec)
except Exception as e:
logger.info(f"Found MCP server {self.name} in Nacos,try to update it")
version_detail = None
try:
version_detail = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
except Exception as e_2:
logger.info(f" Version {self.version} of Mcp server {self.name} is not in Nacos, try to update it")
if version_detail is None:
await self.mcp_service.update_mcp_server(
self._nacos_settings.NAMESPACE,
self.name,
True,
server_basic_info,
mcp_tool_specification,
endpoint_spec
)
else:
_is_compatible,error_msg = self.check_compatible(version_detail)
if not _is_compatible:
logging.error(f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}")
raise NacosException(
f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}"
)
if self._nacos_settings.SERVICE_REGISTER:
version = metadata.version('nacos-mcp-wrapper-python')
service_meta_data = {"source": f"nacos-mcp-wrapper-python-{version}",**self._nacos_settings.SERVICE_META_DATA}
await self.naming_client.register_instance(
request=RegisterInstanceParam(
group_name="DEFAULT_GROUP" if self._nacos_settings.SERVICE_GROUP is None else self._nacos_settings.SERVICE_GROUP,
service_name=self.get_register_service_name(),
ip=self._nacos_settings.SERVICE_IP,
port=self._nacos_settings.SERVICE_PORT if self._nacos_settings.SERVICE_PORT else port,
ephemeral=self._nacos_settings.SERVICE_EPHEMERAL,
metadata=service_meta_data,
)
) )
await config_client.publish_config(ConfigParam( asyncio.create_task(self.subscribe())
data_id=mcp_servers_data_id, group="mcp-server", logging.info(f"Register to nacos success,{self.name},version:{self.version}")
content=json.dumps(mcp_server_info_dict, indent=2)
))
except Exception as e: except Exception as e:
logging.error(f"Failed to register MCP server to Nacos: {e}") logging.error(f"Failed to register MCP server to Nacos: {e}")

View File

@ -11,10 +11,6 @@ class NacosSettings(BaseSettings):
description="nacos server address", description="nacos server address",
default="127.0.0.1:8848") default="127.0.0.1:8848")
SERVER_ENDPOINT : Optional[str] = Field(
description="nacos server endpoint",
default=None)
SERVICE_REGISTER : bool = Field( SERVICE_REGISTER : bool = Field(
description="whether to register service to nacos", description="whether to register service to nacos",
default=True) default=True)
@ -23,18 +19,26 @@ class NacosSettings(BaseSettings):
description="whether to register service as ephemeral", description="whether to register service as ephemeral",
default=True) default=True)
SERVICE_NAMESPACE : str = Field( NAMESPACE : str = Field(
description="nacos service namespace", description="nacos namespace",
default="public") default="public")
SERVICE_GROUP : str = Field( SERVICE_GROUP : Optional[str] = Field(
description="nacos service group", description="nacos service group",
default="DEFAULT_GROUP") default=None)
SERVICE_NAME : Optional[str] = Field(
description="nacos service name",
default=None)
SERVICE_IP : Optional[str] = Field( SERVICE_IP : Optional[str] = Field(
description="nacos service ip", description="nacos service ip",
default=None) default=None)
SERVICE_PORT : Optional[int] = Field(
description="nacos service port",
default=None)
USERNAME : Optional[str] = Field( USERNAME : Optional[str] = Field(
description="nacos username for authentication", description="nacos username for authentication",
default=None) default=None)
@ -59,6 +63,10 @@ class NacosSettings(BaseSettings):
description="nacos connection labels", description="nacos connection labels",
default={}) default={})
SERVICE_META_DATA : Optional[dict] = Field(
description="nacos service metadata",
default={})
class Config: class Config:
env_prefix = "NACOS_MCP_SERVER_" env_prefix = "NACOS_MCP_SERVER_"

View File

@ -2,6 +2,7 @@ import asyncio
import socket import socket
import threading import threading
from enum import Enum from enum import Enum
import json
import jsonref import jsonref
import psutil import psutil
@ -14,6 +15,16 @@ def get_first_non_loopback_ip():
return addr.address return addr.address
return None return None
def pkg_version(package: str) -> str:
try:
from importlib.metadata import version
return version(package)
except Exception:
pass
return "1.0.0"
def jsonref_default(obj): def jsonref_default(obj):
if isinstance(obj, jsonref.JsonRef): if isinstance(obj, jsonref.JsonRef):
return obj.__subject__ return obj.__subject__
@ -24,4 +35,94 @@ class ConfigSuffix(Enum):
TOOLS = "-mcp-tools.json" TOOLS = "-mcp-tools.json"
PROMPTS = "-mcp-prompt.json" PROMPTS = "-mcp-prompt.json"
RESOURCES = "-mcp-resource.json" RESOURCES = "-mcp-resource.json"
MCP_SERVER = "-mcp-server.json" MCP_SERVER = "-mcp-server.json"
def compare(origin: str, target: str) -> bool:
try:
origin_node = json.loads(origin)
target_node = json.loads(target)
return compare_nodes(origin_node, target_node)
except Exception as e:
print(e)
return False
def compare_nodes(origin_node, target_node) -> bool:
if origin_node is None and target_node is None:
return True
if origin_node is None or target_node is None:
return False
origin_properties = origin_node.get("properties")
target_properties = target_node.get("properties")
if (origin_properties is None and target_properties is not None) or (
origin_properties is not None and target_properties is None
):
return False
if origin_properties is not None and target_properties is not None:
# 遍历原始 properties
for key, value_node in origin_properties.items():
if not isinstance(value_node, dict):
continue # 只处理 object 类型
type_node = value_node.get("type")
if not isinstance(type_node, str):
continue
type_ = type_node
if key not in target_properties:
return False
target_value_node = target_properties[key]
target_type_node = target_value_node.get("type")
target_type = target_type_node if isinstance(target_type_node, str) else ""
if type_ != target_type:
return False
# 如果是 object 类型,递归比较
if type_ == "object":
if not compare_nodes(value_node, target_value_node):
return False
# 如果是 array 类型,比较 items 内容
elif type_ == "array":
origin_items = value_node.get("items")
target_items = target_value_node.get("items")
if origin_items is not None and target_items is not None:
if not compare_nodes(origin_items, target_items):
return False
# 检查新增字段
for key in target_properties:
if key not in origin_properties:
return False
origin_required = origin_node.get("required")
target_required = target_node.get("required")
if origin_required is not None and target_required is not None:
if not isinstance(origin_required, list) or not isinstance(target_required, list):
return False
if len(origin_required) != len(target_required):
return False
origin_set = set()
for node in origin_required:
if isinstance(node, str):
origin_set.add(node)
else:
return False
target_set = set()
for node in target_required:
if isinstance(node, str):
target_set.add(node)
else:
return False
return origin_set == target_set
else:
return origin_required is None and target_required is None

View File

@ -1,8 +1,9 @@
psutil==7.0.0 psutil==7.0.0
anyio==4.9.0 anyio==4.9.0
mcp==1.6.0 mcp==1.9.2
nacos-sdk-python==2.0.4 nacos-sdk-python>=2.0.9
pydantic==2.11.3 pydantic==2.11.3
pydantic-settings==2.9.1 pydantic-settings==2.9.1
jsonref==1.1.0 jsonref==1.1.0
uvicorn==0.34.2 uvicorn==0.34.2
nacos-maintainer-sdk-python>=0.1.2

View File

@ -9,7 +9,7 @@ def read_requirements():
setup( setup(
name='nacos-mcp-wrapper-python', name='nacos-mcp-wrapper-python',
version='0.3.0', # 项目的版本号 version='1.0.9', # 项目的版本号
packages=find_packages( packages=find_packages(
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]), # 自动发现所有包 exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]), # 自动发现所有包
url="https://github.com/nacos-group/nacos-mcp-wrapper-python", url="https://github.com/nacos-group/nacos-mcp-wrapper-python",