Compare commits

..

No commits in common. "master" and "2.0.3" have entirely different histories.

22 changed files with 93 additions and 107 deletions

View File

@ -18,7 +18,7 @@ Supported Nacos version over 2.x
## Installation ## Installation
```shell ```shell
pip install nacos-sdk-python pip install nacos-sdk-python==2.0.3
``` ```
## Client Configuration ## Client Configuration
@ -284,7 +284,7 @@ instance_list = await client.list_instances(ListInstanceParam(service_name='naco
### Subscribe ### Subscribe
```angular2html ```angular2html
async def cb(instance_list: List[Instance]): async def cb(instance_list: list[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.subscribe( await client.subscribe(
@ -295,7 +295,7 @@ await client.subscribe(
### Unsubscribe ### Unsubscribe
```angular2html ```angular2html
async def cb(instance_list: List[Instance]): async def cb(instance_list: list[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.unsubscribe( await client.unsubscribe(

View File

@ -425,7 +425,7 @@ class NacosClient:
params["type"] = config_type params["type"] = config_type
try: try:
resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params, resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None,
timeout or self.default_timeout, "POST") timeout or self.default_timeout, "POST")
c = resp.read() c = resp.read()
logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % ( logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % (
@ -777,13 +777,11 @@ class NacosClient:
# if contains_init_key: # if contains_init_key:
# headers["longPullingNoHangUp"] = "true" # headers["longPullingNoHangUp"] = "true"
params = {"tenant": self.namespace} if self.namespace else None
data = {"Listening-Configs": probe_update_string} data = {"Listening-Configs": probe_update_string}
changed_keys = list() changed_keys = list()
try: try:
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, params, data, resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, None, data,
self.pulling_timeout + 10, "POST") self.pulling_timeout + 10, "POST")
changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())] changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())]
logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys))) logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys)))

View File

@ -4,6 +4,6 @@ alibabacloud_kms20160120>=2.2.3
alibabacloud_tea_openapi>=0.3.12 alibabacloud_tea_openapi>=0.3.12
grpcio>=1.66.1 grpcio>=1.66.1
protobuf>=3.20.3 protobuf>=3.20.3
psutil>=5.9.5 psutil>=5.9.6
pycryptodome>=3.19.1 pycryptodome>=3.19.1
pydantic>=2.10.4 pydantic>=2.10.4

View File

@ -53,7 +53,7 @@ class UploadCommand(Command):
setup( setup(
name="nacos-sdk-python", name="nacos-sdk-python",
version="2.0.5", version="2.0.3",
packages=find_packages( packages=find_packages(
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]), exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]),
url="https://github.com/nacos-group/nacos-sdk-python", url="https://github.com/nacos-group/nacos-sdk-python",

View File

