diff --git a/Cargo.lock b/Cargo.lock index f4663ac5..913691ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -807,6 +807,12 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.10.7" @@ -839,6 +845,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dragonfly-api" version = "2.0.162" @@ -886,6 +898,7 @@ dependencies = [ "lazy_static", "leaky-bucket", "libc", + "mockall", "openssl", "opentelemetry", "opentelemetry-jaeger", @@ -1137,6 +1150,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1167,6 +1189,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs2" version = "0.4.3" @@ -2113,6 +2141,33 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "multer" version = "2.1.0" @@ -2201,6 +2256,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "ntapi" version = "0.4.1" @@ -2792,6 +2853,36 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "prettyplease" version = "0.2.17" @@ -3969,6 +4060,12 @@ dependencies = [ "redox_termios", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.63" diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index a11e2daf..d176e995 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -135,6 +135,29 @@ fn default_scheduler_schedule_timeout() -> Duration { Duration::from_secs(10) } +/// default_scheduler_cooldown_interval is the default cooldown time (in seconds) for the scheduler. +/// This duration specifies the time period during which the scheduler will not be accessed +/// after it becomes unavailable due to an exception. +#[inline] +fn default_scheduler_cooldown_interval() -> Duration { + Duration::from_secs(60) +} + +/// default_scheduler_max_attempts is the default maximum number of attempts to connect to the scheduler. +/// If the connection to the scheduler fails, dfdaemon will retry up to this number of times. +#[inline] +fn default_scheduler_max_attempts() -> u32 { + 3 +} + +/// default_scheduler_refresh_threshold is the default threshold for refreshing available scheduler nodes. +/// When the number of attempts to connect to the scheduler reaches this value, +/// dfdaemon will refresh the available scheduler nodes. +#[inline] +fn default_scheduler_refresh_threshold() -> u32 { + 10 +} + /// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager. #[inline] fn default_dynconfig_refresh_interval() -> Duration { @@ -492,6 +515,28 @@ pub struct Scheduler { #[serde(default = "default_download_max_schedule_count")] #[validate(range(min = 1))] pub max_schedule_count: u32, + + // cooldown_duration is the scheduler's cooldown time (in seconds). + // When the scheduler becomes unavailable due to an exception, this field specifies + // the time period during which the scheduler will not be accessed. + #[serde( + default = "default_scheduler_cooldown_interval", + with = "humantime_serde" + )] + pub cooldown_interval: Duration, + + // max_attempts is the maximum number of attempts to connect to the scheduler. + // If the connection to the scheduler fails, dfdaemon will retry up to this number of times. + #[serde(default = "default_scheduler_max_attempts")] + #[validate(range(min = 1))] + pub max_attempts: u32, + + // refresh_threshold is the threshold for refreshing available scheduler nodes. + // When the number of attempts to connect to the scheduler reaches this value, + // dfdaemon will refresh the available scheduler nodes. + #[serde(default = "default_scheduler_refresh_threshold")] + #[validate(range(min = 1))] + pub refresh_threshold: u32, } /// Scheduler implements Default. @@ -501,6 +546,9 @@ impl Default for Scheduler { announce_interval: default_scheduler_announce_interval(), schedule_timeout: default_scheduler_schedule_timeout(), max_schedule_count: default_download_max_schedule_count(), + cooldown_interval: default_scheduler_cooldown_interval(), + max_attempts: default_scheduler_max_attempts(), + refresh_threshold: default_scheduler_refresh_threshold(), } } } diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 65b1f732..7c270f21 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -98,6 +98,14 @@ pub enum DFError { #[error{"scheduler client not found"}] SchedulerClientNotFound, + /// ExceededMaxAttempts is the error when the maximum number of attempts is exceeded. + #[error("exceeded maximum number of attempts")] + ExceededMaxAttempts, + + /// SchedulerNotServing is the error when the current scheduler is not serving. + #[error("current scheduler is not serving")] + SchedulerNotServing, + /// UnexpectedResponse is the error when the response is unexpected. #[error{"unexpected response"}] UnexpectedResponse, diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 17e797c7..bf6be677 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -91,6 +91,7 @@ futures-util = "0.3.30" termion = "4.0.2" tabled = "0.16.0" path-absolutize = "3.1.1" +mockall = "0.11" [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client/src/dynconfig/mod.rs b/dragonfly-client/src/dynconfig/mod.rs index 8ba716b8..66393af5 100644 --- a/dragonfly-client/src/dynconfig/mod.rs +++ b/dragonfly-client/src/dynconfig/mod.rs @@ -110,6 +110,12 @@ impl Dynconfig { } } + /// get the config. + #[instrument(skip_all)] + pub async fn get_config(&self) -> Arc { + self.config.clone() + } + /// refresh refreshes the dynamic configuration of the dfdaemon. #[instrument(skip_all)] pub async fn refresh(&self) -> Result<()> { diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index 023c24c3..dbb1f1ba 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -17,6 +17,7 @@ // use crate::dynconfig::Dynconfig; use crate::dynconfig::Dynconfig; use dragonfly_api::common::v2::{Peer, PersistentCachePeer, PersistentCacheTask, Task}; +use crate::grpc::health::HealthClient; use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, @@ -30,12 +31,15 @@ use dragonfly_api::scheduler::v2::{ use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr}; use dragonfly_client_core::{Error, Result}; use hashring::HashRing; +use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; +use std::time::Instant; use tokio::sync::RwLock; use tokio::task::JoinSet; use tonic::transport::Channel; +use tonic_health::pb::health_check_response::ServingStatus; use tracing::{error, info, instrument, Instrument}; /// VNode is the virtual node of the hashring. @@ -67,6 +71,9 @@ pub struct SchedulerClient { /// hashring is the hashring of the scheduler. hashring: Arc>>, + + /// unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable. + unavailable_scheduler_addrs: Arc>>, } /// SchedulerClient implements the grpc client of the scheduler. @@ -79,6 +86,7 @@ impl SchedulerClient { available_schedulers: Arc::new(RwLock::new(Vec::new())), available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())), hashring: Arc::new(RwLock::new(HashRing::new())), + unavailable_scheduler_addrs: Arc::new(RwLock::new(HashMap::new())), }; client.refresh_available_scheduler_addrs().await?; @@ -456,15 +464,15 @@ impl SchedulerClient { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; - // Get the scheduler address from the hashring. + // First try to get the address from the task_id. let addrs = self.hashring.read().await; let addr = *addrs .get(&task_id[0..5].to_string()) .ok_or_else(|| Error::HashRing(task_id.to_string()))?; drop(addrs); - info!("picked {:?}", addr); + info!("first picked {:?}", addr); - let channel = match Channel::from_shared(format!("http://{}", addr)) + match Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? .buffer_size(super::BUFFER_SIZE) .connect_timeout(super::CONNECT_TIMEOUT) @@ -475,22 +483,130 @@ impl SchedulerClient { .connect() .await { - Ok(channel) => channel, + Ok(channel) => { + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs.remove(&addr.addr); + drop(unavailable_scheduler_addrs); + return Ok(SchedulerGRPCClient::new(channel) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX)); + } + Err(err) => { error!("connect to {} failed: {}", addr.to_string(), err); - if let Err(err) = self.refresh_available_scheduler_addrs().await { - error!("failed to refresh scheduler client: {}", err); - }; + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs + .entry(addr.addr) + .or_insert_with(Instant::now); + drop(unavailable_scheduler_addrs); + } + } + // Read the shooting configuration items. + let config = self.dynconfig.get_config().await; + let cooldown_interval = config.scheduler.cooldown_interval; + let max_attempts = config.scheduler.max_attempts; + let refresh_threshold = config.scheduler.refresh_threshold; + let mut attempts = 1; + let mut refresh_count = 1; + + // Traverse through available scheduler collections. + let hashring = self.hashring.read().await; + for vnode in hashring.clone().into_iter() { + let scheduler_addr = vnode.addr; + let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await; + if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) { + if instant.elapsed() < cooldown_interval { + continue; + } + } + drop(unavailable_scheduler_addrs); + info!("try to picked {:?}", scheduler_addr); + match self.check_scheduler(&scheduler_addr).await { + Ok(channel) => { + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs.remove(&scheduler_addr); + drop(unavailable_scheduler_addrs); + info!("finally picked {:?}", scheduler_addr); + return Ok(SchedulerGRPCClient::new(channel) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX)); + } + Err(err) => { + error!("scheduler {} is not available: {}", scheduler_addr, err); + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs + .entry(scheduler_addr) + .or_insert_with(Instant::now); + drop(unavailable_scheduler_addrs); + + attempts += 1; + refresh_count += 1; + + if refresh_count >= refresh_threshold { + if let Err(err) = self.refresh_available_scheduler_addrs().await { + error!("failed to refresh scheduler client: {}", err); + }; + refresh_count = 0; + } + + if attempts >= max_attempts { + return Err(Error::ExceededMaxAttempts); + } + } + } + } + + Err(Error::AvailableSchedulersNotFound) + } + + /// Check the health of the scheduler. + async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result { + let addr = format!("http://{}:{}", scheduler_addr.ip(), scheduler_addr.port()); + let health_client = match HealthClient::new(&addr).await { + Ok(client) => client, + Err(err) => { + error!( + "create health client for scheduler {}:{} failed: {}", + scheduler_addr.ip(), + scheduler_addr.port(), + err + ); return Err(ExternalError::new(ErrorType::ConnectError) .with_cause(Box::new(err)) .into()); } }; - Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)) + match health_client.check().await { + Ok(resp) => { + if resp.status == ServingStatus::Serving as i32 { + return Channel::from_shared(format!("http://{}", scheduler_addr)) + .map_err(|_| Error::InvalidURI(scheduler_addr.to_string()))? + .connect_timeout(super::CONNECT_TIMEOUT) + .connect() + .await + .map_err(|err| { + error!("connect to {} failed: {}", scheduler_addr.to_string(), err); + ExternalError::new(ErrorType::ConnectError) + .with_cause(Box::new(err)) + .into() + }); + } + + Err(Error::SchedulerNotServing) + } + Err(err) => { + error!("check scheduler health failed: {}", err); + Err(ExternalError::new(ErrorType::ConnectError) + .with_cause(Box::new(err)) + .into()) + } + } } /// update_available_scheduler_addrs updates the addresses of available schedulers. @@ -568,6 +684,11 @@ impl SchedulerClient { *hashring = new_hashring; drop(hashring); + // Clean the unavailable scheduler addrs. + let mut unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs.clear(); + drop(unavailable_scheduler_addrs); + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; info!( "refresh available scheduler addresses: {:?}", @@ -597,3 +718,456 @@ impl SchedulerClient { request } } + +// test +#[cfg(test)] +mod tests { + use super::*; + use mockall::mock; + use mockall::predicate; + use mockall::predicate::always; + use tokio::time::Duration; + use tonic::transport::Channel; + + // Only mock the methods used in the client function. + mock! { + pub SchedulerClient { + async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result; + async fn update_available_scheduler_addrs(&self) -> Result<()>; + async fn refresh_available_scheduler_addrs(&self) -> Result<()>; + } + } + + mock! { + pub Dynconfig { + pub async fn get_config(&self) -> Arc; + } + } + + #[derive(Clone)] + pub struct SchedulerConfig { + pub cooldown_interval: Duration, + pub max_attempts: u32, + pub refresh_threshold: u32, + } + + // Reuse the Client logic in the Scheduler, retaining only the structural fields necessary for testing. + #[derive(Clone)] + pub struct SchedulerClient { + // available_scheduler_addrs is the addresses of available schedulers. + available_scheduler_addrs: Arc>>, + + // hashring is the hashring of the scheduler. + hashring: Arc>>, + + // unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable. + unavailable_scheduler_addrs: Arc>>, + } + + impl SchedulerClient { + // client gets the grpc client of the scheduler. + async fn client( + &self, + task_id: &str, + _peer_id: Option<&str>, + mock_scheduler_client: &MockSchedulerClient, + mock_dynconfig: &MockDynconfig, + ) -> Result> { + // Update scheduler addresses of the client. + mock_scheduler_client + .update_available_scheduler_addrs() + .await?; + + // First try to get the address from the task_id. + let addrs = self.hashring.read().await; + let addr = *addrs + .get(&task_id[0..5].to_string()) + .ok_or_else(|| Error::HashRing(task_id.to_string()))?; + drop(addrs); + + match Channel::from_shared(format!("http://{}", addr)) + .map_err(|_| Error::InvalidURI(addr.to_string()))? + .connect_timeout(super::super::CONNECT_TIMEOUT) + .connect() + .await + { + Ok(channel) => { + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs.remove(&addr.addr); + drop(unavailable_scheduler_addrs); + return Ok(SchedulerGRPCClient::new(channel) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX)); + } + + Err(err) => { + error!("connect to {} failed: {}", addr.to_string(), err); + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs + .entry(addr.addr) + .or_insert_with(Instant::now); + drop(unavailable_scheduler_addrs); + } + } + + // Read the shooting configuration items. + let config = mock_dynconfig.get_config().await; + let cooldown_interval = config.cooldown_interval; + let max_attempts = config.max_attempts; + let refresh_threshold = config.refresh_threshold; + let mut attempts = 1; + let mut refresh_count = 1; + + // Traverse through available scheduler collections. + let hashring = self.hashring.read().await; + for vnode in hashring.clone().into_iter() { + let scheduler_addr = vnode.addr; + let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await; + if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) { + if instant.elapsed() < cooldown_interval { + continue; + } + } + drop(unavailable_scheduler_addrs); + match mock_scheduler_client.check_scheduler(&scheduler_addr).await { + Ok(channel) => { + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs.remove(&scheduler_addr); + drop(unavailable_scheduler_addrs); + return Ok(SchedulerGRPCClient::new(channel) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX)); + } + Err(err) => { + error!("scheduler {} is not available: {}", scheduler_addr, err); + let mut unavailable_scheduler_addrs = + self.unavailable_scheduler_addrs.write().await; + unavailable_scheduler_addrs + .entry(scheduler_addr) + .or_insert_with(Instant::now); + drop(unavailable_scheduler_addrs); + + attempts += 1; + refresh_count += 1; + + if refresh_count >= refresh_threshold { + if let Err(err) = mock_scheduler_client + .refresh_available_scheduler_addrs() + .await + { + error!("failed to refresh scheduler client: {}", err); + }; + refresh_count = 0; + } + + if attempts >= max_attempts { + return Err(Error::ExceededMaxAttempts); + } + } + } + } + Err(Error::AvailableSchedulersNotFound) + } + } + + async fn setup() -> (SchedulerClient, MockSchedulerClient, MockDynconfig) { + // Mock the SchedulerClient::check_scheduler(). + let mock_scheduler_client = MockSchedulerClient::new(); + + // Mock the Dynconfig::get_config(). + let mock_dynconfig = MockDynconfig::new(); + + let available_scheduler_addrs = vec![ + "192.168.1.1:8080".parse().unwrap(), + "192.168.2.2:8080".parse().unwrap(), + "192.168.2.3:8080".parse().unwrap(), + "192.168.2.4:8080".parse().unwrap(), + "192.168.2.5:8080".parse().unwrap(), + "192.168.2.6:8080".parse().unwrap(), + "192.168.2.7:8080".parse().unwrap(), + "192.168.2.8:8080".parse().unwrap(), + "192.168.2.9:8080".parse().unwrap(), + "192.168.2.10:8080".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap(), + ]; + let mut ring: HashRing = HashRing::new(); + let nodes: Vec = available_scheduler_addrs + .iter() + .map(|&addr| VNode { addr }) + .collect(); + for node in nodes { + ring.add(node); + } + let unavailable_scheduler_addrs: HashMap = HashMap::new(); + + let scheduler_client = SchedulerClient { + available_scheduler_addrs: Arc::new(RwLock::new(available_scheduler_addrs)), + hashring: Arc::new(RwLock::new(ring)), + unavailable_scheduler_addrs: Arc::new(RwLock::new(unavailable_scheduler_addrs)), + }; + + (scheduler_client, mock_scheduler_client, mock_dynconfig) + } + + #[tokio::test] + async fn test_client_exceeds_max_attempts() { + let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; + + mock_scheduler_client + .expect_check_scheduler() + .with(predicate::eq( + "127.0.0.1:8080".parse::().unwrap(), + )) + .returning(|_| { + let channel = Channel::from_static("").connect_lazy(); + Ok(channel) + }); + + mock_scheduler_client + .expect_check_scheduler() + .with(predicate::function(|addr: &SocketAddr| { + addr.ip().to_string().starts_with("192.168") + })) + .returning(|_| Err(Error::SchedulerNotServing)); + + mock_dynconfig.expect_get_config().returning(|| { + Arc::new(SchedulerConfig { + cooldown_interval: Duration::from_secs(60), + max_attempts: 3, + refresh_threshold: 10, + }) + }); + + // Mock the SchedulerClient::update_available_scheduler_addrs(). + mock_scheduler_client + .expect_update_available_scheduler_addrs() + .returning(|| Ok(())); + + // Mock the SchedulerClient::refresh_available_scheduler_addrs(). + mock_scheduler_client + .expect_refresh_available_scheduler_addrs() + .times(0) + .returning(|| Ok(())); + + let task_id = "task12345"; + let peer_id = Some("peer12345"); + match scheduler_client + .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) + .await + { + Ok(_) => { + panic!("Client should not connect successfully."); + } + Err(e) => match e { + Error::ExceededMaxAttempts => {} + _ => { + panic!("Unexpected error type."); + } + }, + } + } + + #[tokio::test] + async fn test_available_schedulers_not_found() { + let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; + + let mut call_count = 0; + mock_scheduler_client + .expect_check_scheduler() + .with(always()) + .returning_st(move |_| { + call_count += 1; + if call_count <= 12 { + Err(Error::SchedulerNotServing) + } else { + Ok(Channel::from_static("http://localhost:8080").connect_lazy()) + } + }); + + mock_dynconfig.expect_get_config().returning(|| { + Arc::new(SchedulerConfig { + cooldown_interval: Duration::from_secs(60), + max_attempts: 100, + refresh_threshold: 10, + }) + }); + + // Mock the SchedulerClient::update_available_scheduler_addrs(). + mock_scheduler_client + .expect_update_available_scheduler_addrs() + .returning(|| Ok(())); + + // Mock the SchedulerClient::refresh_available_scheduler_addrs(). + mock_scheduler_client + .expect_refresh_available_scheduler_addrs() + .returning(|| Ok(())); + + let task_id = "task12345"; + let peer_id = Some("peer12345"); + + // Step 1: Simulate a situation where all schedulers are unavailable. + match scheduler_client + .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) + .await + { + Ok(_) => { + panic!("Client should not connect successfully."); + } + Err(e) => match e { + Error::AvailableSchedulersNotFound => {} + _ => { + panic!("Unexpected error type."); + } + }, + } + + // Step 2: Verify if unavailable scheduler_addrs have stored content. + let available_addrs = scheduler_client.available_scheduler_addrs.read().await; + let unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.read().await; + for addr in available_addrs.iter() { + assert!( + unavailable_addrs.contains_key(addr), + "Address {:?} is not in unavailable_scheduler_addrs", + addr + ); + } + drop(available_addrs); + drop(unavailable_addrs); + + // Step: 3: Simulate a update operation to clear unavailable scheduler_addrs. + let mut unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.write().await; + unavailable_addrs.clear(); + drop(unavailable_addrs); + + // Step: 4: Verify that the scheduler is available. + match scheduler_client + .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) + .await + { + Ok(_) => {} + Err(_e) => { + panic!("Client should connect successfully.") + } + } + } + + #[tokio::test] + async fn test_available_schedulers_concurrent() { + let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; + + mock_dynconfig.expect_get_config().returning(|| { + Arc::new(SchedulerConfig { + cooldown_interval: Duration::from_secs(60), + max_attempts: 100, + refresh_threshold: 10, + }) + }); + + mock_scheduler_client + .expect_update_available_scheduler_addrs() + .returning(|| Ok(())); + + mock_scheduler_client + .expect_refresh_available_scheduler_addrs() + .returning(|| Ok(())); + + let mut call_count = 0; + mock_scheduler_client + .expect_check_scheduler() + .with(always()) + .times(1100) + .returning_st(move |_| { + call_count += 1; + if call_count <= 1000 { + Err(Error::SchedulerNotServing) + } else { + Ok(Channel::from_static("http://localhost:8080").connect_lazy()) + } + }); + + let mock_dynconfig = Arc::new(mock_dynconfig); + let scheduler_client = Arc::new(scheduler_client); + let mock_scheduler_client = Arc::new(mock_scheduler_client); + + let task_id = "12345"; + let peer_id = Some("peer12345"); + + let handles: Vec<_> = (0..100) + .map(|_| { + let scheduler_client = Arc::clone(&scheduler_client); + let mock_scheduler_client = Arc::clone(&mock_scheduler_client); + let mock_dynconfig = Arc::clone(&mock_dynconfig); + let task_id = task_id.to_string(); + let peer_id = peer_id.map(|id| id.to_string()); + + tokio::spawn(async move { + // Step 1: Simulate a situation where all schedulers are unavailable. + match scheduler_client + .client( + &task_id, + peer_id.as_deref(), + &mock_scheduler_client, + &mock_dynconfig, + ) + .await + { + Ok(_) => { + panic!("Client should not connect successfully."); + } + Err(e) => match e { + Error::AvailableSchedulersNotFound => {} + _ => { + panic!("Unexpected error type."); + } + }, + } + + // Step 2: Verify if unavailable scheduler_addrs have stored content. + let available_addrs = scheduler_client.available_scheduler_addrs.read().await; + let unavailable_addrs = + scheduler_client.unavailable_scheduler_addrs.read().await; + for addr in available_addrs.iter() { + assert!( + unavailable_addrs.contains_key(addr), + "Address {:?} is not in unavailable_scheduler_addrs.", + addr + ); + } + drop(available_addrs); + drop(unavailable_addrs); + + // Step 3: Simulate an update operation to clear unavailable scheduler_addrs. + let mut unavailable_addrs = + scheduler_client.unavailable_scheduler_addrs.write().await; + unavailable_addrs.clear(); + drop(unavailable_addrs); + + // Step 4: Verify that all tasks can connect to the scheduler. + match scheduler_client + .client( + &task_id, + peer_id.as_deref(), + &mock_scheduler_client, + &mock_dynconfig, + ) + .await + { + Ok(_) => {} + Err(_e) => { + panic!("Client should connect successfully.") + } + } + }) + }) + .collect(); + + for handle in handles { + if let Err(_e) = handle.await { + panic!("One of the tasks panicked."); + } + } + } +}