Compare commits

...

17 Commits

Author SHA1 Message Date
CZJCC df8f9d2dcd
Merge pull request #237 from Sunrisea/master-fix-login-bug
Fix bug of Inject security info
2025-07-17 19:08:17 +08:00
濯光 4600b89e45 fix bug of get_access_token 2025-07-17 19:00:22 +08:00
濯光 ce22e6a437 fix bug of get_access_token 2025-07-17 18:51:14 +08:00
CZJCC b3c26c7ba9
Merge pull request #235 from Sunrisea/master-fix-deadlock-bug
[ISSUE #233] Fix bug of deadlock and v1 config publish
2025-07-02 15:25:43 +08:00
濯光 cb17109542 fix bug of deadlock and v1 config publish 2025-07-02 14:39:16 +08:00
CZJCC 1892ebbe43
Merge pull request #232 from CZJCC/feature/v2-readme
fix typeing.Dict
2025-05-06 16:59:19 +08:00
CZJCC bcdda37351 fix typeing.Dict 2025-05-06 16:57:26 +08:00
CZJCC d3f9b0daec
Merge pull request #231 from CZJCC/feature/v2-readme
fix 安装命令
2025-05-06 16:34:42 +08:00
CZJCC 95236b1643 fix 安装命令 2025-05-06 16:32:33 +08:00
CZJCC 58019d4395
Merge pull request #230 from CZJCC/feature/v2-readme
fix typeing.List
2025-05-06 16:13:19 +08:00
CZJCC 096b1bf077 fix typeing.List 2025-05-06 16:04:36 +08:00
CZJCC a23ed4f7b6
Merge pull request #228 from CZJCC/feature/v2-readme
修复配置监听异步任务异常问题
2025-04-10 19:37:14 +08:00
CZJCC f37de79b9a 修复配置监听异步任务异常问题 2025-04-10 19:35:48 +08:00
CZJCC 13919631b1
Merge pull request #226 from yk125123/fix-issue-#222
Update client.py
2025-03-21 14:19:33 +08:00
kevin 3fa3bd353b Update client.py
fix: issue #222
resolve the issue of 403 forbidden after custom user added watchers.

/nacos/v1/cs/configs/listener?accessToken=your_token 403
2025-03-20 19:38:19 +08:00
CZJCC 7460efd9b7
Merge pull request #221 from libaiyun/custom-2.0.1
chore: Relax psutil dependency to >=5.9.5 for compatibility
2025-03-20 17:19:22 +08:00
libaiyun 81fac4c584 chore: Relax psutil dependency to >=5.9.5 for compatibility 2025-03-20 15:00:31 +08:00
22 changed files with 107 additions and 93 deletions

View File

@ -18,7 +18,7 @@ Supported Nacos version over 2.x
## Installation
```shell
pip install nacos-sdk-python==2.0.3
pip install nacos-sdk-python
```
## Client Configuration
@ -284,7 +284,7 @@ instance_list = await client.list_instances(ListInstanceParam(service_name='naco
### Subscribe
```angular2html
async def cb(instance_list: list[Instance]):
async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list))
await client.subscribe(
@ -295,7 +295,7 @@ await client.subscribe(
### Unsubscribe
```angular2html
async def cb(instance_list: list[Instance]):
async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list))
await client.unsubscribe(

View File

@ -425,7 +425,7 @@ class NacosClient:
params["type"] = config_type
try:
resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None,
resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params,
timeout or self.default_timeout, "POST")
c = resp.read()
logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % (
@ -777,11 +777,13 @@ class NacosClient:
# if contains_init_key:
# headers["longPullingNoHangUp"] = "true"
params = {"tenant": self.namespace} if self.namespace else None
data = {"Listening-Configs": probe_update_string}
changed_keys = list()
try:
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, None, data,
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, params, data,
self.pulling_timeout + 10, "POST")
changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())]
logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys)))

View File

@ -4,6 +4,6 @@ alibabacloud_kms20160120>=2.2.3
alibabacloud_tea_openapi>=0.3.12
grpcio>=1.66.1
protobuf>=3.20.3
psutil>=5.9.6
psutil>=5.9.5
pycryptodome>=3.19.1
pydantic>=2.10.4

View File

