feat: rename remote peer to parent (#901)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-12-17 16:19:28 +08:00 committed by GitHub
parent d6b613e333
commit c67f13e231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 61 additions and 61 deletions

View File

@ -171,10 +171,10 @@ pub struct BackendError {
pub header: Option<reqwest::header::HeaderMap>,
}
/// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed.
/// DownloadFromParentFailed is the error when the download from parent is failed.
#[derive(Debug, thiserror::Error)]
#[error("download piece {piece_number} from remote peer {parent_id} failed")]
pub struct DownloadFromRemotePeerFailed {
#[error("download piece {piece_number} from parent {parent_id} failed")]
pub struct DownloadFromParentFailed {
/// piece_number is the number of the piece.
pub piece_number: u32,

View File

@ -21,7 +21,7 @@ pub use errors::ErrorType;
pub use errors::ExternalError;
pub use errors::OrErr;
pub use errors::{BackendError, DownloadFromRemotePeerFailed};
pub use errors::{BackendError, DownloadFromParentFailed};
/// DFError is the error for dragonfly.
#[derive(thiserror::Error, Debug)]
@ -70,9 +70,9 @@ pub enum DFError {
#[error{"available schedulers not found"}]
AvailableSchedulersNotFound,
/// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed.
/// DownloadFromParentFailed is the error when the download from parent is failed.
#[error(transparent)]
DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed),
DownloadFromParentFailed(DownloadFromParentFailed),
/// ColumnFamilyNotFound is the error when the column family is not found.
#[error{"column family {0} not found"}]

View File

@ -326,9 +326,9 @@ impl Storage {
)
}
/// download_piece_from_remote_peer_finished is used for downloading piece from remote peer.
/// download_piece_from_parent_finished is used for downloading piece from parent.
#[instrument(skip_all)]
pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>(
pub async fn download_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,

View File

@ -629,7 +629,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
/// SyncPiecesStream is the stream of the sync pieces response.
type SyncPiecesStream = ReceiverStream<Result<SyncPiecesResponse, Status>>;
/// sync_pieces provides the piece metadata for remote peer.
/// sync_pieces provides the piece metadata for parent.
#[instrument(skip_all, fields(host_id, remote_host_id, task_id))]
async fn sync_pieces(
&self,
@ -760,7 +760,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
Ok(Response::new(ReceiverStream::new(out_stream_rx)))
}
/// download_piece provides the piece content for remote peer.
/// download_piece provides the piece content for parent.
#[instrument(skip_all, fields(host_id, remote_host_id, task_id, piece_id))]
async fn download_piece(
&self,
@ -1183,7 +1183,7 @@ impl DfdaemonUploadClient {
Ok(response)
}
/// sync_pieces provides the piece metadata for remote peer.
/// sync_pieces provides the piece metadata for parent.
#[instrument(skip_all)]
pub async fn sync_pieces(
&self,
@ -1194,7 +1194,7 @@ impl DfdaemonUploadClient {
Ok(response)
}
/// download_piece provides the piece content for remote peer.
/// download_piece provides the piece content for parent.
#[instrument(skip_all)]
pub async fn download_piece(
&self,

View File

@ -36,7 +36,7 @@ use dragonfly_api::scheduler::v2::{
};
use dragonfly_client_backend::BackendFactory;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{error::DownloadFromRemotePeerFailed, Error};
use dragonfly_client_core::{error::DownloadFromParentFailed, Error};
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Result as ClientResult,
@ -643,7 +643,7 @@ impl PersistentCacheTask {
announce_persistent_cache_peer_response::Response::NormalPersistentCacheTaskResponse(
response,
) => {
// If the persistent cache task is normal, download the pieces from the remote peer.
// If the persistent cache task is normal, download the pieces from the parent.
info!(
"normal persistent cache task response: {:?}",
response
@ -683,9 +683,9 @@ impl PersistentCacheTask {
interested_pieces.clone(),
);
// Download the pieces from the remote peer.
// Download the pieces from the parent.
let partial_finished_pieces = match self
.download_partial_with_scheduler_from_remote_peer(
.download_partial_with_scheduler_from_parent(
task,
host_id,
peer_id,
@ -698,7 +698,7 @@ impl PersistentCacheTask {
{
Ok(partial_finished_pieces) => {
info!(
"schedule {} finished {} pieces from remote peer",
"schedule {} finished {} pieces from parent",
schedule_count,
partial_finished_pieces.len()
);
@ -706,7 +706,7 @@ impl PersistentCacheTask {
partial_finished_pieces
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
return Ok(finished_pieces);
}
};
@ -761,7 +761,7 @@ impl PersistentCacheTask {
ReschedulePersistentCachePeerRequest {
candidate_parents: response.candidate_cache_parents,
description: Some(
"not all pieces are downloaded from remote peer"
"not all pieces are downloaded from parent"
.to_string(),
),
},
@ -787,10 +787,10 @@ impl PersistentCacheTask {
Ok(finished_pieces)
}
/// download_partial_with_scheduler_from_remote_peer downloads a partial persistent cache task with scheduler from a remote peer.
/// download_partial_with_scheduler_from_parent downloads a partial persistent cache task with scheduler from a parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn download_partial_with_scheduler_from_remote_peer(
async fn download_partial_with_scheduler_from_parent(
&self,
task: &metadata::PersistentCacheTask,
host_id: &str,
@ -823,9 +823,9 @@ impl PersistentCacheTask {
self.config.download.concurrent_piece_count as usize,
));
// Download the pieces from the remote peers.
// Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await {
async fn download_from_remote_peer(
async fn download_from_parent(
task_id: String,
host_id: String,
peer_id: String,
@ -843,13 +843,13 @@ impl PersistentCacheTask {
let piece_id = storage.piece_id(task_id.as_str(), number);
info!(
"start to download piece {} from remote peer {:?}",
"start to download piece {} from parent {:?}",
piece_id,
parent.id.clone()
);
let metadata = piece_manager
.download_from_remote_peer(
.download_from_parent(
peer_id.as_str(),
host_id.as_str(),
task_id.as_str(),
@ -861,12 +861,12 @@ impl PersistentCacheTask {
.await
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
"download piece {} from parent {:?} error: {:?}",
piece_id,
parent.id.clone(),
err
);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
Error::DownloadFromParentFailed(DownloadFromParentFailed {
piece_number: number,
parent_id: parent.id.clone(),
})
@ -932,7 +932,7 @@ impl PersistentCacheTask {
})?;
info!(
"finished piece {} from remote peer {:?}",
"finished piece {} from parent {:?}",
piece_id, metadata.parent_id
);
@ -940,7 +940,7 @@ impl PersistentCacheTask {
}
join_set.spawn(
download_from_remote_peer(
download_from_parent(
task.id.clone(),
host_id.to_string(),
peer_id.to_string(),
@ -972,9 +972,9 @@ impl PersistentCacheTask {
// Store the finished piece.
finished_pieces.push(metadata.clone());
}
Err(Error::DownloadFromRemotePeerFailed(err)) => {
Err(Error::DownloadFromParentFailed(err)) => {
error!(
"download piece {} from remote peer {} error: {:?}",
"download piece {} from parent {} error: {:?}",
self.storage.piece_id(task.id.as_str(), err.piece_number),
err.parent_id,
err
@ -1007,7 +1007,7 @@ impl PersistentCacheTask {
continue;
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
continue;
}
}

View File

@ -389,10 +389,10 @@ impl Piece {
);
}
/// download_from_remote_peer downloads a single piece from a remote peer.
/// download_from_parent downloads a single piece from a parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_remote_peer(
pub async fn download_from_parent(
&self,
piece_id: &str,
host_id: &str,
@ -455,7 +455,7 @@ impl Piece {
// Record the finish of downloading piece.
match self
.storage
.download_piece_from_remote_peer_finished(
.download_piece_from_parent_finished(
piece_id,
task_id,
offset,

View File

@ -29,7 +29,7 @@ use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::{error, info, instrument, Instrument};
/// CollectedParent is the parent peer collected from the remote peer.
/// CollectedParent is the parent peer collected from the parent.
#[derive(Clone, Debug)]
pub struct CollectedParent {
/// id is the id of the parent.
@ -110,7 +110,7 @@ impl PieceCollector {
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
Self::collect_from_remote_peers(
Self::collect_from_parents(
config,
&host_id,
&task_id,
@ -131,10 +131,10 @@ impl PieceCollector {
collected_piece_rx
}
/// collect_from_remote_peers collects pieces from remote peers.
/// collect_from_parents collects pieces from parents.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn collect_from_remote_peers(
async fn collect_from_parents(
config: Arc<Config>,
host_id: &str,
task_id: &str,

View File

@ -39,7 +39,7 @@ use dragonfly_api::scheduler::v2::{
use dragonfly_client_backend::{BackendFactory, HeadRequest};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{
error::{BackendError, DownloadFromRemotePeerFailed, ErrorType, OrErr},
error::{BackendError, DownloadFromParentFailed, ErrorType, OrErr},
Error, Result as ClientResult,
};
use dragonfly_client_storage::{metadata, Storage};
@ -627,7 +627,7 @@ impl Task {
return Ok(Vec::new());
}
announce_peer_response::Response::NormalTaskResponse(response) => {
// If the task is normal, download the pieces from the remote peer.
// If the task is normal, download the pieces from the parent.
info!(
"normal task response: {:?}",
response
@ -667,9 +667,9 @@ impl Task {
interested_pieces.clone(),
);
// Download the pieces from the remote peer.
// Download the pieces from the parent.
let partial_finished_pieces = match self
.download_partial_with_scheduler_from_remote_peer(
.download_partial_with_scheduler_from_parent(
task,
host_id,
peer_id,
@ -683,7 +683,7 @@ impl Task {
{
Ok(partial_finished_pieces) => {
info!(
"schedule {} finished {} pieces from remote peer",
"schedule {} finished {} pieces from parent",
schedule_count,
partial_finished_pieces.len()
);
@ -691,7 +691,7 @@ impl Task {
partial_finished_pieces
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
return Ok(finished_pieces);
}
};
@ -747,7 +747,7 @@ impl Task {
ReschedulePeerRequest {
candidate_parents: response.candidate_parents,
description: Some(
"not all pieces are downloaded from remote peer"
"not all pieces are downloaded from parent"
.to_string(),
),
},
@ -907,10 +907,10 @@ impl Task {
Ok(finished_pieces)
}
/// download_partial_with_scheduler_from_remote_peer downloads a partial task with scheduler from a remote peer.
/// download_partial_with_scheduler_from_parent downloads a partial task with scheduler from a parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn download_partial_with_scheduler_from_remote_peer(
async fn download_partial_with_scheduler_from_parent(
&self,
task: &metadata::Task,
host_id: &str,
@ -940,7 +940,7 @@ impl Task {
);
let mut piece_collector_rx = piece_collector.run().await;
// Initialize the interrupt. If download from remote peer failed with scheduler or download
// Initialize the interrupt. If download from parent failed with scheduler or download
// progress, interrupt the collector and return the finished pieces.
let interrupt = Arc::new(AtomicBool::new(false));
@ -953,7 +953,7 @@ impl Task {
self.config.download.concurrent_piece_count as usize,
));
// Download the pieces from the remote peers.
// Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await {
if interrupt.load(Ordering::SeqCst) {
// If the interrupt is true, break the collector loop.
@ -962,7 +962,7 @@ impl Task {
break;
}
async fn download_from_remote_peer(
async fn download_from_parent(
task_id: String,
host_id: String,
peer_id: String,
@ -983,13 +983,13 @@ impl Task {
let piece_id = storage.piece_id(task_id.as_str(), number);
info!(
"start to download piece {} from remote peer {:?}",
"start to download piece {} from parent {:?}",
piece_id,
parent.id.clone()
);
let metadata = piece_manager
.download_from_remote_peer(
.download_from_parent(
piece_id.as_str(),
host_id.as_str(),
task_id.as_str(),
@ -1001,12 +1001,12 @@ impl Task {
.await
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
"download piece {} from parent {:?} error: {:?}",
piece_id,
parent.id.clone(),
err
);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
Error::DownloadFromParentFailed(DownloadFromParentFailed {
piece_number: number,
parent_id: parent.id.clone(),
})
@ -1080,7 +1080,7 @@ impl Task {
})?;
info!(
"finished piece {} from remote peer {:?}",
"finished piece {} from parent {:?}",
piece_id, metadata.parent_id
);
@ -1091,7 +1091,7 @@ impl Task {
}
join_set.spawn(
download_from_remote_peer(
download_from_parent(
task_id.to_string(),
host_id.to_string(),
peer_id.to_string(),
@ -1120,7 +1120,7 @@ impl Task {
{
match message {
Ok(_) => {}
Err(Error::DownloadFromRemotePeerFailed(err)) => {
Err(Error::DownloadFromParentFailed(err)) => {
let (piece_number, parent_id) = (err.piece_number, err.parent_id);
// Send the download piece failed request.
@ -1151,7 +1151,7 @@ impl Task {
)
});
// If the download failed from the remote peer, continue to download the next
// If the download failed from the parent, continue to download the next
// piece and ignore the error.
continue;
}
@ -1159,13 +1159,13 @@ impl Task {
join_set.detach_all();
// If the send timeout with scheduler or download progress, return the finished pieces.
// It will stop the download from the remote peer with scheduler
// It will stop the download from the parent with scheduler
// and download from the source directly from middle.
let finished_pieces = finished_pieces.lock().unwrap().clone();
return Ok(finished_pieces);
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
// If the unknown error occurred, continue to download the next piece and
// ignore the error.