refactor(downloader): streamline download concurrency handling (#1341)

* style(error): update error message formatting

Signed-off-by: Gaius <gaius.qi@gmail.com>

* refactor(downloader): streamline download concurrency handling

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-09-09 15:57:00 +08:00 committed by GitHub
parent af354b7e59
commit 79fa36ca97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 170 additions and 157 deletions

16
Cargo.lock generated
View File

@ -986,7 +986,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"anyhow",
"bytes",
@ -1059,7 +1059,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1090,7 +1090,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1120,7 +1120,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"headers 0.4.1",
"hyper 1.6.0",
@ -1138,7 +1138,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"anyhow",
"clap",
@ -1155,7 +1155,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"bincode",
"bytes",
@ -1182,7 +1182,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"base64 0.22.1",
"bytesize",
@ -1593,7 +1593,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "1.0.17"
version = "1.0.18"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "1.0.17"
version = "1.0.18"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "1.0.17" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.17" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.17" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.17" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.17" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.17" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.17" }
dragonfly-client = { path = "dragonfly-client", version = "1.0.18" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.18" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.18" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.18" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.18" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.18" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.18" }
dragonfly-api = "=2.1.62"
thiserror = "2.0"
futures = "0.3.31"

View File

@ -926,7 +926,7 @@ mod tests {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"backend error s3 need object_storage parameter"
"backend error: s3 need object_storage parameter"
)
}
@ -935,28 +935,28 @@ mod tests {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error s3 need access_key_id, access_key_secret, region",
"backend error: s3 need access_key_id, access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error s3 need access_key_secret, region",
"backend error: s3 need access_key_secret, region",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error s3 need access_key_id, region",
"backend error: s3 need access_key_id, region",
),
(
ObjectStorageInfo {
region: Some("test-region".into()),
..Default::default()
},
"backend error s3 need access_key_id, access_key_secret",
"backend error: s3 need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
@ -964,7 +964,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error s3 need region",
"backend error: s3 need region",
),
(
ObjectStorageInfo {
@ -972,7 +972,7 @@ mod tests {
region: Some("test-region".into()),
..Default::default()
},
"backend error s3 need access_key_secret",
"backend error: s3 need access_key_secret",
),
(
ObjectStorageInfo {
@ -980,7 +980,7 @@ mod tests {
region: Some("test-region".into()),
..Default::default()
},
"backend error s3 need access_key_id",
"backend error: s3 need access_key_id",
),
];
@ -1004,28 +1004,28 @@ mod tests {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error abs need access_key_id, access_key_secret, endpoint",
"backend error: abs need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error abs need access_key_secret, endpoint",
"backend error: abs need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error abs need access_key_id, endpoint",
"backend error: abs need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error abs need access_key_id, access_key_secret",
"backend error: abs need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
@ -1033,7 +1033,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error abs need endpoint",
"backend error: abs need endpoint",
),
(
ObjectStorageInfo {
@ -1041,7 +1041,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error abs need access_key_secret",
"backend error: abs need access_key_secret",
),
(
ObjectStorageInfo {
@ -1049,7 +1049,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error abs need access_key_id",
"backend error: abs need access_key_id",
),
];
@ -1073,28 +1073,28 @@ mod tests {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error oss need access_key_id, access_key_secret, endpoint",
"backend error: oss need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error oss need access_key_secret, endpoint",
"backend error: oss need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error oss need access_key_id, endpoint",
"backend error: oss need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error oss need access_key_id, access_key_secret",
"backend error: oss need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
@ -1102,7 +1102,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error oss need endpoint",
"backend error: oss need endpoint",
),
(
ObjectStorageInfo {
@ -1110,7 +1110,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error oss need access_key_secret",
"backend error: oss need access_key_secret",
),
(
ObjectStorageInfo {
@ -1118,7 +1118,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error oss need access_key_id",
"backend error: oss need access_key_id",
),
];
@ -1142,28 +1142,28 @@ mod tests {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error obs need access_key_id, access_key_secret, endpoint",
"backend error: obs need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error obs need access_key_secret, endpoint",
"backend error: obs need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error obs need access_key_id, endpoint",
"backend error: obs need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error obs need access_key_id, access_key_secret",
"backend error: obs need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
@ -1171,7 +1171,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error obs need endpoint",
"backend error: obs need endpoint",
),
(
ObjectStorageInfo {
@ -1179,7 +1179,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error obs need access_key_secret",
"backend error: obs need access_key_secret",
),
(
ObjectStorageInfo {
@ -1187,7 +1187,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error obs need access_key_id",
"backend error: obs need access_key_id",
),
];
@ -1211,28 +1211,28 @@ mod tests {
let test_cases = vec![
(
ObjectStorageInfo::default(),
"backend error cos need access_key_id, access_key_secret, endpoint",
"backend error: cos need access_key_id, access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_id: Some("access_key_id".into()),
..Default::default()
},
"backend error cos need access_key_secret, endpoint",
"backend error: cos need access_key_secret, endpoint",
),
(
ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error cos need access_key_id, endpoint",
"backend error: cos need access_key_id, endpoint",
),
(
ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error cos need access_key_id, access_key_secret",
"backend error: cos need access_key_id, access_key_secret",
),
(
ObjectStorageInfo {
@ -1240,7 +1240,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()),
..Default::default()
},
"backend error cos need endpoint",
"backend error: cos need endpoint",
),
(
ObjectStorageInfo {
@ -1248,7 +1248,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error cos need access_key_secret",
"backend error: cos need access_key_secret",
),
(
ObjectStorageInfo {
@ -1256,7 +1256,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()),
..Default::default()
},
"backend error cos need access_key_id",
"backend error: cos need access_key_id",
),
];

View File

@ -159,7 +159,7 @@ impl<T, E> OrErr<T, E> for Result<T, E> {
/// BackendError is the error for backend.
#[derive(Debug, thiserror::Error)]
#[error("backend error {message}")]
#[error("backend error: {message}")]
pub struct BackendError {
/// message is the error message.
pub message: String,

View File

@ -16,6 +16,8 @@
use dragonfly_client_core::Result;
use tokio::fs;
#[cfg(target_os = "linux")]
use tracing::warn;
/// fallocate allocates the space for the file and fills it with zero, only on Linux.

View File

@ -201,7 +201,7 @@ struct Args {
short = 'I',
long = "include-files",
required = false,
help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files='file.txt' --include-files='subdir/file.txt' --include-files='subdir/dir/'"
help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files file.txt --include-files subdir/file.txt --include-files subdir/dir/"
)]
include_files: Option<Vec<String>>,
@ -264,7 +264,7 @@ struct Args {
#[arg(
long,
default_value_t = 5,
default_value_t = 1,
help = "Specify the max count of concurrent download files when downloading a directory"
)]
max_concurrent_requests: usize,
@ -709,23 +709,12 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
entry_args.url = entry_url;
let progress_bar = multi_progress_bar.add(ProgressBar::new(0));
async fn download_entry(
args: Args,
progress_bar: ProgressBar,
download_client: DfdaemonDownloadClient,
semaphore: Arc<Semaphore>,
) -> Result<()> {
// Limit the concurrent download tasks.
let _permit = semaphore.acquire().await.unwrap();
download(args, progress_bar, download_client).await
}
join_set.spawn(download_entry(
entry_args,
progress_bar,
download_client.clone(),
semaphore.clone(),
));
let download_client = download_client.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let _permit = permit;
download(entry_args, progress_bar, download_client).await
});
}
}
@ -740,7 +729,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
Ok(_) => continue,
Err(err) => {
error!("download entry failed: {}", err);
join_set.abort_all();
join_set.shutdown().await;
return Err(err);
}
}