@ -53,7 +53,7 @@ class UploadCommand(Command):
setup(
name="nacos-sdk-python",
version="2.0.3",
version="2.0.5",
packages=find_packages(
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]),
url="https://github.com/nacos-group/nacos-sdk-python",

View File

@ -1,6 +1,7 @@
import asyncio
import os
import unittest
from typing import List
from v2.nacos import ConfigParam
from v2.nacos.common.client_config import GRPCConfig
@ -82,7 +83,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
client = await NacosNamingService.create_naming_service(client_config)
assert await client.server_health()
async def cb(instance_list: list[Instance]):
async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list))
await client.subscribe(

View File

@ -9,13 +9,16 @@ from v2.nacos.config.nacos_config_service import NacosConfigService
from v2.nacos.common.auth import CredentialsProvider, Credentials
client_config = (ClientConfigBuilder()
.access_key(os.getenv('NACOS_ACCESS_KEY'))
.secret_key(os.getenv('NACOS_SECRET_KEY'))
# .access_key(os.getenv('NACOS_ACCESS_KEY'))
# .secret_key(os.getenv('NACOS_SECRET_KEY'))
.username(os.getenv('NACOS_USERNAME'))
.password(os.getenv('NACOS_PASSWORD'))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO')
.grpc_config(GRPCConfig(grpc_timeout=5000))
.build())
class CustomCredentialsProvider(CredentialsProvider):
def __init__(self, ak="", sk="", token=""):
self.credential = Credentials(ak, sk, token)
@ -23,6 +26,7 @@ class CustomCredentialsProvider(CredentialsProvider):
def get_credentials(self):
return self.credential
class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_publish_config(self):
client = await NacosConfigService.create_config_service(client_config)
@ -46,7 +50,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
assert res
print("success to publish")
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
content = await client.get_config(ConfigParam(
data_id=data_id,
group=group,
@ -268,7 +272,8 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_gray_config_with_provider(self):
client_cfg = (ClientConfigBuilder()
.credentials_provider(CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
.credentials_provider(
CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO')
.app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"})

View File

@ -1,6 +1,6 @@
import asyncio
from logging import Logger
from typing import Optional, Callable
from typing import Optional, Callable, List, Dict
from v2.nacos.common.constants import Constants
from v2.nacos.config.cache.config_info_cache import ConfigInfoCache
@ -15,7 +15,7 @@ class ConfigSubscribeManager:
def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str,
config_filter_chain_manager: ConfigFilterChainManager,
execute_config_listen_channel: asyncio.Queue):
self.subscribe_cache_map: dict[str, SubscribeCacheData] = {}
self.subscribe_cache_map: Dict[str, SubscribeCacheData] = {}
self.logger = logger
self.lock = asyncio.Lock()
self.namespace_id = namespace_id
@ -87,7 +87,7 @@ class ConfigSubscribeManager:
await subscribe_cache.execute_listener()
async def execute_listener_and_build_tasks(self, is_sync_all: bool):
listen_fetch_task_map: dict[int, list[SubscribeCacheData]] = {}
listen_fetch_task_map: Dict[int, List[SubscribeCacheData]] = {}
for cache_data in self.subscribe_cache_map.values():
if cache_data.is_sync_with_server:
await cache_data.execute_listener()

View File

@ -1,3 +1,5 @@
from typing import Dict
from v2.nacos.common.client_config import KMSConfig
from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -11,7 +13,7 @@ from v2.nacos.config.model.config_param import HandlerParam
class KMSHandler:
def __init__(self, kms_config: KMSConfig):
self.kms_plugins: dict[str, EncryptionPlugin] = {}
self.kms_plugins: Dict[str, EncryptionPlugin] = {}
self.kms_client = KmsClient.create_kms_client(kms_config)
kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client)
self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin

View File

@ -22,7 +22,7 @@ class ConfigPage(BaseModel):
totalCount: int = 0
pageNumber: int = 0
pagesAvailable: int = 0
pageItems: list[ConfigItem] = []
pageItems: List[ConfigItem] = []
class ConfigListenContext(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, List, Dict
from v2.nacos.config.model.config import ConfigListenContext
from v2.nacos.transport.model.rpc_request import Request
@ -24,7 +24,7 @@ class AbstractConfigRequest(Request, ABC):
class ConfigBatchListenRequest(AbstractConfigRequest):
listen: bool = True
configListenContexts: list[ConfigListenContext] = []
configListenContexts: List[ConfigListenContext] = []
def get_request_type(self):
return "ConfigBatchListenRequest"
@ -46,7 +46,7 @@ class ConfigQueryRequest(AbstractConfigRequest):
class ConfigPublishRequest(AbstractConfigRequest):
content: Optional[str]
casMd5: Optional[str]
additionMap: dict[str, str] = {}
additionMap: Dict[str, str] = {}
def get_request_type(self):
return "ConfigPublishRequest"

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, List
from pydantic import BaseModel
@ -12,7 +12,7 @@ class ConfigContext(BaseModel):
class ConfigChangeBatchListenResponse(Response):
changedConfigs: list[ConfigContext] = []
changedConfigs: List[ConfigContext] = []
def get_response_type(self) -> str:
return "ConfigChangeBatchListenResponse"

View File

@ -197,53 +197,53 @@ class ConfigGRPCClientProxy:
self.logger.debug("Timeout occurred")
except asyncio.CancelledError:
return
finally:
has_changed_keys = False
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
if len(listen_task_map) == 0:
has_changed_keys = False
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
if len(listen_task_map) == 0:
continue
for task_id, cache_data_list in listen_task_map.items():
if len(cache_data_list) == 0:
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if len(response.changedConfigs) > 0:
has_changed_keys = True
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue
for task_id, cache_data_list in listen_task_map.items():
if len(cache_data_list) == 0:
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if is_sync_all:
self.last_all_sync_time = get_current_time_millis()
if len(response.changedConfigs) > 0:
has_changed_keys = True
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue
if is_sync_all:
self.last_all_sync_time = get_current_time_millis()
if has_changed_keys:
await self.execute_config_listen_channel.put(None)
if has_changed_keys:
await self.execute_config_listen_channel.put(None)
async def server_health(self):
return (await self.fetch_rpc_client()).is_running()

View File

@ -2,7 +2,7 @@ import asyncio
import json
import logging
import os
from typing import Callable, Optional
from typing import Callable, Optional, List, Dict
from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants
@ -18,7 +18,7 @@ class ServiceInfoCache:
def __init__(self, client_config: ClientConfig):
self.logger = logging.getLogger(Constants.NAMING_MODULE)
self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id)
self.service_info_map: dict[str, Service] = {}
self.service_info_map: Dict[str, Service] = {}
self.update_time_map = {}
self.lock = asyncio.Lock()
self.sub_callback_manager = SubscribeManager()
@ -124,7 +124,7 @@ class ServiceInfoCache:
return old_instance != new_instance
@staticmethod
def sort_instances(instances: list[Instance]) -> list[Instance]:
def sort_instances(instances: List[Instance]) -> List[Instance]:
def instance_key(instance: Instance) -> (int, int):
ip_num = int(''.join(instance.ip.split('.')))
return ip_num, instance.port

View File

@ -1,4 +1,4 @@
from typing import Optional, Callable
from typing import Optional, Callable, List, Dict
from pydantic import BaseModel
from v2.nacos.common.constants import Constants
@ -10,7 +10,7 @@ class RegisterInstanceParam(BaseModel):
weight: float = 1.0
enabled: bool = True
healthy: bool = True
metadata: dict[str, str] = {}
metadata: Dict[str, str] = {}
cluster_name: str = ''
service_name: str
group_name: str = Constants.DEFAULT_GROUP
@ -20,7 +20,7 @@ class RegisterInstanceParam(BaseModel):
class BatchRegisterInstanceParam(BaseModel):
service_name: str
group_name: str = Constants.DEFAULT_GROUP
instances: list[RegisterInstanceParam] = []
instances: List[RegisterInstanceParam] = []
class DeregisterInstanceParam(BaseModel):
@ -35,7 +35,7 @@ class DeregisterInstanceParam(BaseModel):
class ListInstanceParam(BaseModel):
service_name: str
group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = []
clusters: List[str] = []
subscribe: bool = True
healthy_only: Optional[bool]
@ -43,14 +43,14 @@ class ListInstanceParam(BaseModel):
class SubscribeServiceParam(BaseModel):
service_name: str
group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = []
clusters: List[str] = []
subscribe_callback: Optional[Callable] = None
class GetServiceParam(BaseModel):
service_name: str
group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = []
clusters: List[str] = []
class ListServiceParam(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC
from typing import Optional, Any
from typing import Optional, Any, List
from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service
@ -34,7 +34,7 @@ class InstanceRequest(AbstractNamingRequest):
class BatchInstanceRequest(AbstractNamingRequest):
type: Optional[str]
instances: Optional[list[Instance]]
instances: Optional[List[Instance]]
def get_request_type(self) -> str:
return 'BatchInstanceRequest'

View File

@ -1,4 +1,4 @@
from typing import Optional, Any
from typing import Optional, Any, List
from v2.nacos.naming.model.service import Service
from v2.nacos.transport.model.rpc_response import Response
@ -31,7 +31,7 @@ class BatchInstanceResponse(Response):
class ServiceListResponse(Response):
count: int
serviceNames: list[str]
serviceNames: List[str]
def get_response_type(self) -> str:
return "ServiceListResponse"

View File

@ -1,6 +1,6 @@
import time
import urllib.parse
from typing import Optional
from typing import Optional, List
from pydantic import BaseModel
@ -22,7 +22,7 @@ class Service(BaseModel):
groupName: str
clusters: Optional[str] = ''
cacheMillis: int = 1000
hosts: list[Instance] = []
hosts: List[Instance] = []
lastRefTime: int = 0
checksum: str = ""
allIps: bool = False
@ -114,4 +114,4 @@ class Service(BaseModel):
class ServiceList(BaseModel):
count: int
services: list[str]
services: List[str]

View File

@ -1,4 +1,6 @@
import asyncio
from typing import List
from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -137,7 +139,7 @@ class NacosNamingService(NacosClient):
return await self.grpc_client_proxy.list_services(request)
async def list_instances(self, request: ListInstanceParam) -> list[Instance]:
async def list_instances(self, request: ListInstanceParam) -> List[Instance]:
if not request.service_name:
raise NacosException(INVALID_PARAM, "service_name can not be empty")

View File

@ -4,7 +4,7 @@ import hashlib
import hmac
import logging
import uuid
from typing import Optional
from typing import Optional, List
from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants
@ -107,7 +107,7 @@ class NamingGRPCClientProxy:
response = await self.request_naming_server(request, InstanceResponse)
return response.is_success()
async def batch_register_instance(self, service_name: str, group_name: str, instances: list[Instance]) -> bool:
async def batch_register_instance(self, service_name: str, group_name: str, instances: List[Instance]) -> bool:
self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % (
service_name, group_name, self.namespace_id, str(instances)))

View File

@ -1,4 +1,5 @@
import asyncio
from typing import List
from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service
@ -48,7 +49,7 @@ class NamingGrpcConnectionEventListener(ConnectionEventListener):
async with self.lock:
self.registered_instance_cached[key] = instance
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: list[Instance]) -> None:
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: List[Instance]) -> None:
key = get_group_name(service_name, group_name)
async with self.lock:
self.registered_instance_cached[key] = instances

View File

@ -94,4 +94,6 @@ class NacosServerConnector:
async def inject_security_info(self, headers):
if self.client_config.username and self.client_config.password:
access_token = await self.auth_client.get_access_token(False)
headers[Constants.ACCESS_TOKEN] = access_token
if access_token is not None and access_token != "":
headers[Constants.ACCESS_TOKEN] = access_token
return

View File

@ -70,7 +70,7 @@ class RpcClient(ABC):
def __init__(self, logger, name: str, nacos_server: NacosServerConnector):
self.logger = logger
self.name = name
self.labels: dict[str, str] = {}
self.labels: Dict[str, str] = {}
self.current_connection = None
self.rpc_client_status = RpcClientStatus.INITIALIZED
self.event_chan = asyncio.Queue()
@ -137,11 +137,10 @@ class RpcClient(ABC):
self.logger.error("%s server healthy check fail, currentConnection=%s"
, self.name, self.current_connection.get_connection_id())
async with self.lock:
if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
continue
self.rpc_client_status = RpcClientStatus.UNHEALTHY
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
continue
self.rpc_client_status = RpcClientStatus.UNHEALTHY
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
except asyncio.CancelledError:
break
except asyncio.CancelledError: