feat(scheduler): add automatic scheduler switching feature (#591)

Signed-off-by: yuancr <2637554776@qq.com>
This commit is contained in:
PoloDyBala 2024-09-29 12:01:16 +08:00 committed by GitHub
parent f536bab788
commit a0bc93c69a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 744 additions and 10 deletions

97
Cargo.lock generated
View File

@ -807,6 +807,12 @@ dependencies = [
"powerfmt",
]
[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.7"
@ -839,6 +845,12 @@ dependencies = [
"const-random",
]
[[package]]
name = "downcast"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "dragonfly-api"
version = "2.0.162"
@ -886,6 +898,7 @@ dependencies = [
"lazy_static",
"leaky-bucket",
"libc",
"mockall",
"openssl",
"opentelemetry",
"opentelemetry-jaeger",
@ -1137,6 +1150,15 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4"
dependencies = [
"num-traits",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1167,6 +1189,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fragile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "fs2"
version = "0.4.3"
@ -2113,6 +2141,33 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mockall"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96"
dependencies = [
"cfg-if",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb"
dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "multer"
version = "2.1.0"
@ -2201,6 +2256,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]]
name = "ntapi"
version = "0.4.1"
@ -2792,6 +2853,36 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "predicates"
version = "2.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd"
dependencies = [
"difflib",
"float-cmp",
"itertools 0.10.5",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931"
[[package]]
name = "predicates-tree"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13"
dependencies = [
"predicates-core",
"termtree",
]
[[package]]
name = "prettyplease"
version = "0.2.17"
@ -3969,6 +4060,12 @@ dependencies = [
"redox_termios",
]
[[package]]
name = "termtree"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "thiserror"
version = "1.0.63"

View File

@ -135,6 +135,29 @@ fn default_scheduler_schedule_timeout() -> Duration {
Duration::from_secs(10)
}
/// default_scheduler_cooldown_interval is the default cooldown time (in seconds) for the scheduler.
/// This duration specifies the time period during which the scheduler will not be accessed
/// after it becomes unavailable due to an exception.
#[inline]
fn default_scheduler_cooldown_interval() -> Duration {
Duration::from_secs(60)
}
/// default_scheduler_max_attempts is the default maximum number of attempts to connect to the scheduler.
/// If the connection to the scheduler fails, dfdaemon will retry up to this number of times.
#[inline]
fn default_scheduler_max_attempts() -> u32 {
3
}
/// default_scheduler_refresh_threshold is the default threshold for refreshing available scheduler nodes.
/// When the number of attempts to connect to the scheduler reaches this value,
/// dfdaemon will refresh the available scheduler nodes.
#[inline]
fn default_scheduler_refresh_threshold() -> u32 {
10
}
/// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager.
#[inline]
fn default_dynconfig_refresh_interval() -> Duration {
@ -492,6 +515,28 @@ pub struct Scheduler {
#[serde(default = "default_download_max_schedule_count")]
#[validate(range(min = 1))]
pub max_schedule_count: u32,
// cooldown_duration is the scheduler's cooldown time (in seconds).
// When the scheduler becomes unavailable due to an exception, this field specifies
// the time period during which the scheduler will not be accessed.
#[serde(
default = "default_scheduler_cooldown_interval",
with = "humantime_serde"
)]
pub cooldown_interval: Duration,
// max_attempts is the maximum number of attempts to connect to the scheduler.
// If the connection to the scheduler fails, dfdaemon will retry up to this number of times.
#[serde(default = "default_scheduler_max_attempts")]
#[validate(range(min = 1))]
pub max_attempts: u32,
// refresh_threshold is the threshold for refreshing available scheduler nodes.
// When the number of attempts to connect to the scheduler reaches this value,
// dfdaemon will refresh the available scheduler nodes.
#[serde(default = "default_scheduler_refresh_threshold")]
#[validate(range(min = 1))]
pub refresh_threshold: u32,
}
/// Scheduler implements Default.
@ -501,6 +546,9 @@ impl Default for Scheduler {
announce_interval: default_scheduler_announce_interval(),
schedule_timeout: default_scheduler_schedule_timeout(),
max_schedule_count: default_download_max_schedule_count(),
cooldown_interval: default_scheduler_cooldown_interval(),
max_attempts: default_scheduler_max_attempts(),
refresh_threshold: default_scheduler_refresh_threshold(),
}
}
}

View File

@ -98,6 +98,14 @@ pub enum DFError {
#[error{"scheduler client not found"}]
SchedulerClientNotFound,
/// ExceededMaxAttempts is the error when the maximum number of attempts is exceeded.
#[error("exceeded maximum number of attempts")]
ExceededMaxAttempts,
/// SchedulerNotServing is the error when the current scheduler is not serving.
#[error("current scheduler is not serving")]
SchedulerNotServing,
/// UnexpectedResponse is the error when the response is unexpected.
#[error{"unexpected response"}]
UnexpectedResponse,

View File

@ -91,6 +91,7 @@ futures-util = "0.3.30"
termion = "4.0.2"
tabled = "0.16.0"
path-absolutize = "3.1.1"
mockall = "0.11"
[dev-dependencies]
tempfile.workspace = true

View File

@ -110,6 +110,12 @@ impl Dynconfig {
}
}
/// get the config.
#[instrument(skip_all)]
pub async fn get_config(&self) -> Arc<Config> {
self.config.clone()
}
/// refresh refreshes the dynamic configuration of the dfdaemon.
#[instrument(skip_all)]
pub async fn refresh(&self) -> Result<()> {

View File

@ -17,6 +17,7 @@
// use crate::dynconfig::Dynconfig;
use crate::dynconfig::Dynconfig;
use dragonfly_api::common::v2::{Peer, PersistentCachePeer, PersistentCacheTask, Task};
use crate::grpc::health::HealthClient;
use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
@ -30,12 +31,15 @@ use dragonfly_api::scheduler::v2::{
use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr};
use dragonfly_client_core::{Error, Result};
use hashring::HashRing;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tonic::transport::Channel;
use tonic_health::pb::health_check_response::ServingStatus;
use tracing::{error, info, instrument, Instrument};
/// VNode is the virtual node of the hashring.
@ -67,6 +71,9 @@ pub struct SchedulerClient {
/// hashring is the hashring of the scheduler.
hashring: Arc<RwLock<HashRing<VNode>>>,
/// unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable.
unavailable_scheduler_addrs: Arc<RwLock<HashMap<SocketAddr, Instant>>>,
}
/// SchedulerClient implements the grpc client of the scheduler.
@ -79,6 +86,7 @@ impl SchedulerClient {
available_schedulers: Arc::new(RwLock::new(Vec::new())),
available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())),
hashring: Arc::new(RwLock::new(HashRing::new())),
unavailable_scheduler_addrs: Arc::new(RwLock::new(HashMap::new())),
};
client.refresh_available_scheduler_addrs().await?;
@ -456,15 +464,15 @@ impl SchedulerClient {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;
// Get the scheduler address from the hashring.
// First try to get the address from the task_id.
let addrs = self.hashring.read().await;
let addr = *addrs
.get(&task_id[0..5].to_string())
.ok_or_else(|| Error::HashRing(task_id.to_string()))?;
drop(addrs);
info!("picked {:?}", addr);
info!("first picked {:?}", addr);
let channel = match Channel::from_shared(format!("http://{}", addr))
match Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
@ -475,22 +483,130 @@ impl SchedulerClient {
.connect()
.await
{
Ok(channel) => channel,
Ok(channel) => {
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs.remove(&addr.addr);
drop(unavailable_scheduler_addrs);
return Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX));
}
Err(err) => {
error!("connect to {} failed: {}", addr.to_string(), err);
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs
.entry(addr.addr)
.or_insert_with(Instant::now);
drop(unavailable_scheduler_addrs);
}
}
// Read the shooting configuration items.
let config = self.dynconfig.get_config().await;
let cooldown_interval = config.scheduler.cooldown_interval;
let max_attempts = config.scheduler.max_attempts;
let refresh_threshold = config.scheduler.refresh_threshold;
let mut attempts = 1;
let mut refresh_count = 1;
// Traverse through available scheduler collections.
let hashring = self.hashring.read().await;
for vnode in hashring.clone().into_iter() {
let scheduler_addr = vnode.addr;
let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await;
if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) {
if instant.elapsed() < cooldown_interval {
continue;
}
}
drop(unavailable_scheduler_addrs);
info!("try to picked {:?}", scheduler_addr);
match self.check_scheduler(&scheduler_addr).await {
Ok(channel) => {
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs.remove(&scheduler_addr);
drop(unavailable_scheduler_addrs);
info!("finally picked {:?}", scheduler_addr);
return Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX));
}
Err(err) => {
error!("scheduler {} is not available: {}", scheduler_addr, err);
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs
.entry(scheduler_addr)
.or_insert_with(Instant::now);
drop(unavailable_scheduler_addrs);
attempts += 1;
refresh_count += 1;
if refresh_count >= refresh_threshold {
if let Err(err) = self.refresh_available_scheduler_addrs().await {
error!("failed to refresh scheduler client: {}", err);
};
refresh_count = 0;
}
if attempts >= max_attempts {
return Err(Error::ExceededMaxAttempts);
}
}
}
}
Err(Error::AvailableSchedulersNotFound)
}
/// Check the health of the scheduler.
async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result<Channel> {
let addr = format!("http://{}:{}", scheduler_addr.ip(), scheduler_addr.port());
let health_client = match HealthClient::new(&addr).await {
Ok(client) => client,
Err(err) => {
error!(
"create health client for scheduler {}:{} failed: {}",
scheduler_addr.ip(),
scheduler_addr.port(),
err
);
return Err(ExternalError::new(ErrorType::ConnectError)
.with_cause(Box::new(err))
.into());
}
};
Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX))
match health_client.check().await {
Ok(resp) => {
if resp.status == ServingStatus::Serving as i32 {
return Channel::from_shared(format!("http://{}", scheduler_addr))
.map_err(|_| Error::InvalidURI(scheduler_addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.connect()
.await
.map_err(|err| {
error!("connect to {} failed: {}", scheduler_addr.to_string(), err);
ExternalError::new(ErrorType::ConnectError)
.with_cause(Box::new(err))
.into()
});
}
Err(Error::SchedulerNotServing)
}
Err(err) => {
error!("check scheduler health failed: {}", err);
Err(ExternalError::new(ErrorType::ConnectError)
.with_cause(Box::new(err))
.into())
}
}
}
/// update_available_scheduler_addrs updates the addresses of available schedulers.
@ -568,6 +684,11 @@ impl SchedulerClient {
*hashring = new_hashring;
drop(hashring);
// Clean the unavailable scheduler addrs.
let mut unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs.clear();
drop(unavailable_scheduler_addrs);
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
info!(
"refresh available scheduler addresses: {:?}",
@ -597,3 +718,456 @@ impl SchedulerClient {
request
}
}
// test
#[cfg(test)]
mod tests {
use super::*;
use mockall::mock;
use mockall::predicate;
use mockall::predicate::always;
use tokio::time::Duration;
use tonic::transport::Channel;
// Only mock the methods used in the client function.
mock! {
pub SchedulerClient {
async fn check_scheduler(&self, scheduler_addr: &SocketAddr) -> Result<Channel>;
async fn update_available_scheduler_addrs(&self) -> Result<()>;
async fn refresh_available_scheduler_addrs(&self) -> Result<()>;
}
}
mock! {
pub Dynconfig {
pub async fn get_config(&self) -> Arc<SchedulerConfig>;
}
}
#[derive(Clone)]
pub struct SchedulerConfig {
pub cooldown_interval: Duration,
pub max_attempts: u32,
pub refresh_threshold: u32,
}
// Reuse the Client logic in the Scheduler, retaining only the structural fields necessary for testing.
#[derive(Clone)]
pub struct SchedulerClient {
// available_scheduler_addrs is the addresses of available schedulers.
available_scheduler_addrs: Arc<RwLock<Vec<SocketAddr>>>,
// hashring is the hashring of the scheduler.
hashring: Arc<RwLock<HashRing<VNode>>>,
// unavailable_scheduler_addrs is a map of unavailable scheduler addrs and the time they were marked as unavailable.
unavailable_scheduler_addrs: Arc<RwLock<HashMap<SocketAddr, Instant>>>,
}
impl SchedulerClient {
// client gets the grpc client of the scheduler.
async fn client(
&self,
task_id: &str,
_peer_id: Option<&str>,
mock_scheduler_client: &MockSchedulerClient,
mock_dynconfig: &MockDynconfig,
) -> Result<SchedulerGRPCClient<Channel>> {
// Update scheduler addresses of the client.
mock_scheduler_client
.update_available_scheduler_addrs()
.await?;
// First try to get the address from the task_id.
let addrs = self.hashring.read().await;
let addr = *addrs
.get(&task_id[0..5].to_string())
.ok_or_else(|| Error::HashRing(task_id.to_string()))?;
drop(addrs);
match Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
.connect_timeout(super::super::CONNECT_TIMEOUT)
.connect()
.await
{
Ok(channel) => {
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs.remove(&addr.addr);
drop(unavailable_scheduler_addrs);
return Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX));
}
Err(err) => {
error!("connect to {} failed: {}", addr.to_string(), err);
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs
.entry(addr.addr)
.or_insert_with(Instant::now);
drop(unavailable_scheduler_addrs);
}
}
// Read the shooting configuration items.
let config = mock_dynconfig.get_config().await;
let cooldown_interval = config.cooldown_interval;
let max_attempts = config.max_attempts;
let refresh_threshold = config.refresh_threshold;
let mut attempts = 1;
let mut refresh_count = 1;
// Traverse through available scheduler collections.
let hashring = self.hashring.read().await;
for vnode in hashring.clone().into_iter() {
let scheduler_addr = vnode.addr;
let unavailable_scheduler_addrs = self.unavailable_scheduler_addrs.read().await;
if let Some(&instant) = unavailable_scheduler_addrs.get(&scheduler_addr) {
if instant.elapsed() < cooldown_interval {
continue;
}
}
drop(unavailable_scheduler_addrs);
match mock_scheduler_client.check_scheduler(&scheduler_addr).await {
Ok(channel) => {
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs.remove(&scheduler_addr);
drop(unavailable_scheduler_addrs);
return Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX));
}
Err(err) => {
error!("scheduler {} is not available: {}", scheduler_addr, err);
let mut unavailable_scheduler_addrs =
self.unavailable_scheduler_addrs.write().await;
unavailable_scheduler_addrs
.entry(scheduler_addr)
.or_insert_with(Instant::now);
drop(unavailable_scheduler_addrs);
attempts += 1;
refresh_count += 1;
if refresh_count >= refresh_threshold {
if let Err(err) = mock_scheduler_client
.refresh_available_scheduler_addrs()
.await
{
error!("failed to refresh scheduler client: {}", err);
};
refresh_count = 0;
}
if attempts >= max_attempts {
return Err(Error::ExceededMaxAttempts);
}
}
}
}
Err(Error::AvailableSchedulersNotFound)
}
}
async fn setup() -> (SchedulerClient, MockSchedulerClient, MockDynconfig) {
// Mock the SchedulerClient::check_scheduler().
let mock_scheduler_client = MockSchedulerClient::new();
// Mock the Dynconfig::get_config().
let mock_dynconfig = MockDynconfig::new();
let available_scheduler_addrs = vec![
"192.168.1.1:8080".parse().unwrap(),
"192.168.2.2:8080".parse().unwrap(),
"192.168.2.3:8080".parse().unwrap(),
"192.168.2.4:8080".parse().unwrap(),
"192.168.2.5:8080".parse().unwrap(),
"192.168.2.6:8080".parse().unwrap(),
"192.168.2.7:8080".parse().unwrap(),
"192.168.2.8:8080".parse().unwrap(),
"192.168.2.9:8080".parse().unwrap(),
"192.168.2.10:8080".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap(),
];
let mut ring: HashRing<VNode> = HashRing::new();
let nodes: Vec<VNode> = available_scheduler_addrs
.iter()
.map(|&addr| VNode { addr })
.collect();
for node in nodes {
ring.add(node);
}
let unavailable_scheduler_addrs: HashMap<SocketAddr, Instant> = HashMap::new();
let scheduler_client = SchedulerClient {
available_scheduler_addrs: Arc::new(RwLock::new(available_scheduler_addrs)),
hashring: Arc::new(RwLock::new(ring)),
unavailable_scheduler_addrs: Arc::new(RwLock::new(unavailable_scheduler_addrs)),
};
(scheduler_client, mock_scheduler_client, mock_dynconfig)
}
#[tokio::test]
async fn test_client_exceeds_max_attempts() {
let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await;
mock_scheduler_client
.expect_check_scheduler()
.with(predicate::eq(
"127.0.0.1:8080".parse::<SocketAddr>().unwrap(),
))
.returning(|_| {
let channel = Channel::from_static("").connect_lazy();
Ok(channel)
});
mock_scheduler_client
.expect_check_scheduler()
.with(predicate::function(|addr: &SocketAddr| {
addr.ip().to_string().starts_with("192.168")
}))
.returning(|_| Err(Error::SchedulerNotServing));
mock_dynconfig.expect_get_config().returning(|| {
Arc::new(SchedulerConfig {
cooldown_interval: Duration::from_secs(60),
max_attempts: 3,
refresh_threshold: 10,
})
});
// Mock the SchedulerClient::update_available_scheduler_addrs().
mock_scheduler_client
.expect_update_available_scheduler_addrs()
.returning(|| Ok(()));
// Mock the SchedulerClient::refresh_available_scheduler_addrs().
mock_scheduler_client
.expect_refresh_available_scheduler_addrs()
.times(0)
.returning(|| Ok(()));
let task_id = "task12345";
let peer_id = Some("peer12345");
match scheduler_client
.client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig)
.await
{
Ok(_) => {
panic!("Client should not connect successfully.");
}
Err(e) => match e {
Error::ExceededMaxAttempts => {}
_ => {
panic!("Unexpected error type.");
}
},
}
}
#[tokio::test]
async fn test_available_schedulers_not_found() {
let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await;
let mut call_count = 0;
mock_scheduler_client
.expect_check_scheduler()
.with(always())
.returning_st(move |_| {
call_count += 1;
if call_count <= 12 {
Err(Error::SchedulerNotServing)
} else {
Ok(Channel::from_static("http://localhost:8080").connect_lazy())
}
});
mock_dynconfig.expect_get_config().returning(|| {
Arc::new(SchedulerConfig {
cooldown_interval: Duration::from_secs(60),
max_attempts: 100,
refresh_threshold: 10,
})
});
// Mock the SchedulerClient::update_available_scheduler_addrs().
mock_scheduler_client
.expect_update_available_scheduler_addrs()
.returning(|| Ok(()));
// Mock the SchedulerClient::refresh_available_scheduler_addrs().
mock_scheduler_client
.expect_refresh_available_scheduler_addrs()
.returning(|| Ok(()));
let task_id = "task12345";
let peer_id = Some("peer12345");
// Step 1: Simulate a situation where all schedulers are unavailable.
match scheduler_client
.client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig)
.await
{
Ok(_) => {
panic!("Client should not connect successfully.");
}
Err(e) => match e {
Error::AvailableSchedulersNotFound => {}
_ => {
panic!("Unexpected error type.");
}
},
}
// Step 2: Verify if unavailable scheduler_addrs have stored content.
let available_addrs = scheduler_client.available_scheduler_addrs.read().await;
let unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.read().await;
for addr in available_addrs.iter() {
assert!(
unavailable_addrs.contains_key(addr),
"Address {:?} is not in unavailable_scheduler_addrs",
addr
);
}
drop(available_addrs);
drop(unavailable_addrs);
// Step: 3: Simulate a update operation to clear unavailable scheduler_addrs.
let mut unavailable_addrs = scheduler_client.unavailable_scheduler_addrs.write().await;
unavailable_addrs.clear();
drop(unavailable_addrs);
// Step: 4: Verify that the scheduler is available.
match scheduler_client
.client(task_id, peer_id, &mock_scheduler_client, &mock_dynconfig)
.await
{
Ok(_) => {}
Err(_e) => {
panic!("Client should connect successfully.")
}
}
}
#[tokio::test]
async fn test_available_schedulers_concurrent() {
let (scheduler_client, mut mock_scheduler_client, mut mock_dynconfig) = setup().await;
mock_dynconfig.expect_get_config().returning(|| {
Arc::new(SchedulerConfig {
cooldown_interval: Duration::from_secs(60),
max_attempts: 100,
refresh_threshold: 10,
})
});
mock_scheduler_client
.expect_update_available_scheduler_addrs()
.returning(|| Ok(()));
mock_scheduler_client
.expect_refresh_available_scheduler_addrs()
.returning(|| Ok(()));
let mut call_count = 0;
mock_scheduler_client
.expect_check_scheduler()
.with(always())
.times(1100)
.returning_st(move |_| {
call_count += 1;
if call_count <= 1000 {
Err(Error::SchedulerNotServing)
} else {
Ok(Channel::from_static("http://localhost:8080").connect_lazy())
}
});
let mock_dynconfig = Arc::new(mock_dynconfig);
let scheduler_client = Arc::new(scheduler_client);
let mock_scheduler_client = Arc::new(mock_scheduler_client);
let task_id = "12345";
let peer_id = Some("peer12345");
let handles: Vec<_> = (0..100)
.map(|_| {
let scheduler_client = Arc::clone(&scheduler_client);
let mock_scheduler_client = Arc::clone(&mock_scheduler_client);
let mock_dynconfig = Arc::clone(&mock_dynconfig);
let task_id = task_id.to_string();
let peer_id = peer_id.map(|id| id.to_string());
tokio::spawn(async move {
// Step 1: Simulate a situation where all schedulers are unavailable.
match scheduler_client
.client(
&task_id,
peer_id.as_deref(),
&mock_scheduler_client,
&mock_dynconfig,
)
.await
{
Ok(_) => {
panic!("Client should not connect successfully.");
}
Err(e) => match e {
Error::AvailableSchedulersNotFound => {}
_ => {
panic!("Unexpected error type.");
}
},
}
// Step 2: Verify if unavailable scheduler_addrs have stored content.
let available_addrs = scheduler_client.available_scheduler_addrs.read().await;
let unavailable_addrs =
scheduler_client.unavailable_scheduler_addrs.read().await;
for addr in available_addrs.iter() {
assert!(
unavailable_addrs.contains_key(addr),
"Address {:?} is not in unavailable_scheduler_addrs.",
addr
);
}
drop(available_addrs);
drop(unavailable_addrs);
// Step 3: Simulate an update operation to clear unavailable scheduler_addrs.
let mut unavailable_addrs =
scheduler_client.unavailable_scheduler_addrs.write().await;
unavailable_addrs.clear();
drop(unavailable_addrs);
// Step 4: Verify that all tasks can connect to the scheduler.
match scheduler_client
.client(
&task_id,
peer_id.as_deref(),
&mock_scheduler_client,
&mock_dynconfig,
)
.await
{
Ok(_) => {}
Err(_e) => {
panic!("Client should connect successfully.")
}
}
})
})
.collect();
for handle in handles {
if let Err(_e) = handle.await {
panic!("One of the tasks panicked.");
}
}
}
}