@ -1,7 +1,6 @@
import asyncio import asyncio
import os import os
import unittest import unittest
from typing import List
from v2.nacos import ConfigParam from v2.nacos import ConfigParam
from v2.nacos.common.client_config import GRPCConfig from v2.nacos.common.client_config import GRPCConfig
@ -83,7 +82,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
client = await NacosNamingService.create_naming_service(client_config) client = await NacosNamingService.create_naming_service(client_config)
assert await client.server_health() assert await client.server_health()
async def cb(instance_list: List[Instance]): async def cb(instance_list: list[Instance]):
print('received subscribe callback', str(instance_list)) print('received subscribe callback', str(instance_list))
await client.subscribe( await client.subscribe(

View File

@ -9,16 +9,13 @@ from v2.nacos.config.nacos_config_service import NacosConfigService
from v2.nacos.common.auth import CredentialsProvider, Credentials from v2.nacos.common.auth import CredentialsProvider, Credentials
client_config = (ClientConfigBuilder() client_config = (ClientConfigBuilder()
# .access_key(os.getenv('NACOS_ACCESS_KEY')) .access_key(os.getenv('NACOS_ACCESS_KEY'))
# .secret_key(os.getenv('NACOS_SECRET_KEY')) .secret_key(os.getenv('NACOS_SECRET_KEY'))
.username(os.getenv('NACOS_USERNAME'))
.password(os.getenv('NACOS_PASSWORD'))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848')) .server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO') .log_level('INFO')
.grpc_config(GRPCConfig(grpc_timeout=5000)) .grpc_config(GRPCConfig(grpc_timeout=5000))
.build()) .build())
class CustomCredentialsProvider(CredentialsProvider): class CustomCredentialsProvider(CredentialsProvider):
def __init__(self, ak="", sk="", token=""): def __init__(self, ak="", sk="", token=""):
self.credential = Credentials(ak, sk, token) self.credential = Credentials(ak, sk, token)
@ -26,7 +23,6 @@ class CustomCredentialsProvider(CredentialsProvider):
def get_credentials(self): def get_credentials(self):
return self.credential return self.credential
class TestClientV2(unittest.IsolatedAsyncioTestCase): class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_publish_config(self): async def test_publish_config(self):
client = await NacosConfigService.create_config_service(client_config) client = await NacosConfigService.create_config_service(client_config)
@ -50,7 +46,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
assert res assert res
print("success to publish") print("success to publish")
await asyncio.sleep(0.2) await asyncio.sleep(0.1)
content = await client.get_config(ConfigParam( content = await client.get_config(ConfigParam(
data_id=data_id, data_id=data_id,
group=group, group=group,
@ -272,8 +268,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
async def test_gray_config_with_provider(self): async def test_gray_config_with_provider(self):
client_cfg = (ClientConfigBuilder() client_cfg = (ClientConfigBuilder()
.credentials_provider( .credentials_provider(CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848')) .server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
.log_level('INFO') .log_level('INFO')
.app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"}) .app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"})

View File

@ -1,6 +1,6 @@
import asyncio import asyncio
from logging import Logger from logging import Logger
from typing import Optional, Callable, List, Dict from typing import Optional, Callable
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.config.cache.config_info_cache import ConfigInfoCache from v2.nacos.config.cache.config_info_cache import ConfigInfoCache
@ -15,7 +15,7 @@ class ConfigSubscribeManager:
def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str, def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str,
config_filter_chain_manager: ConfigFilterChainManager, config_filter_chain_manager: ConfigFilterChainManager,
execute_config_listen_channel: asyncio.Queue): execute_config_listen_channel: asyncio.Queue):
self.subscribe_cache_map: Dict[str, SubscribeCacheData] = {} self.subscribe_cache_map: dict[str, SubscribeCacheData] = {}
self.logger = logger self.logger = logger
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.namespace_id = namespace_id self.namespace_id = namespace_id
@ -87,7 +87,7 @@ class ConfigSubscribeManager:
await subscribe_cache.execute_listener() await subscribe_cache.execute_listener()
async def execute_listener_and_build_tasks(self, is_sync_all: bool): async def execute_listener_and_build_tasks(self, is_sync_all: bool):
listen_fetch_task_map: Dict[int, List[SubscribeCacheData]] = {} listen_fetch_task_map: dict[int, list[SubscribeCacheData]] = {}
for cache_data in self.subscribe_cache_map.values(): for cache_data in self.subscribe_cache_map.values():
if cache_data.is_sync_with_server: if cache_data.is_sync_with_server:
await cache_data.execute_listener() await cache_data.execute_listener()

View File

@ -1,5 +1,3 @@
from typing import Dict
from v2.nacos.common.client_config import KMSConfig from v2.nacos.common.client_config import KMSConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -13,7 +11,7 @@ from v2.nacos.config.model.config_param import HandlerParam
class KMSHandler: class KMSHandler:
def __init__(self, kms_config: KMSConfig): def __init__(self, kms_config: KMSConfig):
self.kms_plugins: Dict[str, EncryptionPlugin] = {} self.kms_plugins: dict[str, EncryptionPlugin] = {}
self.kms_client = KmsClient.create_kms_client(kms_config) self.kms_client = KmsClient.create_kms_client(kms_config)
kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client) kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client)
self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin

View File

@ -22,7 +22,7 @@ class ConfigPage(BaseModel):
totalCount: int = 0 totalCount: int = 0
pageNumber: int = 0 pageNumber: int = 0
pagesAvailable: int = 0 pagesAvailable: int = 0
pageItems: List[ConfigItem] = [] pageItems: list[ConfigItem] = []
class ConfigListenContext(BaseModel): class ConfigListenContext(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, List, Dict from typing import Optional
from v2.nacos.config.model.config import ConfigListenContext from v2.nacos.config.model.config import ConfigListenContext
from v2.nacos.transport.model.rpc_request import Request from v2.nacos.transport.model.rpc_request import Request
@ -24,7 +24,7 @@ class AbstractConfigRequest(Request, ABC):
class ConfigBatchListenRequest(AbstractConfigRequest): class ConfigBatchListenRequest(AbstractConfigRequest):
listen: bool = True listen: bool = True
configListenContexts: List[ConfigListenContext] = [] configListenContexts: list[ConfigListenContext] = []
def get_request_type(self): def get_request_type(self):
return "ConfigBatchListenRequest" return "ConfigBatchListenRequest"
@ -46,7 +46,7 @@ class ConfigQueryRequest(AbstractConfigRequest):
class ConfigPublishRequest(AbstractConfigRequest): class ConfigPublishRequest(AbstractConfigRequest):
content: Optional[str] content: Optional[str]
casMd5: Optional[str] casMd5: Optional[str]
additionMap: Dict[str, str] = {} additionMap: dict[str, str] = {}
def get_request_type(self): def get_request_type(self):
return "ConfigPublishRequest" return "ConfigPublishRequest"

View File

@ -1,4 +1,4 @@
from typing import Optional, List from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -12,7 +12,7 @@ class ConfigContext(BaseModel):
class ConfigChangeBatchListenResponse(Response): class ConfigChangeBatchListenResponse(Response):
changedConfigs: List[ConfigContext] = [] changedConfigs: list[ConfigContext] = []
def get_response_type(self) -> str: def get_response_type(self) -> str:
return "ConfigChangeBatchListenResponse" return "ConfigChangeBatchListenResponse"

View File

@ -197,53 +197,53 @@ class ConfigGRPCClientProxy:
self.logger.debug("Timeout occurred") self.logger.debug("Timeout occurred")
except asyncio.CancelledError: except asyncio.CancelledError:
return return
finally:
has_changed_keys = False has_changed_keys = False
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000 is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all) listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
if len(listen_task_map) == 0: if len(listen_task_map) == 0:
continue
for task_id, cache_data_list in listen_task_map.items():
if len(cache_data_list) == 0:
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if len(response.changedConfigs) > 0:
has_changed_keys = True
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue continue
if is_sync_all: for task_id, cache_data_list in listen_task_map.items():
self.last_all_sync_time = get_current_time_millis() if len(cache_data_list) == 0:
continue
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
for cache_data in cache_data_list:
config_listen_context = ConfigListenContext(group=cache_data.group,
md5=cache_data.md5,
dataId=cache_data.data_id,
tenant=cache_data.tenant)
request.configListenContexts.append(config_listen_context)
try:
rpc_client = await self.fetch_rpc_client(task_id)
response: ConfigChangeBatchListenResponse = await self.request_config_server(
rpc_client, request, ConfigChangeBatchListenResponse)
if has_changed_keys: if len(response.changedConfigs) > 0:
await self.execute_config_listen_channel.put(None) has_changed_keys = True
for config_ctx in response.changedConfigs:
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
try:
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
config_ctx.group)
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
config_ctx.group,
self.namespace_id,
content,
encrypted_data_key)
except Exception as e:
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
continue
except Exception as e:
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
continue
if is_sync_all:
self.last_all_sync_time = get_current_time_millis()
if has_changed_keys:
await self.execute_config_listen_channel.put(None)
async def server_health(self): async def server_health(self):
return (await self.fetch_rpc_client()).is_running() return (await self.fetch_rpc_client()).is_running()

View File

@ -2,7 +2,7 @@ import asyncio
import json import json
import logging import logging
import os import os
from typing import Callable, Optional, List, Dict from typing import Callable, Optional
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -18,7 +18,7 @@ class ServiceInfoCache:
def __init__(self, client_config: ClientConfig): def __init__(self, client_config: ClientConfig):
self.logger = logging.getLogger(Constants.NAMING_MODULE) self.logger = logging.getLogger(Constants.NAMING_MODULE)
self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id) self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id)
self.service_info_map: Dict[str, Service] = {} self.service_info_map: dict[str, Service] = {}
self.update_time_map = {} self.update_time_map = {}
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.sub_callback_manager = SubscribeManager() self.sub_callback_manager = SubscribeManager()
@ -124,7 +124,7 @@ class ServiceInfoCache:
return old_instance != new_instance return old_instance != new_instance
@staticmethod @staticmethod
def sort_instances(instances: List[Instance]) -> List[Instance]: def sort_instances(instances: list[Instance]) -> list[Instance]:
def instance_key(instance: Instance) -> (int, int): def instance_key(instance: Instance) -> (int, int):
ip_num = int(''.join(instance.ip.split('.'))) ip_num = int(''.join(instance.ip.split('.')))
return ip_num, instance.port return ip_num, instance.port

View File

@ -1,4 +1,4 @@
from typing import Optional, Callable, List, Dict from typing import Optional, Callable
from pydantic import BaseModel from pydantic import BaseModel
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -10,7 +10,7 @@ class RegisterInstanceParam(BaseModel):
weight: float = 1.0 weight: float = 1.0
enabled: bool = True enabled: bool = True
healthy: bool = True healthy: bool = True
metadata: Dict[str, str] = {} metadata: dict[str, str] = {}
cluster_name: str = '' cluster_name: str = ''
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
@ -20,7 +20,7 @@ class RegisterInstanceParam(BaseModel):
class BatchRegisterInstanceParam(BaseModel): class BatchRegisterInstanceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
instances: List[RegisterInstanceParam] = [] instances: list[RegisterInstanceParam] = []
class DeregisterInstanceParam(BaseModel): class DeregisterInstanceParam(BaseModel):
@ -35,7 +35,7 @@ class DeregisterInstanceParam(BaseModel):
class ListInstanceParam(BaseModel): class ListInstanceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: List[str] = [] clusters: list[str] = []
subscribe: bool = True subscribe: bool = True
healthy_only: Optional[bool] healthy_only: Optional[bool]
@ -43,14 +43,14 @@ class ListInstanceParam(BaseModel):
class SubscribeServiceParam(BaseModel): class SubscribeServiceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: List[str] = [] clusters: list[str] = []
subscribe_callback: Optional[Callable] = None subscribe_callback: Optional[Callable] = None
class GetServiceParam(BaseModel): class GetServiceParam(BaseModel):
service_name: str service_name: str
group_name: str = Constants.DEFAULT_GROUP group_name: str = Constants.DEFAULT_GROUP
clusters: List[str] = [] clusters: list[str] = []
class ListServiceParam(BaseModel): class ListServiceParam(BaseModel):

