From 345d8813f52720c8aeac2c6fe1571d06cbf1b003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=9B=E6=96=87=E8=B6=85?= Date: Sun, 16 Apr 2023 12:02:17 +0800 Subject: [PATCH 1/5] [ISSUES #132] enhance health check (#139) * [ISSUES #132] enhance health check - optimize payload convert logic - add ErrResponse error - add health check task - optimize bi default handle logic - remove unnecessary lock - remove switch server method - remove block method in bi channel --- src/api/error.rs | 3 + src/common/event_bus/mod.rs | 3 +- src/common/remote/grpc/bi_channel.rs | 36 --- src/common/remote/grpc/grpc_client.rs | 24 +- .../remote/grpc/handler/default_handler.rs | 13 +- src/common/remote/grpc/message/mod.rs | 103 ++++++-- src/common/remote/grpc/mod.rs | 246 +++++++++++------- src/naming/mod.rs | 2 +- .../redo_task_reconnect_event_subscriber.rs | 4 +- 9 files changed, 272 insertions(+), 162 deletions(-) diff --git a/src/api/error.rs b/src/api/error.rs index 23ddb33..1332ca3 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -26,6 +26,9 @@ pub enum Error { #[error("get result failed: {0}")] ErrResult(String), + #[error("request_id:{0:?} ret_code:{1} error_code:{2} message:{3:?}")] + ErrResponse(Option, i32, i32, Option), + /// Config not found. #[cfg(feature = "config")] #[error("config not found: {0}")] diff --git a/src/common/event_bus/mod.rs b/src/common/event_bus/mod.rs index d3016c1..23d20a6 100644 --- a/src/common/event_bus/mod.rs +++ b/src/common/event_bus/mod.rs @@ -126,7 +126,8 @@ mod __private { } } } else { - warn!("{key:?} has not been subscribed by anyone."); + let identity = event.event_identity(); + warn!("{identity} {key:?} has not been subscribed by anyone."); } } } diff --git a/src/common/remote/grpc/bi_channel.rs b/src/common/remote/grpc/bi_channel.rs index 5db593d..c6e8b27 100644 --- a/src/common/remote/grpc/bi_channel.rs +++ b/src/common/remote/grpc/bi_channel.rs @@ -43,18 +43,6 @@ impl ResponseWriter { } Ok(()) } - - pub(crate) fn blocking_write(&self, payload: Payload) -> Result<()> { - let ret = self.local_message_sender.blocking_send(Ok(payload)); - if let Err(e) = ret { - warn!("ResponseWriter write message occur an error. {e:?}"); - return Err(ErrResult( - "ResponseWriter write message occur an error".to_string(), - )); - } - - Ok(()) - } } impl BiChannel { @@ -140,18 +128,6 @@ impl BiChannel { Ok(()) } - #[instrument(fields(client_id = &self.client_id), skip_all)] - pub(crate) fn blocking_write(&self, payload: Payload) -> Result<()> { - let ret = self.local_message_sender.blocking_send(Ok(payload)); - - if let Err(e) = ret { - warn!("send message occur an error. {e:?}"); - return Err(ErrResult("send message occur an error".to_string())); - } - - Ok(()) - } - #[instrument(fields(client_id = &self.client_id), skip_all)] pub(crate) async fn close(&self) -> Result<()> { let ret = self @@ -165,18 +141,6 @@ impl BiChannel { Ok(()) } - #[instrument(fields(client_id = &self.client_id), skip_all)] - pub(crate) fn blocking_close(&self) -> Result<()> { - let ret = self - .local_message_sender - .blocking_send(Err(ErrResult("close bi channel".to_string()))); - if let Err(e) = ret { - warn!("close channel occur an error. {e:?}"); - return Err(ErrResult("close channel occur an error".to_string())); - } - Ok(()) - } - #[instrument(fields(client_id = &self.client_id), skip_all)] pub(crate) fn is_closed(&self) -> bool { self.local_message_sender.is_closed() diff --git a/src/common/remote/grpc/grpc_client.rs b/src/common/remote/grpc/grpc_client.rs index f0292cb..f8d49b3 100644 --- a/src/common/remote/grpc/grpc_client.rs +++ b/src/common/remote/grpc/grpc_client.rs @@ -7,9 +7,7 @@ use tracing::{debug, error, info, instrument, warn, Instrument}; use crate::api::error::Result; use crate::common::remote::grpc::events::ReconnectedEvent; -use crate::common::remote::grpc::events::{ - ConnectionHealthCheckEvent, DisconnectEvent, ShutdownEvent, -}; +use crate::common::remote::grpc::events::{DisconnectEvent, ShutdownEvent}; use crate::{api::error::Error::ClientUnhealthy, api::error::Error::ErrResult, common::event_bus}; use crate::{api::error::Error::GrpcioJoin, nacos_proto::v2::Payload}; use crate::{ @@ -65,7 +63,7 @@ impl GrpcClient { } #[instrument(fields(client_id = &self.client_id), skip_all)] - pub(crate) async fn shutdown(&mut self) { + pub(crate) fn shutdown(&self) { self.client_state.store( GrpcClientState::Shutdown.into(), std::sync::atomic::Ordering::Release, @@ -73,6 +71,11 @@ impl GrpcClient { info!("grpc client shutdown."); } + pub(crate) fn is_shutdown(&self) -> bool { + let state: GrpcClientState = self.client_state.load(Ordering::Acquire).into(); + matches!(state, GrpcClientState::Shutdown) + } + #[instrument(fields(client_id = &self.client_id), skip_all)] pub(crate) async fn open_bi_channel(&self, processor: F) -> Result where @@ -132,12 +135,8 @@ impl GrpcClient { let response_payload = response_payload.unwrap(); - let message = GrpcMessage::

::from_payload(response_payload); - if let Err(error) = message { - error!("unary_call_async response grpc message convert to message object occur an error. {error:?}"); - return Err(error); - } - Ok(message.unwrap()) + let message = GrpcMessage::

::from_payload(response_payload)?; + Ok(message) } #[instrument(parent = None , fields(client_id = &self.client_id), skip_all)] @@ -223,11 +222,6 @@ impl GrpcClient { } ConnectivityState::GRPC_CHANNEL_IDLE => { debug!("the current grpc connection state is in idle"); - // health check - event_bus::post(Arc::new(ConnectionHealthCheckEvent { - scope: client_id.clone() - })); - grpc_channel .wait_for_state_change(ConnectivityState::GRPC_CHANNEL_IDLE, deadline) .await; diff --git a/src/common/remote/grpc/handler/default_handler.rs b/src/common/remote/grpc/handler/default_handler.rs index 4f14a18..55b0264 100644 --- a/src/common/remote/grpc/handler/default_handler.rs +++ b/src/common/remote/grpc/handler/default_handler.rs @@ -7,6 +7,17 @@ pub(crate) struct DefaultHandler; impl GrpcPayloadHandler for DefaultHandler { fn hand(&self, _: ResponseWriter, payload: Payload) { - info!("DefaultHandler receive a bi payload: {payload:?}"); + let Payload { body, metadata } = payload; + + let body = body.unwrap_or_default(); + let p_type = body.type_url; + let r_body = String::from_utf8(body.value).unwrap_or_default(); + + let meta_data = metadata.unwrap_or_default(); + let r_type = meta_data.r#type; + let client_ip = meta_data.client_ip; + let headers = meta_data.headers; + + info!("unknown server request. type: {}, client_ip: {}, headers:{:?}, payload: {}, payload_type: {}", r_type, client_ip, headers, r_body, p_type); } } diff --git a/src/common/remote/grpc/message/mod.rs b/src/common/remote/grpc/message/mod.rs index bbdc3b5..d5e455a 100644 --- a/src/common/remote/grpc/message/mod.rs +++ b/src/common/remote/grpc/message/mod.rs @@ -2,10 +2,13 @@ use lazy_static::lazy_static; use prost_types::Any; use serde::{de::DeserializeOwned, Serialize}; use std::collections::HashMap; +use tracing::warn; +use crate::api::error::Error::ErrResponse; use crate::api::error::Error::ErrResult; use crate::api::error::Error::Serialization; use crate::api::error::Result; +use crate::common::remote::grpc::message::response::ErrorResponse; use crate::nacos_proto::v2::{Metadata, Payload}; use std::fmt::Debug; use tracing::error; @@ -64,30 +67,98 @@ where let body_any = body.unwrap(); - let body = T::from_proto_any(&body_any); + let meta_data = payload.metadata.unwrap_or_default(); + let r_type = meta_data.r#type; + let client_ip = meta_data.client_ip; + let headers = meta_data.headers; - if let Err(error) = body { - let body_str = String::from_utf8(body_any.value); - error!("Deserialize from Any[{body_str:?}] to GrpcMessage occur an error:{error:?}"); - return Err(error); - } - let body = body.unwrap(); + let de_body; - let client_ip; - let headers; - let meta_data = payload.metadata; + // try to serialize target type if r_type is not empty + if !r_type.is_empty() { + if T::identity().eq(&r_type) { + let ret: Result = T::from_proto_any(&body_any); + if let Err(error) = ret { + let payload_str = std::str::from_utf8(&body_any.value); + if payload_str.is_err() { + error!("can not convert to target type {}, this payload can not convert to string as well", T::identity()); + return Err(error); + } + let payload_str = payload_str.unwrap(); + error!( + "payload {} can not convert to {} occur an error:{:?}", + payload_str, + T::identity(), + error + ); + return Err(error); + } + de_body = ret.unwrap(); + } else { + warn!( + "payload type {}, target type {}, trying convert to ErrorResponse", + &r_type, + T::identity() + ); + // try to convert to Error Response + let ret: Result = ErrorResponse::from_proto_any(&body_any); + if let Err(error) = ret { + let payload_str = std::str::from_utf8(&body_any.value); + if payload_str.is_err() { + error!("can not convert to ErrorResponse, this payload can not convert to string as well"); + return Err(error); + } + let payload_str = payload_str.unwrap(); + error!( + "payload {} can not convert to ErrorResponse occur an error:{:?}", + payload_str, error + ); + return Err(error); + } - if let Some(meta_data) = meta_data { - client_ip = meta_data.client_ip; - headers = meta_data.headers; + let error_response = ret.unwrap(); + return Err(ErrResponse( + error_response.request_id, + error_response.result_code, + error_response.error_code, + error_response.message, + )); + } } else { - client_ip = "".to_string(); - headers = HashMap::new(); + warn!("payload type is empty!"); + let ret: Result = T::from_proto_any(&body_any); + if let Err(error) = ret { + let payload_str = std::str::from_utf8(&body_any.value); + if payload_str.is_err() { + error!("can not convert to target type {}, this payload can not convert to string as well", T::identity()); + return Err(error); + } + let payload_str = payload_str.unwrap(); + warn!( + "payload {} can not convert to {} occur an error:{:?}", + payload_str, + T::identity(), + error + ); + let ret: Result = ErrorResponse::from_proto_any(&body_any); + if let Err(e) = ret { + error!("trying convert to ErrorResponse occur an error:{:?}", e); + return Err(error); + } + let error_response = ret.unwrap(); + return Err(ErrResponse( + error_response.request_id, + error_response.result_code, + error_response.error_code, + error_response.message, + )); + } + de_body = ret.unwrap(); } Ok(GrpcMessage { headers, - body, + body: de_body, client_ip, }) } diff --git a/src/common/remote/grpc/mod.rs b/src/common/remote/grpc/mod.rs index 99bc46c..f5f8266 100644 --- a/src/common/remote/grpc/mod.rs +++ b/src/common/remote/grpc/mod.rs @@ -1,20 +1,23 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; +use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument, warn, Instrument}; -use crate::common::{ - event_bus, executor, - remote::grpc::{ - events::ClientInitCompleteEvent, - handler::default_handler::DefaultHandler, - message::{ - request::{ConnectionSetupRequest, ServerCheckRequest}, - GrpcMessageBuilder, +use crate::{ + api::error::Error, + common::{ + event_bus, executor, + remote::grpc::{ + events::{ClientInitCompleteEvent, ReconnectedEvent}, + handler::default_handler::DefaultHandler, + message::{ + request::{ConnectionSetupRequest, ServerCheckRequest}, + GrpcMessageBuilder, + }, }, + remote::into_grpc_server_addr, }, - remote::into_grpc_server_addr, }; use self::{ @@ -24,12 +27,14 @@ use self::{ client_detection_request_handler::ClientDetectionRequestHandler, GrpcPayloadHandler, }, message::{ - request::ClientDetectionRequest, response::ServerCheckResponse, GrpcMessageData, - GrpcRequestMessage, GrpcResponseMessage, + request::{ClientDetectionRequest, HealthCheckRequest}, + response::{HealthCheckResponse, ServerCheckResponse}, + GrpcMessageData, GrpcRequestMessage, GrpcResponseMessage, }, subscribers::{ConnectionHealthCheckEventSubscriber, ReconnectedEventSubscriber}, }; use crate::api::error::Error::ClientUnhealthy; +use crate::api::error::Error::ErrResult; use crate::api::error::Result; pub(crate) mod bi_channel; @@ -44,11 +49,12 @@ const APP_FILED: &str = "app"; const DEFAULT_CALL_TIME_OUT: u64 = 3000; pub(crate) struct NacosGrpcClient { - grpc_client: RwLock, + grpc_client: Arc, pub(crate) connection_id: String, pub(crate) client_id: String, bi_handler_map: HandlerMap, app_name: String, + init_semaphore: Semaphore, } impl NacosGrpcClient { @@ -61,7 +67,7 @@ impl NacosGrpcClient { ) -> Result { let address = into_grpc_server_addr(address.as_str(), true, grpc_port)?; let grpc_client = GrpcClient::new(address.as_str(), client_id.clone()).await?; - let grpc_client = RwLock::new(grpc_client); + let grpc_client = Arc::new(grpc_client); let bi_handler_map = HashMap::new(); @@ -71,35 +77,81 @@ impl NacosGrpcClient { client_id, bi_handler_map, app_name, + init_semaphore: Semaphore::new(1), }) } #[instrument(skip_all)] - pub(crate) async fn switch_server( - &self, - address: String, - grpc_port: Option, - set_up: NacosServerSetUP, - ) -> Result<()> { - // switch server - warn!("switch server starting"); - { - let mut old_grpc_client = self.grpc_client.write().await; - old_grpc_client.shutdown().await; + pub(crate) fn health_check_task(&self) { + let client_id = self.client_id.clone(); + let app_name = self.app_name.clone(); + let grpc_client = self.grpc_client.clone(); + executor::spawn(async move { + loop { + if grpc_client.is_shutdown() { + info!("health check task quit. the grpc client has been shutdown."); + break; + } + let health_check_request = HealthCheckRequest::default(); + let grpc_message = GrpcMessageBuilder::new(health_check_request) + .header(APP_FILED.to_owned(), app_name.clone()) + .build(); - info!("create a new grpc client."); - let address = into_grpc_server_addr(address.as_str(), true, grpc_port)?; - let new_client = GrpcClient::new(address.as_str(), self.client_id.clone()).await?; - *old_grpc_client = new_client; - } - warn!("init new grpc client."); - - self.init(set_up).await + let response = grpc_client + .unary_call_async::( + grpc_message, + Duration::from_millis(DEFAULT_CALL_TIME_OUT), + ) + .await; + if let Err(e) = response { + match e { + crate::api::error::Error::ErrResponse( + request_id, + ret_code, + error_code, + message, + ) => { + error!("health check failed, ready to reinitialize grpc client. request_id:{request_id:?} ret_code:{ret_code} error_code:{error_code} message:{message:?}"); + // send event + event_bus::post(Arc::new(ReconnectedEvent { + scope: client_id.clone(), + })); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + _ => { + //just ignore error + error!("health check failed. {e}"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + } + } + let response = response.unwrap().into_body(); + debug!("health check. {response:?}"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); } #[instrument(skip_all)] pub(crate) async fn init(&self, set_up: NacosServerSetUP) -> Result<()> { debug!("init nacos grpc client."); + let semaphore = self.init_semaphore.try_acquire(); + if semaphore.is_err() { + debug!("the current grpc client is initializing. skip init."); + return Ok(()); + } + + // got permit , health check. + let health_check_request = HealthCheckRequest::default(); + let response = self + .unary_call_async::(health_check_request) + .await; + if response.is_ok() { + debug!("the current client health check pass, don't need to reinitialize"); + return Ok(()); + } let mut retry_count = 0; let retry_wait_time = 300; @@ -174,10 +226,10 @@ impl NacosGrpcClient { #[instrument(skip_all)] async fn open_bi_channel(&self) -> Result { - let grpc_client = self.grpc_client.read().await; let handler_map = self.bi_handler_map.clone(); debug!("open bi channel"); - let bi_channel = grpc_client + let bi_channel = self + .grpc_client .open_bi_channel(move |mut payload, response_writer| { let metadata = payload.metadata.take(); if metadata.is_none() { @@ -261,8 +313,8 @@ impl NacosGrpcClient { .headers(request_headers) .build(); - let grpc_client = self.grpc_client.read().await; - let ret = grpc_client + let ret = self + .grpc_client .unary_call_async::(grpc_message, Duration::from_millis(DEFAULT_CALL_TIME_OUT)) .in_current_span() .await?; @@ -298,6 +350,12 @@ impl NacosGrpcClient { } } +impl Drop for NacosGrpcClient { + fn drop(&mut self) { + self.grpc_client.shutdown(); + } +} + #[derive(Clone, Debug)] pub(crate) struct NacosServerSetUP { labels: HashMap, @@ -524,61 +582,69 @@ impl NacosGrpcClientBuilder { } pub(crate) fn build(self) -> Result> { - futures::executor::block_on( - async move { - let mut nacos_grpc_client = NacosGrpcClient::new( - self.address, - self.app_name, - self.grpc_port, - self.client_id.clone(), - ) - .await?; - let server_set_up = NacosServerSetUP { - labels: self.labels, - client_version: self.client_version, - abilities: self.abilities, - namespace: self.namespace, - }; + let build_fut = async move { + let mut nacos_grpc_client = NacosGrpcClient::new( + self.address, + self.app_name, + self.grpc_port, + self.client_id.clone(), + ) + .await?; + let server_set_up = NacosServerSetUP { + labels: self.labels, + client_version: self.client_version, + abilities: self.abilities, + namespace: self.namespace, + }; - // register grpc payload handler - for (key, handlers) in self.bi_call_handlers { - for handler in handlers { - nacos_grpc_client - .register_bi_call_handler(key.clone(), handler) - .await; - } + // register grpc payload handler + for (key, handlers) in self.bi_call_handlers { + for handler in handlers { + nacos_grpc_client + .register_bi_call_handler(key.clone(), handler) + .await; } - - // register default handler - nacos_grpc_client - .register_bi_call_handler( - ClientDetectionRequest::identity().to_string(), - Arc::new(ClientDetectionRequestHandler { - client_id: self.client_id.clone(), - }), - ) - .await; - - let nacos_grpc_client = Arc::new(nacos_grpc_client); - - // register event subscriber - let reconnect_subscriber = ReconnectedEventSubscriber { - nacos_grpc_client: nacos_grpc_client.clone(), - set_up_info: server_set_up.clone(), - scope: self.client_id.clone(), - }; - let health_check_subscriber = ConnectionHealthCheckEventSubscriber { - nacos_grpc_client: nacos_grpc_client.clone(), - scope: self.client_id.clone(), - }; - - event_bus::register(Arc::new(reconnect_subscriber)); - event_bus::register(Arc::new(health_check_subscriber)); - - nacos_grpc_client.init(server_set_up).await?; - Ok(nacos_grpc_client) } - .in_current_span(), - ) + + // register default handler + nacos_grpc_client + .register_bi_call_handler( + ClientDetectionRequest::identity().to_string(), + Arc::new(ClientDetectionRequestHandler { + client_id: self.client_id.clone(), + }), + ) + .await; + + let nacos_grpc_client = Arc::new(nacos_grpc_client); + + // register event subscriber + let reconnect_subscriber = ReconnectedEventSubscriber { + nacos_grpc_client: nacos_grpc_client.clone(), + set_up_info: server_set_up.clone(), + scope: self.client_id.clone(), + }; + let health_check_subscriber = ConnectionHealthCheckEventSubscriber { + nacos_grpc_client: nacos_grpc_client.clone(), + scope: self.client_id.clone(), + }; + + event_bus::register(Arc::new(reconnect_subscriber)); + event_bus::register(Arc::new(health_check_subscriber)); + + nacos_grpc_client.init(server_set_up).await?; + nacos_grpc_client.health_check_task(); + Ok::, Error>(nacos_grpc_client) + } + .in_current_span(); + + let ret = futures::executor::block_on(executor::spawn(build_fut)); + + if let Err(e) = ret { + error!("build client failed. {e}"); + return Err(ErrResult("build client failed.".to_string())); + } + + ret.unwrap() } } diff --git a/src/naming/mod.rs b/src/naming/mod.rs index 4573500..b97dd68 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -1284,7 +1284,7 @@ pub(crate) mod tests { info!("response. {ret:?}"); - let ten_millis = time::Duration::from_secs(300); + let ten_millis = time::Duration::from_secs(3000); thread::sleep(ten_millis); Ok(()) } diff --git a/src/naming/subscribers/redo_task_reconnect_event_subscriber.rs b/src/naming/subscribers/redo_task_reconnect_event_subscriber.rs index c2ff158..33f7c78 100644 --- a/src/naming/subscribers/redo_task_reconnect_event_subscriber.rs +++ b/src/naming/subscribers/redo_task_reconnect_event_subscriber.rs @@ -18,9 +18,9 @@ impl NacosEventSubscriber for RedoTaskReconnectEventSubscriber { type EventType = ClientInitCompleteEvent; fn on_event(&self, _: &Self::EventType) { - let _redo_task_disconnect_event_subscriber_span = warn_span!( + let _redo_task_reconnect_event_subscriber_span = warn_span!( parent: None, - "redo_task_disconnect_event_subscriber", + "redo_task_reconnect_event_subscriber", client_id = self.scope ) .entered(); From bd0b678b40c80651c1ccbf6e6160e36e059ce275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=9B=E6=96=87=E8=B6=85?= Date: Sun, 16 Apr 2023 19:04:24 +0800 Subject: [PATCH 2/5] chore: upgrade version to 0.2.5 (#141) --- CHANGELOG.md | 8 ++++++++ Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06c0541..5f8bb1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # 变更日志 | Change log +### 0.2.5 + +- 优化重连机制 + +--- + +- Enhance: optimize reconnect logic + ### 0.2.4 - 清理无用代码 diff --git a/Cargo.toml b/Cargo.toml index 4a3ae57..e1e5bc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "nacos-sdk" -version = "0.2.4" +version = "0.2.5" edition = "2021" authors = ["nacos-group", "CheirshCai <785427346@qq.com>", "onewe <2583021406@qq.com>"] license = "Apache-2.0" From 79f53624a680256f3632c0f49d123b82e4645bca Mon Sep 17 00:00:00 2001 From: gy0801151351 Date: Wed, 19 Apr 2023 13:46:07 +0800 Subject: [PATCH 3/5] =?UTF-8?q?Fix:=20=E4=BF=AE=E5=A4=8D=E5=8D=95=E7=BA=BF?= =?UTF-8?q?=E7=A8=8Btokio=E4=B8=8BHttpLoginAuthPlugin=E7=9A=84login?= =?UTF-8?q?=E5=8F=91=E7=94=9F=E6=AD=BB=E9=94=81=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=20(#145)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 修复单线程tokio下HttpLoginAuthPlugin的login发生死锁的问题 --------- Co-authored-by: gongyan --- .gitignore | 3 ++- src/api/plugin/auth/auth_by_http.rs | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 10bb499..540bbb8 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ Cargo.lock # Customize ### google.protobuf.rs build by prost-build, exclude it because no content. **/google.protobuf.rs -.DS_Store \ No newline at end of file +.DS_Store +.history \ No newline at end of file diff --git a/src/api/plugin/auth/auth_by_http.rs b/src/api/plugin/auth/auth_by_http.rs index 9597639..f341c92 100644 --- a/src/api/plugin/auth/auth_by_http.rs +++ b/src/api/plugin/auth/auth_by_http.rs @@ -45,8 +45,8 @@ impl AuthPlugin for HttpLoginAuthPlugin { return; } - let username = auth_context.params.get(USERNAME).unwrap(); - let password = auth_context.params.get(PASSWORD).unwrap(); + let username = auth_context.params.get(USERNAME).unwrap().to_owned(); + let password = auth_context.params.get(PASSWORD).unwrap().to_owned(); let server_addr = { let mutex = self.server_list.read().unwrap(); @@ -66,7 +66,8 @@ impl AuthPlugin for HttpLoginAuthPlugin { tracing::debug!("Http login with username={username},password={password}"); - let future = async { + let (sender, receiver) = std::sync::mpsc::channel::>(); + let future = async move { let resp = reqwest::Client::new() .post(login_url) .query(&[(USERNAME, username), (PASSWORD, password)]) @@ -75,19 +76,24 @@ impl AuthPlugin for HttpLoginAuthPlugin { tracing::debug!("Http login resp={resp:?}"); if resp.is_err() { - return None; + sender.send(None).expect("send response failed"); + return; } let resp_text = resp.unwrap().text().await.unwrap(); let resp_obj = serde_json::from_str::(&resp_text); if resp_obj.is_err() { - return None; + sender.send(None).expect("send response failed"); + return; } - Some(resp_obj.unwrap()) + sender + .send(Some(resp_obj.unwrap())) + .expect("send response failed"); }; - let login_response = futures::executor::block_on(future); + crate::common::executor::spawn(future); + let login_response = receiver.recv().expect("receive response failed"); if let Some(login_response) = login_response { let delay_sec = login_response.token_ttl / 10; From 6f129a9c43bd1609a435f8644af362dbb08b662f Mon Sep 17 00:00:00 2001 From: gy0801151351 Date: Thu, 27 Apr 2023 20:19:54 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9C=A8ServiceInfoUpdat?= =?UTF-8?q?eTask=E4=B8=AD=E5=8F=91=E9=80=81=E7=9A=84=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1auth=E7=9B=B8=E5=85=B3header=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=20(#151)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 修复在ServiceInfoUpdateTask中发送的请求丢失auth相关header的问题 --------- Co-authored-by: gongyan --- src/naming/mod.rs | 18 ++++++++++++++++-- src/naming/updater/mod.rs | 25 ++++++++++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/naming/mod.rs b/src/naming/mod.rs index b97dd68..6cb5194 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -161,6 +161,7 @@ impl NacosNamingService { nacos_grpc_client.clone(), namespace.clone(), client_id.clone(), + auth_plugin.clone(), )); Ok(NacosNamingService { @@ -826,7 +827,10 @@ pub(crate) mod tests { use tracing::{info, metadata::LevelFilter}; - use crate::api::{naming::NamingChangeEvent, plugin::NoopAuthPlugin}; + use crate::api::{ + naming::NamingChangeEvent, + plugin::{HttpLoginAuthPlugin, NoopAuthPlugin}, + }; use super::*; @@ -1244,7 +1248,17 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let http_auth_plugin = HttpLoginAuthPlugin::default(); + http_auth_plugin.set_server_list(vec!["127.0.0.1:8848".to_string()]); + + let auth_context = AuthContext::default() + .add_param(crate::api::plugin::USERNAME, "test") + .add_param(crate::api::plugin::PASSWORD, "123456"); + + http_auth_plugin.login(auth_context.clone()); + // let login_identity_1 = http_auth_plugin.get_login_identity(); + + let naming_service = NacosNamingService::new(props, Arc::new(http_auth_plugin))?; let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, diff --git a/src/naming/updater/mod.rs b/src/naming/updater/mod.rs index 5560845..a3978a2 100644 --- a/src/naming/updater/mod.rs +++ b/src/naming/updater/mod.rs @@ -10,11 +10,17 @@ use std::{ use tokio::{sync::Mutex, time::sleep}; use tracing::{debug, error, info, instrument, warn, Instrument}; -use crate::common::{ - executor, - remote::{ - generate_request_id, - grpc::{message::GrpcResponseMessage, NacosGrpcClient}, +use crate::{ + api::plugin::AuthPlugin, + common::{ + executor, + remote::{ + generate_request_id, + grpc::{ + message::{GrpcRequestMessage, GrpcResponseMessage}, + NacosGrpcClient, + }, + }, }, }; @@ -30,6 +36,7 @@ pub(crate) struct ServiceInfoUpdater { namespace: String, task_map: Mutex>, client_id: String, + auth_plugin: Arc, } impl ServiceInfoUpdater { @@ -38,6 +45,7 @@ impl ServiceInfoUpdater { nacos_grpc_client: Arc, namespace: String, client_id: String, + auth_plugin: Arc, ) -> Self { Self { service_info_holder, @@ -45,6 +53,7 @@ impl ServiceInfoUpdater { namespace, task_map: Mutex::new(HashMap::default()), client_id, + auth_plugin, } } @@ -67,6 +76,7 @@ impl ServiceInfoUpdater { cluster, self.service_info_holder.clone(), self.nacos_grpc_client.clone(), + self.auth_plugin.clone(), ); update_task.start(); @@ -103,6 +113,7 @@ struct ServiceInfoUpdateTask { cluster: String, service_info_holder: Arc, nacos_grpc_client: Arc, + auth_plugin: Arc, } impl ServiceInfoUpdateTask { @@ -117,6 +128,7 @@ impl ServiceInfoUpdateTask { cluster: String, service_info_holder: Arc, nacos_grpc_client: Arc, + auth_plugin: Arc, ) -> Self { Self { running: Arc::new(AtomicBool::new(false)), @@ -126,6 +138,7 @@ impl ServiceInfoUpdateTask { cluster, service_info_holder, nacos_grpc_client, + auth_plugin, } } @@ -143,6 +156,7 @@ impl ServiceInfoUpdateTask { let service_info_holder = self.service_info_holder.clone(); let grpc_client = self.nacos_grpc_client.clone(); + let auth_plugin = self.auth_plugin.clone(); executor::spawn(async move { let mut delay_time = ServiceInfoUpdateTask::DEFAULT_DELAY; @@ -211,6 +225,7 @@ impl ServiceInfoUpdateTask { let mut request = request.clone(); request.request_id = Some(generate_request_id()); + request.add_headers(auth_plugin.get_login_identity().contexts); let ret = grpc_client .unary_call_async::(request) From f6b72df2759c827945cb35ef44d847efcf56f800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=9B=E6=96=87=E8=B6=85?= Date: Wed, 3 May 2023 10:20:03 +0800 Subject: [PATCH 5/5] chore: upgrade version to 0.2.6 (#155) --- CHANGELOG.md | 8 ++++++++ Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f8bb1c..d6c14b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # 变更日志 | Change log +### 0.2.6 + +- 修复 `ServiceInfoUpdateTask` 丢失 auth header + +--- + +- fix lose auth headers in ServiceInfoUpdateTask + ### 0.2.5 - 优化重连机制 diff --git a/Cargo.toml b/Cargo.toml index e1e5bc9..5320870 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "nacos-sdk" -version = "0.2.5" +version = "0.2.6" edition = "2021" authors = ["nacos-group", "CheirshCai <785427346@qq.com>", "onewe <2583021406@qq.com>"] license = "Apache-2.0"