diff --git a/Cargo.lock b/Cargo.lock index 913691ba..f4663ac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -807,12 +807,6 @@ dependencies = [ "powerfmt", ] -[[package]] -name = "difflib" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" - [[package]] name = "digest" version = "0.10.7" @@ -845,12 +839,6 @@ dependencies = [ "const-random", ] -[[package]] -name = "downcast" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" - [[package]] name = "dragonfly-api" version = "2.0.162" @@ -898,7 +886,6 @@ dependencies = [ "lazy_static", "leaky-bucket", "libc", - "mockall", "openssl", "opentelemetry", "opentelemetry-jaeger", @@ -1150,15 +1137,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1189,12 +1167,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" - [[package]] name = "fs2" version = "0.4.3" @@ -2141,33 +2113,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "mockall" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" -dependencies = [ - "cfg-if", - "downcast", - "fragile", - "lazy_static", - "mockall_derive", - "predicates", - "predicates-tree", -] - -[[package]] -name = "mockall_derive" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "multer" version = "2.1.0" @@ -2256,12 +2201,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "ntapi" version = "0.4.1" @@ -2853,36 +2792,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "predicates" -version = "2.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" -dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", - "predicates-core", - "regex", -] - -[[package]] -name = "predicates-core" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" - -[[package]] -name = "predicates-tree" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" -dependencies = [ - "predicates-core", - "termtree", -] - [[package]] name = "prettyplease" version = "0.2.17" @@ -4060,12 +3969,6 @@ dependencies = [ "redox_termios", ] -[[package]] -name = "termtree" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" - [[package]] name = "thiserror" version = "1.0.63" diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index d176e995..a11e2daf 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -135,29 +135,6 @@ fn default_scheduler_schedule_timeout() -> Duration { Duration::from_secs(10) } -/// default_scheduler_cooldown_interval is the default cooldown time (in seconds) for the scheduler. -/// This duration specifies the time period during which the scheduler will not be accessed -/// after it becomes unavailable due to an exception. -#[inline] -fn default_scheduler_cooldown_interval() -> Duration { - Duration::from_secs(60) -} - -/// default_scheduler_max_attempts is the default maximum number of attempts to connect to the scheduler. -/// If the connection to the scheduler fails, dfdaemon will retry up to this number of times. -#[inline] -fn default_scheduler_max_attempts() -> u32 { - 3 -} - -/// default_scheduler_refresh_threshold is the default threshold for refreshing available scheduler nodes. -/// When the number of attempts to connect to the scheduler reaches this value, -/// dfdaemon will refresh the available scheduler nodes. -#[inline] -fn default_scheduler_refresh_threshold() -> u32 { - 10 -} - /// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager. #[inline] fn default_dynconfig_refresh_interval() -> Duration { @@ -515,28 +492,6 @@ pub struct Scheduler { #[serde(default = "default_download_max_schedule_count")] #[validate(range(min = 1))] pub max_schedule_count: u32, - - // cooldown_duration is the scheduler's cooldown time (in seconds). - // When the scheduler becomes unavailable due to an exception, this field specifies - // the time period during which the scheduler will not be accessed. - #[serde( - default = "default_scheduler_cooldown_interval", - with = "humantime_serde" - )] - pub cooldown_interval: Duration, - - // max_attempts is the maximum number of attempts to connect to the scheduler. - // If the connection to the scheduler fails, dfdaemon will retry up to this number of times. - #[serde(default = "default_scheduler_max_attempts")] - #[validate(range(min = 1))] - pub max_attempts: u32, - - // refresh_threshold is the threshold for refreshing available scheduler nodes. - // When the number of attempts to connect to the scheduler reaches this value, - // dfdaemon will refresh the available scheduler nodes. - #[serde(default = "default_scheduler_refresh_threshold")] - #[validate(range(min = 1))] - pub refresh_threshold: u32, } /// Scheduler implements Default. @@ -546,9 +501,6 @@ impl Default for Scheduler { announce_interval: default_scheduler_announce_interval(), schedule_timeout: default_scheduler_schedule_timeout(), max_schedule_count: default_download_max_schedule_count(), - cooldown_interval: default_scheduler_cooldown_interval(), - max_attempts: default_scheduler_max_attempts(), - refresh_threshold: default_scheduler_refresh_threshold(), } } } diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 7c270f21..65b1f732 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -98,14 +98,6 @@ pub enum DFError { #[error{"scheduler client not found"}] SchedulerClientNotFound, - /// ExceededMaxAttempts is the error when the maximum number of attempts is exceeded. - #[error("exceeded maximum number of attempts")] - ExceededMaxAttempts, - - /// SchedulerNotServing is the error when the current scheduler is not serving. - #[error("current scheduler is not serving")] - SchedulerNotServing, - /// UnexpectedResponse is the error when the response is unexpected. #[error{"unexpected response"}] UnexpectedResponse, diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index bf6be677..17e797c7 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -91,7 +91,6 @@ futures-util = "0.3.30" termion = "4.0.2" tabled = "0.16.0" path-absolutize = "3.1.1" -mockall = "0.11" [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client/src/dynconfig/mod.rs b/dragonfly-client/src/dynconfig/mod.rs index 66393af5..8ba716b8 100644 --- a/dragonfly-client/src/dynconfig/mod.rs +++ b/dragonfly-client/src/dynconfig/mod.rs @@ -110,12 +110,6 @@ impl Dynconfig { } } - /// get the config. - #[instrument(skip_all)] - pub async fn get_config(&self) -> Arc { - self.config.clone() - } - /// refresh refreshes the dynamic configuration of the dfdaemon. #[instrument(skip_all)] pub async fn refresh(&self) -> Result<()> { diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index dbb1f1ba..023c24c3 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -17,7 +17,6 @@ // use crate::dynconfig::Dynconfig; use crate::dynconfig::Dynconfig; use dragonfly_api::common::v2::{Peer, PersistentCachePeer, PersistentCacheTask, Task}; -use crate::grpc::health::HealthClient; use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, @@ -31,15 +30,12 @@ use dragonfly_api::scheduler::v2::{ use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr}; use dragonfly_client_core::{Error, Result}; use hashring::HashRing; -use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; -use std::time::Instant; use tokio::sync::RwLock; use tokio::task::JoinSet; use tonic::transport::Channel; -use tonic_health::pb::health_check_response::ServingStatus; use tracing::{error, info, instrument, Instrument}; /// VNode is the virtual node of the hashring. @@ -71,9 +67,6 @@ pub struct SchedulerClient { /// hashring is the hashring of the scheduler. hashring: Arc>>, - - /// unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable. - unavailable_scheduler_addrs: Arc>>, } /// SchedulerClient implements the grpc client of the scheduler. @@ -86,7 +79,6 @@ impl SchedulerClient { available_schedulers: Arc::new(RwLock::new(Vec::new())), available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())), hashring: Arc::new(RwLock::new(HashRing::new())), - unavailable_scheduler_addrs: Arc::new(RwLock::new(HashMap::new())), }; client.refresh_available_scheduler_addrs().await?; @@ -464,15 +456,15 @@ impl SchedulerClient { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; - // First try to get the address from the task_id. + // Get the scheduler address from the hashring. let addrs = self.hashring.read().await; let addr = *addrs .get(&task_id[0..5].to_string()) .ok_or_else(|| Error::HashRing(task_id.to_string()))?; drop(addrs); - info!("first picked {:?}", addr); + info!("picked {:?}", addr); - match Channel::from_shared(format!("http://{}", addr)) + let channel = match Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? .buffer_size(super::BUFFER_SIZE) .connect_timeout(super::CONNECT_TIMEOUT) @@ -483,130 +475,22 @@ impl SchedulerClient { .connect() .await { - Ok(channel) => { - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs.remove(&addr.addr); - drop(unavailable_scheduler_addrs); - return Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)); - } - + Ok(channel) => channel, Err(err) => { error!("connect to {} failed: {}", addr.to_string(), err); - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs - .entry(addr.addr) - .or_insert_with(Instant::now); - drop(unavailable_scheduler_addrs); - } - } + if let Err(err) = self.refresh_available_scheduler_addrs().await { + error!("failed to refresh scheduler client: {}", err); + }; - // Read the shooting configuration items. - let config = self.dynconfig.get_config().await; - let cooldown_interval = config.scheduler.cooldown_interval; - let max_attempts = config.scheduler.max_attempts; - let refresh_threshold = config.scheduler.refresh_threshold; - let mut attempts = 1; - let mut refresh_count = 1; - - // Traverse through available scheduler collections. - let hashring = self.hashring.read().await; - for vnode in hashring.clone().into_iter() { - let scheduler_addr = vnode.addr; - let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await; - if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) { - if instant.elapsed() < cooldown_interval { - continue; - } - } - drop(unavailable_scheduler_addrs); - info!("try to picked {:?}", scheduler_addr); - match self.check_scheduler(&scheduler_addr).await { - Ok(channel) => { - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs.remove(&scheduler_addr); - drop(unavailable_scheduler_addrs); - info!("finally picked {:?}", scheduler_addr); - return Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)); - } - Err(err) => { - error!("scheduler {} is not available: {}", scheduler_addr, err); - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs - .entry(scheduler_addr) - .or_insert_with(Instant::now); - drop(unavailable_scheduler_addrs); - - attempts += 1; - refresh_count += 1; - - if refresh_count >= refresh_threshold { - if let Err(err) = self.refresh_available_scheduler_addrs().await { - error!("failed to refresh scheduler client: {}", err); - }; - refresh_count = 0; - } - - if attempts >= max_attempts { - return Err(Error::ExceededMaxAttempts); - } - } - } - } - - Err(Error::AvailableSchedulersNotFound) - } - - /// Check the health of the scheduler. - async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result { - let addr = format!("http://{}:{}", scheduler_addr.ip(), scheduler_addr.port()); - let health_client = match HealthClient::new(&addr).await { - Ok(client) => client, - Err(err) => { - error!( - "create health client for scheduler {}:{} failed: {}", - scheduler_addr.ip(), - scheduler_addr.port(), - err - ); return Err(ExternalError::new(ErrorType::ConnectError) .with_cause(Box::new(err)) .into()); } }; - match health_client.check().await { - Ok(resp) => { - if resp.status == ServingStatus::Serving as i32 { - return Channel::from_shared(format!("http://{}", scheduler_addr)) - .map_err(|_| Error::InvalidURI(scheduler_addr.to_string()))? - .connect_timeout(super::CONNECT_TIMEOUT) - .connect() - .await - .map_err(|err| { - error!("connect to {} failed: {}", scheduler_addr.to_string(), err); - ExternalError::new(ErrorType::ConnectError) - .with_cause(Box::new(err)) - .into() - }); - } - - Err(Error::SchedulerNotServing) - } - Err(err) => { - error!("check scheduler health failed: {}", err); - Err(ExternalError::new(ErrorType::ConnectError) - .with_cause(Box::new(err)) - .into()) - } - } + Ok(SchedulerGRPCClient::new(channel) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX)) } /// update_available_scheduler_addrs updates the addresses of available schedulers. @@ -684,11 +568,6 @@ impl SchedulerClient { *hashring = new_hashring; drop(hashring); - // Clean the unavailable scheduler addrs. - let mut unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs.clear(); - drop(unavailable_scheduler_addrs); - let available_scheduler_addrs = self.available_scheduler_addrs.read().await; info!( "refresh available scheduler addresses: {:?}", @@ -718,456 +597,3 @@ impl SchedulerClient { request } } - -// test -#[cfg(test)] -mod tests { - use super::*; - use mockall::mock; - use mockall::predicate; - use mockall::predicate::always; - use tokio::time::Duration; - use tonic::transport::Channel; - - // Only mock the methods used in the client function. - mock! { - pub SchedulerClient { - async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result; - async fn update_available_scheduler_addrs(&self) -> Result<()>; - async fn refresh_available_scheduler_addrs(&self) -> Result<()>; - } - } - - mock! { - pub Dynconfig { - pub async fn get_config(&self) -> Arc; - } - } - - #[derive(Clone)] - pub struct SchedulerConfig { - pub cooldown_interval: Duration, - pub max_attempts: u32, - pub refresh_threshold: u32, - } - - // Reuse the Client logic in the Scheduler, retaining only the structural fields necessary for testing. - #[derive(Clone)] - pub struct SchedulerClient { - // available_scheduler_addrs is the addresses of available schedulers. - available_scheduler_addrs: Arc>>, - - // hashring is the hashring of the scheduler. - hashring: Arc>>, - - // unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable. - unavailable_scheduler_addrs: Arc>>, - } - - impl SchedulerClient { - // client gets the grpc client of the scheduler. - async fn client( - &self, - task_id: &str, - _peer_id: Option<&str>, - mock_scheduler_client: &MockSchedulerClient, - mock_dynconfig: &MockDynconfig, - ) -> Result> { - // Update scheduler addresses of the client. - mock_scheduler_client - .update_available_scheduler_addrs() - .await?; - - // First try to get the address from the task_id. - let addrs = self.hashring.read().await; - let addr = *addrs - .get(&task_id[0..5].to_string()) - .ok_or_else(|| Error::HashRing(task_id.to_string()))?; - drop(addrs); - - match Channel::from_shared(format!("http://{}", addr)) - .map_err(|_| Error::InvalidURI(addr.to_string()))? - .connect_timeout(super::super::CONNECT_TIMEOUT) - .connect() - .await - { - Ok(channel) => { - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs.remove(&addr.addr); - drop(unavailable_scheduler_addrs); - return Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)); - } - - Err(err) => { - error!("connect to {} failed: {}", addr.to_string(), err); - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs - .entry(addr.addr) - .or_insert_with(Instant::now); - drop(unavailable_scheduler_addrs); - } - } - - // Read the shooting configuration items. - let config = mock_dynconfig.get_config().await; - let cooldown_interval = config.cooldown_interval; - let max_attempts = config.max_attempts; - let refresh_threshold = config.refresh_threshold; - let mut attempts = 1; - let mut refresh_count = 1; - - // Traverse through available scheduler collections. - let hashring = self.hashring.read().await; - for vnode in hashring.clone().into_iter() { - let scheduler_addr = vnode.addr; - let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await; - if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) { - if instant.elapsed() < cooldown_interval { - continue; - } - } - drop(unavailable_scheduler_addrs); - match mock_scheduler_client.check_scheduler(&scheduler_addr).await { - Ok(channel) => { - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs.remove(&scheduler_addr); - drop(unavailable_scheduler_addrs); - return Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)); - } - Err(err) => { - error!("scheduler {} is not available: {}", scheduler_addr, err); - let mut unavailable_scheduler_addrs = - self.unavailable_scheduler_addrs.write().await; - unavailable_scheduler_addrs - .entry(scheduler_addr) - .or_insert_with(Instant::now); - drop(unavailable_scheduler_addrs); - - attempts += 1; - refresh_count += 1; - - if refresh_count >= refresh_threshold { - if let Err(err) = mock_scheduler_client - .refresh_available_scheduler_addrs() - .await - { - error!("failed to refresh scheduler client: {}", err); - }; - refresh_count = 0; - } - - if attempts >= max_attempts { - return Err(Error::ExceededMaxAttempts); - } - } - } - } - Err(Error::AvailableSchedulersNotFound) - } - } - - async fn setup() -> (SchedulerClient, MockSchedulerClient, MockDynconfig) { - // Mock the SchedulerClient::check_scheduler(). - let mock_scheduler_client = MockSchedulerClient::new(); - - // Mock the Dynconfig::get_config(). - let mock_dynconfig = MockDynconfig::new(); - - let available_scheduler_addrs = vec![ - "192.168.1.1:8080".parse().unwrap(), - "192.168.2.2:8080".parse().unwrap(), - "192.168.2.3:8080".parse().unwrap(), - "192.168.2.4:8080".parse().unwrap(), - "192.168.2.5:8080".parse().unwrap(), - "192.168.2.6:8080".parse().unwrap(), - "192.168.2.7:8080".parse().unwrap(), - "192.168.2.8:8080".parse().unwrap(), - "192.168.2.9:8080".parse().unwrap(), - "192.168.2.10:8080".parse().unwrap(), - "127.0.0.1:8080".parse().unwrap(), - ]; - let mut ring: HashRing = HashRing::new(); - let nodes: Vec = available_scheduler_addrs - .iter() - .map(|&addr| VNode { addr }) - .collect(); - for node in nodes { - ring.add(node); - } - let unavailable_scheduler_addrs: HashMap = HashMap::new(); - - let scheduler_client = SchedulerClient { - available_scheduler_addrs: Arc::new(RwLock::new(available_scheduler_addrs)), - hashring: Arc::new(RwLock::new(ring)), - unavailable_scheduler_addrs: Arc::new(RwLock::new(unavailable_scheduler_addrs)), - }; - - (scheduler_client, mock_scheduler_client, mock_dynconfig) - } - - #[tokio::test] - async fn test_client_exceeds_max_attempts() { - let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; - - mock_scheduler_client - .expect_check_scheduler() - .with(predicate::eq( - "127.0.0.1:8080".parse::().unwrap(), - )) - .returning(|_| { - let channel = Channel::from_static("").connect_lazy(); - Ok(channel) - }); - - mock_scheduler_client - .expect_check_scheduler() - .with(predicate::function(|addr: &SocketAddr| { - addr.ip().to_string().starts_with("192.168") - })) - .returning(|_| Err(Error::SchedulerNotServing)); - - mock_dynconfig.expect_get_config().returning(|| { - Arc::new(SchedulerConfig { - cooldown_interval: Duration::from_secs(60), - max_attempts: 3, - refresh_threshold: 10, - }) - }); - - // Mock the SchedulerClient::update_available_scheduler_addrs(). - mock_scheduler_client - .expect_update_available_scheduler_addrs() - .returning(|| Ok(())); - - // Mock the SchedulerClient::refresh_available_scheduler_addrs(). - mock_scheduler_client - .expect_refresh_available_scheduler_addrs() - .times(0) - .returning(|| Ok(())); - - let task_id = "task12345"; - let peer_id = Some("peer12345"); - match scheduler_client - .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) - .await - { - Ok(_) => { - panic!("Client should not connect successfully."); - } - Err(e) => match e { - Error::ExceededMaxAttempts => {} - _ => { - panic!("Unexpected error type."); - } - }, - } - } - - #[tokio::test] - async fn test_available_schedulers_not_found() { - let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; - - let mut call_count = 0; - mock_scheduler_client - .expect_check_scheduler() - .with(always()) - .returning_st(move |_| { - call_count += 1; - if call_count <= 12 { - Err(Error::SchedulerNotServing) - } else { - Ok(Channel::from_static("http://localhost:8080").connect_lazy()) - } - }); - - mock_dynconfig.expect_get_config().returning(|| { - Arc::new(SchedulerConfig { - cooldown_interval: Duration::from_secs(60), - max_attempts: 100, - refresh_threshold: 10, - }) - }); - - // Mock the SchedulerClient::update_available_scheduler_addrs(). - mock_scheduler_client - .expect_update_available_scheduler_addrs() - .returning(|| Ok(())); - - // Mock the SchedulerClient::refresh_available_scheduler_addrs(). - mock_scheduler_client - .expect_refresh_available_scheduler_addrs() - .returning(|| Ok(())); - - let task_id = "task12345"; - let peer_id = Some("peer12345"); - - // Step 1: Simulate a situation where all schedulers are unavailable. - match scheduler_client - .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) - .await - { - Ok(_) => { - panic!("Client should not connect successfully."); - } - Err(e) => match e { - Error::AvailableSchedulersNotFound => {} - _ => { - panic!("Unexpected error type."); - } - }, - } - - // Step 2: Verify if unavailable scheduler_addrs have stored content. - let available_addrs = scheduler_client.available_scheduler_addrs.read().await; - let unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.read().await; - for addr in available_addrs.iter() { - assert!( - unavailable_addrs.contains_key(addr), - "Address {:?} is not in unavailable_scheduler_addrs", - addr - ); - } - drop(available_addrs); - drop(unavailable_addrs); - - // Step: 3: Simulate a update operation to clear unavailable scheduler_addrs. - let mut unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.write().await; - unavailable_addrs.clear(); - drop(unavailable_addrs); - - // Step: 4: Verify that the scheduler is available. - match scheduler_client - .client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig) - .await - { - Ok(_) => {} - Err(_e) => { - panic!("Client should connect successfully.") - } - } - } - - #[tokio::test] - async fn test_available_schedulers_concurrent() { - let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await; - - mock_dynconfig.expect_get_config().returning(|| { - Arc::new(SchedulerConfig { - cooldown_interval: Duration::from_secs(60), - max_attempts: 100, - refresh_threshold: 10, - }) - }); - - mock_scheduler_client - .expect_update_available_scheduler_addrs() - .returning(|| Ok(())); - - mock_scheduler_client - .expect_refresh_available_scheduler_addrs() - .returning(|| Ok(())); - - let mut call_count = 0; - mock_scheduler_client - .expect_check_scheduler() - .with(always()) - .times(1100) - .returning_st(move |_| { - call_count += 1; - if call_count <= 1000 { - Err(Error::SchedulerNotServing) - } else { - Ok(Channel::from_static("http://localhost:8080").connect_lazy()) - } - }); - - let mock_dynconfig = Arc::new(mock_dynconfig); - let scheduler_client = Arc::new(scheduler_client); - let mock_scheduler_client = Arc::new(mock_scheduler_client); - - let task_id = "12345"; - let peer_id = Some("peer12345"); - - let handles: Vec<_> = (0..100) - .map(|_| { - let scheduler_client = Arc::clone(&scheduler_client); - let mock_scheduler_client = Arc::clone(&mock_scheduler_client); - let mock_dynconfig = Arc::clone(&mock_dynconfig); - let task_id = task_id.to_string(); - let peer_id = peer_id.map(|id| id.to_string()); - - tokio::spawn(async move { - // Step 1: Simulate a situation where all schedulers are unavailable. - match scheduler_client - .client( - &task_id, - peer_id.as_deref(), - &mock_scheduler_client, - &mock_dynconfig, - ) - .await - { - Ok(_) => { - panic!("Client should not connect successfully."); - } - Err(e) => match e { - Error::AvailableSchedulersNotFound => {} - _ => { - panic!("Unexpected error type."); - } - }, - } - - // Step 2: Verify if unavailable scheduler_addrs have stored content. - let available_addrs = scheduler_client.available_scheduler_addrs.read().await; - let unavailable_addrs = - scheduler_client.unavailable_scheduler_addrs.read().await; - for addr in available_addrs.iter() { - assert!( - unavailable_addrs.contains_key(addr), - "Address {:?} is not in unavailable_scheduler_addrs.", - addr - ); - } - drop(available_addrs); - drop(unavailable_addrs); - - // Step 3: Simulate an update operation to clear unavailable scheduler_addrs. - let mut unavailable_addrs = - scheduler_client.unavailable_scheduler_addrs.write().await; - unavailable_addrs.clear(); - drop(unavailable_addrs); - - // Step 4: Verify that all tasks can connect to the scheduler. - match scheduler_client - .client( - &task_id, - peer_id.as_deref(), - &mock_scheduler_client, - &mock_dynconfig, - ) - .await - { - Ok(_) => {} - Err(_e) => { - panic!("Client should connect successfully.") - } - } - }) - }) - .collect(); - - for handle in handles { - if let Err(_e) = handle.await { - panic!("One of the tasks panicked."); - } - } - } -}