View File

@ -1,5 +1,5 @@
from abc import ABC from abc import ABC
from typing import Optional, Any, List from typing import Optional, Any
from v2.nacos.naming.model.instance import Instance from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
@ -34,7 +34,7 @@ class InstanceRequest(AbstractNamingRequest):
class BatchInstanceRequest(AbstractNamingRequest): class BatchInstanceRequest(AbstractNamingRequest):
type: Optional[str] type: Optional[str]
instances: Optional[List[Instance]] instances: Optional[list[Instance]]
def get_request_type(self) -> str: def get_request_type(self) -> str:
return 'BatchInstanceRequest' return 'BatchInstanceRequest'

View File

@ -1,4 +1,4 @@
from typing import Optional, Any, List from typing import Optional, Any
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
from v2.nacos.transport.model.rpc_response import Response from v2.nacos.transport.model.rpc_response import Response
@ -31,7 +31,7 @@ class BatchInstanceResponse(Response):
class ServiceListResponse(Response): class ServiceListResponse(Response):
count: int count: int
serviceNames: List[str] serviceNames: list[str]
def get_response_type(self) -> str: def get_response_type(self) -> str:
return "ServiceListResponse" return "ServiceListResponse"

View File

@ -1,6 +1,6 @@
import time import time
import urllib.parse import urllib.parse
from typing import Optional, List from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -22,7 +22,7 @@ class Service(BaseModel):
groupName: str groupName: str
clusters: Optional[str] = '' clusters: Optional[str] = ''
cacheMillis: int = 1000 cacheMillis: int = 1000
hosts: List[Instance] = [] hosts: list[Instance] = []
lastRefTime: int = 0 lastRefTime: int = 0
checksum: str = "" checksum: str = ""
allIps: bool = False allIps: bool = False
@ -114,4 +114,4 @@ class Service(BaseModel):
class ServiceList(BaseModel): class ServiceList(BaseModel):
count: int count: int
services: List[str] services: list[str]

