Compare commits

...

17 Commits

Author SHA1 Message Date
CZJCC df8f9d2dcd
Merge pull request #237 from Sunrisea/master-fix-login-bug
Fix bug of Inject security info
2025-07-17 19:08:17 +08:00
濯光 4600b89e45 fix bug of get_access_token 2025-07-17 19:00:22 +08:00
濯光 ce22e6a437 fix bug of get_access_token 2025-07-17 18:51:14 +08:00
CZJCC b3c26c7ba9
Merge pull request #235 from Sunrisea/master-fix-deadlock-bug
[ISSUE #233] Fix bug of deadlock and v1 config publish
2025-07-02 15:25:43 +08:00
濯光 cb17109542 fix bug of deadlock and v1 config publish 2025-07-02 14:39:16 +08:00
CZJCC 1892ebbe43
Merge pull request #232 from CZJCC/feature/v2-readme
fix typeing.Dict
2025-05-06 16:59:19 +08:00
CZJCC bcdda37351 fix typeing.Dict 2025-05-06 16:57:26 +08:00
CZJCC d3f9b0daec
Merge pull request #231 from CZJCC/feature/v2-readme
fix 安装命令
2025-05-06 16:34:42 +08:00
CZJCC 95236b1643 fix 安装命令 2025-05-06 16:32:33 +08:00
CZJCC 58019d4395
Merge pull request #230 from CZJCC/feature/v2-readme
fix typeing.List
2025-05-06 16:13:19 +08:00
CZJCC 096b1bf077 fix typeing.List 2025-05-06 16:04:36 +08:00
CZJCC a23ed4f7b6
Merge pull request #228 from CZJCC/feature/v2-readme
修复配置监听异步任务异常问题
2025-04-10 19:37:14 +08:00
CZJCC f37de79b9a 修复配置监听异步任务异常问题 2025-04-10 19:35:48 +08:00
CZJCC 13919631b1
Merge pull request #226 from yk125123/fix-issue-#222
Update client.py
2025-03-21 14:19:33 +08:00
kevin 3fa3bd353b Update client.py
fix: issue #222
resolve the issue of 403 forbidden after custom user added watchers.

/nacos/v1/cs/configs/listener?accessToken=your_token 403
2025-03-20 19:38:19 +08:00
CZJCC 7460efd9b7
Merge pull request #221 from libaiyun/custom-2.0.1
chore: Relax psutil dependency to >=5.9.5 for compatibility
2025-03-20 17:19:22 +08:00
libaiyun 81fac4c584 chore: Relax psutil dependency to >=5.9.5 for compatibility 2025-03-20 15:00:31 +08:00
22 changed files with 107 additions and 93 deletions

View File

@ -18,7 +18,7 @@ Supported Nacos version over 2.x
## Installation ## Installation
```shell ```shell
pip install nacos-sdk-python==2.0.3 pip install nacos-sdk-python
``` ```
## Client Configuration ## Client Configuration
@ -284,7 +284,7 @@ instance_list = await client.list_instances(ListInstanceParam(service_name='naco
### Subscribe ### Subscribe
```angular2html ```angular2html
async def cb(instance_list: list[Instance]): async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.subscribe( await client.subscribe(
@ -295,7 +295,7 @@ await client.subscribe(
### Unsubscribe ### Unsubscribe
```angular2html ```angular2html
async def cb(instance_list: list[Instance]): async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.unsubscribe( await client.unsubscribe(

View File

@ -425,7 +425,7 @@ class NacosClient:
params["type"] = config_type params["type"] = config_type
try: try:
resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None, resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params,
timeout or self.default_timeout, "POST") timeout or self.default_timeout, "POST")
c = resp.read() c = resp.read()
logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % ( logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % (
@ -777,11 +777,13 @@ class NacosClient:
# if contains_init_key: # if contains_init_key:
# headers["longPullingNoHangUp"] = "true" # headers["longPullingNoHangUp"] = "true"
params = {"tenant": self.namespace} if self.namespace else None
data = {"Listening-Configs": probe_update_string} data = {"Listening-Configs": probe_update_string}
changed_keys = list() changed_keys = list()
try: try:
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, None, data, resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, params, data,
self.pulling_timeout + 10, "POST") self.pulling_timeout + 10, "POST")
changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())] changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())]
logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys))) logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys)))

View File

@ -4,6 +4,6 @@ alibabacloud_kms20160120>=2.2.3
alibabacloud_tea_openapi>=0.3.12 alibabacloud_tea_openapi>=0.3.12
grpcio>=1.66.1 grpcio>=1.66.1
protobuf>=3.20.3 protobuf>=3.20.3
psutil>=5.9.6 psutil>=5.9.5
pycryptodome>=3.19.1 pycryptodome>=3.19.1
pydantic>=2.10.4 pydantic>=2.10.4

View File

@ -53,7 +53,7 @@ class UploadCommand(Command):
setup( setup(
name="nacos-sdk-python", name="nacos-sdk-python",
version="2.0.3", version="2.0.5",
packages=find_packages( packages=find_packages(
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]), exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]),
url="https://github.com/nacos-group/nacos-sdk-python", url="https://github.com/nacos-group/nacos-sdk-python",

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
import os import os
import unittest import unittest
from typing import List
from v2.nacos import ConfigParam from v2.nacos import ConfigParam
from v2.nacos.common.client_config import GRPCConfig from v2.nacos.common.client_config import GRPCConfig
@ -82,7 +83,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
client = await NacosNamingService.create_naming_service(client_config) client = await NacosNamingService.create_naming_service(client_config)
assert await client.server_health() assert await client.server_health()
async def cb(instance_list: list[Instance]): async def cb(instance_list: List[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.subscribe( await client.subscribe(

View File

@ -9,13 +9,16 @@ from v2.nacos.config.nacos_config_service import NacosConfigService
from v2.nacos.common.auth import CredentialsProvider, Credentials from v2.nacos.common.auth import CredentialsProvider, Credentials
client_config = (ClientConfigBuilder() client_config = (ClientConfigBuilder()
.access_key(os.getenv('NACOS_ACCESS_KEY')) # .access_key(os.getenv('NACOS_ACCESS_KEY'))
.secret_key(os.getenv('NACOS_SECRET_KEY')) # .secret_key(os.getenv('NACOS_SECRET_KEY'))
.username(os.getenv('NACOS_USERNAME'))
.password(os.getenv('NACOS_PASSWORD'))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848')) .server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO') .log_level('INFO')
.grpc_config(GRPCConfig(grpc_timeout=5000)) .grpc_config(GRPCConfig(grpc_timeout=5000))
.build()) .build())
class CustomCredentialsProvider(CredentialsProvider): class CustomCredentialsProvider(CredentialsProvider):
def __init__(self, ak="", sk="", token=""): def __init__(self, ak="", sk="", token=""):
self.credential = Credentials(ak, sk, token) self.credential = Credentials(ak, sk, token)
@ -23,6 +26,7 @@ class CustomCredentialsProvider(CredentialsProvider):
def get_credentials(self): def get_credentials(self):
return self.credential return self.credential
class TestClientV2(unittest.IsolatedAsyncioTestCase): class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_publish_config(self): async def test_publish_config(self):
client = await NacosConfigService.create_config_service(client_config) client = await NacosConfigService.create_config_service(client_config)
@ -46,7 +50,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
assert res assert res
print("success to publish") print("success to publish")
await asyncio.sleep(0.1) await asyncio.sleep(0.2)
content = await client.get_config(ConfigParam( content = await client.get_config(ConfigParam(
data_id=data_id, data_id=data_id,
group=group, group=group,
@ -268,7 +272,8 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_gray_config_with_provider(self): async def test_gray_config_with_provider(self):
client_cfg = (ClientConfigBuilder() client_cfg = (ClientConfigBuilder()
.credentials_provider(CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY'))) .credentials_provider(
CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848')) .server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO') .log_level('INFO')
.app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"}) .app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"})

View File

@ -1,6 +1,6 @@
import asyncio import asyncio
from logging import Logger from logging import Logger
from typing import Optional, Callable from typing import Optional, Callable, List, Dict
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.config.cache.config_info_cache import ConfigInfoCache from v2.nacos.config.cache.config_info_cache import ConfigInfoCache
@ -15,7 +15,7 @@ class ConfigSubscribeManager:
def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str, def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str,
config_filter_chain_manager: ConfigFilterChainManager, config_filter_chain_manager: ConfigFilterChainManager,
execute_config_listen_channel: asyncio.Queue): execute_config_listen_channel: asyncio.Queue):
self.subscribe_cache_map: dict[str, SubscribeCacheData] = {} self.subscribe_cache_map: Dict[str, SubscribeCacheData] = {}
self.logger = logger self.logger = logger
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.namespace_id = namespace_id self.namespace_id = namespace_id
@ -87,7 +87,7 @@ class ConfigSubscribeManager:
await subscribe_cache.execute_listener() await subscribe_cache.execute_listener()
async def execute_listener_and_build_tasks(self, is_sync_all: bool): async def execute_listener_and_build_tasks(self, is_sync_all: bool):
listen_fetch_task_map: dict[int, list[SubscribeCacheData]] = {} listen_fetch_task_map: Dict[int, List[SubscribeCacheData]] = {}
for cache_data in self.subscribe_cache_map.values(): for cache_data in self.subscribe_cache_map.values():
if cache_data.is_sync_with_server: if cache_data.is_sync_with_server:
await cache_data.execute_listener() await cache_data.execute_listener()

View File

@ -1,3 +1,5 @@
from typing import Dict
from v2.nacos.common.client_config import KMSConfig from v2.nacos.common.client_config import KMSConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -11,7 +13,7 @@ from v2.nacos.config.model.config_param import HandlerParam
class KMSHandler: class KMSHandler:
def __init__(self, kms_config: KMSConfig): def __init__(self, kms_config: KMSConfig):
self.kms_plugins: dict[str, EncryptionPlugin] = {} self.kms_plugins: Dict[str, EncryptionPlugin] = {}
self.kms_client = KmsClient.create_kms_client(kms_config) self.kms_client = KmsClient.create_kms_client(kms_config)
kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client) kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client)
self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin

View File

@ -22,7 +22,7 @@ class ConfigPage(BaseModel):
totalCount: int = 0 totalCount: int = 0
pageNumber: int = 0 pageNumber: int = 0
pagesAvailable: int = 0 pagesAvailable: int = 0
pageItems: list[ConfigItem] = [] pageItems: List[ConfigItem] = []
class ConfigListenContext(BaseModel): class ConfigListenContext(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional from typing import Optional, List, Dict
from v2.nacos.config.model.config import ConfigListenContext from v2.nacos.config.model.config import ConfigListenContext
from v2.nacos.transport.model.rpc_request import Request from v2.nacos.transport.model.rpc_request import Request
@ -24,7 +24,7 @@ class AbstractConfigRequest(Request, ABC):
class ConfigBatchListenRequest(AbstractConfigRequest): class ConfigBatchListenRequest(AbstractConfigRequest):
listen: bool = True listen: bool = True
configListenContexts: list[ConfigListenContext] = [] configListenContexts: List[ConfigListenContext] = []
def get_request_type(self): def get_request_type(self):
return "ConfigBatchListenRequest" return "ConfigBatchListenRequest"
@ -46,7 +46,7 @@ class ConfigQueryRequest(AbstractConfigRequest):
class ConfigPublishRequest(AbstractConfigRequest): class ConfigPublishRequest(AbstractConfigRequest):
content: Optional[str] content: Optional[str]
casMd5: Optional[str] casMd5: Optional[str]
additionMap: dict[str, str] = {} additionMap: Dict[str, str] = {}
def get_request_type(self): def get_request_type(self):
return "ConfigPublishRequest" return "ConfigPublishRequest"

View File

@ -1,4 +1,4 @@
from typing import Optional from typing import Optional, List
from pydantic import BaseModel from pydantic import BaseModel
@ -12,7 +12,7 @@ class ConfigContext(BaseModel):
class ConfigChangeBatchListenResponse(Response): class ConfigChangeBatchListenResponse(Response):
changedConfigs: list[ConfigContext] = [] changedConfigs: List[ConfigContext] = []
def get_response_type(self) -> str: def get_response_type(self) -> str:
return "ConfigChangeBatchListenResponse" return "ConfigChangeBatchListenResponse"

View File

@ -197,53 +197,53 @@ class ConfigGRPCClientProxy:
self.logger.debug("Timeout occurred") self.logger.debug("Timeout occurred")
except asyncio.CancelledError: except asyncio.CancelledError:
return return
finally:
has_changed_keys = False has_changed_keys = False
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000 is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all) listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
if len(listen_task_map) == 0: if len(listen_task_map) == 0:
continue
for task_id, cache_data_list in listen_task_map.items():
if len(cache_data_list) == 0:
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if len(response.changedConfigs) > 0:
has_changed_keys = True
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue continue
for task_id, cache_data_list in listen_task_map.items(): if is_sync_all:
if len(cache_data_list) == 0: self.last_all_sync_time = get_current_time_millis()
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if len(response.changedConfigs) > 0: if has_changed_keys:
has_changed_keys = True await self.execute_config_listen_channel.put(None)
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue
if is_sync_all:
self.last_all_sync_time = get_current_time_millis()
if has_changed_keys:
await self.execute_config_listen_channel.put(None)
async def server_health(self): async def server_health(self):
return (await self.fetch_rpc_client()).is_running() return (await self.fetch_rpc_client()).is_running()

View File

@ -2,7 +2,7 @@ import asyncio
import json import json
import logging import logging
import os import os
from typing import Callable, Optional from typing import Callable, Optional, List, Dict
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -18,7 +18,7 @@ class ServiceInfoCache:
def __init__(self, client_config: ClientConfig): def __init__(self, client_config: ClientConfig):
self.logger = logging.getLogger(Constants.NAMING_MODULE) self.logger = logging.getLogger(Constants.NAMING_MODULE)
self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id) self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id)
self.service_info_map: dict[str, Service] = {} self.service_info_map: Dict[str, Service] = {}
self.update_time_map = {} self.update_time_map = {}
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.sub_callback_manager = SubscribeManager() self.sub_callback_manager = SubscribeManager()
@ -124,7 +124,7 @@ class ServiceInfoCache:
return old_instance != new_instance return old_instance != new_instance
@staticmethod @staticmethod
def sort_instances(instances: list[Instance]) -> list[Instance]: def sort_instances(instances: List[Instance]) -> List[Instance]:
def instance_key(instance: Instance) -> (int, int): def instance_key(instance: Instance) -> (int, int):
ip_num = int(''.join(instance.ip.split('.'))) ip_num = int(''.join(instance.ip.split('.')))
return ip_num, instance.port return ip_num, instance.port

View File

@ -1,4 +1,4 @@
from typing import Optional, Callable from typing import Optional, Callable, List, Dict
from pydantic import BaseModel from pydantic import BaseModel
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -10,7 +10,7 @@ class RegisterInstanceParam(BaseModel):
weight: float = 1.0 weight: float = 1.0
enabled: bool = True enabled: bool = True
healthy: bool = True healthy: bool = True
metadata: dict[str, str] = {} metadata: Dict[str, str] = {}
cluster_name: str = '' cluster_name: str = ''
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
@ -20,7 +20,7 @@ class RegisterInstanceParam(BaseModel):
class BatchRegisterInstanceParam(BaseModel): class BatchRegisterInstanceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
instances: list[RegisterInstanceParam] = [] instances: List[RegisterInstanceParam] = []
class DeregisterInstanceParam(BaseModel): class DeregisterInstanceParam(BaseModel):
@ -35,7 +35,7 @@ class DeregisterInstanceParam(BaseModel):
class ListInstanceParam(BaseModel): class ListInstanceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = [] clusters: List[str] = []
subscribe: bool = True subscribe: bool = True
healthy_only: Optional[bool] healthy_only: Optional[bool]
@ -43,14 +43,14 @@ class ListInstanceParam(BaseModel):
class SubscribeServiceParam(BaseModel): class SubscribeServiceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = [] clusters: List[str] = []
subscribe_callback: Optional[Callable] = None subscribe_callback: Optional[Callable] = None
class GetServiceParam(BaseModel): class GetServiceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: list[str] = [] clusters: List[str] = []
class ListServiceParam(BaseModel): class ListServiceParam(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC from abc import ABC
from typing import Optional, Any from typing import Optional, Any, List
from v2.nacos.naming.model.instance import Instance from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
@ -34,7 +34,7 @@ class InstanceRequest(AbstractNamingRequest):
class BatchInstanceRequest(AbstractNamingRequest): class BatchInstanceRequest(AbstractNamingRequest):
type: Optional[str] type: Optional[str]
instances: Optional[list[Instance]] instances: Optional[List[Instance]]
def get_request_type(self) -> str: def get_request_type(self) -> str:
return 'BatchInstanceRequest' return 'BatchInstanceRequest'

View File

@ -1,4 +1,4 @@
from typing import Optional, Any from typing import Optional, Any, List
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
from v2.nacos.transport.model.rpc_response import Response from v2.nacos.transport.model.rpc_response import Response
@ -31,7 +31,7 @@ class BatchInstanceResponse(Response):
class ServiceListResponse(Response): class ServiceListResponse(Response):
count: int count: int
serviceNames: list[str] serviceNames: List[str]
def get_response_type(self) -> str: def get_response_type(self) -> str:
return "ServiceListResponse" return "ServiceListResponse"

View File

@ -1,6 +1,6 @@
import time import time
import urllib.parse import urllib.parse
from typing import Optional from typing import Optional, List
from pydantic import BaseModel from pydantic import BaseModel
@ -22,7 +22,7 @@ class Service(BaseModel):
groupName: str groupName: str
clusters: Optional[str] = '' clusters: Optional[str] = ''
cacheMillis: int = 1000 cacheMillis: int = 1000
hosts: list[Instance] = [] hosts: List[Instance] = []
lastRefTime: int = 0 lastRefTime: int = 0
checksum: str = "" checksum: str = ""
allIps: bool = False allIps: bool = False
@ -114,4 +114,4 @@ class Service(BaseModel):
class ServiceList(BaseModel): class ServiceList(BaseModel):
count: int count: int
services: list[str] services: List[str]

View File

@ -1,4 +1,6 @@
import asyncio import asyncio
from typing import List
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -137,7 +139,7 @@ class NacosNamingService(NacosClient):
return await self.grpc_client_proxy.list_services(request) return await self.grpc_client_proxy.list_services(request)
async def list_instances(self, request: ListInstanceParam) -> list[Instance]: async def list_instances(self, request: ListInstanceParam) -> List[Instance]:
if not request.service_name: if not request.service_name:
raise NacosException(INVALID_PARAM, "service_name can not be empty") raise NacosException(INVALID_PARAM, "service_name can not be empty")

View File

@ -4,7 +4,7 @@ import hashlib
import hmac import hmac
import logging import logging
import uuid import uuid
from typing import Optional from typing import Optional, List
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -107,7 +107,7 @@ class NamingGRPCClientProxy:
response = await self.request_naming_server(request, InstanceResponse) response = await self.request_naming_server(request, InstanceResponse)
return response.is_success() return response.is_success()
async def batch_register_instance(self, service_name: str, group_name: str, instances: list[Instance]) -> bool: async def batch_register_instance(self, service_name: str, group_name: str, instances: List[Instance]) -> bool:
self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % ( self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % (
service_name, group_name, self.namespace_id, str(instances))) service_name, group_name, self.namespace_id, str(instances)))

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
from typing import List
from v2.nacos.naming.model.instance import Instance from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
@ -48,7 +49,7 @@ class NamingGrpcConnectionEventListener(ConnectionEventListener):
async with self.lock: async with self.lock:
self.registered_instance_cached[key] = instance self.registered_instance_cached[key] = instance
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: list[Instance]) -> None: async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: List[Instance]) -> None:
key = get_group_name(service_name, group_name) key = get_group_name(service_name, group_name)
async with self.lock: async with self.lock:
self.registered_instance_cached[key] = instances self.registered_instance_cached[key] = instances

