Compare commits
17 Commits
Author | SHA1 | Date |
---|---|---|
|
df8f9d2dcd | |
|
4600b89e45 | |
|
ce22e6a437 | |
|
b3c26c7ba9 | |
|
cb17109542 | |
|
1892ebbe43 | |
|
bcdda37351 | |
|
d3f9b0daec | |
|
95236b1643 | |
|
58019d4395 | |
|
096b1bf077 | |
|
a23ed4f7b6 | |
|
f37de79b9a | |
|
13919631b1 | |
|
3fa3bd353b | |
|
7460efd9b7 | |
|
81fac4c584 |
|
@ -18,7 +18,7 @@ Supported Nacos version over 2.x
|
|||
## Installation
|
||||
|
||||
```shell
|
||||
pip install nacos-sdk-python==2.0.3
|
||||
pip install nacos-sdk-python
|
||||
```
|
||||
|
||||
## Client Configuration
|
||||
|
@ -284,7 +284,7 @@ instance_list = await client.list_instances(ListInstanceParam(service_name='naco
|
|||
### Subscribe
|
||||
|
||||
```angular2html
|
||||
async def cb(instance_list: list[Instance]):
|
||||
async def cb(instance_list: List[Instance]):
|
||||
print('received subscribe callback', str(instance_list))
|
||||
|
||||
await client.subscribe(
|
||||
|
@ -295,7 +295,7 @@ await client.subscribe(
|
|||
### Unsubscribe
|
||||
|
||||
```angular2html
|
||||
async def cb(instance_list: list[Instance]):
|
||||
async def cb(instance_list: List[Instance]):
|
||||
print('received subscribe callback', str(instance_list))
|
||||
|
||||
await client.unsubscribe(
|
||||
|
|
|
@ -425,7 +425,7 @@ class NacosClient:
|
|||
params["type"] = config_type
|
||||
|
||||
try:
|
||||
resp = self._do_sync_req("/nacos/v1/cs/configs", None, params, None,
|
||||
resp = self._do_sync_req("/nacos/v1/cs/configs", None, None, params,
|
||||
timeout or self.default_timeout, "POST")
|
||||
c = resp.read()
|
||||
logger.info("[publish] publish content, group:%s, data_id:%s, server response:%s" % (
|
||||
|
@ -777,11 +777,13 @@ class NacosClient:
|
|||
# if contains_init_key:
|
||||
# headers["longPullingNoHangUp"] = "true"
|
||||
|
||||
params = {"tenant": self.namespace} if self.namespace else None
|
||||
|
||||
data = {"Listening-Configs": probe_update_string}
|
||||
|
||||
changed_keys = list()
|
||||
try:
|
||||
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, None, data,
|
||||
resp = self._do_sync_req("/nacos/v1/cs/configs/listener", headers, params, data,
|
||||
self.pulling_timeout + 10, "POST")
|
||||
changed_keys = [group_key(*i) for i in parse_pulling_result(resp.read())]
|
||||
logger.info("[do-pulling] following keys are changed from server %s" % truncate(str(changed_keys)))
|
||||
|
|
|
@ -4,6 +4,6 @@ alibabacloud_kms20160120>=2.2.3
|
|||
alibabacloud_tea_openapi>=0.3.12
|
||||
grpcio>=1.66.1
|
||||
protobuf>=3.20.3
|
||||
psutil>=5.9.6
|
||||
psutil>=5.9.5
|
||||
pycryptodome>=3.19.1
|
||||
pydantic>=2.10.4
|
||||
|
|
2
setup.py
2
setup.py
|
@ -53,7 +53,7 @@ class UploadCommand(Command):
|
|||
|
||||
setup(
|
||||
name="nacos-sdk-python",
|
||||
version="2.0.3",
|
||||
version="2.0.5",
|
||||
packages=find_packages(
|
||||
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]),
|
||||
url="https://github.com/nacos-group/nacos-sdk-python",
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import asyncio
|
||||
import os
|
||||
import unittest
|
||||
from typing import List
|
||||
|
||||
from v2.nacos import ConfigParam
|
||||
from v2.nacos.common.client_config import GRPCConfig
|
||||
|
@ -82,7 +83,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
|
|||
client = await NacosNamingService.create_naming_service(client_config)
|
||||
assert await client.server_health()
|
||||
|
||||
async def cb(instance_list: list[Instance]):
|
||||
async def cb(instance_list: List[Instance]):
|
||||
print('received subscribe callback', str(instance_list))
|
||||
|
||||
await client.subscribe(
|
||||
|
|
|
@ -9,13 +9,16 @@ from v2.nacos.config.nacos_config_service import NacosConfigService
|
|||
from v2.nacos.common.auth import CredentialsProvider, Credentials
|
||||
|
||||
client_config = (ClientConfigBuilder()
|
||||
.access_key(os.getenv('NACOS_ACCESS_KEY'))
|
||||
.secret_key(os.getenv('NACOS_SECRET_KEY'))
|
||||
# .access_key(os.getenv('NACOS_ACCESS_KEY'))
|
||||
# .secret_key(os.getenv('NACOS_SECRET_KEY'))
|
||||
.username(os.getenv('NACOS_USERNAME'))
|
||||
.password(os.getenv('NACOS_PASSWORD'))
|
||||
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
|
||||
.log_level('INFO')
|
||||
.grpc_config(GRPCConfig(grpc_timeout=5000))
|
||||
.build())
|
||||
|
||||
|
||||
class CustomCredentialsProvider(CredentialsProvider):
|
||||
def __init__(self, ak="", sk="", token=""):
|
||||
self.credential = Credentials(ak, sk, token)
|
||||
|
@ -23,6 +26,7 @@ class CustomCredentialsProvider(CredentialsProvider):
|
|||
def get_credentials(self):
|
||||
return self.credential
|
||||
|
||||
|
||||
class TestClientV2(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_publish_config(self):
|
||||
client = await NacosConfigService.create_config_service(client_config)
|
||||
|
@ -46,7 +50,7 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
|
|||
assert res
|
||||
print("success to publish")
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.2)
|
||||
content = await client.get_config(ConfigParam(
|
||||
data_id=data_id,
|
||||
group=group,
|
||||
|
@ -268,7 +272,8 @@ class TestClientV2(unittest.IsolatedAsyncioTestCase):
|
|||
|
||||
async def test_gray_config_with_provider(self):
|
||||
client_cfg = (ClientConfigBuilder()
|
||||
.credentials_provider(CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
|
||||
.credentials_provider(
|
||||
CustomCredentialsProvider(os.getenv('NACOS_ACCESS_KEY'), os.getenv('NACOS_SECRET_KEY')))
|
||||
.server_address(os.getenv('NACOS_SERVER_ADDR', 'localhost:8848'))
|
||||
.log_level('INFO')
|
||||
.app_conn_labels({"k1": "v1", "k2": "v2", "nacos_config_gray_label": "gray"})
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import asyncio
|
||||
from logging import Logger
|
||||
from typing import Optional, Callable
|
||||
from typing import Optional, Callable, List, Dict
|
||||
|
||||
from v2.nacos.common.constants import Constants
|
||||
from v2.nacos.config.cache.config_info_cache import ConfigInfoCache
|
||||
|
@ -15,7 +15,7 @@ class ConfigSubscribeManager:
|
|||
def __init__(self, logger: Logger, config_info_cache: ConfigInfoCache, namespace_id: str,
|
||||
config_filter_chain_manager: ConfigFilterChainManager,
|
||||
execute_config_listen_channel: asyncio.Queue):
|
||||
self.subscribe_cache_map: dict[str, SubscribeCacheData] = {}
|
||||
self.subscribe_cache_map: Dict[str, SubscribeCacheData] = {}
|
||||
self.logger = logger
|
||||
self.lock = asyncio.Lock()
|
||||
self.namespace_id = namespace_id
|
||||
|
@ -87,7 +87,7 @@ class ConfigSubscribeManager:
|
|||
await subscribe_cache.execute_listener()
|
||||
|
||||
async def execute_listener_and_build_tasks(self, is_sync_all: bool):
|
||||
listen_fetch_task_map: dict[int, list[SubscribeCacheData]] = {}
|
||||
listen_fetch_task_map: Dict[int, List[SubscribeCacheData]] = {}
|
||||
for cache_data in self.subscribe_cache_map.values():
|
||||
if cache_data.is_sync_with_server:
|
||||
await cache_data.execute_listener()
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from typing import Dict
|
||||
|
||||
from v2.nacos.common.client_config import KMSConfig
|
||||
from v2.nacos.common.constants import Constants
|
||||
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
|
||||
|
@ -11,7 +13,7 @@ from v2.nacos.config.model.config_param import HandlerParam
|
|||
|
||||
class KMSHandler:
|
||||
def __init__(self, kms_config: KMSConfig):
|
||||
self.kms_plugins: dict[str, EncryptionPlugin] = {}
|
||||
self.kms_plugins: Dict[str, EncryptionPlugin] = {}
|
||||
self.kms_client = KmsClient.create_kms_client(kms_config)
|
||||
kms_aes_128_encryption_plugin = KmsAes128EncryptionPlugin(self.kms_client)
|
||||
self.kms_plugins[kms_aes_128_encryption_plugin.algorithm_name()] = kms_aes_128_encryption_plugin
|
||||
|
|
|
@ -22,7 +22,7 @@ class ConfigPage(BaseModel):
|
|||
totalCount: int = 0
|
||||
pageNumber: int = 0
|
||||
pagesAvailable: int = 0
|
||||
pageItems: list[ConfigItem] = []
|
||||
pageItems: List[ConfigItem] = []
|
||||
|
||||
|
||||
class ConfigListenContext(BaseModel):
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
from v2.nacos.config.model.config import ConfigListenContext
|
||||
from v2.nacos.transport.model.rpc_request import Request
|
||||
|
@ -24,7 +24,7 @@ class AbstractConfigRequest(Request, ABC):
|
|||
|
||||
class ConfigBatchListenRequest(AbstractConfigRequest):
|
||||
listen: bool = True
|
||||
configListenContexts: list[ConfigListenContext] = []
|
||||
configListenContexts: List[ConfigListenContext] = []
|
||||
|
||||
def get_request_type(self):
|
||||
return "ConfigBatchListenRequest"
|
||||
|
@ -46,7 +46,7 @@ class ConfigQueryRequest(AbstractConfigRequest):
|
|||
class ConfigPublishRequest(AbstractConfigRequest):
|
||||
content: Optional[str]
|
||||
casMd5: Optional[str]
|
||||
additionMap: dict[str, str] = {}
|
||||
additionMap: Dict[str, str] = {}
|
||||
|
||||
def get_request_type(self):
|
||||
return "ConfigPublishRequest"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Optional
|
||||
from typing import Optional, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
@ -12,7 +12,7 @@ class ConfigContext(BaseModel):
|
|||
|
||||
|
||||
class ConfigChangeBatchListenResponse(Response):
|
||||
changedConfigs: list[ConfigContext] = []
|
||||
changedConfigs: List[ConfigContext] = []
|
||||
|
||||
def get_response_type(self) -> str:
|
||||
return "ConfigChangeBatchListenResponse"
|
||||
|
|
|
@ -197,53 +197,53 @@ class ConfigGRPCClientProxy:
|
|||
self.logger.debug("Timeout occurred")
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
has_changed_keys = False
|
||||
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
|
||||
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
|
||||
if len(listen_task_map) == 0:
|
||||
|
||||
has_changed_keys = False
|
||||
is_sync_all = (get_current_time_millis() - self.last_all_sync_time) >= 5 * 60 * 1000
|
||||
listen_task_map = await self.config_subscribe_manager.execute_listener_and_build_tasks(is_sync_all)
|
||||
if len(listen_task_map) == 0:
|
||||
continue
|
||||
|
||||
for task_id, cache_data_list in listen_task_map.items():
|
||||
if len(cache_data_list) == 0:
|
||||
continue
|
||||
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
|
||||
for cache_data in cache_data_list:
|
||||
config_listen_context = ConfigListenContext(group=cache_data.group,
|
||||
md5=cache_data.md5,
|
||||
dataId=cache_data.data_id,
|
||||
tenant=cache_data.tenant)
|
||||
request.configListenContexts.append(config_listen_context)
|
||||
try:
|
||||
rpc_client = await self.fetch_rpc_client(task_id)
|
||||
response: ConfigChangeBatchListenResponse = await self.request_config_server(
|
||||
rpc_client, request, ConfigChangeBatchListenResponse)
|
||||
|
||||
if len(response.changedConfigs) > 0:
|
||||
has_changed_keys = True
|
||||
|
||||
for config_ctx in response.changedConfigs:
|
||||
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
|
||||
try:
|
||||
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
|
||||
config_ctx.group)
|
||||
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
|
||||
config_ctx.group,
|
||||
self.namespace_id,
|
||||
content,
|
||||
encrypted_data_key)
|
||||
except Exception as e:
|
||||
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
|
||||
continue
|
||||
except Exception as e:
|
||||
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
|
||||
continue
|
||||
|
||||
for task_id, cache_data_list in listen_task_map.items():
|
||||
if len(cache_data_list) == 0:
|
||||
continue
|
||||
request = ConfigBatchListenRequest(group='', dataId='', tenant='')
|
||||
for cache_data in cache_data_list:
|
||||
config_listen_context = ConfigListenContext(group=cache_data.group,
|
||||
md5=cache_data.md5,
|
||||
dataId=cache_data.data_id,
|
||||
tenant=cache_data.tenant)
|
||||
request.configListenContexts.append(config_listen_context)
|
||||
try:
|
||||
rpc_client = await self.fetch_rpc_client(task_id)
|
||||
response: ConfigChangeBatchListenResponse = await self.request_config_server(
|
||||
rpc_client, request, ConfigChangeBatchListenResponse)
|
||||
if is_sync_all:
|
||||
self.last_all_sync_time = get_current_time_millis()
|
||||
|
||||
if len(response.changedConfigs) > 0:
|
||||
has_changed_keys = True
|
||||
|
||||
for config_ctx in response.changedConfigs:
|
||||
change_key = get_config_cache_key(config_ctx.dataId, config_ctx.group, config_ctx.tenant)
|
||||
try:
|
||||
content, encrypted_data_key = await self.query_config(config_ctx.dataId,
|
||||
config_ctx.group)
|
||||
await self.config_subscribe_manager.update_subscribe_cache(config_ctx.dataId,
|
||||
config_ctx.group,
|
||||
self.namespace_id,
|
||||
content,
|
||||
encrypted_data_key)
|
||||
except Exception as e:
|
||||
self.logger.error(f"failed to refresh config:{change_key},error:{str(e)}")
|
||||
continue
|
||||
except Exception as e:
|
||||
self.logger.error(f"failed to batch listen config ,error:{str(e)}")
|
||||
continue
|
||||
|
||||
if is_sync_all:
|
||||
self.last_all_sync_time = get_current_time_millis()
|
||||
|
||||
if has_changed_keys:
|
||||
await self.execute_config_listen_channel.put(None)
|
||||
if has_changed_keys:
|
||||
await self.execute_config_listen_channel.put(None)
|
||||
|
||||
async def server_health(self):
|
||||
return (await self.fetch_rpc_client()).is_running()
|
||||
|
|
|
@ -2,7 +2,7 @@ import asyncio
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Callable, Optional
|
||||
from typing import Callable, Optional, List, Dict
|
||||
|
||||
from v2.nacos.common.client_config import ClientConfig
|
||||
from v2.nacos.common.constants import Constants
|
||||
|
@ -18,7 +18,7 @@ class ServiceInfoCache:
|
|||
def __init__(self, client_config: ClientConfig):
|
||||
self.logger = logging.getLogger(Constants.NAMING_MODULE)
|
||||
self.cache_dir = os.path.join(client_config.cache_dir, Constants.NAMING_MODULE, client_config.namespace_id)
|
||||
self.service_info_map: dict[str, Service] = {}
|
||||
self.service_info_map: Dict[str, Service] = {}
|
||||
self.update_time_map = {}
|
||||
self.lock = asyncio.Lock()
|
||||
self.sub_callback_manager = SubscribeManager()
|
||||
|
@ -124,7 +124,7 @@ class ServiceInfoCache:
|
|||
return old_instance != new_instance
|
||||
|
||||
@staticmethod
|
||||
def sort_instances(instances: list[Instance]) -> list[Instance]:
|
||||
def sort_instances(instances: List[Instance]) -> List[Instance]:
|
||||
def instance_key(instance: Instance) -> (int, int):
|
||||
ip_num = int(''.join(instance.ip.split('.')))
|
||||
return ip_num, instance.port
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Optional, Callable
|
||||
from typing import Optional, Callable, List, Dict
|
||||
from pydantic import BaseModel
|
||||
|
||||
from v2.nacos.common.constants import Constants
|
||||
|
@ -10,7 +10,7 @@ class RegisterInstanceParam(BaseModel):
|
|||
weight: float = 1.0
|
||||
enabled: bool = True
|
||||
healthy: bool = True
|
||||
metadata: dict[str, str] = {}
|
||||
metadata: Dict[str, str] = {}
|
||||
cluster_name: str = ''
|
||||
service_name: str
|
||||
group_name: str = Constants.DEFAULT_GROUP
|
||||
|
@ -20,7 +20,7 @@ class RegisterInstanceParam(BaseModel):
|
|||
class BatchRegisterInstanceParam(BaseModel):
|
||||
service_name: str
|
||||
group_name: str = Constants.DEFAULT_GROUP
|
||||
instances: list[RegisterInstanceParam] = []
|
||||
instances: List[RegisterInstanceParam] = []
|
||||
|
||||
|
||||
class DeregisterInstanceParam(BaseModel):
|
||||
|
@ -35,7 +35,7 @@ class DeregisterInstanceParam(BaseModel):
|
|||
class ListInstanceParam(BaseModel):
|
||||
service_name: str
|
||||
group_name: str = Constants.DEFAULT_GROUP
|
||||
clusters: list[str] = []
|
||||
clusters: List[str] = []
|
||||
subscribe: bool = True
|
||||
healthy_only: Optional[bool]
|
||||
|
||||
|
@ -43,14 +43,14 @@ class ListInstanceParam(BaseModel):
|
|||
class SubscribeServiceParam(BaseModel):
|
||||
service_name: str
|
||||
group_name: str = Constants.DEFAULT_GROUP
|
||||
clusters: list[str] = []
|
||||
clusters: List[str] = []
|
||||
subscribe_callback: Optional[Callable] = None
|
||||
|
||||
|
||||
class GetServiceParam(BaseModel):
|
||||
service_name: str
|
||||
group_name: str = Constants.DEFAULT_GROUP
|
||||
clusters: list[str] = []
|
||||
clusters: List[str] = []
|
||||
|
||||
|
||||
class ListServiceParam(BaseModel):
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from abc import ABC
|
||||
from typing import Optional, Any
|
||||
from typing import Optional, Any, List
|
||||
|
||||
from v2.nacos.naming.model.instance import Instance
|
||||
from v2.nacos.naming.model.service import Service
|
||||
|
@ -34,7 +34,7 @@ class InstanceRequest(AbstractNamingRequest):
|
|||
|
||||
class BatchInstanceRequest(AbstractNamingRequest):
|
||||
type: Optional[str]
|
||||
instances: Optional[list[Instance]]
|
||||
instances: Optional[List[Instance]]
|
||||
|
||||
def get_request_type(self) -> str:
|
||||
return 'BatchInstanceRequest'
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Optional, Any
|
||||
from typing import Optional, Any, List
|
||||
|
||||
from v2.nacos.naming.model.service import Service
|
||||
from v2.nacos.transport.model.rpc_response import Response
|
||||
|
@ -31,7 +31,7 @@ class BatchInstanceResponse(Response):
|
|||
|
||||
class ServiceListResponse(Response):
|
||||
count: int
|
||||
serviceNames: list[str]
|
||||
serviceNames: List[str]
|
||||
|
||||
def get_response_type(self) -> str:
|
||||
return "ServiceListResponse"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import time
|
||||
import urllib.parse
|
||||
from typing import Optional
|
||||
from typing import Optional, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
@ -22,7 +22,7 @@ class Service(BaseModel):
|
|||
groupName: str
|
||||
clusters: Optional[str] = ''
|
||||
cacheMillis: int = 1000
|
||||
hosts: list[Instance] = []
|
||||
hosts: List[Instance] = []
|
||||
lastRefTime: int = 0
|
||||
checksum: str = ""
|
||||
allIps: bool = False
|
||||
|
@ -114,4 +114,4 @@ class Service(BaseModel):
|
|||
|
||||
class ServiceList(BaseModel):
|
||||
count: int
|
||||
services: list[str]
|
||||
services: List[str]
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import asyncio
|
||||
from typing import List
|
||||
|
||||
from v2.nacos.common.client_config import ClientConfig
|
||||
from v2.nacos.common.constants import Constants
|
||||
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
|
||||
|
@ -137,7 +139,7 @@ class NacosNamingService(NacosClient):
|
|||
|
||||
return await self.grpc_client_proxy.list_services(request)
|
||||
|
||||
async def list_instances(self, request: ListInstanceParam) -> list[Instance]:
|
||||
async def list_instances(self, request: ListInstanceParam) -> List[Instance]:
|
||||
if not request.service_name:
|
||||
raise NacosException(INVALID_PARAM, "service_name can not be empty")
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import hashlib
|
|||
import hmac
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Optional
|
||||
from typing import Optional, List
|
||||
|
||||
from v2.nacos.common.client_config import ClientConfig
|
||||
from v2.nacos.common.constants import Constants
|
||||
|
@ -107,7 +107,7 @@ class NamingGRPCClientProxy:
|
|||
response = await self.request_naming_server(request, InstanceResponse)
|
||||
return response.is_success()
|
||||
|
||||
async def batch_register_instance(self, service_name: str, group_name: str, instances: list[Instance]) -> bool:
|
||||
async def batch_register_instance(self, service_name: str, group_name: str, instances: List[Instance]) -> bool:
|
||||
self.logger.info("batch register instance service_name:%s, group_name:%s, namespace:%s,instances:%s" % (
|
||||
service_name, group_name, self.namespace_id, str(instances)))
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import asyncio
|
||||
from typing import List
|
||||
|
||||
from v2.nacos.naming.model.instance import Instance
|
||||
from v2.nacos.naming.model.service import Service
|
||||
|
@ -48,7 +49,7 @@ class NamingGrpcConnectionEventListener(ConnectionEventListener):
|
|||
async with self.lock:
|
||||
self.registered_instance_cached[key] = instance
|
||||
|
||||
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: list[Instance]) -> None:
|
||||
async def cache_instances_for_redo(self, service_name: str, group_name: str, instances: List[Instance]) -> None:
|
||||
key = get_group_name(service_name, group_name)
|
||||
async with self.lock:
|
||||
self.registered_instance_cached[key] = instances
|
||||
|
|
|
@ -94,4 +94,6 @@ class NacosServerConnector:
|
|||
async def inject_security_info(self, headers):
|
||||
if self.client_config.username and self.client_config.password:
|
||||
access_token = await self.auth_client.get_access_token(False)
|
||||
headers[Constants.ACCESS_TOKEN] = access_token
|
||||
if access_token is not None and access_token != "":
|
||||
headers[Constants.ACCESS_TOKEN] = access_token
|
||||
return
|
||||
|
|
|
@ -70,7 +70,7 @@ class RpcClient(ABC):
|
|||
def __init__(self, logger, name: str, nacos_server: NacosServerConnector):
|
||||
self.logger = logger
|
||||
self.name = name
|
||||
self.labels: dict[str, str] = {}
|
||||
self.labels: Dict[str, str] = {}
|
||||
self.current_connection = None
|
||||
self.rpc_client_status = RpcClientStatus.INITIALIZED
|
||||
self.event_chan = asyncio.Queue()
|
||||
|
@ -137,11 +137,10 @@ class RpcClient(ABC):
|
|||
self.logger.error("%s server healthy check fail, currentConnection=%s"
|
||||
, self.name, self.current_connection.get_connection_id())
|
||||
|
||||
async with self.lock:
|
||||
if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
|
||||
continue
|
||||
self.rpc_client_status = RpcClientStatus.UNHEALTHY
|
||||
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
|
||||
if self.rpc_client_status == RpcClientStatus.SHUTDOWN:
|
||||
continue
|
||||
self.rpc_client_status = RpcClientStatus.UNHEALTHY
|
||||
await self.reconnect(ReconnectContext(server_info=None, on_request_fail=False))
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
|
|
Loading…
Reference in New Issue