View File

@ -1,6 +1,4 @@
import asyncio import asyncio
from typing import List
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
@ -139,7 +137,7 @@ class NacosNamingService(NacosClient):
return await self.grpc_client_proxy.list_services(request) return await self.grpc_client_proxy.list_services(request)
async def list_instances(self, request: ListInstanceParam) -> List[Instance]: async def list_instances(self, request: ListInstanceParam) -> list[Instance]:
if not request.service_name: if not request.service_name:
raise NacosException(INVALID_PARAM, "service_name can not be empty") raise NacosException(INVALID_PARAM, "service_name can not be empty")

View File

@ -4,7 +4,7 @@ import hashlib
import hmac import hmac
import logging import logging
import uuid import uuid
from typing import Optional, List from typing import Optional
from v2.nacos.common.client_config import ClientConfig from v2.nacos.common.client_config import ClientConfig
from v2.nacos.common.constants import Constants from v2.nacos.common.constants import Constants
@ -107,7 +107,7 @@ class NamingGRPCClientProxy:
response = await self.request_naming_server(request, InstanceResponse) response = await self.request_naming_server(request, InstanceResponse)
return response.is_success() return response.is_success()
async def batch_register_instance(self, service_name: str, group_name: str, instances: List[Instance]) -> bool: async def batch_register_instance(self, service_name: str, group_name: str, instances: list[Instance]) -> bool:
self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % ( self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % (
service_name, group_name, self.namespace_id, str(instances))) service_name, group_name, self.namespace_id, str(instances)))

View File

@ -1,5 +1,4 @@
import asyncio import asyncio
from typing import List
from v2.nacos.naming.model.instance import Instance from v2.nacos.naming.model.instance import Instance
from v2.nacos.naming.model.service import Service from v2.nacos.naming.model.service import Service
@ -49,7 +48,7 @@ class NamingGrpcConnectionEventListener(ConnectionEventListener):
async with self.lock: async with self.lock:
self.registered_instance_cached[key] = instance self.registered_instance_cached[key] = instance
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: List[Instance]) -> None: async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: list[Instance]) -> None:
key = get_group_name(service_name, group_name) key = get_group_name(service_name, group_name)
async with self.lock: async with self.lock:
self.registered_instance_cached[key] = instances self.registered_instance_cached[key] = instances

View File

@ -94,6 +94,4 @@ class NacosServerConnector:
async def inject_security_info(self, headers): async def inject_security_info(self, headers):
if self.client_config.username and self.client_config.password: if self.client_config.username and self.client_config.password:
access_token = await self.auth_client.get_access_token(False) access_token = await self.auth_client.get_access_token(False)
if access_token is not None and access_token != "": headers[Constants.ACCESS_TOKEN] = access_token
headers[Constants.ACCESS_TOKEN] = access_token
return

View File

@ -70,7 +70,7 @@ class RpcClient(ABC):
def __init__(self, logger, name: str, nacos_server: NacosServerConnector): def __init__(self, logger, name: str, nacos_server: NacosServerConnector):
self.logger = logger self.logger = logger
self.name = name self.name = name
self.labels: Dict[str, str] = {} self.labels: dict[str, str] = {}
self.current_connection = None self.current_connection = None
self.rpc_client_status = RpcClientStatus.INITIALIZED self.rpc_client_status = RpcClientStatus.INITIALIZED
self.event_chan = asyncio.Queue() self.event_chan = asyncio.Queue()
@ -137,10 +137,11 @@ class RpcClient(ABC):
self.logger.error("%s server healthy check fail, currentConnection=%s" self.logger.error("%s server healthy check fail, currentConnection=%s"
, self.name, self.current_connection.get_connection_id()) , self.name, self.current_connection.get_connection_id())
if self.rpc_client_status == RpcClientStatus.SHUTDOWN: async with self.lock:
continue if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
self.rpc_client_status = RpcClientStatus.UNHEALTHY continue
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False)) self.rpc_client_status = RpcClientStatus.UNHEALTHY
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except asyncio.CancelledError: except asyncio.CancelledError: