diff --git a/Cargo.lock b/Cargo.lock index 58060d79..f33d1ea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,7 +545,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.9" +version = "0.1.11" dependencies = [ "anyhow", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index 04e5f6ae..1f0cdb64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dragonfly-client" -version = "0.1.10" +version = "0.1.11" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" diff --git a/src/grpc/scheduler.rs b/src/grpc/scheduler.rs index 3d0174b6..2615b988 100644 --- a/src/grpc/scheduler.rs +++ b/src/grpc/scheduler.rs @@ -18,6 +18,7 @@ use crate::dynconfig::Dynconfig; use crate::{Error, Result}; use dragonfly_api::common::v2::{Peer, Task}; +use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse, @@ -49,8 +50,11 @@ pub struct SchedulerClient { // dynconfig is the dynamic configuration of the dfdaemon. dynconfig: Arc, - // available_schedulers is the endpoints of available schedulers. - available_schedulers: Arc>>, + // available_schedulers is the available schedulers. + available_schedulers: Arc>>, + + // available_scheduler_addrs is the addresses of available schedulers. + available_scheduler_addrs: Arc>>, // hashring is the hashring of the scheduler. hashring: Arc>>, @@ -63,10 +67,11 @@ impl SchedulerClient { let client = Self { dynconfig, available_schedulers: Arc::new(RwLock::new(Vec::new())), + available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())), hashring: Arc::new(RwLock::new(HashRing::new())), }; - client.refresh_scheduler_client().await?; + client.refresh_available_scheduler_addrs().await?; Ok(client) } @@ -140,8 +145,8 @@ impl SchedulerClient { #[instrument(skip(self))] pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { let mut join_set = JoinSet::new(); - let available_schedulers = self.available_schedulers.read().await; - for available_scheduler in available_schedulers.iter() { + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; + for available_scheduler_addr in available_scheduler_addrs.iter() { let request = Self::make_request(request.clone()); async fn announce_host( addr: SocketAddr, @@ -159,7 +164,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler, request)); + join_set.spawn(announce_host(*available_scheduler_addr, request)); } while let Some(message) = join_set.join_next().await { @@ -175,9 +180,13 @@ impl SchedulerClient { // announce_host announces the host to the scheduler. #[instrument(skip(self))] pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> { + // Update scheduler addresses of the client. + self.update_available_scheduler_addrs().await?; + + // Announce the host to the scheduler. let mut join_set = JoinSet::new(); - let available_schedulers = self.available_schedulers.read().await; - for available_scheduler in available_schedulers.iter() { + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; + for available_scheduler_addr in available_scheduler_addrs.iter() { let request = Self::make_request(request.clone()); async fn announce_host( addr: SocketAddr, @@ -195,7 +204,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler, request)); + join_set.spawn(announce_host(*available_scheduler_addr, request)); } while let Some(message) = join_set.join_next().await { @@ -210,9 +219,13 @@ impl SchedulerClient { // leave_host tells the scheduler that the host is leaving. #[instrument(skip(self))] pub async fn leave_host(&self, request: LeaveHostRequest) -> Result<()> { + // Update scheduler addresses of the client. + self.update_available_scheduler_addrs().await?; + + // Leave the host from the scheduler. let mut join_set = JoinSet::new(); - let available_schedulers = self.available_schedulers.read().await; - for available_scheduler in available_schedulers.iter() { + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; + for available_scheduler_addr in available_scheduler_addrs.iter() { let request = Self::make_request(request.clone()); async fn leave_host( addr: SocketAddr, @@ -230,7 +243,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(leave_host(*available_scheduler, request)); + join_set.spawn(leave_host(*available_scheduler_addr, request)); } while let Some(message) = join_set.join_next().await { @@ -245,6 +258,10 @@ impl SchedulerClient { // client gets the grpc client of the scheduler. #[instrument(skip(self))] async fn client(&self, key: String) -> Result> { + // Update scheduler addresses of the client. + self.update_available_scheduler_addrs().await?; + + // Get the scheduler address from the hashring. let addr = self.hashring.read().await; let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?; info!("{} picked {:?}", key, addr); @@ -258,7 +275,7 @@ impl SchedulerClient { Ok(channel) => channel, Err(err) => { error!("failed to connect to {:?}: {}", addr, err); - if let Err(err) = self.refresh_scheduler_client().await { + if let Err(err) = self.refresh_available_scheduler_addrs().await { error!("failed to refresh scheduler client: {}", err); }; @@ -269,12 +286,9 @@ impl SchedulerClient { Ok(SchedulerGRPCClient::new(channel)) } - // get_endpoints gets the endpoints of available schedulers. + // update_available_scheduler_addrs updates the addresses of available schedulers. #[instrument(skip(self))] - async fn refresh_scheduler_client(&self) -> Result<()> { - // Refresh the dynamic configuration. - self.dynconfig.refresh().await?; - + async fn update_available_scheduler_addrs(&self) -> Result<()> { // Get the endpoints of available schedulers. let data = self.dynconfig.data.read().await; @@ -283,9 +297,27 @@ impl SchedulerClient { return Err(Error::AvailableSchedulersNotFound()); } + // Get the available schedulers. + let available_schedulers = self.available_schedulers.read().await; + + // Check if the available schedulers is not changed. + if data.available_schedulers.len() == available_schedulers.len() + && data + .available_schedulers + .iter() + .all(|available_scheduler| available_schedulers.contains(available_scheduler)) + { + info!("available schedulers is not changed"); + return Ok(()); + } + drop(available_schedulers); + // Get the available schedulers. let mut available_schedulers = self.available_schedulers.write().await; + // Get the addresses of available schedulers. + let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await; + // Refresh the hashring. let mut new_hashring = HashRing::new(); for available_scheduler in data.available_schedulers.iter() { @@ -298,7 +330,10 @@ impl SchedulerClient { }; // Add the scheduler to the available schedulers. - available_schedulers.push(SocketAddr::new(ip, available_scheduler.port as u16)); + available_schedulers.push(available_scheduler.clone()); + + // Add the scheduler address to the addresses of available schedulers. + available_scheduler_addrs.push(SocketAddr::new(ip, available_scheduler.port as u16)); // Add the scheduler to the hashring. new_hashring.add(VNode { @@ -309,10 +344,23 @@ impl SchedulerClient { // Update the hashring. let mut hashring = self.hashring.write().await; *hashring = new_hashring; - info!("refresh available schedulers: {:?}", available_schedulers); + info!( + "refresh available scheduler addresses: {:?}", + available_scheduler_addrs + ); Ok(()) } + // refresh_available_scheduler_addrs refreshes addresses of available schedulers. + #[instrument(skip(self))] + async fn refresh_available_scheduler_addrs(&self) -> Result<()> { + // Refresh the dynamic configuration. + self.dynconfig.refresh().await?; + + // Update scheduler addresses of the client. + self.update_available_scheduler_addrs().await + } + // make_request creates a new request with timeout. fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request);