feat: refresh addresses of available schedulers when call grpc api (#197)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-01-04 22:56:45 +08:00 committed by GitHub
parent a6f9896576
commit a68e50b7ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 22 deletions

2
Cargo.lock generated
View File

@ -545,7 +545,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.1.9" version = "0.1.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.1.10" version = "0.1.11"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"

View File

@ -18,6 +18,7 @@
use crate::dynconfig::Dynconfig; use crate::dynconfig::Dynconfig;
use crate::{Error, Result}; use crate::{Error, Result};
use dragonfly_api::common::v2::{Peer, Task}; use dragonfly_api::common::v2::{Peer, Task};
use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{ use dragonfly_api::scheduler::v2::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse, AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse,
@ -49,8 +50,11 @@ pub struct SchedulerClient {
// dynconfig is the dynamic configuration of the dfdaemon. // dynconfig is the dynamic configuration of the dfdaemon.
dynconfig: Arc<Dynconfig>, dynconfig: Arc<Dynconfig>,
// available_schedulers is the endpoints of available schedulers. // available_schedulers is the available schedulers.
available_schedulers: Arc<RwLock<Vec<SocketAddr>>>, available_schedulers: Arc<RwLock<Vec<Scheduler>>>,
// available_scheduler_addrs is the addresses of available schedulers.
available_scheduler_addrs: Arc<RwLock<Vec<SocketAddr>>>,
// hashring is the hashring of the scheduler. // hashring is the hashring of the scheduler.
hashring: Arc<RwLock<HashRing<VNode>>>, hashring: Arc<RwLock<HashRing<VNode>>>,
@ -63,10 +67,11 @@ impl SchedulerClient {
let client = Self { let client = Self {
dynconfig, dynconfig,
available_schedulers: Arc::new(RwLock::new(Vec::new())), available_schedulers: Arc::new(RwLock::new(Vec::new())),
available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())),
hashring: Arc::new(RwLock::new(HashRing::new())), hashring: Arc::new(RwLock::new(HashRing::new())),
}; };
client.refresh_scheduler_client().await?; client.refresh_available_scheduler_addrs().await?;
Ok(client) Ok(client)
} }
@ -140,8 +145,8 @@ impl SchedulerClient {
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler in available_schedulers.iter() { for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn announce_host( async fn announce_host(
addr: SocketAddr, addr: SocketAddr,
@ -159,7 +164,7 @@ impl SchedulerClient {
Ok(()) Ok(())
} }
join_set.spawn(announce_host(*available_scheduler, request)); join_set.spawn(announce_host(*available_scheduler_addr, request));
} }
while let Some(message) = join_set.join_next().await { while let Some(message) = join_set.join_next().await {
@ -175,9 +180,13 @@ impl SchedulerClient {
// announce_host announces the host to the scheduler. // announce_host announces the host to the scheduler.
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> { pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;
// Announce the host to the scheduler.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler in available_schedulers.iter() { for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn announce_host( async fn announce_host(
addr: SocketAddr, addr: SocketAddr,
@ -195,7 +204,7 @@ impl SchedulerClient {
Ok(()) Ok(())
} }
join_set.spawn(announce_host(*available_scheduler, request)); join_set.spawn(announce_host(*available_scheduler_addr, request));
} }
while let Some(message) = join_set.join_next().await { while let Some(message) = join_set.join_next().await {
@ -210,9 +219,13 @@ impl SchedulerClient {
// leave_host tells the scheduler that the host is leaving. // leave_host tells the scheduler that the host is leaving.
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn leave_host(&self, request: LeaveHostRequest) -> Result<()> { pub async fn leave_host(&self, request: LeaveHostRequest) -> Result<()> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;
// Leave the host from the scheduler.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await; let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler in available_schedulers.iter() { for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone()); let request = Self::make_request(request.clone());
async fn leave_host( async fn leave_host(
addr: SocketAddr, addr: SocketAddr,
@ -230,7 +243,7 @@ impl SchedulerClient {
Ok(()) Ok(())
} }
join_set.spawn(leave_host(*available_scheduler, request)); join_set.spawn(leave_host(*available_scheduler_addr, request));
} }
while let Some(message) = join_set.join_next().await { while let Some(message) = join_set.join_next().await {
@ -245,6 +258,10 @@ impl SchedulerClient {
// client gets the grpc client of the scheduler. // client gets the grpc client of the scheduler.
#[instrument(skip(self))] #[instrument(skip(self))]
async fn client(&self, key: String) -> Result<SchedulerGRPCClient<Channel>> { async fn client(&self, key: String) -> Result<SchedulerGRPCClient<Channel>> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;
// Get the scheduler address from the hashring.
let addr = self.hashring.read().await; let addr = self.hashring.read().await;
let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?; let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?;
info!("{} picked {:?}", key, addr); info!("{} picked {:?}", key, addr);
@ -258,7 +275,7 @@ impl SchedulerClient {
Ok(channel) => channel, Ok(channel) => channel,
Err(err) => { Err(err) => {
error!("failed to connect to {:?}: {}", addr, err); error!("failed to connect to {:?}: {}", addr, err);
if let Err(err) = self.refresh_scheduler_client().await { if let Err(err) = self.refresh_available_scheduler_addrs().await {
error!("failed to refresh scheduler client: {}", err); error!("failed to refresh scheduler client: {}", err);
}; };
@ -269,12 +286,9 @@ impl SchedulerClient {
Ok(SchedulerGRPCClient::new(channel)) Ok(SchedulerGRPCClient::new(channel))
} }
// get_endpoints gets the endpoints of available schedulers. // update_available_scheduler_addrs updates the addresses of available schedulers.
#[instrument(skip(self))] #[instrument(skip(self))]
async fn refresh_scheduler_client(&self) -> Result<()> { async fn update_available_scheduler_addrs(&self) -> Result<()> {
// Refresh the dynamic configuration.
self.dynconfig.refresh().await?;
// Get the endpoints of available schedulers. // Get the endpoints of available schedulers.
let data = self.dynconfig.data.read().await; let data = self.dynconfig.data.read().await;
@ -283,9 +297,27 @@ impl SchedulerClient {
return Err(Error::AvailableSchedulersNotFound()); return Err(Error::AvailableSchedulersNotFound());
} }
// Get the available schedulers.
let available_schedulers = self.available_schedulers.read().await;
// Check if the available schedulers is not changed.
if data.available_schedulers.len() == available_schedulers.len()
&& data
.available_schedulers
.iter()
.all(|available_scheduler| available_schedulers.contains(available_scheduler))
{
info!("available schedulers is not changed");
return Ok(());
}
drop(available_schedulers);
// Get the available schedulers. // Get the available schedulers.
let mut available_schedulers = self.available_schedulers.write().await; let mut available_schedulers = self.available_schedulers.write().await;
// Get the addresses of available schedulers.
let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;
// Refresh the hashring. // Refresh the hashring.
let mut new_hashring = HashRing::new(); let mut new_hashring = HashRing::new();
for available_scheduler in data.available_schedulers.iter() { for available_scheduler in data.available_schedulers.iter() {
@ -298,7 +330,10 @@ impl SchedulerClient {
}; };
// Add the scheduler to the available schedulers. // Add the scheduler to the available schedulers.
available_schedulers.push(SocketAddr::new(ip, available_scheduler.port as u16)); available_schedulers.push(available_scheduler.clone());
// Add the scheduler address to the addresses of available schedulers.
available_scheduler_addrs.push(SocketAddr::new(ip, available_scheduler.port as u16));
// Add the scheduler to the hashring. // Add the scheduler to the hashring.
new_hashring.add(VNode { new_hashring.add(VNode {
@ -309,10 +344,23 @@ impl SchedulerClient {
// Update the hashring. // Update the hashring.
let mut hashring = self.hashring.write().await; let mut hashring = self.hashring.write().await;
*hashring = new_hashring; *hashring = new_hashring;
info!("refresh available schedulers: {:?}", available_schedulers); info!(
"refresh available scheduler addresses: {:?}",
available_scheduler_addrs
);
Ok(()) Ok(())
} }
// refresh_available_scheduler_addrs refreshes addresses of available schedulers.
#[instrument(skip(self))]
async fn refresh_available_scheduler_addrs(&self) -> Result<()> {
// Refresh the dynamic configuration.
self.dynconfig.refresh().await?;
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await
}
// make_request creates a new request with timeout. // make_request creates a new request with timeout.
fn make_request<T>(request: T) -> tonic::Request<T> { fn make_request<T>(request: T) -> tonic::Request<T> {
let mut request = tonic::Request::new(request); let mut request = tonic::Request::new(request);