diff --git a/Cargo.lock b/Cargo.lock index b9892041..d0ef1a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -986,7 +986,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "1.0.17" +version = "1.0.18" dependencies = [ "anyhow", "bytes", @@ -1059,7 +1059,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "1.0.17" +version = "1.0.18" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1090,7 +1090,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "1.0.17" +version = "1.0.18" dependencies = [ "bytesize", "bytesize-serde", @@ -1120,7 +1120,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "1.0.17" +version = "1.0.18" dependencies = [ "headers 0.4.1", "hyper 1.6.0", @@ -1138,7 +1138,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "1.0.17" +version = "1.0.18" dependencies = [ "anyhow", "clap", @@ -1155,7 +1155,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "1.0.17" +version = "1.0.18" dependencies = [ "bincode", "bytes", @@ -1182,7 +1182,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "1.0.17" +version = "1.0.18" dependencies = [ "base64 0.22.1", "bytesize", @@ -1593,7 +1593,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "1.0.17" +version = "1.0.18" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 99e20c44..7eed81ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "1.0.17" +version = "1.0.18" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "1.0.17" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.17" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.17" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.17" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.17" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.17" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.17" } +dragonfly-client = { path = "dragonfly-client", version = "1.0.18" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.18" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.18" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.18" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.18" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.18" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.18" } dragonfly-api = "=2.1.62" thiserror = "2.0" futures = "0.3.31" diff --git a/dragonfly-client-backend/src/object_storage.rs b/dragonfly-client-backend/src/object_storage.rs index 9b7bc8c2..4626a1ab 100644 --- a/dragonfly-client-backend/src/object_storage.rs +++ b/dragonfly-client-backend/src/object_storage.rs @@ -926,7 +926,7 @@ mod tests { assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "backend error s3 need object_storage parameter" + "backend error: s3 need object_storage parameter" ) } @@ -935,28 +935,28 @@ mod tests { let test_cases = vec![ ( ObjectStorageInfo::default(), - "backend error s3 need access_key_id, access_key_secret, region", + "backend error: s3 need access_key_id, access_key_secret, region", ), ( ObjectStorageInfo { access_key_id: Some("access_key_id".into()), ..Default::default() }, - "backend error s3 need access_key_secret, region", + "backend error: s3 need access_key_secret, region", ), ( ObjectStorageInfo { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error s3 need access_key_id, region", + "backend error: s3 need access_key_id, region", ), ( ObjectStorageInfo { region: Some("test-region".into()), ..Default::default() }, - "backend error s3 need access_key_id, access_key_secret", + "backend error: s3 need access_key_id, access_key_secret", ), ( ObjectStorageInfo { @@ -964,7 +964,7 @@ mod tests { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error s3 need region", + "backend error: s3 need region", ), ( ObjectStorageInfo { @@ -972,7 +972,7 @@ mod tests { region: Some("test-region".into()), ..Default::default() }, - "backend error s3 need access_key_secret", + "backend error: s3 need access_key_secret", ), ( ObjectStorageInfo { @@ -980,7 +980,7 @@ mod tests { region: Some("test-region".into()), ..Default::default() }, - "backend error s3 need access_key_id", + "backend error: s3 need access_key_id", ), ]; @@ -1004,28 +1004,28 @@ mod tests { let test_cases = vec![ ( ObjectStorageInfo::default(), - "backend error abs need access_key_id, access_key_secret, endpoint", + "backend error: abs need access_key_id, access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_id: Some("access_key_id".into()), ..Default::default() }, - "backend error abs need access_key_secret, endpoint", + "backend error: abs need access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error abs need access_key_id, endpoint", + "backend error: abs need access_key_id, endpoint", ), ( ObjectStorageInfo { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error abs need access_key_id, access_key_secret", + "backend error: abs need access_key_id, access_key_secret", ), ( ObjectStorageInfo { @@ -1033,7 +1033,7 @@ mod tests { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error abs need endpoint", + "backend error: abs need endpoint", ), ( ObjectStorageInfo { @@ -1041,7 +1041,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error abs need access_key_secret", + "backend error: abs need access_key_secret", ), ( ObjectStorageInfo { @@ -1049,7 +1049,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error abs need access_key_id", + "backend error: abs need access_key_id", ), ]; @@ -1073,28 +1073,28 @@ mod tests { let test_cases = vec![ ( ObjectStorageInfo::default(), - "backend error oss need access_key_id, access_key_secret, endpoint", + "backend error: oss need access_key_id, access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_id: Some("access_key_id".into()), ..Default::default() }, - "backend error oss need access_key_secret, endpoint", + "backend error: oss need access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error oss need access_key_id, endpoint", + "backend error: oss need access_key_id, endpoint", ), ( ObjectStorageInfo { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error oss need access_key_id, access_key_secret", + "backend error: oss need access_key_id, access_key_secret", ), ( ObjectStorageInfo { @@ -1102,7 +1102,7 @@ mod tests { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error oss need endpoint", + "backend error: oss need endpoint", ), ( ObjectStorageInfo { @@ -1110,7 +1110,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error oss need access_key_secret", + "backend error: oss need access_key_secret", ), ( ObjectStorageInfo { @@ -1118,7 +1118,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error oss need access_key_id", + "backend error: oss need access_key_id", ), ]; @@ -1142,28 +1142,28 @@ mod tests { let test_cases = vec![ ( ObjectStorageInfo::default(), - "backend error obs need access_key_id, access_key_secret, endpoint", + "backend error: obs need access_key_id, access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_id: Some("access_key_id".into()), ..Default::default() }, - "backend error obs need access_key_secret, endpoint", + "backend error: obs need access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error obs need access_key_id, endpoint", + "backend error: obs need access_key_id, endpoint", ), ( ObjectStorageInfo { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error obs need access_key_id, access_key_secret", + "backend error: obs need access_key_id, access_key_secret", ), ( ObjectStorageInfo { @@ -1171,7 +1171,7 @@ mod tests { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error obs need endpoint", + "backend error: obs need endpoint", ), ( ObjectStorageInfo { @@ -1179,7 +1179,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error obs need access_key_secret", + "backend error: obs need access_key_secret", ), ( ObjectStorageInfo { @@ -1187,7 +1187,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error obs need access_key_id", + "backend error: obs need access_key_id", ), ]; @@ -1211,28 +1211,28 @@ mod tests { let test_cases = vec![ ( ObjectStorageInfo::default(), - "backend error cos need access_key_id, access_key_secret, endpoint", + "backend error: cos need access_key_id, access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_id: Some("access_key_id".into()), ..Default::default() }, - "backend error cos need access_key_secret, endpoint", + "backend error: cos need access_key_secret, endpoint", ), ( ObjectStorageInfo { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error cos need access_key_id, endpoint", + "backend error: cos need access_key_id, endpoint", ), ( ObjectStorageInfo { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error cos need access_key_id, access_key_secret", + "backend error: cos need access_key_id, access_key_secret", ), ( ObjectStorageInfo { @@ -1240,7 +1240,7 @@ mod tests { access_key_secret: Some("access_key_secret".into()), ..Default::default() }, - "backend error cos need endpoint", + "backend error: cos need endpoint", ), ( ObjectStorageInfo { @@ -1248,7 +1248,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error cos need access_key_secret", + "backend error: cos need access_key_secret", ), ( ObjectStorageInfo { @@ -1256,7 +1256,7 @@ mod tests { endpoint: Some("test-endpoint.local".into()), ..Default::default() }, - "backend error cos need access_key_id", + "backend error: cos need access_key_id", ), ]; diff --git a/dragonfly-client-core/src/error/errors.rs b/dragonfly-client-core/src/error/errors.rs index 8855f1fc..5687bbfb 100644 --- a/dragonfly-client-core/src/error/errors.rs +++ b/dragonfly-client-core/src/error/errors.rs @@ -159,7 +159,7 @@ impl OrErr for Result { /// BackendError is the error for backend. #[derive(Debug, thiserror::Error)] -#[error("backend error {message}")] +#[error("backend error: {message}")] pub struct BackendError { /// message is the error message. pub message: String, diff --git a/dragonfly-client-util/src/fs/mod.rs b/dragonfly-client-util/src/fs/mod.rs index ea1f404d..f7e97f70 100644 --- a/dragonfly-client-util/src/fs/mod.rs +++ b/dragonfly-client-util/src/fs/mod.rs @@ -16,6 +16,8 @@ use dragonfly_client_core::Result; use tokio::fs; + +#[cfg(target_os = "linux")] use tracing::warn; /// fallocate allocates the space for the file and fills it with zero, only on Linux. diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index f696b817..fac3b617 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -201,7 +201,7 @@ struct Args { short = 'I', long = "include-files", required = false, - help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files='file.txt' --include-files='subdir/file.txt' --include-files='subdir/dir/'" + help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files file.txt --include-files subdir/file.txt --include-files subdir/dir/" )] include_files: Option>, @@ -264,7 +264,7 @@ struct Args { #[arg( long, - default_value_t = 5, + default_value_t = 1, help = "Specify the max count of concurrent download files when downloading a directory" )] max_concurrent_requests: usize, @@ -709,23 +709,12 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re entry_args.url = entry_url; let progress_bar = multi_progress_bar.add(ProgressBar::new(0)); - async fn download_entry( - args: Args, - progress_bar: ProgressBar, - download_client: DfdaemonDownloadClient, - semaphore: Arc, - ) -> Result<()> { - // Limit the concurrent download tasks. - let _permit = semaphore.acquire().await.unwrap(); - download(args, progress_bar, download_client).await - } - - join_set.spawn(download_entry( - entry_args, - progress_bar, - download_client.clone(), - semaphore.clone(), - )); + let download_client = download_client.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + let _permit = permit; + download(entry_args, progress_bar, download_client).await + }); } } @@ -740,7 +729,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re Ok(_) => continue, Err(err) => { error!("download entry failed: {}", err); - join_set.abort_all(); + join_set.shutdown().await; return Err(err); } } diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index be6696f2..10d9d6fa 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -272,7 +272,7 @@ impl PersistentCacheTask { finished_pieces.push(metadata.clone()); } Err(err) => { - join_set.detach_all(); + join_set.shutdown().await; // Delete the persistent cache task. self.storage @@ -1042,8 +1042,6 @@ impl PersistentCacheTask { let semaphore = Arc::new(Semaphore::new( self.config.download.concurrent_piece_count as usize, )); - - // Download the pieces from the parents. while let Some(collect_piece) = piece_collector_rx.recv().await { if interrupt.load(Ordering::SeqCst) { // If the interrupt is true, break the collector loop. @@ -1061,15 +1059,11 @@ impl PersistentCacheTask { need_piece_content: bool, parent: piece_collector::CollectedParent, piece_manager: Arc, - semaphore: Arc, download_progress_tx: Sender>, in_stream_tx: Sender, interrupt: Arc, finished_pieces: Arc>>, ) -> ClientResult { - // Limit the concurrent download count. - let _permit = semaphore.acquire().await.unwrap(); - let piece_id = piece_manager.persistent_cache_id(task_id.as_str(), number); info!( "start to download persistent cache piece {} from parent {:?}", @@ -1203,24 +1197,34 @@ impl PersistentCacheTask { Ok(metadata) } - join_set.spawn( + let task_id = task_id.to_string(); + let host_id = host_id.to_string(); + let peer_id = peer_id.to_string(); + let piece_manager = self.piece.clone(); + let download_progress_tx = download_progress_tx.clone(); + let in_stream_tx = in_stream_tx.clone(); + let interrupt = interrupt.clone(); + let finished_pieces = finished_pieces.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + let _permit = permit; download_from_parent( - task.id.clone(), - host_id.to_string(), - peer_id.to_string(), + task_id, + host_id, + peer_id, collect_piece.number, collect_piece.length, need_piece_content, collect_piece.parent.clone(), - self.piece.clone(), - semaphore.clone(), - download_progress_tx.clone(), - in_stream_tx.clone(), - interrupt.clone(), - finished_pieces.clone(), + piece_manager, + download_progress_tx, + in_stream_tx, + interrupt, + finished_pieces, ) - .in_current_span(), - ); + .in_current_span() + .await + }); } // Wait for the pieces to be downloaded. @@ -1233,7 +1237,7 @@ impl PersistentCacheTask { match message { Ok(_) => {} Err(Error::DownloadFromParentFailed(err)) => { - join_set.detach_all(); + join_set.shutdown().await; // Send the download piece failed request. let (piece_number, parent_id) = (err.piece_number, err.parent_id.clone()); @@ -1263,14 +1267,14 @@ impl PersistentCacheTask { return Err(Error::DownloadFromParentFailed(err)); } Err(Error::SendTimeout) => { - join_set.detach_all(); + join_set.shutdown().await; // If the send timeout with scheduler or download progress, return the error // and interrupt the collector. return Err(Error::SendTimeout); } Err(err) => { - join_set.detach_all(); + join_set.shutdown().await; error!("download from parent error: {:?}", err); return Err(err); } diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 9d0ebdc0..282b87f6 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -290,7 +290,7 @@ impl PieceCollector { // If all pieces are collected, abort all tasks. if collected_pieces.is_empty() { info!("all pieces are collected, abort all tasks"); - join_set.abort_all(); + join_set.shutdown().await; } } Ok(Err(err)) => { @@ -550,7 +550,7 @@ impl PersistentCachePieceCollector { // If all pieces are collected, abort all tasks. if collected_pieces.is_empty() { info!("all persistent cache pieces are collected, abort all tasks"); - join_set.abort_all(); + join_set.shutdown().await; } } Ok(Err(err)) => { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index fb2cd463..bb5bbca1 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -1012,8 +1012,6 @@ impl Task { let semaphore = Arc::new(Semaphore::new( self.config.download.concurrent_piece_count as usize, )); - - // Download the pieces from the parents. while let Some(collect_piece) = piece_collector_rx.recv().await { if interrupt.load(Ordering::SeqCst) { // If the interrupt is true, break the collector loop. @@ -1030,7 +1028,6 @@ impl Task { length: u64, parent: piece_collector::CollectedParent, piece_manager: Arc, - semaphore: Arc, download_progress_tx: Sender>, in_stream_tx: Sender, interrupt: Arc, @@ -1038,9 +1035,6 @@ impl Task { is_prefetch: bool, need_piece_content: bool, ) -> ClientResult { - // Limit the concurrent piece count. - let _permit = semaphore.acquire().await.unwrap(); - let piece_id = piece_manager.id(task_id.as_str(), number); info!( "start to download piece {} from parent {:?}", @@ -1174,25 +1168,35 @@ impl Task { Ok(metadata) } - join_set.spawn( + let task_id = task_id.to_string(); + let host_id = host_id.to_string(); + let peer_id = peer_id.to_string(); + let piece_manager = self.piece.clone(); + let download_progress_tx = download_progress_tx.clone(); + let in_stream_tx = in_stream_tx.clone(); + let interrupt = interrupt.clone(); + let finished_pieces = finished_pieces.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + let _permit = permit; download_from_parent( - task_id.to_string(), - host_id.to_string(), - peer_id.to_string(), + task_id, + host_id, + peer_id, collect_piece.number, collect_piece.length, collect_piece.parent.clone(), - self.piece.clone(), - semaphore.clone(), - download_progress_tx.clone(), - in_stream_tx.clone(), - interrupt.clone(), - finished_pieces.clone(), + piece_manager, + download_progress_tx, + in_stream_tx, + interrupt, + finished_pieces, is_prefetch, need_piece_content, ) - .in_current_span(), - ); + .in_current_span() + .await + }); } // Wait for the pieces to be downloaded. @@ -1240,7 +1244,7 @@ impl Task { continue; } Err(Error::SendTimeout) => { - join_set.detach_all(); + join_set.shutdown().await; // If the send timeout with scheduler or download progress, return the finished pieces. // It will stop the download from the parent with scheduler @@ -1304,15 +1308,11 @@ impl Task { is_prefetch: bool, need_piece_content: bool, piece_manager: Arc, - semaphore: Arc, download_progress_tx: Sender>, in_stream_tx: Sender, object_storage: Option, hdfs: Option, ) -> ClientResult { - // Limit the concurrent download count. - let _permit = semaphore.acquire().await.unwrap(); - let piece_id = piece_manager.id(task_id.as_str(), number); info!("start to download piece {} from source", piece_id); @@ -1418,27 +1418,39 @@ impl Task { Ok(metadata) } - join_set.spawn( + let task_id = task_id.to_string(); + let host_id = host_id.to_string(); + let peer_id = peer_id.to_string(); + let url = request.url.clone(); + let request_header = request_header.clone(); + let piece_manager = self.piece.clone(); + let download_progress_tx = download_progress_tx.clone(); + let in_stream_tx = in_stream_tx.clone(); + let object_storage = request.object_storage.clone(); + let hdfs = request.hdfs.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + let _permit = permit; download_from_source( - task_id.to_string(), - host_id.to_string(), - peer_id.to_string(), + task_id, + host_id, + peer_id, interested_piece.number, - request.url.clone(), + url, interested_piece.offset, interested_piece.length, - request_header.clone(), + request_header, request.is_prefetch, request.need_piece_content, - self.piece.clone(), - semaphore.clone(), - download_progress_tx.clone(), - in_stream_tx.clone(), - request.object_storage.clone(), - request.hdfs.clone(), + piece_manager, + download_progress_tx, + in_stream_tx, + object_storage, + hdfs, ) - .in_current_span(), - ); + .in_current_span() + .await + }); } // Wait for the pieces to be downloaded. @@ -1454,7 +1466,7 @@ impl Task { finished_pieces.push(metadata.clone()); } Err(Error::BackendError(err)) => { - join_set.detach_all(); + join_set.shutdown().await; // Send the download piece http failed request. in_stream_tx.send_timeout(AnnouncePeerRequest { @@ -1483,7 +1495,7 @@ impl Task { return Err(Error::BackendError(err)); } Err(Error::SendTimeout) => { - join_set.detach_all(); + join_set.shutdown().await; // Send the download piece failed request. in_stream_tx.send_timeout(AnnouncePeerRequest { @@ -1510,7 +1522,7 @@ impl Task { return Ok(finished_pieces); } Err(err) => { - join_set.detach_all(); + join_set.shutdown().await; // Send the download piece failed request. in_stream_tx.send_timeout(AnnouncePeerRequest { @@ -1684,8 +1696,7 @@ impl Task { let semaphore = Arc::new(Semaphore::new( self.config.download.concurrent_piece_count as usize, )); - - for interested_piece in &interested_pieces { + for interested_piece in interested_pieces.clone() { async fn download_from_source( task_id: String, host_id: String, @@ -1698,14 +1709,10 @@ impl Task { is_prefetch: bool, need_piece_content: bool, piece_manager: Arc, - semaphore: Arc, download_progress_tx: Sender>, object_storage: Option, hdfs: Option, ) -> ClientResult { - // Limit the concurrent download count. - let _permit = semaphore.acquire().await.unwrap(); - let piece_id = piece_manager.id(task_id.as_str(), number); info!("start to download piece {} from source", piece_id); @@ -1790,26 +1797,37 @@ impl Task { Ok(metadata) } - join_set.spawn( + let task_id = task_id.to_string(); + let host_id = host_id.to_string(); + let peer_id = peer_id.to_string(); + let url = request.url.clone(); + let request_header = request_header.clone(); + let piece_manager = self.piece.clone(); + let download_progress_tx = download_progress_tx.clone(); + let object_storage = request.object_storage.clone(); + let hdfs = request.hdfs.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(async move { + let _permit = permit; download_from_source( - task_id.to_string(), - host_id.to_string(), - peer_id.to_string(), + task_id, + host_id, + peer_id, interested_piece.number, - request.url.clone(), + url, interested_piece.offset, interested_piece.length, - request_header.clone(), + request_header, request.is_prefetch, request.need_piece_content, - self.piece.clone(), - semaphore.clone(), - download_progress_tx.clone(), - request.object_storage.clone(), - request.hdfs.clone(), + piece_manager, + download_progress_tx, + object_storage, + hdfs, ) - .in_current_span(), - ); + .in_current_span() + .await + }); } // Wait for the pieces to be downloaded. @@ -1825,7 +1843,7 @@ impl Task { finished_pieces.push(metadata.clone()); } Err(err) => { - join_set.detach_all(); + join_set.shutdown().await; // If the download failed from the source, return the error. // It will stop the download from the source.