Compare commits

...

14 Commits
0.3.1 ... main

Author SHA1 Message Date
Sunrisea f24f12c4ba
Support service metadata, update nacos-sdk-python version (#22) 2025-08-08 11:00:09 +08:00
Sunrisea cf9477af7b
Update and optimize log (#19)
* update log

* version 1.0.8
2025-07-29 12:00:01 +08:00
Sunrisea 5ab63b71cd
version 1.0.7, update nacos-maintainer-sdk-python to 0.1.2 (#17)
* Fix bug

* version 1.0.7, update nacos-maintainer-sdk-python to 0.1.2
2025-07-17 21:04:52 +08:00
Sunrisea 291949b579
optimize register logic (#16) 2025-07-17 20:11:42 +08:00
Sunrisea 8aa1fe43f9
update nacos-sdk-python to version 2.0.7 (#15) 2025-07-11 16:08:26 +08:00
Sunrisea 710139761a
Update mcp version logic and fix bug (#14)
* Optimize version logic and fix bug

* update version to 1.0.3

* update readme
2025-07-02 19:10:56 +08:00
Sunrisea dce1cfb011
Fix bug and release version 1.0.2 (#13) 2025-07-01 20:44:53 +08:00
Sunrisea 3bf041460b
支持注册PORT 自定义 (#10) 2025-06-11 13:53:07 +08:00
Sunrisea 34442de68f
update README.md (#8) 2025-06-04 15:56:00 +08:00
Sunrisea 2950ad4fe6
fix dependency (#7) 2025-06-04 15:17:53 +08:00
Sunrisea c29e002875
Support Nacos 3.0.1 and streamable-http (#6) 2025-06-04 14:51:44 +08:00
fly 75f7543e46
Enhanced examples: (#1)
- Standardized comments in examples to pure English for consistency.
- Added a minimalist example [example/nacos_simple_example.py] to simplify user implementation.
- Included an SSE port configuration demo to facilitate fixing port conflicts.
2025-05-16 13:44:21 +08:00
Sunrisea 345571626a fix stdio transport protocol bug 2025-05-14 10:37:21 +08:00
Sunrisea 32e5f468f8 0.3.1 2025-05-08 11:39:46 +08:00
14 changed files with 776 additions and 203 deletions

View File

@ -8,6 +8,7 @@ Nacos-mcp-wrapper-python is a sdk that helps you quickly register your Mcp Serve
## Installation
### Environment
Starting from version 1.0.0 of nacos-mcp-wrapper-python, the Nacos server version must be greater than 3.0.1.
- python >=3.10
### Use pip
```bash
@ -43,7 +44,10 @@ from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# mcp = FastMCP("Demo")
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848"
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings)
mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Add an addition tool
@ -192,6 +196,7 @@ async def list_tools() -> list[types.Tool]:
async def run():
await server.register_to_nacos("stdio")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,

View File

@ -9,6 +9,7 @@ Nacos 一个更易于构建云原生应用的动态服务发现、配置管理
## 安装
### 环境要求
nacos-mcp-wrapper-python 1.0.0及以上版本要求Nacos Sever版本 > 3.0.1
- python >=3.10
### 使用 PIP
```bash
@ -44,7 +45,10 @@ from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# mcp = FastMCP("Demo")
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g.127.0.0.1:8848"
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings)
mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Add an addition tool
@ -191,6 +195,7 @@ async def list_tools() -> list[types.Tool]:
async def run():
await server.register_to_nacos("stdio")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,

View File

@ -1,34 +1,35 @@
from nacos_mcp_wrapper.server.nacos_mcp import NacosMCP
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
# Create an MCP server
# mcp = FastMCP("Demo")
# Create an MCP server instance
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g. 127.0.0.1:8848"
mcp = NacosMCP("nacos-mcp-python",nacos_settings=nacos_settings)
# Add an addition tool
nacos_settings.SERVER_ADDR = "127.0.0.1:8848" # <nacos_server_addr> e.g. 127.0.0.1:8848
nacos_settings.USERNAME=""
nacos_settings.PASSWORD=""
mcp = NacosMCP("nacos-mcp-python", nacos_settings=nacos_settings,
port=18001,
instructions="This is a simple Nacos MCP server",
version="1.0.0")
# Register an addition tool
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
"""Add two integers together"""
return a + b
# Register a subtraction tool
@mcp.tool()
def minus(a: int, b: int) -> int:
"""Subtract two numbers"""
return a - b
# Register a prompt function
@mcp.prompt()
def get_prompt(topic: str) -> str:
"""Get a personalized greeting"""
return f"Hello, {topic}!"
@mcp.resource("greeting://{name}")
def get_resource(name: str) -> str:
"""Get a file"""
return f"Hello, {name}!"
# Add a dynamic greeting resource
# Register a dynamic resource endpoint
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
"""Get a personalized greeting"""
@ -37,5 +38,5 @@ def get_greeting(name: str) -> str:
if __name__ == "__main__":
try:
mcp.run(transport="sse")
except ValueError as e:
print(f"运行时发生错误: {e}")
except Exception as e:
print(f"Runtime error: {e}")

View File

@ -22,16 +22,19 @@ async def fetch_website(
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option("--port", default=18002, help="Port to listen on for SSE")
@click.option("--server_addr", default="127.0.0.1:8848", help="Nacos server address")
@click.option(
"--transport",
type=click.Choice(["stdio", "sse"]),
default="sse",
help="Transport type",
)
def main(port: int, transport: str) -> int:
def main(port: int, transport: str, server_addr: str) -> int:
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "<nacos_server_addr> e.g. 127.0.0.1:8848"
nacos_settings.SERVER_ADDR = server_addr
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
# app = Server("mcp-website-fetcher")
app = NacosServer("mcp-website-fetcher",nacos_settings=nacos_settings)
@ -76,7 +79,7 @@ def main(port: int, transport: str) -> int:
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
# 0 进 1出
# 0 input stream, 1 output stream
await app.run(
streams[0], streams[1], app.create_initialization_options()
)

View File

@ -0,0 +1,30 @@
import click
from nacos_mcp_wrapper.server.nacos_mcp import NacosMCP
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from datetime import datetime
@click.command()
@click.option("--port", default=18003, help="Port to listen on for SSE")
@click.option("--name", default="nacos-simple-mcp", help="The name of the MCP service")
@click.option("--server_addr", default="127.0.0.1:8848", help="Nacos server address")
def main(port: int, name: str, server_addr: str):
# Registration settings for Nacos
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = server_addr
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
mcp = NacosMCP(name=name, nacos_settings=nacos_settings, port=port)
@mcp.tool()
def get_datetime() -> str:
"""Get current datetime as string"""
return datetime.now().isoformat() # 返回字符串格式的时间
try:
mcp.run(transport="sse")
except ValueError as e:
print(f"Runtime errors: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,105 @@
"""
In-memory event store for demonstrating resumability functionality.
This is a simple implementation intended for examples and testing,
not for production use where a persistent storage solution would be more appropriate.
"""
import logging
from collections import deque
from dataclasses import dataclass
from uuid import uuid4
from mcp.server.streamable_http import (
EventCallback,
EventId,
EventMessage,
EventStore,
StreamId,
)
from mcp.types import JSONRPCMessage
logger = logging.getLogger(__name__)
@dataclass
class EventEntry:
"""
Represents an event entry in the event store.
"""
event_id: EventId
stream_id: StreamId
message: JSONRPCMessage
class InMemoryEventStore(EventStore):
"""
Simple in-memory implementation of the EventStore interface for resumability.
This is primarily intended for examples and testing, not for production use
where a persistent storage solution would be more appropriate.
This implementation keeps only the last N events per stream for memory efficiency.
"""
def __init__(self, max_events_per_stream: int = 100):
"""Initialize the event store.
Args:
max_events_per_stream: Maximum number of events to keep per stream
"""
self.max_events_per_stream = max_events_per_stream
# for maintaining last N events per stream
self.streams: dict[StreamId, deque[EventEntry]] = {}
# event_id -> EventEntry for quick lookup
self.event_index: dict[EventId, EventEntry] = {}
async def store_event(
self, stream_id: StreamId, message: JSONRPCMessage
) -> EventId:
"""Stores an event with a generated event ID."""
event_id = str(uuid4())
event_entry = EventEntry(
event_id=event_id, stream_id=stream_id, message=message
)
# Get or create deque for this stream
if stream_id not in self.streams:
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
# If deque is full, the oldest event will be automatically removed
# We need to remove it from the event_index as well
if len(self.streams[stream_id]) == self.max_events_per_stream:
oldest_event = self.streams[stream_id][0]
self.event_index.pop(oldest_event.event_id, None)
# Add new event
self.streams[stream_id].append(event_entry)
self.event_index[event_id] = event_entry
return event_id
async def replay_events_after(
self,
last_event_id: EventId,
send_callback: EventCallback,
) -> StreamId | None:
"""Replays events that occurred after the specified event ID."""
if last_event_id not in self.event_index:
logger.warning(f"Event ID {last_event_id} not found in store")
return None
# Get the stream and find events after the last one
last_event = self.event_index[last_event_id]
stream_id = last_event.stream_id
stream_events = self.streams.get(last_event.stream_id, deque())
# Events in deque are already in chronological order
found_last = False
for event in stream_events:
if found_last:
await send_callback(EventMessage(event.message, event.event_id))
elif event.event_id == last_event_id:
found_last = True
return stream_id

View File

@ -0,0 +1,170 @@
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator
import anyio
import mcp.types as types
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from pydantic import AnyUrl
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
from nacos_mcp_wrapper.server.nacos_server import NacosServer
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from event_store import InMemoryEventStore
# Configure logging
logger = logging.getLogger(__name__)
async def main(
log_level: str="INFO",
json_response: bool=True,
port: int=7001,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
nacos_settings = NacosSettings()
nacos_settings.SERVER_ADDR = "127.0.0.1:8848" # <nacos_server_addr> e.g. 127.0.0.1:8848
nacos_settings.USERNAME = ""
nacos_settings.PASSWORD = ""
app = NacosServer("mcp-streamable-http-demo",nacos_settings=nacos_settings)
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")
# Send the specified number of notifications with the given interval
for i in range(count):
# Include more detailed message for resumability demonstration
notification_msg = (
f"[{i+1}/{count}] Event from '{caller}' - "
f"Use Last-Event-ID to resume if disconnected"
)
await ctx.session.send_log_message(
level="info",
data=notification_msg,
logger="notification_stream",
# Associates this notification with the original request
# Ensures notifications are sent to the correct response stream
# Without this, notifications will either go to:
# - a standalone SSE stream (if GET request is supported)
# - nowhere (if GET request isn't supported)
related_request_id=ctx.request_id,
)
logger.debug(f"Sent notification {i+1}/{count} for caller: {caller}")
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)
# This will send a resource notificaiton though standalone SSE
# established by GET request
await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource"))
return [
types.TextContent(
type="text",
text=(
f"Sent {count} notifications with {interval}s interval"
f" for caller: {caller}"
),
)
]
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="start-notification-stream",
description=(
"Sends a stream of notifications with configurable count"
" and interval"
),
inputSchema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": (
"Identifier of the caller to include in notifications"
),
},
},
},
)
]
# Create event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()
# Create the session manager with our app and event store
session_manager = StreamableHTTPSessionManager(
app=app,
event_store=event_store, # Enable resumability
json_response=json_response,
)
# ASGI handler for streamable HTTP connections
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for managing session manager lifecycle."""
async with session_manager.run():
logger.info("Application started with StreamableHTTP session manager!")
try:
yield
finally:
logger.info("Application shutting down...")
# Create an ASGI application using the transport
await app.register_to_nacos("streamable-http", port, "/mcp")
starlette_app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=handle_streamable_http),
],
lifespan=lifespan,
)
import uvicorn
config = uvicorn.Config(
starlette_app,
host="127.0.0.1",
port=port,
)
server = uvicorn.Server(config)
await server.serve()
return 0
asyncio.run(main())

View File

@ -1,29 +0,0 @@
from pydantic import BaseModel
class ServiceRef(BaseModel):
namespaceId: str
groupName: str
serviceName: str
class RemoteServerConfig(BaseModel):
serviceRef: ServiceRef | None = None
exportPath: str = None
class MCPServerInfo(BaseModel):
protocol: str
name: str
description: str | None = None
version: str | None = None
enabled: bool = True
remoteServerConfig: RemoteServerConfig | None = None
localServerConfig: dict | None = None
toolsDescriptionRef: str | None = None
promptDescriptionRef: str | None = None
resourceDescriptionRef: str | None = None
class ToolMeta(BaseModel):
enabled: bool = True

View File

@ -4,33 +4,47 @@ from typing import Any
import uvicorn
from mcp import stdio_server
from mcp.server import FastMCP
from mcp.server.auth.provider import OAuthAuthorizationServerProvider
from mcp.server.fastmcp.server import lifespan_wrapper
from mcp.server.fastmcp.tools import Tool
from mcp.server.lowlevel.server import lifespan as default_lifespan
from mcp.server.streamable_http import EventStore
from nacos_mcp_wrapper.server.nacos_server import NacosServer
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
logger = logging.getLogger(__name__)
class NacosMCP(FastMCP):
def __init__(self,
name: str | None = None,
nacos_settings: NacosSettings | None = None,
instructions: str | None = None,
**settings: Any):
super().__init__(name, instructions, **settings)
auth_server_provider: OAuthAuthorizationServerProvider[
Any, Any, Any]
| None = None,
event_store: EventStore | None = None,
*,
tools: list[Tool] | None = None,
version: str | None = None,
**settings: Any,
):
if "host" not in settings:
settings["host"] = "0.0.0.0"
super().__init__(name, instructions, auth_server_provider, event_store,
tools=tools, **settings)
self._mcp_server = NacosServer(
nacos_settings=nacos_settings,
name=name or "FastMCP",
instructions=instructions,
version=version,
lifespan=lifespan_wrapper(self, self.settings.lifespan)
if self.settings.lifespan
else default_lifespan,
)
self.dependencies = self.settings.dependencies
# Set up MCP protocol handlers
self._setup_handlers()
@ -44,10 +58,11 @@ class NacosMCP(FastMCP):
self._mcp_server.create_initialization_options(),
)
async def run_sse_async(self) -> None:
async def run_sse_async(self, mount_path: str | None = None) -> None:
"""Run the server using SSE transport."""
starlette_app = self.sse_app()
await self._mcp_server.register_to_nacos("sse", self.settings.port, self.settings.sse_path)
starlette_app = self.sse_app(mount_path)
await self._mcp_server.register_to_nacos("sse", self.settings.port,
self.settings.sse_path)
config = uvicorn.Config(
starlette_app,
host=self.settings.host,
@ -57,3 +72,19 @@ class NacosMCP(FastMCP):
server = uvicorn.Server(config)
await server.serve()
async def run_streamable_http_async(self) -> None:
"""Run the server using StreamableHTTP transport."""
import uvicorn
starlette_app = self.streamable_http_app()
await self._mcp_server.register_to_nacos("streamable-http",
self.settings.port,
self.settings.streamable_http_path)
config = uvicorn.Config(
starlette_app,
host=self.settings.host,
port=self.settings.port,
log_level=self.settings.log_level.lower(),
)
server = uvicorn.Server(config)
await server.serve()

View File

@ -1,24 +1,38 @@
import asyncio
import json
import logging
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from typing import Literal, Callable, AsyncIterator, Any
from contextlib import AbstractAsyncContextManager
from typing import Literal, Callable, Any
from importlib import metadata
import jsonref
from maintainer.ai.model.nacos_mcp_info import McpToolMeta, McpServerDetailInfo, \
McpTool, McpServiceRef, McpToolSpecification, McpServerBasicInfo, \
McpEndpointSpec, McpServerRemoteServiceConfig
from maintainer.ai.model.registry_mcp_info import ServerVersionDetail
from maintainer.ai.nacos_mcp_service import NacosAIMaintainerService
from maintainer.common.ai_maintainer_client_config_builder import \
AIMaintainerClientConfigBuilder
from mcp import types, Tool
from mcp.server import Server
from mcp.server.lowlevel.server import LifespanResultT
from mcp.server.lowlevel.server import LifespanResultT, RequestT
from mcp.server.lowlevel.server import lifespan
from v2.nacos import NacosConfigService, ConfigParam, \
NacosNamingService, RegisterInstanceParam, ClientConfigBuilder
from nacos_mcp_wrapper.server.mcp_server_info import MCPServerInfo, ServiceRef, \
RemoteServerConfig
from v2.nacos import NacosNamingService, RegisterInstanceParam, \
ClientConfigBuilder, NacosException
from nacos_mcp_wrapper.server.nacos_settings import NacosSettings
from nacos_mcp_wrapper.server.utils import get_first_non_loopback_ip, \
ConfigSuffix, jsonref_default
jsonref_default, compare, pkg_version
logger = logging.getLogger(__name__)
TRANSPORT_MAP = {
"stdio": "stdio",
"sse": "mcp-sse",
"streamable-http": "mcp-streamable",
}
class NacosServer(Server):
def __init__(
self,
@ -27,26 +41,44 @@ class NacosServer(Server):
version: str | None = None,
instructions: str | None = None,
lifespan: Callable[
[Server[LifespanResultT]], AbstractAsyncContextManager[
LifespanResultT]
[Server[LifespanResultT, RequestT]],
AbstractAsyncContextManager[LifespanResultT],
] = lifespan,
):
if version is None:
version = pkg_version("mcp")
super().__init__(name, version, instructions, lifespan)
if nacos_settings == None:
nacos_settings = NacosSettings()
if nacos_settings.SERVICE_NAMESPACE == "":
nacos_settings.SERVICE_NAMESPACE = "public"
if nacos_settings.NAMESPACE == "":
nacos_settings.NAMESPACE = "public"
self._nacos_settings = nacos_settings
if self._nacos_settings.SERVICE_IP is None:
self._nacos_settings.SERVICE_IP = get_first_non_loopback_ip()
ai_client_config_builder = AIMaintainerClientConfigBuilder()
ai_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).access_key(
self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password(
self._nacos_settings.PASSWORD).app_conn_labels(
self._nacos_settings.APP_CONN_LABELS)
if self._nacos_settings.CREDENTIAL_PROVIDER is not None:
ai_client_config_builder.credentials_provider(
self._nacos_settings.CREDENTIAL_PROVIDER)
self._ai_client_config = ai_client_config_builder.build()
naming_client_config_builder = ClientConfigBuilder()
naming_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).endpoint(
self._nacos_settings.SERVER_ENDPOINT).namespace_id(
self._nacos_settings.SERVICE_NAMESPACE).access_key(
self._nacos_settings.SERVER_ADDR).namespace_id(
self._nacos_settings.NAMESPACE).access_key(
self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password(
@ -59,25 +91,8 @@ class NacosServer(Server):
self._naming_client_config = naming_client_config_builder.build()
config_client_config_builder = ClientConfigBuilder()
config_client_config_builder.server_address(
self._nacos_settings.SERVER_ADDR).endpoint(
self._nacos_settings.SERVER_ENDPOINT).namespace_id(
"nacos-default-mcp").access_key(
self._nacos_settings.ACCESS_KEY).secret_key(
self._nacos_settings.SECRET_KEY).username(
self._nacos_settings.USERNAME).password(
self._nacos_settings.PASSWORD).app_conn_labels(
self._nacos_settings.APP_CONN_LABELS)
if self._nacos_settings.CREDENTIAL_PROVIDER is not None:
config_client_config_builder.credentials_provider(
self._nacos_settings.CREDENTIAL_PROVIDER)
self._config_client_config = config_client_config_builder.build()
self._tmp_tools: dict[str, Tool] = {}
self._tools_meta = {}
self._tools_meta: dict[str, McpToolMeta] = {}
self._tmp_tools_list_handler = None
async def _list_tmp_tools(self) -> list[Tool]:
@ -93,38 +108,43 @@ class NacosServer(Server):
]
def is_tool_enabled(self, tool_name: str) -> bool:
if self._tools_meta is None:
return True
if tool_name in self._tools_meta:
if "enabled" in self._tools_meta[tool_name]:
if not self._tools_meta[tool_name]["enabled"]:
mcp_tool_meta = self._tools_meta[tool_name]
if mcp_tool_meta.enabled is not None:
if not mcp_tool_meta.enabled:
return False
return True
async def tool_list_listener(self, tenant_id: str, group_id: str,
data_id: str, content: str):
self.update_local_tools(content)
def update_tools(self,server_detail_info:McpServerDetailInfo):
def update_local_tools(self,nacos_tools:str):
def update_args_description(_local_args:dict[str, Any], _nacos_args:dict[str, Any]):
for key, value in _local_args.items():
if key in _nacos_args and "description" in _nacos_args[key]:
_local_args[key]["description"] = _nacos_args[key][
"description"]
nacos_tools_dict = json.loads(nacos_tools)
if "toolsMeta" in nacos_tools_dict:
self._tools_meta = nacos_tools_dict["toolsMeta"]
if "tools" not in nacos_tools_dict:
tool_spec = server_detail_info.toolSpec
if tool_spec is None:
return
for nacos_tool in nacos_tools_dict["tools"]:
if nacos_tool["name"] in self._tmp_tools:
local_tool = self._tmp_tools[nacos_tool["name"]]
if "description" in nacos_tool:
local_tool.description = nacos_tool["description"]
if tool_spec.toolsMeta is None:
self._tools_meta = {}
else:
self._tools_meta = tool_spec.toolsMeta
if tool_spec.tools is None:
return
for tool in tool_spec.tools:
if tool.name in self._tmp_tools:
local_tool = self._tmp_tools[tool.name]
if tool.description is not None:
local_tool.description = tool.description
local_args = local_tool.inputSchema["properties"]
nacos_args = nacos_tool["inputSchema"]["properties"]
nacos_args = tool.inputSchema["properties"]
update_args_description(local_args, nacos_args)
break
continue
async def init_tools_tmp(self):
_tmp_tools = await self.request_handlers[
@ -141,105 +161,227 @@ class NacosServer(Server):
resolved_data = json.loads(resolved_data)
tool.inputSchema = resolved_data
def check_tools_compatible(self,server_detail_info:McpServerDetailInfo) -> bool:
if (server_detail_info.toolSpec is None
or server_detail_info.toolSpec.tools is None or len(server_detail_info.toolSpec.tools) == 0):
return True
tools_spec = server_detail_info.toolSpec
tools_in_nacos = {}
for tool in tools_spec.tools:
tools_in_nacos[tool.name] = tool
tools_in_local = {}
for name,tool in self._tmp_tools.items():
tools_in_local[name] = McpTool(
name=tool.name,
description=tool.description,
inputSchema=tool.inputSchema,
)
if tools_in_nacos.keys() != tools_in_local.keys():
return False
for name,tool in tools_in_nacos.items():
str_tools_in_nacos = tool.model_dump_json(exclude_none=True)
str_tools_in_local = tools_in_local[name].model_dump_json(exclude_none=True)
if not compare(str_tools_in_nacos, str_tools_in_local):
return False
return True
def check_compatible(self,server_detail_info:McpServerDetailInfo) -> (bool,str):
if server_detail_info.version != self.version:
return False, f"version not compatible, local version:{self.version}, remote version:{server_detail_info.version}"
if server_detail_info.protocol != self.type:
return False, f"protocol not compatible, local protocol:{self.type}, remote protocol:{server_detail_info.protocol}"
if types.ListToolsRequest in self.request_handlers:
checkToolsResult = self.check_tools_compatible(server_detail_info)
if not checkToolsResult:
return False , f"tools not compatible, local tools:{self._tmp_tools}, remote tools:{server_detail_info.toolSpec}"
mcp_service_ref = server_detail_info.remoteServerConfig.serviceRef
is_same_service,error_msg = self.is_service_ref_same(mcp_service_ref)
if not is_same_service:
return False, error_msg
return True, ""
def is_service_ref_same(self,mcp_service_ref:McpServiceRef) -> (bool,str):
if self._nacos_settings.SERVICE_NAME is not None and self._nacos_settings.SERVICE_NAME != mcp_service_ref.serviceName:
return False, f"service name not compatible, local service name:{self._nacos_settings.SERVICE_NAME}, remote service name:{mcp_service_ref.serviceName}"
if self._nacos_settings.SERVICE_GROUP is not None and self._nacos_settings.SERVICE_GROUP != mcp_service_ref.groupName:
return False, f"group name not compatible, local group name:{self._nacos_settings.SERVICE_GROUP}, remote group name:{mcp_service_ref.groupName}"
if mcp_service_ref.namespaceId != self._nacos_settings.NAMESPACE:
return False, f"namespace id not compatible, local namespace id:{self._nacos_settings.NAMESPACE}, remote namespace id:{mcp_service_ref.namespaceId}"
return True, ""
def get_register_service_name(self) -> str:
if self._nacos_settings.SERVICE_NAME is not None:
return self._nacos_settings.SERVICE_NAME
else:
return self.name + "::" + self.version
async def subscribe(self):
while True:
try:
await asyncio.sleep(30)
except asyncio.TimeoutError:
logging.debug("Timeout occurred")
except asyncio.CancelledError:
return
try:
server_detail_info = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
if server_detail_info is not None:
self.update_tools(server_detail_info)
except Exception as e:
logging.info(
f"can not found McpServer info from nacos,{self.name},version:{self.version}")
async def register_to_nacos(self,
transport: Literal["stdio", "sse"] = "stdio",
transport: Literal["stdio", "sse","streamable-http"] = "stdio",
port: int = 8000,
path: str = "/sse"):
try:
config_client = await NacosConfigService.create_config_service(
self._config_client_config)
mcp_tools_data_id = self.name + ConfigSuffix.TOOLS.value
mcp_servers_data_id = self.name + ConfigSuffix.MCP_SERVER.value
self.type = TRANSPORT_MAP.get(transport, None)
self.mcp_service = await NacosAIMaintainerService.create_mcp_service(
self._ai_client_config
)
self.naming_client = await NacosNamingService.create_naming_service(
self._naming_client_config)
server_detail_info = None
try:
server_detail_info = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
except Exception as e:
logging.info(f"can not found McpServer info from nacos,{self.name},version:{self.version}")
if types.ListToolsRequest in self.request_handlers:
await self.init_tools_tmp()
self.list_tools()(self._list_tmp_tools)
nacos_tools = await config_client.get_config(ConfigParam(
data_id=mcp_tools_data_id, group="mcp-tools"
))
if nacos_tools is not None and nacos_tools != "":
self.update_local_tools(nacos_tools)
_tmp_tools = await self.request_handlers[
types.ListToolsRequest](
self)
tools_dict = _tmp_tools.model_dump(
by_alias=True, mode="json", exclude_none=True
)
tools_dict["toolsMeta"] = self._tools_meta
await config_client.publish_config(ConfigParam(
data_id=mcp_tools_data_id, group="mcp-tools",
content=json.dumps(tools_dict, indent=2)
))
self.list_tools()(self._list_tmp_tools)
await config_client.add_listener(mcp_tools_data_id, "mcp-tools",
self.tool_list_listener)
server_info_content = await config_client.get_config(ConfigParam(
data_id=mcp_servers_data_id, group="mcp-server"
))
server_description = self.name
if self.instructions is not None:
server_description = self.instructions
if server_info_content is not None and server_info_content != "":
server_info_dict = json.loads(server_info_content)
if "description" in server_info_dict:
server_description = server_info_dict["description"]
if transport == "stdio":
mcp_server_info = MCPServerInfo(
protocol="local",
name=self.name,
description=server_description,
version=self.version,
toolsDescriptionRef=mcp_tools_data_id,
)
mcp_server_info_dict = mcp_server_info.model_dump(
by_alias=True, mode="json", exclude_none=True
)
await config_client.publish_config(ConfigParam(
data_id=mcp_servers_data_id, group="mcp-server",
content=json.dumps(mcp_server_info_dict, indent=2)
))
elif transport == "sse":
if self._nacos_settings.SERVICE_REGISTER:
naming_client = await NacosNamingService.create_naming_service(
self._naming_client_config)
await naming_client.register_instance(
if server_detail_info is not None:
is_compatible, error_msg = self.check_compatible(server_detail_info)
if not is_compatible:
logging.error(f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}")
raise NacosException(
f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}"
)
if types.ListToolsRequest in self.request_handlers:
self.update_tools(server_detail_info)
asyncio.create_task(self.subscribe())
if self._nacos_settings.SERVICE_REGISTER and (self.type == "mcp-sse"
or self.type == "mcp-streamable"):
version = metadata.version('nacos-mcp-wrapper-python')
service_meta_data = {
"source": f"nacos-mcp-wrapper-python-{version}",
**self._nacos_settings.SERVICE_META_DATA}
await self.naming_client.register_instance(
request=RegisterInstanceParam(
group_name=self._nacos_settings.SERVICE_GROUP,
service_name=self.name + "-mcp-service",
group_name=server_detail_info.remoteServerConfig.serviceRef.groupName,
service_name=server_detail_info.remoteServerConfig.serviceRef.serviceName,
ip=self._nacos_settings.SERVICE_IP,
port=port,
port=self._nacos_settings.SERVICE_PORT if self._nacos_settings.SERVICE_PORT else port,
ephemeral=self._nacos_settings.SERVICE_EPHEMERAL,
metadata=service_meta_data
)
)
mcp_server_info = MCPServerInfo(
protocol="mcp-sse",
name=self.name,
description=self.instructions,
version=self.version,
remoteServerConfig=RemoteServerConfig(
serviceRef=ServiceRef(
namespaceId=self._nacos_settings.SERVICE_NAMESPACE,
serviceName=self.name + "-mcp-service",
groupName=self._nacos_settings.SERVICE_GROUP
),
exportPath=path,
),
toolsDescriptionRef=mcp_tools_data_id,
logging.info(f"Register to nacos success,{self.name},version:{self.version}")
return
mcp_tool_specification = None
if types.ListToolsRequest in self.request_handlers:
tool_spec = [
McpTool(
name=tool.name,
description=tool.description,
inputSchema=tool.inputSchema,
)
for tool in list(self._tmp_tools.values())
]
mcp_tool_specification = McpToolSpecification(
tools=tool_spec
)
mcp_server_info_dict = mcp_server_info.model_dump(
by_alias=True, mode="json", exclude_none=True
server_version_detail = ServerVersionDetail()
server_version_detail.version = self.version
server_basic_info = McpServerBasicInfo()
server_basic_info.name = self.name
server_basic_info.versionDetail = server_version_detail
server_basic_info.description = self.instructions or self.name
endpoint_spec = McpEndpointSpec()
if self.type == "stdio":
server_basic_info.protocol = self.type
server_basic_info.frontProtocol = self.type
else:
endpoint_spec.type = "REF"
data = {
"serviceName": self.get_register_service_name(),
"groupName": "DEFAULT_GROUP" if self._nacos_settings.SERVICE_GROUP is None else self._nacos_settings.SERVICE_GROUP,
"namespaceId": self._nacos_settings.NAMESPACE,
}
endpoint_spec.data = data
remote_server_config_info = McpServerRemoteServiceConfig()
remote_server_config_info.exportPath = path
server_basic_info.remoteServerConfig = remote_server_config_info
server_basic_info.protocol = self.type
server_basic_info.frontProtocol = self.type
try:
await self.mcp_service.create_mcp_server(self._nacos_settings.NAMESPACE,
self.name,
server_basic_info,
mcp_tool_specification,
endpoint_spec)
except Exception as e:
logger.info(f"Found MCP server {self.name} in Nacos,try to update it")
version_detail = None
try:
version_detail = await self.mcp_service.get_mcp_server_detail(
self._nacos_settings.NAMESPACE,
self.name,
self.version
)
except Exception as e_2:
logger.info(f" Version {self.version} of Mcp server {self.name} is not in Nacos, try to update it")
if version_detail is None:
await self.mcp_service.update_mcp_server(
self._nacos_settings.NAMESPACE,
self.name,
True,
server_basic_info,
mcp_tool_specification,
endpoint_spec
)
else:
_is_compatible,error_msg = self.check_compatible(version_detail)
if not _is_compatible:
logging.error(f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}")
raise NacosException(
f"mcp server info is not compatible,{self.name},version:{self.version},reason:{error_msg}"
)
if self._nacos_settings.SERVICE_REGISTER:
version = metadata.version('nacos-mcp-wrapper-python')
service_meta_data = {"source": f"nacos-mcp-wrapper-python-{version}",**self._nacos_settings.SERVICE_META_DATA}
await self.naming_client.register_instance(
request=RegisterInstanceParam(
group_name="DEFAULT_GROUP" if self._nacos_settings.SERVICE_GROUP is None else self._nacos_settings.SERVICE_GROUP,
service_name=self.get_register_service_name(),
ip=self._nacos_settings.SERVICE_IP,
port=self._nacos_settings.SERVICE_PORT if self._nacos_settings.SERVICE_PORT else port,
ephemeral=self._nacos_settings.SERVICE_EPHEMERAL,
metadata=service_meta_data,
)
)
await config_client.publish_config(ConfigParam(
data_id=mcp_servers_data_id, group="mcp-server",
content=json.dumps(mcp_server_info_dict, indent=2)
))
asyncio.create_task(self.subscribe())
logging.info(f"Register to nacos success,{self.name},version:{self.version}")
except Exception as e:
logging.error(f"Failed to register MCP server to Nacos: {e}")

View File

@ -11,10 +11,6 @@ class NacosSettings(BaseSettings):
description="nacos server address",
default="127.0.0.1:8848")
SERVER_ENDPOINT : Optional[str] = Field(
description="nacos server endpoint",
default=None)
SERVICE_REGISTER : bool = Field(
description="whether to register service to nacos",
default=True)
@ -23,18 +19,26 @@ class NacosSettings(BaseSettings):
description="whether to register service as ephemeral",
default=True)
SERVICE_NAMESPACE : str = Field(
description="nacos service namespace",
NAMESPACE : str = Field(
description="nacos namespace",
default="public")
SERVICE_GROUP : str = Field(
SERVICE_GROUP : Optional[str] = Field(
description="nacos service group",
default="DEFAULT_GROUP")
default=None)
SERVICE_NAME : Optional[str] = Field(
description="nacos service name",
default=None)
SERVICE_IP : Optional[str] = Field(
description="nacos service ip",
default=None)
SERVICE_PORT : Optional[int] = Field(
description="nacos service port",
default=None)
USERNAME : Optional[str] = Field(
description="nacos username for authentication",
default=None)
@ -59,6 +63,10 @@ class NacosSettings(BaseSettings):
description="nacos connection labels",
default={})
SERVICE_META_DATA : Optional[dict] = Field(
description="nacos service metadata",
default={})
class Config:
env_prefix = "NACOS_MCP_SERVER_"

View File

@ -2,6 +2,7 @@ import asyncio
import socket
import threading
from enum import Enum
import json
import jsonref
import psutil
@ -14,6 +15,16 @@ def get_first_non_loopback_ip():
return addr.address
return None
def pkg_version(package: str) -> str:
try:
from importlib.metadata import version
return version(package)
except Exception:
pass
return "1.0.0"
def jsonref_default(obj):
if isinstance(obj, jsonref.JsonRef):
return obj.__subject__
@ -24,4 +35,94 @@ class ConfigSuffix(Enum):
TOOLS = "-mcp-tools.json"
PROMPTS = "-mcp-prompt.json"
RESOURCES = "-mcp-resource.json"
MCP_SERVER = "-mcp-server.json"
MCP_SERVER = "-mcp-server.json"
def compare(origin: str, target: str) -> bool:
try:
origin_node = json.loads(origin)
target_node = json.loads(target)
return compare_nodes(origin_node, target_node)
except Exception as e:
print(e)
return False
def compare_nodes(origin_node, target_node) -> bool:
if origin_node is None and target_node is None:
return True
if origin_node is None or target_node is None:
return False
origin_properties = origin_node.get("properties")
target_properties = target_node.get("properties")
if (origin_properties is None and target_properties is not None) or (
origin_properties is not None and target_properties is None
):
return False
if origin_properties is not None and target_properties is not None:
# 遍历原始 properties
for key, value_node in origin_properties.items():
if not isinstance(value_node, dict):
continue # 只处理 object 类型
type_node = value_node.get("type")
if not isinstance(type_node, str):
continue
type_ = type_node
if key not in target_properties:
return False
target_value_node = target_properties[key]
target_type_node = target_value_node.get("type")
target_type = target_type_node if isinstance(target_type_node, str) else ""
if type_ != target_type:
return False
# 如果是 object 类型,递归比较
if type_ == "object":
if not compare_nodes(value_node, target_value_node):
return False
# 如果是 array 类型,比较 items 内容
elif type_ == "array":
origin_items = value_node.get("items")
target_items = target_value_node.get("items")
if origin_items is not None and target_items is not None:
if not compare_nodes(origin_items, target_items):
return False
# 检查新增字段
for key in target_properties:
if key not in origin_properties:
return False
origin_required = origin_node.get("required")
target_required = target_node.get("required")
if origin_required is not None and target_required is not None:
if not isinstance(origin_required, list) or not isinstance(target_required, list):
return False
if len(origin_required) != len(target_required):
return False
origin_set = set()
for node in origin_required:
if isinstance(node, str):
origin_set.add(node)
else:
return False
target_set = set()
for node in target_required:
if isinstance(node, str):
target_set.add(node)
else:
return False
return origin_set == target_set
else:
return origin_required is None and target_required is None

View File

@ -1,8 +1,9 @@
psutil==7.0.0
anyio==4.9.0
mcp==1.6.0
nacos-sdk-python==2.0.4
mcp==1.9.2
nacos-sdk-python>=2.0.9
pydantic==2.11.3
pydantic-settings==2.9.1
jsonref==1.1.0
uvicorn==0.34.2
uvicorn==0.34.2
nacos-maintainer-sdk-python>=0.1.2

View File

@ -9,7 +9,7 @@ def read_requirements():
setup(
name='nacos-mcp-wrapper-python',
version='0.3.0', # 项目的版本号
version='1.0.9', # 项目的版本号
packages=find_packages(
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]), # 自动发现所有包
url="https://github.com/nacos-group/nacos-mcp-wrapper-python",