View File

@ -94,4 +94,6 @@ class NacosServerConnector:
async def inject_security_info(self, headers): async def inject_security_info(self, headers):
if self.client_config.username and self.client_config.password: if self.client_config.username and self.client_config.password:
access_token = await self.auth_client.get_access_token(False) access_token = await self.auth_client.get_access_token(False)
headers[Constants.ACCESS_TOKEN] = access_token if access_token is not None and access_token != "":
headers[Constants.ACCESS_TOKEN] = access_token
return

View File

@ -70,7 +70,7 @@ class RpcClient(ABC):
def __init__(self, logger, name: str, nacos_server: NacosServerConnector): def __init__(self, logger, name: str, nacos_server: NacosServerConnector):
self.logger = logger self.logger = logger
self.name = name self.name = name
self.labels: dict[str, str] = {} self.labels: Dict[str, str] = {}
self.current_connection = None self.current_connection = None
self.rpc_client_status = RpcClientStatus.INITIALIZED self.rpc_client_status = RpcClientStatus.INITIALIZED
self.event_chan = asyncio.Queue() self.event_chan = asyncio.Queue()
@ -137,11 +137,10 @@ class RpcClient(ABC):
self.logger.error("%s server healthy check fail, currentConnection=%s" self.logger.error("%s server healthy check fail, currentConnection=%s"
, self.name, self.current_connection.get_connection_id()) , self.name, self.current_connection.get_connection_id())
async with self.lock: if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
if self.rpc_client_status == RpcClientStatus.SHUTDOWN: continue
continue self.rpc_client_status = RpcClientStatus.UNHEALTHY
self.rpc_client_status = RpcClientStatus.UNHEALTHY await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except asyncio.CancelledError: except asyncio.CancelledError: