diff --git a/dragonfly-client-core/src/error/errors.rs b/dragonfly-client-core/src/error/errors.rs index 32fd2197..8855f1fc 100644 --- a/dragonfly-client-core/src/error/errors.rs +++ b/dragonfly-client-core/src/error/errors.rs @@ -171,10 +171,10 @@ pub struct BackendError { pub header: Option, } -/// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed. +/// DownloadFromParentFailed is the error when the download from parent is failed. #[derive(Debug, thiserror::Error)] -#[error("download piece {piece_number} from remote peer {parent_id} failed")] -pub struct DownloadFromRemotePeerFailed { +#[error("download piece {piece_number} from parent {parent_id} failed")] +pub struct DownloadFromParentFailed { /// piece_number is the number of the piece. pub piece_number: u32, diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index a8be7272..55f031d3 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -21,7 +21,7 @@ pub use errors::ErrorType; pub use errors::ExternalError; pub use errors::OrErr; -pub use errors::{BackendError, DownloadFromRemotePeerFailed}; +pub use errors::{BackendError, DownloadFromParentFailed}; /// DFError is the error for dragonfly. #[derive(thiserror::Error, Debug)] @@ -70,9 +70,9 @@ pub enum DFError { #[error{"available schedulers not found"}] AvailableSchedulersNotFound, - /// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed. + /// DownloadFromParentFailed is the error when the download from parent is failed. #[error(transparent)] - DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed), + DownloadFromParentFailed(DownloadFromParentFailed), /// ColumnFamilyNotFound is the error when the column family is not found. #[error{"column family {0} not found"}] diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index e73c5ac3..b546294f 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -326,9 +326,9 @@ impl Storage { ) } - /// download_piece_from_remote_peer_finished is used for downloading piece from remote peer. + /// download_piece_from_parent_finished is used for downloading piece from parent. #[instrument(skip_all)] - pub async fn download_piece_from_remote_peer_finished( + pub async fn download_piece_from_parent_finished( &self, piece_id: &str, task_id: &str, diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index e559ae04..73b049aa 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -629,7 +629,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { /// SyncPiecesStream is the stream of the sync pieces response. type SyncPiecesStream = ReceiverStream>; - /// sync_pieces provides the piece metadata for remote peer. + /// sync_pieces provides the piece metadata for parent. #[instrument(skip_all, fields(host_id, remote_host_id, task_id))] async fn sync_pieces( &self, @@ -760,7 +760,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Ok(Response::new(ReceiverStream::new(out_stream_rx))) } - /// download_piece provides the piece content for remote peer. + /// download_piece provides the piece content for parent. #[instrument(skip_all, fields(host_id, remote_host_id, task_id, piece_id))] async fn download_piece( &self, @@ -1183,7 +1183,7 @@ impl DfdaemonUploadClient { Ok(response) } - /// sync_pieces provides the piece metadata for remote peer. + /// sync_pieces provides the piece metadata for parent. #[instrument(skip_all)] pub async fn sync_pieces( &self, @@ -1194,7 +1194,7 @@ impl DfdaemonUploadClient { Ok(response) } - /// download_piece provides the piece content for remote peer. + /// download_piece provides the piece content for parent. #[instrument(skip_all)] pub async fn download_piece( &self, diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 6753cbd7..e6c43027 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -36,7 +36,7 @@ use dragonfly_api::scheduler::v2::{ }; use dragonfly_client_backend::BackendFactory; use dragonfly_client_config::dfdaemon::Config; -use dragonfly_client_core::{error::DownloadFromRemotePeerFailed, Error}; +use dragonfly_client_core::{error::DownloadFromParentFailed, Error}; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Result as ClientResult, @@ -643,7 +643,7 @@ impl PersistentCacheTask { announce_persistent_cache_peer_response::Response::NormalPersistentCacheTaskResponse( response, ) => { - // If the persistent cache task is normal, download the pieces from the remote peer. + // If the persistent cache task is normal, download the pieces from the parent. info!( "normal persistent cache task response: {:?}", response @@ -683,9 +683,9 @@ impl PersistentCacheTask { interested_pieces.clone(), ); - // Download the pieces from the remote peer. + // Download the pieces from the parent. let partial_finished_pieces = match self - .download_partial_with_scheduler_from_remote_peer( + .download_partial_with_scheduler_from_parent( task, host_id, peer_id, @@ -698,7 +698,7 @@ impl PersistentCacheTask { { Ok(partial_finished_pieces) => { info!( - "schedule {} finished {} pieces from remote peer", + "schedule {} finished {} pieces from parent", schedule_count, partial_finished_pieces.len() ); @@ -706,7 +706,7 @@ impl PersistentCacheTask { partial_finished_pieces } Err(err) => { - error!("download from remote peer error: {:?}", err); + error!("download from parent error: {:?}", err); return Ok(finished_pieces); } }; @@ -761,7 +761,7 @@ impl PersistentCacheTask { ReschedulePersistentCachePeerRequest { candidate_parents: response.candidate_cache_parents, description: Some( - "not all pieces are downloaded from remote peer" + "not all pieces are downloaded from parent" .to_string(), ), }, @@ -787,10 +787,10 @@ impl PersistentCacheTask { Ok(finished_pieces) } - /// download_partial_with_scheduler_from_remote_peer downloads a partial persistent cache task with scheduler from a remote peer. + /// download_partial_with_scheduler_from_parent downloads a partial persistent cache task with scheduler from a parent. #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] - async fn download_partial_with_scheduler_from_remote_peer( + async fn download_partial_with_scheduler_from_parent( &self, task: &metadata::PersistentCacheTask, host_id: &str, @@ -823,9 +823,9 @@ impl PersistentCacheTask { self.config.download.concurrent_piece_count as usize, )); - // Download the pieces from the remote peers. + // Download the pieces from the parents. while let Some(collect_piece) = piece_collector_rx.recv().await { - async fn download_from_remote_peer( + async fn download_from_parent( task_id: String, host_id: String, peer_id: String, @@ -843,13 +843,13 @@ impl PersistentCacheTask { let piece_id = storage.piece_id(task_id.as_str(), number); info!( - "start to download piece {} from remote peer {:?}", + "start to download piece {} from parent {:?}", piece_id, parent.id.clone() ); let metadata = piece_manager - .download_from_remote_peer( + .download_from_parent( peer_id.as_str(), host_id.as_str(), task_id.as_str(), @@ -861,12 +861,12 @@ impl PersistentCacheTask { .await .map_err(|err| { error!( - "download piece {} from remote peer {:?} error: {:?}", + "download piece {} from parent {:?} error: {:?}", piece_id, parent.id.clone(), err ); - Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { + Error::DownloadFromParentFailed(DownloadFromParentFailed { piece_number: number, parent_id: parent.id.clone(), }) @@ -932,7 +932,7 @@ impl PersistentCacheTask { })?; info!( - "finished piece {} from remote peer {:?}", + "finished piece {} from parent {:?}", piece_id, metadata.parent_id ); @@ -940,7 +940,7 @@ impl PersistentCacheTask { } join_set.spawn( - download_from_remote_peer( + download_from_parent( task.id.clone(), host_id.to_string(), peer_id.to_string(), @@ -972,9 +972,9 @@ impl PersistentCacheTask { // Store the finished piece. finished_pieces.push(metadata.clone()); } - Err(Error::DownloadFromRemotePeerFailed(err)) => { + Err(Error::DownloadFromParentFailed(err)) => { error!( - "download piece {} from remote peer {} error: {:?}", + "download piece {} from parent {} error: {:?}", self.storage.piece_id(task.id.as_str(), err.piece_number), err.parent_id, err @@ -1007,7 +1007,7 @@ impl PersistentCacheTask { continue; } Err(err) => { - error!("download from remote peer error: {:?}", err); + error!("download from parent error: {:?}", err); continue; } } diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 6235041c..b358b2d2 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -389,10 +389,10 @@ impl Piece { ); } - /// download_from_remote_peer downloads a single piece from a remote peer. + /// download_from_parent downloads a single piece from a parent. #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(piece_id))] - pub async fn download_from_remote_peer( + pub async fn download_from_parent( &self, piece_id: &str, host_id: &str, @@ -455,7 +455,7 @@ impl Piece { // Record the finish of downloading piece. match self .storage - .download_piece_from_remote_peer_finished( + .download_piece_from_parent_finished( piece_id, task_id, offset, diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index aca60227..0d89fb07 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -29,7 +29,7 @@ use tokio::task::JoinSet; use tokio_stream::StreamExt; use tracing::{error, info, instrument, Instrument}; -/// CollectedParent is the parent peer collected from the remote peer. +/// CollectedParent is the parent peer collected from the parent. #[derive(Clone, Debug)] pub struct CollectedParent { /// id is the id of the parent. @@ -110,7 +110,7 @@ impl PieceCollector { let (collected_piece_tx, collected_piece_rx) = mpsc::channel(10 * 1024); tokio::spawn( async move { - Self::collect_from_remote_peers( + Self::collect_from_parents( config, &host_id, &task_id, @@ -131,10 +131,10 @@ impl PieceCollector { collected_piece_rx } - /// collect_from_remote_peers collects pieces from remote peers. + /// collect_from_parents collects pieces from parents. #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] - async fn collect_from_remote_peers( + async fn collect_from_parents( config: Arc, host_id: &str, task_id: &str, diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 6137e05c..0e5dadec 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -39,7 +39,7 @@ use dragonfly_api::scheduler::v2::{ use dragonfly_client_backend::{BackendFactory, HeadRequest}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{ - error::{BackendError, DownloadFromRemotePeerFailed, ErrorType, OrErr}, + error::{BackendError, DownloadFromParentFailed, ErrorType, OrErr}, Error, Result as ClientResult, }; use dragonfly_client_storage::{metadata, Storage}; @@ -627,7 +627,7 @@ impl Task { return Ok(Vec::new()); } announce_peer_response::Response::NormalTaskResponse(response) => { - // If the task is normal, download the pieces from the remote peer. + // If the task is normal, download the pieces from the parent. info!( "normal task response: {:?}", response @@ -667,9 +667,9 @@ impl Task { interested_pieces.clone(), ); - // Download the pieces from the remote peer. + // Download the pieces from the parent. let partial_finished_pieces = match self - .download_partial_with_scheduler_from_remote_peer( + .download_partial_with_scheduler_from_parent( task, host_id, peer_id, @@ -683,7 +683,7 @@ impl Task { { Ok(partial_finished_pieces) => { info!( - "schedule {} finished {} pieces from remote peer", + "schedule {} finished {} pieces from parent", schedule_count, partial_finished_pieces.len() ); @@ -691,7 +691,7 @@ impl Task { partial_finished_pieces } Err(err) => { - error!("download from remote peer error: {:?}", err); + error!("download from parent error: {:?}", err); return Ok(finished_pieces); } }; @@ -747,7 +747,7 @@ impl Task { ReschedulePeerRequest { candidate_parents: response.candidate_parents, description: Some( - "not all pieces are downloaded from remote peer" + "not all pieces are downloaded from parent" .to_string(), ), }, @@ -907,10 +907,10 @@ impl Task { Ok(finished_pieces) } - /// download_partial_with_scheduler_from_remote_peer downloads a partial task with scheduler from a remote peer. + /// download_partial_with_scheduler_from_parent downloads a partial task with scheduler from a parent. #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] - async fn download_partial_with_scheduler_from_remote_peer( + async fn download_partial_with_scheduler_from_parent( &self, task: &metadata::Task, host_id: &str, @@ -940,7 +940,7 @@ impl Task { ); let mut piece_collector_rx = piece_collector.run().await; - // Initialize the interrupt. If download from remote peer failed with scheduler or download + // Initialize the interrupt. If download from parent failed with scheduler or download // progress, interrupt the collector and return the finished pieces. let interrupt = Arc::new(AtomicBool::new(false)); @@ -953,7 +953,7 @@ impl Task { self.config.download.concurrent_piece_count as usize, )); - // Download the pieces from the remote peers. + // Download the pieces from the parents. while let Some(collect_piece) = piece_collector_rx.recv().await { if interrupt.load(Ordering::SeqCst) { // If the interrupt is true, break the collector loop. @@ -962,7 +962,7 @@ impl Task { break; } - async fn download_from_remote_peer( + async fn download_from_parent( task_id: String, host_id: String, peer_id: String, @@ -983,13 +983,13 @@ impl Task { let piece_id = storage.piece_id(task_id.as_str(), number); info!( - "start to download piece {} from remote peer {:?}", + "start to download piece {} from parent {:?}", piece_id, parent.id.clone() ); let metadata = piece_manager - .download_from_remote_peer( + .download_from_parent( piece_id.as_str(), host_id.as_str(), task_id.as_str(), @@ -1001,12 +1001,12 @@ impl Task { .await .map_err(|err| { error!( - "download piece {} from remote peer {:?} error: {:?}", + "download piece {} from parent {:?} error: {:?}", piece_id, parent.id.clone(), err ); - Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { + Error::DownloadFromParentFailed(DownloadFromParentFailed { piece_number: number, parent_id: parent.id.clone(), }) @@ -1080,7 +1080,7 @@ impl Task { })?; info!( - "finished piece {} from remote peer {:?}", + "finished piece {} from parent {:?}", piece_id, metadata.parent_id ); @@ -1091,7 +1091,7 @@ impl Task { } join_set.spawn( - download_from_remote_peer( + download_from_parent( task_id.to_string(), host_id.to_string(), peer_id.to_string(), @@ -1120,7 +1120,7 @@ impl Task { { match message { Ok(_) => {} - Err(Error::DownloadFromRemotePeerFailed(err)) => { + Err(Error::DownloadFromParentFailed(err)) => { let (piece_number, parent_id) = (err.piece_number, err.parent_id); // Send the download piece failed request. @@ -1151,7 +1151,7 @@ impl Task { ) }); - // If the download failed from the remote peer, continue to download the next + // If the download failed from the parent, continue to download the next // piece and ignore the error. continue; } @@ -1159,13 +1159,13 @@ impl Task { join_set.detach_all(); // If the send timeout with scheduler or download progress, return the finished pieces. - // It will stop the download from the remote peer with scheduler + // It will stop the download from the parent with scheduler // and download from the source directly from middle. let finished_pieces = finished_pieces.lock().unwrap().clone(); return Ok(finished_pieces); } Err(err) => { - error!("download from remote peer error: {:?}", err); + error!("download from parent error: {:?}", err); // If the unknown error occurred, continue to download the next piece and // ignore the error.