diff --git a/Cargo.lock b/Cargo.lock index 499c5f4b..ad62dec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -950,9 +950,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.123" +version = "2.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b83bf914712fc12df2129ed54ed889dbdddf0714af3013b68e85f9f7b844f7" +checksum = "1ab958fa6959f459c170f87cccae5adae0010fc3be09b9f98a433b898a7f4158" dependencies = [ "prost 0.11.9", "prost-types 0.12.6", diff --git a/Cargo.toml b/Cargo.toml index 3acce7bb..eb96a5b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.8 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.81" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.81" } thiserror = "1.0" -dragonfly-api = "2.0.123" +dragonfly-api = "2.0.124" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } rcgen = { version = "0.12.1", features = ["x509-parser"] } hyper = { version = "1.2", features = ["full"] } diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index 952f00c2..5b418b2e 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -16,12 +16,14 @@ // use crate::dynconfig::Dynconfig; use crate::dynconfig::Dynconfig; -use dragonfly_api::common::v2::{Peer, Task}; +use dragonfly_api::common::v2::{CachePeer, CacheTask, Peer, Task}; use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ - scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, - AnnouncePeerRequest, AnnouncePeerResponse, DeleteHostRequest, DeletePeerRequest, - DeleteTaskRequest, StatPeerRequest, StatTaskRequest, + scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceCachePeerRequest, + AnnounceCachePeerResponse, AnnounceHostRequest, AnnouncePeerRequest, AnnouncePeerResponse, + DeleteCachePeerRequest, DeleteCacheTaskRequest, DeleteHostRequest, DeletePeerRequest, + DeleteTaskRequest, StatCachePeerRequest, StatCacheTaskRequest, StatPeerRequest, + StatTaskRequest, UploadCacheTaskRequest, }; use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr}; use dragonfly_client_core::{Error, Result}; @@ -134,57 +136,6 @@ impl SchedulerClient { Ok(()) } - // init_announce_host announces the host to the scheduler. - #[instrument(skip(self))] - pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { - let mut join_set = JoinSet::new(); - let available_scheduler_addrs = self.available_scheduler_addrs.read().await; - let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); - drop(available_scheduler_addrs); - - for available_scheduler_addr in available_scheduler_addrs_clone.iter() { - let request = Self::make_request(request.clone()); - async fn announce_host( - addr: SocketAddr, - request: tonic::Request, - ) -> Result<()> { - info!("announce host to {:?}", addr); - - // Connect to the scheduler. - let channel = Channel::from_shared(format!("http://{}", addr)) - .map_err(|_| Error::InvalidURI(addr.to_string()))? - .connect_timeout(super::CONNECT_TIMEOUT) - .connect() - .await - .map_err(|err| { - error!("connect to {} failed: {}", addr.to_string(), err); - err - }) - .or_err(ErrorType::ConnectError)?; - - let mut client = SchedulerGRPCClient::new(channel); - client.announce_host(request).await?; - Ok(()) - } - - join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); - } - - while let Some(message) = join_set - .join_next() - .await - .transpose() - .or_err(ErrorType::AsyncRuntimeError)? - { - if let Err(err) = message { - error!("failed to init announce host: {}", err); - return Err(err); - } - } - - Ok(()) - } - // announce_host announces the host to the scheduler. #[instrument(skip(self))] pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> { @@ -239,6 +190,57 @@ impl SchedulerClient { Ok(()) } + // init_announce_host announces the host to the scheduler. + #[instrument(skip(self))] + pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> { + let mut join_set = JoinSet::new(); + let available_scheduler_addrs = self.available_scheduler_addrs.read().await; + let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); + drop(available_scheduler_addrs); + + for available_scheduler_addr in available_scheduler_addrs_clone.iter() { + let request = Self::make_request(request.clone()); + async fn announce_host( + addr: SocketAddr, + request: tonic::Request, + ) -> Result<()> { + info!("announce host to {:?}", addr); + + // Connect to the scheduler. + let channel = Channel::from_shared(format!("http://{}", addr)) + .map_err(|_| Error::InvalidURI(addr.to_string()))? + .connect_timeout(super::CONNECT_TIMEOUT) + .connect() + .await + .map_err(|err| { + error!("connect to {} failed: {}", addr.to_string(), err); + err + }) + .or_err(ErrorType::ConnectError)?; + + let mut client = SchedulerGRPCClient::new(channel); + client.announce_host(request).await?; + Ok(()) + } + + join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); + } + + while let Some(message) = join_set + .join_next() + .await + .transpose() + .or_err(ErrorType::AsyncRuntimeError)? + { + if let Err(err) = message { + error!("failed to init announce host: {}", err); + return Err(err); + } + } + + Ok(()) + } + // delete_host tells the scheduler that the host is deleting. #[instrument(skip(self))] pub async fn delete_host(&self, request: DeleteHostRequest) -> Result<()> { @@ -293,6 +295,100 @@ impl SchedulerClient { Ok(()) } + // announce_cache_peer announces the cache peer to the scheduler. + #[instrument(skip_all)] + pub async fn announce_cache_peer( + &self, + task_id: &str, + peer_id: &str, + request: impl tonic::IntoStreamingRequest, + ) -> Result>> { + let response = self + .client(task_id, Some(peer_id)) + .await? + .announce_cache_peer(request) + .await?; + Ok(response) + } + + // stat_cache_peer gets the status of the cache peer. + #[instrument(skip(self))] + pub async fn stat_cache_peer( + &self, + task_id: &str, + request: StatCachePeerRequest, + ) -> Result { + let request = Self::make_request(request); + let response = self + .client(task_id, None) + .await? + .stat_cache_peer(request) + .await?; + Ok(response.into_inner()) + } + + // delete_cache_peer tells the scheduler that the cache peer is deleting. + #[instrument(skip(self))] + pub async fn delete_cache_peer( + &self, + task_id: &str, + request: DeleteCachePeerRequest, + ) -> Result<()> { + let request = Self::make_request(request); + self.client(task_id, None) + .await? + .delete_cache_peer(request) + .await?; + Ok(()) + } + + // upload_cache_task uploads the metadata of the cache task. + #[instrument(skip(self))] + pub async fn upload_cache_task( + &self, + task_id: &str, + request: UploadCacheTaskRequest, + ) -> Result { + let request = Self::make_request(request); + let response = self + .client(task_id, None) + .await? + .upload_cache_task(request) + .await?; + Ok(response.into_inner()) + } + + // stat_cache_task gets the status of the cache task. + #[instrument(skip(self))] + pub async fn stat_cache_task( + &self, + task_id: &str, + request: StatCacheTaskRequest, + ) -> Result { + let request = Self::make_request(request); + let response = self + .client(task_id, None) + .await? + .stat_cache_task(request) + .await?; + Ok(response.into_inner()) + } + + // delete_cache_task tells the scheduler that the cache task is deleting. + #[instrument(skip(self))] + pub async fn delete_cache_task( + &self, + task_id: &str, + request: DeleteCacheTaskRequest, + ) -> Result<()> { + let request = Self::make_request(request); + self.client(task_id, None) + .await? + .delete_cache_task(request) + .await?; + Ok(()) + } + // client gets the grpc client of the scheduler. #[instrument(skip(self))] async fn client(