refactor: reduce the lock scope (#85)

This commit is contained in:
AngrySean 2023-09-11 09:15:35 +08:00 committed by GitHub
parent 60111f8851
commit 2a2dd73ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 42 deletions

View File

@ -110,42 +110,30 @@ impl Dynconfig {
// refresh refreshes the dynamic configuration of the dfdaemon. // refresh refreshes the dynamic configuration of the dfdaemon.
pub async fn refresh(&self) -> Result<()> { pub async fn refresh(&self) -> Result<()> {
// refresh the schedulers.
let schedulers = self.list_schedulers().await?;
// refresh the object storage configuration.
let object_storage = self.get_object_storage().await.ok();
// Get the data with read lock.
let read_guard = self.data.read().await;
let available_schedulers = self
.get_available_schedulers(&read_guard.schedulers.schedulers)
.await?;
drop(read_guard);
if available_schedulers.is_empty() {
return Err(Error::AvailableSchedulersNotFound());
}
// If the available scheduler cluster id is not set, set it to the first available scheduler.
let Some(available_scheduler) = available_schedulers.first() else {
return Err(Error::AvailableSchedulersNotFound());
};
// Get the data with write lock. // Get the data with write lock.
let mut data = self.data.write().await; let mut data = self.data.write().await;
data.schedulers = schedulers;
// refresh the schedulers. data.object_storage = object_storage;
data.schedulers = self.list_schedulers().await?;
// refresh the object storage configuration.
match self.get_object_storage().await {
Ok(object_storage) => {
data.object_storage = Some(object_storage);
}
Err(err) => {
info!("get object storage failed: {}", err);
data.object_storage = None;
}
}
// get the available schedulers.
let available_schedulers = self
.get_available_schedulers(&data.schedulers.schedulers)
.await?;
// If the available schedulers is empty, return error.
if !available_schedulers.is_empty() {
data.available_schedulers = available_schedulers;
} else {
return Err(Error::AvailableSchedulersNotFound());
}
// If the available scheduler cluster id is not set, set it to the first available scheduler.
if let Some(available_scheduler) = data.available_schedulers.first() {
data.available_scheduler_cluster_id = Some(available_scheduler.scheduler_cluster_id); data.available_scheduler_cluster_id = Some(available_scheduler.scheduler_cluster_id);
} else {
return Err(Error::AvailableSchedulersNotFound());
}
Ok(()) Ok(())
} }
@ -191,10 +179,7 @@ impl Dynconfig {
} }
// get_available_schedulers gets the available schedulers. // get_available_schedulers gets the available schedulers.
async fn get_available_schedulers( async fn get_available_schedulers(&self, schedulers: &[Scheduler]) -> Result<Vec<Scheduler>> {
&self,
schedulers: &Vec<Scheduler>,
) -> Result<Vec<Scheduler>> {
let mut available_schedulers: Vec<Scheduler> = Vec::new(); let mut available_schedulers: Vec<Scheduler> = Vec::new();
let mut available_scheduler_cluster_id: Option<u64> = None; let mut available_scheduler_cluster_id: Option<u64> = None;
for scheduler in schedulers { for scheduler in schedulers {
@ -208,7 +193,7 @@ impl Dynconfig {
// Check the health of the scheduler. // Check the health of the scheduler.
let health_client = let health_client =
HealthClient::new(format!("http://{}:{}", scheduler.ip, scheduler.port)).await?; HealthClient::new(&format!("http://{}:{}", scheduler.ip, scheduler.port)).await?;
match health_client match health_client
.check(HealthCheckRequest { .check(HealthCheckRequest {

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::Result; use crate::{Error, Result};
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic_health::pb::{ use tonic_health::pb::{
health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse, health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse,
@ -30,8 +30,9 @@ pub struct HealthClient {
// HealthClient implements the grpc client of the health. // HealthClient implements the grpc client of the health.
impl HealthClient { impl HealthClient {
// new creates a new HealthClient. // new creates a new HealthClient.
pub async fn new(addr: String) -> Result<Self> { pub async fn new(addr: &str) -> Result<Self> {
let channel = Channel::from_static(Box::leak(addr.into_boxed_str())) let channel = Channel::from_shared(addr.to_string())
.map_err(|_| Error::InvalidURI(addr.into()))?
.connect() .connect()
.await?; .await?;
let client = HealthGRPCClient::new(channel); let client = HealthGRPCClient::new(channel);
@ -47,3 +48,63 @@ impl HealthClient {
Ok(response.into_inner()) Ok(response.into_inner())
} }
} }
#[cfg(test)]
mod test {
use std::{net::SocketAddr, time::Duration};
use tokio::task::JoinHandle;
use tonic::transport::Server;
use tonic_health::{pb::HealthCheckRequest, ServingStatus};
use crate::grpc::health::HealthClient;
struct MockHealthServer {
addr: String,
handle: JoinHandle<()>,
}
impl Drop for MockHealthServer {
fn drop(&mut self) {
self.handle.abort()
}
}
async fn spawn_mock_app() -> MockHealthServer {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter
.set_service_status("test", ServingStatus::Serving)
.await;
let addr = SocketAddr::from(([127, 0, 0, 1], 8081));
let handle = tokio::spawn(async move {
Server::builder()
.add_service(health_service)
.serve(addr)
.await
.expect("failed to start server");
});
tokio::time::sleep(Duration::from_secs(1)).await;
return MockHealthServer {
handle: handle,
addr: addr.to_string(),
};
}
#[tokio::test]
async fn test_get_avaliable_scheduler() {
let app = spawn_mock_app().await;
let health_client = HealthClient::new(&format!("http://{}", app.addr))
.await
.expect("failed to create health client");
let Ok(res) = health_client
.check(HealthCheckRequest {
service: String::new(),
})
.await
else {
panic!("failed to check health");
};
assert_eq!(res.status, ServingStatus::Serving as i32);
}
}