diff --git a/Cargo.lock b/Cargo.lock index 9cec8f29..499c5f4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -950,9 +950,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.115" +version = "2.0.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fae032d643b28b11dc0fa1d80499eec18c603f088ee07fed69ceecdcb6147f6d" +checksum = "86b83bf914712fc12df2129ed54ed889dbdddf0714af3013b68e85f9f7b844f7" dependencies = [ "prost 0.11.9", "prost-types 0.12.6", diff --git a/Cargo.toml b/Cargo.toml index dab726df..3acce7bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.8 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.81" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.81" } thiserror = "1.0" -dragonfly-api = "2.0.115" +dragonfly-api = "2.0.123" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } rcgen = { version = "0.12.1", features = ["x509-parser"] } hyper = { version = "1.2", features = ["full"] } diff --git a/dragonfly-client-config/src/dfcache.rs b/dragonfly-client-config/src/dfcache.rs index dfff5f93..756d12d5 100644 --- a/dragonfly-client-config/src/dfcache.rs +++ b/dragonfly-client-config/src/dfcache.rs @@ -25,7 +25,7 @@ pub fn default_dfcache_log_dir() -> PathBuf { crate::default_log_dir().join(NAME) } -// default_persistent_replica_count is the default replica count of the persistent cache task. +// default_dfcache_persistent_replica_count is the default replica count of the persistent cache task. #[inline] pub fn default_dfcache_persistent_replica_count() -> u64 { 2 diff --git a/dragonfly-client-init/src/container_runtime/docker.rs b/dragonfly-client-init/src/container_runtime/docker.rs index 75247af7..384504bc 100644 --- a/dragonfly-client-init/src/container_runtime/docker.rs +++ b/dragonfly-client-init/src/container_runtime/docker.rs @@ -39,7 +39,7 @@ impl Docker { } } - // TODO Implement the run method for Docker. + // TODO: Implement the run method for Docker. // // run runs the docker runtime to initialize // runtime environment for the dfdaemon. diff --git a/dragonfly-client/src/announcer/mod.rs b/dragonfly-client/src/announcer/mod.rs index 7631413a..26a0aadc 100644 --- a/dragonfly-client/src/announcer/mod.rs +++ b/dragonfly-client/src/announcer/mod.rs @@ -18,7 +18,7 @@ use crate::grpc::{manager::ManagerClient, scheduler::SchedulerClient}; use crate::shutdown; use dragonfly_api::common::v2::{Build, Cpu, Host, Memory, Network}; use dragonfly_api::manager::v2::{DeleteSeedPeerRequest, SourceType, UpdateSeedPeerRequest}; -use dragonfly_api::scheduler::v2::{AnnounceHostRequest, LeaveHostRequest}; +use dragonfly_api::scheduler::v2::{AnnounceHostRequest, DeleteHostRequest}; use dragonfly_client_config::{ dfdaemon::{Config, HostType}, CARGO_PKG_RUSTC_VERSION, CARGO_PKG_VERSION, GIT_HASH, @@ -175,10 +175,10 @@ impl SchedulerAnnouncer { } _ = shutdown.recv() => { // Announce to scheduler shutting down with signals. - if let Err(err) = self.scheduler_client.leave_host(LeaveHostRequest{ - id: self.host_id.clone(), + if let Err(err) = self.scheduler_client.delete_host(DeleteHostRequest{ + host_id: self.host_id.clone(), }).await { - error!("leave host from scheduler failed: {}", err); + error!("delete host from scheduler failed: {}", err); } info!("announce to scheduler shutting down"); @@ -211,7 +211,7 @@ impl SchedulerAnnouncer { percent: sys.global_cpu_info().cpu_usage() as f64, process_percent: process.cpu_usage() as f64, - // TODO Get the cpu times. + // TODO: Get the cpu times. times: None, }; @@ -222,17 +222,17 @@ impl SchedulerAnnouncer { used: sys.used_memory(), used_percent: (sys.used_memory() / sys.total_memory()) as f64, - // TODO Get the process used memory. + // TODO: Get the process used memory. process_used_percent: 0 as f64, free: sys.free_memory(), }; // Get the network information. let network = Network { - // TODO Get the count of the tcp connection. + // TODO: Get the count of the tcp connection. tcp_connection_count: 0, - // TODO Get the count of the upload tcp connection. + // TODO: Get the count of the upload tcp connection. upload_tcp_connection_count: 0, idc: self.config.host.idc.clone(), location: self.config.host.location.clone(), @@ -264,11 +264,11 @@ impl SchedulerAnnouncer { memory: Some(memory), network: Some(network), - // TODO Get the disk information. + // TODO: Get the disk information. disk: None, build: Some(build), - // TODO Get scheduler cluster id from dynconfig. + // TODO: Get scheduler cluster id from dynconfig. scheduler_cluster_id: 0, }; diff --git a/dragonfly-client/src/bin/dfcache/main.rs b/dragonfly-client/src/bin/dfcache/main.rs index 959d58b3..e23482e7 100644 --- a/dragonfly-client/src/bin/dfcache/main.rs +++ b/dragonfly-client/src/bin/dfcache/main.rs @@ -116,6 +116,7 @@ pub enum Command { Remove(remove::RemoveCommand), } +// Implement the execute for Command. impl Command { #[allow(unused)] pub async fn execute(self) -> Result<(), anyhow::Error> { diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index eb1aadd2..8bac049a 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -16,7 +16,7 @@ use crate::grpc::scheduler::SchedulerClient; use crate::shutdown; -use dragonfly_api::scheduler::v2::LeaveTaskRequest; +use dragonfly_api::scheduler::v2::DeleteTaskRequest; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; use dragonfly_client_storage::{metadata, Storage}; @@ -113,8 +113,8 @@ impl GC { }); info!("evict task {}", task.id); - self.leave_task_from_scheduler(task.clone()).await; - info!("leave task {} from scheduler", task.id); + self.delete_task_from_scheduler(task.clone()).await; + info!("delete task {} from scheduler", task.id); } } @@ -185,27 +185,27 @@ impl GC { evicted_space += task_space; info!("evict task {} size {}", task.id, task_space); - self.leave_task_from_scheduler(task.clone()).await; - info!("leave task {} from scheduler", task.id); + self.delete_task_from_scheduler(task.clone()).await; + info!("delete task {} from scheduler", task.id); } info!("evict total size {}", evicted_space); Ok(()) } - // leave_task_from_scheduler leaves the task from the scheduler. - async fn leave_task_from_scheduler(&self, task: metadata::Task) { + // delete_task_from_scheduler deletes the task from the scheduler. + async fn delete_task_from_scheduler(&self, task: metadata::Task) { self.scheduler_client - .leave_task( + .delete_task( task.id.as_str(), - LeaveTaskRequest { + DeleteTaskRequest { host_id: self.host_id.clone(), task_id: task.id.clone(), }, ) .await .unwrap_or_else(|err| { - error!("failed to leave peer {}: {}", task.id, err); + error!("failed to delete peer {}: {}", task.id, err); }); } } diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 012285fb..8df92054 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -20,18 +20,19 @@ use crate::metrics::{ }; use crate::shutdown; use crate::task; -use dragonfly_api::common::v2::Task; +use dragonfly_api::common::v2::{CacheTask, Task}; use dragonfly_api::dfdaemon::v2::{ dfdaemon_download_client::DfdaemonDownloadClient as DfdaemonDownloadGRPCClient, dfdaemon_download_server::{ DfdaemonDownload, DfdaemonDownloadServer as DfdaemonDownloadGRPCServer, }, - DeleteTaskRequest, DownloadTaskRequest, DownloadTaskResponse, - StatTaskRequest as DfdaemonStatTaskRequest, UploadTaskRequest, + DeleteCacheTaskRequest, DeleteTaskRequest, DownloadCacheTaskRequest, DownloadCacheTaskResponse, + DownloadTaskRequest, DownloadTaskResponse, StatCacheTaskRequest, + StatTaskRequest as DfdaemonStatTaskRequest, UploadCacheTaskRequest, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_api::scheduler::v2::{ - LeaveHostRequest as SchedulerLeaveHostRequest, StatTaskRequest as SchedulerStatTaskRequest, + DeleteHostRequest as SchedulerDeleteHostRequest, StatTaskRequest as SchedulerStatTaskRequest, }; use dragonfly_client_core::{ error::{ErrorType, OrErr}, @@ -468,16 +469,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Ok(Response::new(ReceiverStream::new(out_stream_rx))) } - // upload_task tells the dfdaemon to upload the task. - #[instrument(skip_all)] - async fn upload_task( - &self, - request: Request, - ) -> Result, Status> { - println!("upload_task: {:?}", request); - Err(Status::unimplemented("not implemented")) - } - // stat_task gets the status of the task. #[instrument(skip_all, fields(host_id, task_id))] async fn stat_task( @@ -504,7 +495,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { .stat_task( task_id.as_str(), SchedulerStatTaskRequest { - id: task_id.clone(), + task_id: task_id.clone(), }, ) .await @@ -526,22 +517,62 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Err(Status::unimplemented("not implemented")) } - // leave_host calls the scheduler to leave the host. + // delete_host calls the scheduler to delete the host. #[instrument(skip_all)] - async fn leave_host(&self, _: Request<()>) -> Result, Status> { + async fn delete_host(&self, _: Request<()>) -> Result, Status> { self.task .scheduler_client - .leave_host(SchedulerLeaveHostRequest { - id: self.task.id_generator.host_id(), + .delete_host(SchedulerDeleteHostRequest { + host_id: self.task.id_generator.host_id(), }) .await .map_err(|e| { - error!("leave host: {}", e); + error!("delete host: {}", e); Status::internal(e.to_string()) })?; Ok(Response::new(())) } + + // DownloadCacheTaskStream is the stream of the download cache task response. + type DownloadCacheTaskStream = ReceiverStream>; + + // TODO: Implement this. + // download_cache_task downloads the task. + #[instrument(skip_all, fields(host_id, task_id, peer_id))] + async fn download_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + // TODO: Implement this. + // upload_cache_task uploads the cache task. + async fn upload_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + // TODO: Implement this. + // stat_cache_task stats the cache task. + async fn stat_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + // TODO: Implement this. + // delete_cache_task deletes the cache task. + async fn delete_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } } // DfdaemonDownloadClient is a wrapper of DfdaemonDownloadGRPCClient. @@ -597,14 +628,6 @@ impl DfdaemonDownloadClient { Ok(response) } - // upload_task tells the dfdaemon to upload the task. - #[instrument(skip_all)] - pub async fn upload_task(&self, request: UploadTaskRequest) -> ClientResult<()> { - let request = Self::make_request(request); - self.client.clone().upload_task(request).await?; - Ok(()) - } - // stat_task gets the status of the task. #[instrument(skip_all)] pub async fn stat_task(&self, request: DfdaemonStatTaskRequest) -> ClientResult { @@ -621,6 +644,72 @@ impl DfdaemonDownloadClient { Ok(()) } + // download_cache_task downloads the cache task. + #[instrument(skip_all)] + pub async fn download_cache_task( + &self, + request: DownloadCacheTaskRequest, + ) -> ClientResult>> { + // Clone the request. + let request_clone = request.clone(); + + // Initialize the request. + let mut request = tonic::Request::new(request); + + // Set the timeout to the request. + if let Some(timeout) = request_clone.timeout { + request.set_timeout( + Duration::try_from(timeout) + .map_err(|_| tonic::Status::invalid_argument("invalid timeout"))?, + ); + } + + let response = self.client.clone().download_cache_task(request).await?; + Ok(response) + } + + // upload_cache_task uploads the cache task. + #[instrument(skip_all)] + pub async fn upload_cache_task( + &self, + request: UploadCacheTaskRequest, + timeout: Duration, + ) -> ClientResult { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let response = self.client.clone().upload_cache_task(request).await?; + Ok(response.into_inner()) + } + + // stat_cache_task stats the cache task. + #[instrument(skip_all)] + pub async fn stat_cache_task( + &self, + request: StatCacheTaskRequest, + timeout: Duration, + ) -> ClientResult { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let response = self.client.clone().stat_cache_task(request).await?; + Ok(response.into_inner()) + } + + // delete_cache_task deletes the cache task. + #[instrument(skip_all)] + pub async fn delete_cache_task( + &self, + request: DeleteCacheTaskRequest, + timeout: Duration, + ) -> ClientResult<()> { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let _response = self.client.clone().delete_cache_task(request).await?; + Ok(()) + } + // make_request creates a new request with timeout. fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index eca247db..faadffdf 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -21,12 +21,13 @@ use crate::metrics::{ }; use crate::shutdown; use crate::task; -use dragonfly_api::common::v2::Piece; +use dragonfly_api::common::v2::{CacheTask, Piece}; use dragonfly_api::dfdaemon::v2::{ dfdaemon_upload_client::DfdaemonUploadClient as DfdaemonUploadGRPCClient, dfdaemon_upload_server::{DfdaemonUpload, DfdaemonUploadServer as DfdaemonUploadGRPCServer}, + DeleteCacheTaskRequest, DownloadCacheTaskRequest, DownloadCacheTaskResponse, DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, DownloadTaskResponse, - SyncPiecesRequest, SyncPiecesResponse, + StatCacheTaskRequest, SyncPiecesRequest, SyncPiecesResponse, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_client_config::dfdaemon::Config; @@ -138,238 +139,6 @@ pub struct DfdaemonUploadServerHandler { // DfdaemonUploadServerHandler implements the dfdaemon upload grpc service. #[tonic::async_trait] impl DfdaemonUpload for DfdaemonUploadServerHandler { - // SyncPiecesStream is the stream of the sync pieces response. - type SyncPiecesStream = ReceiverStream>; - - // get_piece_numbers gets the piece numbers. - #[instrument(skip_all, fields(host_id, remote_host_id, task_id))] - async fn sync_pieces( - &self, - request: Request, - ) -> Result, Status> { - // Clone the request. - let request = request.into_inner(); - - // Generate the host id. - let host_id = self.task.id_generator.host_id(); - - // Get the remote host id from the request. - let remote_host_id = request.host_id; - - // Get the task id from tae request. - let task_id = request.task_id; - - // Span record the host id and task id. - Span::current().record("host_id", host_id.clone()); - Span::current().record("remote_host_id", remote_host_id.as_str()); - Span::current().record("task_id", task_id.clone()); - - // Get the interested piece numbers from the request. - let mut interested_piece_numbers = request.interested_piece_numbers.clone(); - - // Clone the task. - let task_manager = self.task.clone(); - - // Initialize stream channel. - let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); - tokio::spawn( - async move { - loop { - let mut has_started_piece = false; - let mut finished_piece_numbers = Vec::new(); - for interested_piece_number in interested_piece_numbers.iter() { - let piece = match task_manager - .piece - .get(task_id.as_str(), *interested_piece_number) - { - Ok(Some(piece)) => piece, - Ok(None) => continue, - Err(err) => { - error!( - "send piece metadata {}-{}: {}", - task_id, interested_piece_number, err - ); - out_stream_tx - .send(Err(Status::internal(err.to_string()))) - .await - .unwrap_or_else(|err| { - error!( - "send piece metadata {}-{} to stream: {}", - task_id, interested_piece_number, err - ); - }); - - drop(out_stream_tx); - return; - } - }; - - // Send the piece metadata to the stream. - if piece.is_finished() { - out_stream_tx - .send(Ok(SyncPiecesResponse { - number: piece.number, - offset: piece.offset, - length: piece.length, - })) - .await - .unwrap_or_else(|err| { - error!( - "send finished piece {}-{} to stream: {}", - task_id, interested_piece_number, err - ); - }); - info!("send piece metadata {}-{}", task_id, piece.number); - - // Add the finished piece number to the finished piece numbers. - finished_piece_numbers.push(piece.number); - continue; - } - - // Check whether the piece is started. - if piece.is_started() { - has_started_piece = true; - } - } - - // Remove the finished piece numbers from the interested piece numbers. - interested_piece_numbers - .retain(|number| !finished_piece_numbers.contains(number)); - - // If all the interested pieces are finished, return. - if interested_piece_numbers.is_empty() { - info!("all the interested pieces are finished"); - drop(out_stream_tx); - return; - } - - // If there is no started piece, return. - if !has_started_piece { - info!("there is no started piece"); - drop(out_stream_tx); - return; - } - - // Wait for the piece to be finished. - tokio::time::sleep( - dragonfly_client_storage::DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL, - ) - .await; - } - } - .in_current_span(), - ); - - Ok(Response::new(ReceiverStream::new(out_stream_rx))) - } - - // sync_pieces syncs the pieces. - #[instrument(skip_all, fields(host_id, remote_host_id, task_id, piece_number))] - async fn download_piece( - &self, - request: Request, - ) -> Result, Status> { - // Clone the request. - let request = request.into_inner(); - - // Generate the host id. - let host_id = self.task.id_generator.host_id(); - - // Get the remote host id from the request. - let remote_host_id = request.host_id; - - // Get the task id from the request. - let task_id = request.task_id; - - // Get the interested piece number from the request. - let piece_number = request.piece_number; - - // Span record the host id, task id and piece number. - Span::current().record("host_id", host_id.as_str()); - Span::current().record("remote_host_id", remote_host_id.as_str()); - Span::current().record("task_id", task_id.as_str()); - Span::current().record("piece_number", piece_number); - - // Get the piece metadata from the local storage. - let piece = self - .task - .piece - .get(task_id.as_str(), piece_number) - .map_err(|err| { - error!( - "upload piece metadata {}-{} from local storage: {}", - task_id, piece_number, err - ); - Status::internal(err.to_string()) - })? - .ok_or_else(|| { - error!( - "upload piece metadata {}-{} not found", - task_id, piece_number - ); - Status::not_found("piece metadata not found") - })?; - - // Collect upload piece started metrics. - collect_upload_piece_started_metrics(); - info!("start upload piece content {}-{}", task_id, piece_number); - - // Get the piece content from the local storage. - let mut reader = self - .task - .piece - .upload_from_local_peer_into_async_read( - task_id.as_str(), - piece_number, - piece.length, - None, - false, - ) - .await - .map_err(|err| { - // Collect upload piece failure metrics. - collect_upload_piece_failure_metrics(); - - error!( - "upload piece content {}-{} from local storage: {}", - task_id, piece_number, err - ); - Status::internal(err.to_string()) - })?; - - // Read the content of the piece. - let mut content = Vec::new(); - reader.read_to_end(&mut content).await.map_err(|err| { - // Collect upload piece failure metrics. - collect_upload_piece_failure_metrics(); - - error!( - "upload piece content {}-{} failed: {}", - task_id, piece_number, err - ); - Status::internal(err.to_string()) - })?; - - // Collect upload piece finished metrics. - collect_upload_piece_finished_metrics(); - info!("finished upload piece content {}-{}", task_id, piece_number); - - // Return the piece. - Ok(Response::new(DownloadPieceResponse { - piece: Some(Piece { - number: piece.number, - parent_id: piece.parent_id, - offset: piece.offset, - length: piece.length, - digest: piece.digest, - content: Some(content), - traffic_type: None, - cost: None, - created_at: None, - }), - })) - } - // DownloadTaskStream is the stream of the download task response. type DownloadTaskStream = ReceiverStream>; @@ -685,6 +454,269 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Ok(Response::new(ReceiverStream::new(out_stream_rx))) } + + // SyncPiecesStream is the stream of the sync pieces response. + type SyncPiecesStream = ReceiverStream>; + + // sync_pieces provides the piece metadata for remote peer. + #[instrument(skip_all, fields(host_id, remote_host_id, task_id))] + async fn sync_pieces( + &self, + request: Request, + ) -> Result, Status> { + // Clone the request. + let request = request.into_inner(); + + // Generate the host id. + let host_id = self.task.id_generator.host_id(); + + // Get the remote host id from the request. + let remote_host_id = request.host_id; + + // Get the task id from tae request. + let task_id = request.task_id; + + // Span record the host id and task id. + Span::current().record("host_id", host_id.clone()); + Span::current().record("remote_host_id", remote_host_id.as_str()); + Span::current().record("task_id", task_id.clone()); + + // Get the interested piece numbers from the request. + let mut interested_piece_numbers = request.interested_piece_numbers.clone(); + + // Clone the task. + let task_manager = self.task.clone(); + + // Initialize stream channel. + let (out_stream_tx, out_stream_rx) = mpsc::channel(1024); + tokio::spawn( + async move { + loop { + let mut has_started_piece = false; + let mut finished_piece_numbers = Vec::new(); + for interested_piece_number in interested_piece_numbers.iter() { + let piece = match task_manager + .piece + .get(task_id.as_str(), *interested_piece_number) + { + Ok(Some(piece)) => piece, + Ok(None) => continue, + Err(err) => { + error!( + "send piece metadata {}-{}: {}", + task_id, interested_piece_number, err + ); + out_stream_tx + .send(Err(Status::internal(err.to_string()))) + .await + .unwrap_or_else(|err| { + error!( + "send piece metadata {}-{} to stream: {}", + task_id, interested_piece_number, err + ); + }); + + drop(out_stream_tx); + return; + } + }; + + // Send the piece metadata to the stream. + if piece.is_finished() { + out_stream_tx + .send(Ok(SyncPiecesResponse { + number: piece.number, + offset: piece.offset, + length: piece.length, + })) + .await + .unwrap_or_else(|err| { + error!( + "send finished piece {}-{} to stream: {}", + task_id, interested_piece_number, err + ); + }); + info!("send piece metadata {}-{}", task_id, piece.number); + + // Add the finished piece number to the finished piece numbers. + finished_piece_numbers.push(piece.number); + continue; + } + + // Check whether the piece is started. + if piece.is_started() { + has_started_piece = true; + } + } + + // Remove the finished piece numbers from the interested piece numbers. + interested_piece_numbers + .retain(|number| !finished_piece_numbers.contains(number)); + + // If all the interested pieces are finished, return. + if interested_piece_numbers.is_empty() { + info!("all the interested pieces are finished"); + drop(out_stream_tx); + return; + } + + // If there is no started piece, return. + if !has_started_piece { + info!("there is no started piece"); + drop(out_stream_tx); + return; + } + + // Wait for the piece to be finished. + tokio::time::sleep( + dragonfly_client_storage::DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL, + ) + .await; + } + } + .in_current_span(), + ); + + Ok(Response::new(ReceiverStream::new(out_stream_rx))) + } + + // download_piece provides the piece content for remote peer. + #[instrument(skip_all, fields(host_id, remote_host_id, task_id, piece_number))] + async fn download_piece( + &self, + request: Request, + ) -> Result, Status> { + // Clone the request. + let request = request.into_inner(); + + // Generate the host id. + let host_id = self.task.id_generator.host_id(); + + // Get the remote host id from the request. + let remote_host_id = request.host_id; + + // Get the task id from the request. + let task_id = request.task_id; + + // Get the interested piece number from the request. + let piece_number = request.piece_number; + + // Span record the host id, task id and piece number. + Span::current().record("host_id", host_id.as_str()); + Span::current().record("remote_host_id", remote_host_id.as_str()); + Span::current().record("task_id", task_id.as_str()); + Span::current().record("piece_number", piece_number); + + // Get the piece metadata from the local storage. + let piece = self + .task + .piece + .get(task_id.as_str(), piece_number) + .map_err(|err| { + error!( + "upload piece metadata {}-{} from local storage: {}", + task_id, piece_number, err + ); + Status::internal(err.to_string()) + })? + .ok_or_else(|| { + error!( + "upload piece metadata {}-{} not found", + task_id, piece_number + ); + Status::not_found("piece metadata not found") + })?; + + // Collect upload piece started metrics. + collect_upload_piece_started_metrics(); + info!("start upload piece content {}-{}", task_id, piece_number); + + // Get the piece content from the local storage. + let mut reader = self + .task + .piece + .upload_from_local_peer_into_async_read( + task_id.as_str(), + piece_number, + piece.length, + None, + false, + ) + .await + .map_err(|err| { + // Collect upload piece failure metrics. + collect_upload_piece_failure_metrics(); + + error!( + "upload piece content {}-{} from local storage: {}", + task_id, piece_number, err + ); + Status::internal(err.to_string()) + })?; + + // Read the content of the piece. + let mut content = Vec::new(); + reader.read_to_end(&mut content).await.map_err(|err| { + // Collect upload piece failure metrics. + collect_upload_piece_failure_metrics(); + + error!( + "upload piece content {}-{} failed: {}", + task_id, piece_number, err + ); + Status::internal(err.to_string()) + })?; + + // Collect upload piece finished metrics. + collect_upload_piece_finished_metrics(); + info!("finished upload piece content {}-{}", task_id, piece_number); + + // Return the piece. + Ok(Response::new(DownloadPieceResponse { + piece: Some(Piece { + number: piece.number, + parent_id: piece.parent_id, + offset: piece.offset, + length: piece.length, + digest: piece.digest, + content: Some(content), + traffic_type: None, + cost: None, + created_at: None, + }), + })) + } + + // DownloadCacheTaskStream is the stream of the download cache task response. + type DownloadCacheTaskStream = ReceiverStream>; + + // TODO: Implement this. + // download_cache_task downloads the task. + #[instrument(skip_all, fields(host_id, task_id, peer_id))] + async fn download_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + // TODO: Implement this. + // stat_cache_task stats the cache task. + async fn stat_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + // TODO: Implement this. + // delete_cache_task deletes the cache task. + async fn delete_cache_task( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } } // DfdaemonUploadClient is a wrapper of DfdaemonUploadGRPCClient. @@ -711,32 +743,7 @@ impl DfdaemonUploadClient { Ok(Self { client }) } - // get_piece_numbers gets the piece numbers. - #[instrument(skip_all)] - pub async fn sync_pieces( - &self, - request: SyncPiecesRequest, - ) -> ClientResult>> { - let request = Self::make_request(request); - let response = self.client.clone().sync_pieces(request).await?; - Ok(response) - } - - // sync_pieces syncs the pieces. - #[instrument(skip_all)] - pub async fn download_piece( - &self, - request: DownloadPieceRequest, - timeout: Duration, - ) -> ClientResult { - let mut request = tonic::Request::new(request); - request.set_timeout(timeout); - - let response = self.client.clone().download_piece(request).await?; - Ok(response.into_inner()) - } - - // trigger_download_task triggers the download task. + // download_task downloads the task. #[instrument(skip_all)] pub async fn download_task( &self, @@ -762,6 +769,83 @@ impl DfdaemonUploadClient { Ok(response) } + // sync_pieces provides the piece metadata for remote peer. + #[instrument(skip_all)] + pub async fn sync_pieces( + &self, + request: SyncPiecesRequest, + ) -> ClientResult>> { + let request = Self::make_request(request); + let response = self.client.clone().sync_pieces(request).await?; + Ok(response) + } + + // download_piece provides the piece content for remote peer. + #[instrument(skip_all)] + pub async fn download_piece( + &self, + request: DownloadPieceRequest, + timeout: Duration, + ) -> ClientResult { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let response = self.client.clone().download_piece(request).await?; + Ok(response.into_inner()) + } + + // download_cache_task downloads the cache task. + #[instrument(skip_all)] + pub async fn download_cache_task( + &self, + request: DownloadCacheTaskRequest, + ) -> ClientResult>> { + // Clone the request. + let request_clone = request.clone(); + + // Initialize the request. + let mut request = tonic::Request::new(request); + + // Set the timeout to the request. + if let Some(timeout) = request_clone.timeout { + request.set_timeout( + Duration::try_from(timeout) + .map_err(|_| tonic::Status::invalid_argument("invalid timeout"))?, + ); + } + + let response = self.client.clone().download_cache_task(request).await?; + Ok(response) + } + + // stat_cache_task stats the cache task. + #[instrument(skip_all)] + pub async fn stat_cache_task( + &self, + request: StatCacheTaskRequest, + timeout: Duration, + ) -> ClientResult { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let response = self.client.clone().stat_cache_task(request).await?; + Ok(response.into_inner()) + } + + // delete_cache_task deletes the cache task. + #[instrument(skip_all)] + pub async fn delete_cache_task( + &self, + request: DeleteCacheTaskRequest, + timeout: Duration, + ) -> ClientResult<()> { + let mut request = tonic::Request::new(request); + request.set_timeout(timeout); + + let _response = self.client.clone().delete_cache_task(request).await?; + Ok(()) + } + // make_request creates a new request with timeout. fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index 55cb9520..952f00c2 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -20,8 +20,8 @@ use dragonfly_api::common::v2::{Peer, Task}; use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, - AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse, - LeaveHostRequest, LeavePeerRequest, LeaveTaskRequest, StatPeerRequest, StatTaskRequest, + AnnouncePeerRequest, AnnouncePeerResponse, DeleteHostRequest, DeletePeerRequest, + DeleteTaskRequest, StatPeerRequest, StatTaskRequest, }; use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr}; use dragonfly_client_core::{Error, Result}; @@ -104,33 +104,17 @@ impl SchedulerClient { Ok(response.into_inner()) } - // leave_peer tells the scheduler that the peer is leaving. + // delete_peer tells the scheduler that the peer is deleting. #[instrument(skip(self))] - pub async fn leave_peer(&self, task_id: &str, request: LeavePeerRequest) -> Result<()> { + pub async fn delete_peer(&self, task_id: &str, request: DeletePeerRequest) -> Result<()> { let request = Self::make_request(request); self.client(task_id, None) .await? - .leave_peer(request) + .delete_peer(request) .await?; Ok(()) } - // exchange_peer exchanges the peer with the scheduler. - #[instrument(skip(self))] - pub async fn exchange_peer( - &self, - task_id: &str, - request: ExchangePeerRequest, - ) -> Result { - let request = Self::make_request(request); - let response = self - .client(task_id, None) - .await? - .exchange_peer(request) - .await?; - Ok(response.into_inner()) - } - // stat_task gets the status of the task. #[instrument(skip(self))] pub async fn stat_task(&self, task_id: &str, request: StatTaskRequest) -> Result { @@ -139,13 +123,13 @@ impl SchedulerClient { Ok(response.into_inner()) } - // leave_task tells the scheduler that the task is leaving. + // delete_task tells the scheduler that the task is deleting. #[instrument(skip(self))] - pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> { + pub async fn delete_task(&self, task_id: &str, request: DeleteTaskRequest) -> Result<()> { let request = Self::make_request(request); self.client(task_id, None) .await? - .leave_task(request) + .delete_task(request) .await?; Ok(()) } @@ -255,13 +239,13 @@ impl SchedulerClient { Ok(()) } - // leave_host tells the scheduler that the host is leaving. + // delete_host tells the scheduler that the host is deleting. #[instrument(skip(self))] - pub async fn leave_host(&self, request: LeaveHostRequest) -> Result<()> { + pub async fn delete_host(&self, request: DeleteHostRequest) -> Result<()> { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; - // Leave the host from the scheduler. + // Delete the host from the scheduler. let mut join_set = JoinSet::new(); let available_scheduler_addrs = self.available_scheduler_addrs.read().await; let available_scheduler_addrs_clone = available_scheduler_addrs.clone(); @@ -269,11 +253,11 @@ impl SchedulerClient { for available_scheduler_addr in available_scheduler_addrs_clone.iter() { let request = Self::make_request(request.clone()); - async fn leave_host( + async fn delete_host( addr: SocketAddr, - request: tonic::Request, + request: tonic::Request, ) -> Result<()> { - info!("leave host from {}", addr); + info!("delete host from {}", addr); // Connect to the scheduler. let channel = Channel::from_shared(format!("http://{}", addr)) @@ -288,11 +272,11 @@ impl SchedulerClient { .or_err(ErrorType::ConnectError)?; let mut client = SchedulerGRPCClient::new(channel); - client.leave_host(request).await?; + client.delete_host(request).await?; Ok(()) } - join_set.spawn(leave_host(*available_scheduler_addr, request).in_current_span()); + join_set.spawn(delete_host(*available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set @@ -302,7 +286,7 @@ impl SchedulerClient { .or_err(ErrorType::AsyncRuntimeError)? { if let Err(err) = message { - error!("failed to leave host: {}", err); + error!("failed to delete host: {}", err); } } diff --git a/dragonfly-client/src/task/mod.rs b/dragonfly-client/src/task/mod.rs index a6d6dc24..c2ef7a2e 100644 --- a/dragonfly-client/src/task/mod.rs +++ b/dragonfly-client/src/task/mod.rs @@ -28,7 +28,7 @@ use dragonfly_api::scheduler::v2::{ DownloadPeerFailedRequest, DownloadPeerFinishedRequest, DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest, DownloadPieceBackToSourceFinishedRequest, DownloadPieceFailedRequest, DownloadPieceFinishedRequest, RegisterPeerRequest, - RescheduleRequest, + ReschedulePeerRequest, }; use dragonfly_client_backend::{BackendFactory, HeadRequest}; use dragonfly_client_config::dfdaemon::Config; @@ -727,23 +727,25 @@ impl Task { host_id: host_id.to_string(), task_id: task.id.clone(), peer_id: peer_id.to_string(), - request: Some(announce_peer_request::Request::RescheduleRequest( - RescheduleRequest { - candidate_parents: response.candidate_parents, - description: Some( - "not all pieces are downloaded from remote peer" - .to_string(), - ), - }, - )), + request: Some( + announce_peer_request::Request::ReschedulePeerRequest( + ReschedulePeerRequest { + candidate_parents: response.candidate_parents, + description: Some( + "not all pieces are downloaded from remote peer" + .to_string(), + ), + }, + ), + ), }, REQUEST_TIMEOUT, ) .await { - Ok(_) => info!("sent RescheduleRequest"), + Ok(_) => info!("sent ReschedulePeerRequest"), Err(err) => { - error!("send RescheduleRequest failed: {:?}", err); + error!("send ReschedulePeerRequest failed: {:?}", err); return Ok(finished_pieces); } };