View File

@ -272,7 +272,7 @@ impl PersistentCacheTask {
finished_pieces.push(metadata.clone());
}
Err(err) => {
join_set.detach_all();
join_set.shutdown().await;
// Delete the persistent cache task.
self.storage
@ -1042,8 +1042,6 @@ impl PersistentCacheTask {
let semaphore = Arc::new(Semaphore::new(
self.config.download.concurrent_piece_count as usize,
));
// Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await {
if interrupt.load(Ordering::SeqCst) {
// If the interrupt is true, break the collector loop.
@ -1061,15 +1059,11 @@ impl PersistentCacheTask {
need_piece_content: bool,
parent: piece_collector::CollectedParent,
piece_manager: Arc<super::piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadPersistentCacheTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePersistentCachePeerRequest>,
interrupt: Arc<AtomicBool>,
finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = piece_manager.persistent_cache_id(task_id.as_str(), number);
info!(
"start to download persistent cache piece {} from parent {:?}",
@ -1203,24 +1197,34 @@ impl PersistentCacheTask {
Ok(metadata)
}
join_set.spawn(
let task_id = task_id.to_string();
let host_id = host_id.to_string();
let peer_id = peer_id.to_string();
let piece_manager = self.piece.clone();
let download_progress_tx = download_progress_tx.clone();
let in_stream_tx = in_stream_tx.clone();
let interrupt = interrupt.clone();
let finished_pieces = finished_pieces.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let _permit = permit;
download_from_parent(
task.id.clone(),
host_id.to_string(),
peer_id.to_string(),
task_id,
host_id,
peer_id,
collect_piece.number,
collect_piece.length,
need_piece_content,
collect_piece.parent.clone(),
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
in_stream_tx.clone(),
interrupt.clone(),
finished_pieces.clone(),
piece_manager,
download_progress_tx,
in_stream_tx,
interrupt,
finished_pieces,
)
.in_current_span(),
);
.in_current_span()
.await
});
}
// Wait for the pieces to be downloaded.
@ -1233,7 +1237,7 @@ impl PersistentCacheTask {
match message {
Ok(_) => {}
Err(Error::DownloadFromParentFailed(err)) => {
join_set.detach_all();
join_set.shutdown().await;
// Send the download piece failed request.
let (piece_number, parent_id) = (err.piece_number, err.parent_id.clone());
@ -1263,14 +1267,14 @@ impl PersistentCacheTask {
return Err(Error::DownloadFromParentFailed(err));
}
Err(Error::SendTimeout) => {
join_set.detach_all();
join_set.shutdown().await;
// If the send timeout with scheduler or download progress, return the error
// and interrupt the collector.
return Err(Error::SendTimeout);
}
Err(err) => {
join_set.detach_all();
join_set.shutdown().await;
error!("download from parent error: {:?}", err);
return Err(err);
}

View File

@ -290,7 +290,7 @@ impl PieceCollector {
// If all pieces are collected, abort all tasks.
if collected_pieces.is_empty() {
info!("all pieces are collected, abort all tasks");
join_set.abort_all();
join_set.shutdown().await;
}
}
Ok(Err(err)) => {
@ -550,7 +550,7 @@ impl PersistentCachePieceCollector {
// If all pieces are collected, abort all tasks.
if collected_pieces.is_empty() {
info!("all persistent cache pieces are collected, abort all tasks");
join_set.abort_all();
join_set.shutdown().await;
}
}
Ok(Err(err)) => {

View File

@ -1012,8 +1012,6 @@ impl Task {
let semaphore = Arc::new(Semaphore::new(
self.config.download.concurrent_piece_count as usize,
));
// Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await {
if interrupt.load(Ordering::SeqCst) {
// If the interrupt is true, break the collector loop.
@ -1030,7 +1028,6 @@ impl Task {
length: u64,
parent: piece_collector::CollectedParent,
piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>,
interrupt: Arc<AtomicBool>,
@ -1038,9 +1035,6 @@ impl Task {
is_prefetch: bool,
need_piece_content: bool,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = piece_manager.id(task_id.as_str(), number);
info!(
"start to download piece {} from parent {:?}",
@ -1174,25 +1168,35 @@ impl Task {
Ok(metadata)
}
join_set.spawn(
let task_id = task_id.to_string();
let host_id = host_id.to_string();
let peer_id = peer_id.to_string();
let piece_manager = self.piece.clone();
let download_progress_tx = download_progress_tx.clone();
let in_stream_tx = in_stream_tx.clone();
let interrupt = interrupt.clone();
let finished_pieces = finished_pieces.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let _permit = permit;
download_from_parent(
task_id.to_string(),
host_id.to_string(),
peer_id.to_string(),
task_id,
host_id,
peer_id,
collect_piece.number,
collect_piece.length,
collect_piece.parent.clone(),
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
in_stream_tx.clone(),
interrupt.clone(),
finished_pieces.clone(),
piece_manager,
download_progress_tx,
in_stream_tx,
interrupt,
finished_pieces,
is_prefetch,
need_piece_content,
)
.in_current_span(),
);
.in_current_span()
.await
});
}
// Wait for the pieces to be downloaded.
@ -1240,7 +1244,7 @@ impl Task {
continue;
}
Err(Error::SendTimeout) => {
join_set.detach_all();
join_set.shutdown().await;
// If the send timeout with scheduler or download progress, return the finished pieces.
// It will stop the download from the parent with scheduler
@ -1304,15 +1308,11 @@ impl Task {
is_prefetch: bool,
need_piece_content: bool,
piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = piece_manager.id(task_id.as_str(), number);
info!("start to download piece {} from source", piece_id);
@ -1418,27 +1418,39 @@ impl Task {
Ok(metadata)
}
join_set.spawn(
let task_id = task_id.to_string();
let host_id = host_id.to_string();
let peer_id = peer_id.to_string();
let url = request.url.clone();
let request_header = request_header.clone();
let piece_manager = self.piece.clone();
let download_progress_tx = download_progress_tx.clone();
let in_stream_tx = in_stream_tx.clone();
let object_storage = request.object_storage.clone();
let hdfs = request.hdfs.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let _permit = permit;
download_from_source(
task_id.to_string(),
host_id.to_string(),
peer_id.to_string(),
task_id,
host_id,
peer_id,
interested_piece.number,
request.url.clone(),
url,
interested_piece.offset,
interested_piece.length,
request_header.clone(),
request_header,
request.is_prefetch,
request.need_piece_content,
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
in_stream_tx.clone(),
request.object_storage.clone(),
request.hdfs.clone(),
piece_manager,
download_progress_tx,
in_stream_tx,
object_storage,
hdfs,
)
.in_current_span(),
);
.in_current_span()
.await
});
}
// Wait for the pieces to be downloaded.
@ -1454,7 +1466,7 @@ impl Task {
finished_pieces.push(metadata.clone());
}
Err(Error::BackendError(err)) => {
join_set.detach_all();
join_set.shutdown().await;
// Send the download piece http failed request.
in_stream_tx.send_timeout(AnnouncePeerRequest {
@ -1483,7 +1495,7 @@ impl Task {
return Err(Error::BackendError(err));
}
Err(Error::SendTimeout) => {
join_set.detach_all();
join_set.shutdown().await;
// Send the download piece failed request.
in_stream_tx.send_timeout(AnnouncePeerRequest {
@ -1510,7 +1522,7 @@ impl Task {
return Ok(finished_pieces);
}
Err(err) => {
join_set.detach_all();
join_set.shutdown().await;
// Send the download piece failed request.
in_stream_tx.send_timeout(AnnouncePeerRequest {
@ -1684,8 +1696,7 @@ impl Task {
let semaphore = Arc::new(Semaphore::new(
self.config.download.concurrent_piece_count as usize,
));
for interested_piece in &interested_pieces {
for interested_piece in interested_pieces.clone() {
async fn download_from_source(
task_id: String,
host_id: String,
@ -1698,14 +1709,10 @@ impl Task {
is_prefetch: bool,
need_piece_content: bool,
piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent download count.
let _permit = semaphore.acquire().await.unwrap();
let piece_id = piece_manager.id(task_id.as_str(), number);
info!("start to download piece {} from source", piece_id);
@ -1790,26 +1797,37 @@ impl Task {
Ok(metadata)
}
join_set.spawn(
let task_id = task_id.to_string();
let host_id = host_id.to_string();
let peer_id = peer_id.to_string();
let url = request.url.clone();
let request_header = request_header.clone();
let piece_manager = self.piece.clone();
let download_progress_tx = download_progress_tx.clone();
let object_storage = request.object_storage.clone();
let hdfs = request.hdfs.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_set.spawn(async move {
let _permit = permit;
download_from_source(
task_id.to_string(),
host_id.to_string(),
peer_id.to_string(),
task_id,
host_id,
peer_id,
interested_piece.number,
request.url.clone(),
url,
interested_piece.offset,
interested_piece.length,
request_header.clone(),
request_header,
request.is_prefetch,
request.need_piece_content,
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
request.object_storage.clone(),
request.hdfs.clone(),
piece_manager,
download_progress_tx,
object_storage,
hdfs,
)
.in_current_span(),
);
.in_current_span()
.await
});
}
// Wait for the pieces to be downloaded.
@ -1825,7 +1843,7 @@ impl Task {
finished_pieces.push(metadata.clone());
}
Err(err) => {
join_set.detach_all();
join_set.shutdown().await;
// If the download failed from the source, return the error.
// It will stop the download from the source.