From e2a907a05c61b0b62db678c88e2833c97d44d7a8 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 30 May 2024 15:05:28 +0800 Subject: [PATCH] feat: optimize mutex in dynconfig and add keepalive to hyper proxy (#513) Signed-off-by: Gaius --- Cargo.lock | 16 ++-- Cargo.toml | 16 ++-- .../src/grpc/dfdaemon_download.rs | 1 - dragonfly-client/src/grpc/scheduler.rs | 78 +++++++++++-------- dragonfly-client/src/proxy/mod.rs | 3 +- dragonfly-client/src/task/mod.rs | 12 ++- 6 files changed, 73 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57603145..f6072cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -959,7 +959,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.72" +version = "0.1.73" dependencies = [ "anyhow", "bytes", @@ -1023,7 +1023,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.72" +version = "0.1.73" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.72" +version = "0.1.73" dependencies = [ "dragonfly-client-core", "home", @@ -1061,7 +1061,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.72" +version = "0.1.73" dependencies = [ "libloading", "reqwest", @@ -1072,7 +1072,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.72" +version = "0.1.73" dependencies = [ "anyhow", "clap", @@ -1088,7 +1088,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.72" +version = "0.1.73" dependencies = [ "base16ct", "blake3", @@ -1112,7 +1112,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.72" +version = "0.1.73" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1570,7 +1570,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.72" +version = "0.1.73" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index d1598c76..c5aadd88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.72" +version = "0.1.73" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.72" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.72" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.72" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.72" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.72" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.72" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.72" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.73" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.73" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.73" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.73" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.73" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.73" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.73" } thiserror = "1.0" dragonfly-api = "2.0.114" reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 71398022..37e5dd14 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -275,7 +275,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { error!("missing content length in the response"); return Err(Status::internal("missing content length in the response")); }; - info!("content length: {}", content_length); // Download's range priority is higher than the request header's range. diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index d4af0d38..5ae5bfc0 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -85,10 +85,11 @@ impl SchedulerClient { pub async fn announce_peer( &self, task_id: &str, + peer_id: &str, request: impl tonic::IntoStreamingRequest, ) -> Result>> { let response = self - .client(task_id.to_string()) + .client(task_id, Some(peer_id)) .await? .announce_peer(request) .await?; @@ -99,11 +100,7 @@ impl SchedulerClient { #[instrument(skip(self))] pub async fn stat_peer(&self, task_id: &str, request: StatPeerRequest) -> Result { let request = Self::make_request(request); - let response = self - .client(task_id.to_string()) - .await? - .stat_peer(request) - .await?; + let response = self.client(task_id, None).await?.stat_peer(request).await?; Ok(response.into_inner()) } @@ -111,7 +108,7 @@ impl SchedulerClient { #[instrument(skip(self))] pub async fn leave_peer(&self, task_id: &str, request: LeavePeerRequest) -> Result<()> { let request = Self::make_request(request); - self.client(task_id.to_string()) + self.client(task_id, None) .await? .leave_peer(request) .await?; @@ -127,7 +124,7 @@ impl SchedulerClient { ) -> Result { let request = Self::make_request(request); let response = self - .client(task_id.to_string()) + .client(task_id, None) .await? .exchange_peer(request) .await?; @@ -138,11 +135,7 @@ impl SchedulerClient { #[instrument(skip(self))] pub async fn stat_task(&self, task_id: &str, request: StatTaskRequest) -> Result { let request = Self::make_request(request); - let response = self - .client(task_id.to_string()) - .await? - .stat_task(request) - .await?; + let response = self.client(task_id, None).await?.stat_task(request).await?; Ok(response.into_inner()) } @@ -150,7 +143,7 @@ impl SchedulerClient { #[instrument(skip(self))] pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> { let request = Self::make_request(request); - self.client(task_id.to_string()) + self.client(task_id, None) .await? .leave_task(request) .await?; @@ -162,7 +155,10 @@ impl SchedulerClient { pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { let mut join_set = JoinSet::new(); let available_scheduler_addrs = self.available_scheduler_addrs.read().await; - for available_scheduler_addr in available_scheduler_addrs.iter() { + let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); + drop(available_scheduler_addrs); + + for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); async fn announce_host( addr: SocketAddr, @@ -214,7 +210,10 @@ impl SchedulerClient { // Announce the host to the scheduler. let mut join_set = JoinSet::new(); let available_scheduler_addrs = self.available_scheduler_addrs.read().await; - for available_scheduler_addr in available_scheduler_addrs.iter() { + let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); + drop(available_scheduler_addrs); + + for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); async fn announce_host( addr: SocketAddr, @@ -265,7 +264,10 @@ impl SchedulerClient { // Leave the host from the scheduler. let mut join_set = JoinSet::new(); let available_scheduler_addrs = self.available_scheduler_addrs.read().await; - for available_scheduler_addr in available_scheduler_addrs.iter() { + let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); + drop(available_scheduler_addrs); + + for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); async fn leave_host( addr: SocketAddr, @@ -309,16 +311,21 @@ impl SchedulerClient { // client gets the grpc client of the scheduler. #[instrument(skip(self))] - async fn client(&self, key: String) -> Result> { + async fn client( + &self, + task_id: &str, + peer_id: Option<&str>, + ) -> Result> { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; // Get the scheduler address from the hashring. - let addr = self.hashring.read().await; - let addr = addr - .get(&key[0..5].to_string()) - .ok_or_else(|| Error::HashRing(key.clone()))?; - info!("{} picked {:?}", key, addr); + let addrs = self.hashring.read().await; + let addr = *addrs + .get(&task_id[0..5].to_string()) + .ok_or_else(|| Error::HashRing(task_id.to_string()))?; + drop(addrs); + info!("picked {:?}", addr); let channel = match Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? @@ -347,38 +354,40 @@ impl SchedulerClient { async fn update_available_scheduler_addrs(&self) -> Result<()> { // Get the endpoints of available schedulers. let data = self.dynconfig.data.read().await; + let data_available_schedulers_clone = data.available_schedulers.clone(); + drop(data); // Check if the available schedulers is empty. - if data.available_schedulers.is_empty() { + if data_available_schedulers_clone.is_empty() { return Err(Error::AvailableSchedulersNotFound); } // Get the available schedulers. let available_schedulers = self.available_schedulers.read().await; + let available_schedulers_clone = available_schedulers.clone(); + drop(available_schedulers); // Check if the available schedulers is not changed. - if data.available_schedulers.len() == available_schedulers.len() - && data - .available_schedulers + if data_available_schedulers_clone.len() == available_schedulers_clone.len() + && data_available_schedulers_clone .iter() - .zip(available_schedulers.iter()) + .zip(available_schedulers_clone.iter()) .all(|(a, b)| a == b) { info!( "available schedulers is not changed: {:?}", - data.available_schedulers + data_available_schedulers_clone .iter() .map(|s| s.ip.clone()) .collect::>() ); return Ok(()); } - drop(available_schedulers); let mut new_available_schedulers = Vec::new(); let mut new_available_scheduler_addrs = Vec::new(); let mut new_hashring = HashRing::new(); - for available_scheduler in data.available_schedulers.iter() { + for available_scheduler in data_available_schedulers_clone.iter() { let ip = match IpAddr::from_str(&available_scheduler.ip) { Ok(ip) => ip, Err(err) => { @@ -403,20 +412,25 @@ impl SchedulerClient { // Update the available schedulers. let mut available_schedulers = self.available_schedulers.write().await; *available_schedulers = new_available_schedulers; + drop(available_schedulers); // Update the addresses of available schedulers. let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await; *available_scheduler_addrs = new_available_scheduler_addrs; + drop(available_scheduler_addrs); // Update the hashring. let mut hashring = self.hashring.write().await; *hashring = new_hashring; + drop(hashring); + + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; info!( "refresh available scheduler addresses: {:?}", available_scheduler_addrs .iter() .map(|s| s.ip().to_string()) - .collect::>() + .collect::>(), ); Ok(()) } diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 0eb8b7d4..2c9ea905 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -175,6 +175,7 @@ impl Proxy { let server_ca_cert = self.server_ca_cert.clone(); tokio::task::spawn(async move { if let Err(err) = http1::Builder::new() + .keep_alive(true) .preserve_header_case(true) .title_case_headers(true) .serve_connection( @@ -184,7 +185,7 @@ impl Proxy { .with_upgrades() .await { - error!("failed to serve connection: {}", err); + error!("failed to serve connection from {}: {}", remote_address, err); } }); } diff --git a/dragonfly-client/src/task/mod.rs b/dragonfly-client/src/task/mod.rs index 555b2b37..7f271f2f 100644 --- a/dragonfly-client/src/task/mod.rs +++ b/dragonfly-client/src/task/mod.rs @@ -418,7 +418,7 @@ impl Task { let mut finished_pieces: Vec = Vec::new(); // Initialize stream channel. - let (in_stream_tx, in_stream_rx) = mpsc::channel(1024); + let (in_stream_tx, in_stream_rx) = mpsc::channel(4096); // Send the register peer request. in_stream_tx @@ -444,10 +444,16 @@ impl Task { // Initialize the stream. let in_stream = ReceiverStream::new(in_stream_rx); + let request = Request::new(in_stream); let response = self .scheduler_client - .announce_peer(task.id.as_str(), Request::new(in_stream)) - .await?; + .announce_peer(task.id.as_str(), peer_id, request) + .await + .map_err(|err| { + error!("announce peer failed: {:?}", err); + err + })?; + info!("announced peer has been connected"); let out_stream = response .into_inner()