From 3d91c09b0a03f3b199d2c82dcaca48c82ef538fc Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 3 Jan 2025 17:15:50 +0800 Subject: [PATCH] feat: support transfer piece content from dfdaemon (#926) Signed-off-by: Gaius --- Cargo.lock | 36 ++--- Cargo.toml | 2 +- dragonfly-client-backend/src/http.rs | 6 +- dragonfly-client-backend/src/lib.rs | 3 +- .../src/object_storage.rs | 6 +- dragonfly-client-config/src/dfcache.rs | 4 + dragonfly-client-config/src/dfget.rs | 4 + dragonfly-client-init/src/bin/main.rs | 6 +- dragonfly-client-storage/src/content.rs | 102 +++++-------- dragonfly-client/src/bin/dfcache/export.rs | 137 ++++++++++++----- dragonfly-client/src/bin/dfcache/import.rs | 36 ++--- dragonfly-client/src/bin/dfcache/remove.rs | 28 ++-- dragonfly-client/src/bin/dfcache/stat.rs | 28 ++-- dragonfly-client/src/bin/dfdaemon/main.rs | 26 ++-- dragonfly-client/src/bin/dfget/main.rs | 137 ++++++++++++----- dragonfly-client/src/bin/dfstore/main.rs | 2 +- .../src/grpc/dfdaemon_download.rs | 30 ++-- dragonfly-client/src/grpc/dfdaemon_upload.rs | 49 +++--- dragonfly-client/src/grpc/health.rs | 9 +- dragonfly-client/src/grpc/manager.rs | 6 +- dragonfly-client/src/grpc/mod.rs | 3 +- dragonfly-client/src/grpc/scheduler.rs | 15 +- dragonfly-client/src/proxy/mod.rs | 4 +- .../src/resource/persistent_cache_task.rs | 96 ++++++++---- dragonfly-client/src/resource/piece.rs | 8 +- .../src/resource/piece_collector.rs | 9 +- dragonfly-client/src/resource/task.rs | 140 +++++++++++++----- 27 files changed, 561 insertions(+), 371 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3d12f8d..2076bc31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,11 +908,11 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.177" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0578dcfe9837e01a7ead547c67aca6e857d45d8ab171c370a5a272f1710a9e9" +checksum = "da4ce0486d3b9c5a8cfa137f9180807d2079aee1b86464d28847e548dc8c3f9c" dependencies = [ - "prost 0.13.3", + "prost 0.13.4", "prost-types", "prost-wkt-types", "serde", @@ -3239,12 +3239,12 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" dependencies = [ "bytes", - "prost-derive 0.13.3", + "prost-derive 0.13.4", ] [[package]] @@ -3261,7 +3261,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost 0.13.3", + "prost 0.13.4", "prost-types", "regex", "syn 2.0.90", @@ -3283,9 +3283,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", "itertools 0.11.0", @@ -3296,11 +3296,11 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +checksum = "cc2f1e56baa61e93533aebc21af4d2134b70f66275e0fcdf3cbe43d77ff7e8fc" dependencies = [ - "prost 0.13.3", + "prost 0.13.4", ] [[package]] @@ -3311,7 +3311,7 @@ checksum = "a8d84e2bee181b04c2bac339f2bfe818c46a99750488cc6728ce4181d5aa8299" dependencies = [ "chrono", "inventory", - "prost 0.13.3", + "prost 0.13.4", "serde", "serde_derive", "serde_json", @@ -3325,7 +3325,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a669d5acbe719010c6f62a64e6d7d88fdedc1fe46e419747949ecb6312e9b14" dependencies = [ "heck 0.5.0", - "prost 0.13.3", + "prost 0.13.4", "prost-build", "prost-types", "quote", @@ -3338,7 +3338,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01ef068e9b82e654614b22e6b13699bd545b6c0e2e721736008b00b38aeb4f64" dependencies = [ "chrono", - "prost 0.13.3", + "prost 0.13.4", "prost-build", "prost-types", "prost-wkt", @@ -4729,7 +4729,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost 0.13.3", + "prost 0.13.4", "rustls-pemfile 2.2.0", "socket2", "tokio", @@ -4761,7 +4761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1eaf34ddb812120f5c601162d5429933c9b527d901ab0e7f930d3147e33a09b2" dependencies = [ "async-stream", - "prost 0.13.3", + "prost 0.13.4", "tokio", "tokio-stream", "tonic", @@ -4773,7 +4773,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "878d81f52e7fcfd80026b7fdb6a9b578b3c3653ba987f87f0dce4b64043cba27" dependencies = [ - "prost 0.13.3", + "prost 0.13.4", "prost-types", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 9e53d95d..3175d55f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.0 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.0" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.0" } thiserror = "1.0" -dragonfly-api = "=2.0.177" +dragonfly-api = "=2.1.3" reqwest = { version = "0.12.4", features = [ "stream", "native-tls", diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 7b089a46..61c02b2c 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -144,12 +144,11 @@ impl super::Backend for HTTP { .timeout(request.timeout) .send() .await - .map_err(|err| { + .inspect_err(|err| { error!( "head request failed {} {}: {}", request.task_id, request.url, err ); - err })?; let header = response.headers().clone(); @@ -190,12 +189,11 @@ impl super::Backend for HTTP { .timeout(request.timeout) .send() .await - .map_err(|err| { + .inspect_err(|err| { error!( "get request failed {} {} {}: {}", request.task_id, request.piece_id, request.url, err ); - err })?; let header = response.headers().clone(); diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index c002ab7a..934b6faf 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -234,9 +234,8 @@ impl BackendFactory { if let Some(plugin_dir) = plugin_dir { backend_factory .load_plugin_backends(plugin_dir) - .map_err(|err| { + .inspect_err(|err| { error!("failed to load plugin backends: {}", err); - err })?; } diff --git a/dragonfly-client-backend/src/object_storage.rs b/dragonfly-client-backend/src/object_storage.rs index 2386d0c9..ae2a5221 100644 --- a/dragonfly-client-backend/src/object_storage.rs +++ b/dragonfly-client-backend/src/object_storage.rs @@ -505,12 +505,11 @@ impl crate::Backend for ObjectStorage { .url .parse() .map_err(|_| ClientError::InvalidURI(request.url.clone()))?; - let parsed_url: super::object_storage::ParsedURL = url.try_into().map_err(|err| { + let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| { error!( "parse head request url failed {} {}: {}", request.task_id, request.url, err ); - err })?; // Initialize the operator with the parsed URL, object storage, and timeout. @@ -594,12 +593,11 @@ impl crate::Backend for ObjectStorage { .url .parse() .map_err(|_| ClientError::InvalidURI(request.url.clone()))?; - let parsed_url: super::object_storage::ParsedURL = url.try_into().map_err(|err| { + let parsed_url: super::object_storage::ParsedURL = url.try_into().inspect_err(|err| { error!( "parse get request url failed {} {}: {}", request.piece_id, request.url, err ); - err })?; // Initialize the operator with the parsed URL, object storage, and timeout. diff --git a/dragonfly-client-config/src/dfcache.rs b/dragonfly-client-config/src/dfcache.rs index 021e03e0..da7d227b 100644 --- a/dragonfly-client-config/src/dfcache.rs +++ b/dragonfly-client-config/src/dfcache.rs @@ -19,6 +19,10 @@ use std::path::PathBuf; /// NAME is the name of dfcache. pub const NAME: &str = "dfcache"; +// DEFAULT_OUTPUT_FILE_MODE defines the default file mode for output files when downloading with dfcache +// using the `--transfer-from-dfdaemon=true` option. +pub const DEFAULT_OUTPUT_FILE_MODE: u32 = 0o644; + /// default_dfcache_log_dir is the default log directory for dfcache. #[inline] pub fn default_dfcache_log_dir() -> PathBuf { diff --git a/dragonfly-client-config/src/dfget.rs b/dragonfly-client-config/src/dfget.rs index 2e87dbde..bc2a8961 100644 --- a/dragonfly-client-config/src/dfget.rs +++ b/dragonfly-client-config/src/dfget.rs @@ -19,6 +19,10 @@ use std::path::PathBuf; /// NAME is the name of dfget. pub const NAME: &str = "dfget"; +// DEFAULT_OUTPUT_FILE_MODE defines the default file mode for output files when downloading with dfget +// using the `--transfer-from-dfdaemon=true` option. +pub const DEFAULT_OUTPUT_FILE_MODE: u32 = 0o644; + /// default_dfget_log_dir is the default log directory for dfget. pub fn default_dfget_log_dir() -> PathBuf { crate::default_log_dir().join(NAME) diff --git a/dragonfly-client-init/src/bin/main.rs b/dragonfly-client-init/src/bin/main.rs index e23a1f59..249b31b7 100644 --- a/dragonfly-client-init/src/bin/main.rs +++ b/dragonfly-client-init/src/bin/main.rs @@ -99,16 +99,14 @@ async fn main() -> Result<(), anyhow::Error> { ); // Load config. - let config = dfinit::Config::load(&args.config).map_err(|err| { + let config = dfinit::Config::load(&args.config).inspect_err(|err| { error!("failed to load config: {}", err); - err })?; // Handle features of the container runtime. let container_runtime = container_runtime::ContainerRuntime::new(&config); - container_runtime.run().await.map_err(|err| { + container_runtime.run().await.inspect_err(|err| { error!("failed to run container runtime: {}", err); - err })?; Ok(()) diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index d6bc74e8..006710da 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -100,9 +100,8 @@ impl Content { // ensure the file exists. if range.length == 0 { info!("range length is 0, no need to copy"); - File::create(to).await.map_err(|err| { + File::create(to).await.inspect_err(|err| { error!("create {:?} failed: {}", to, err); - err })?; return Ok(()); @@ -110,9 +109,8 @@ impl Content { self.copy_task_by_range(task.id.as_str(), to, range) .await - .map_err(|err| { + .inspect_err(|err| { error!("copy range {:?} to {:?} failed: {}", task_path, to, err); - err })?; info!("copy range {:?} to {:?} success", task_path, to); @@ -131,18 +129,18 @@ impl Content { // ensure the file exists. if task.is_empty() { info!("task is empty, no need to copy"); - File::create(to).await.map_err(|err| { + File::create(to).await.inspect_err(|err| { error!("create {:?} failed: {}", to, err); - err })?; return Ok(()); } - self.copy_task(task.id.as_str(), to).await.map_err(|err| { - error!("copy {:?} to {:?} failed: {}", task_path, to, err); - err - })?; + self.copy_task(task.id.as_str(), to) + .await + .inspect_err(|err| { + error!("copy {:?} to {:?} failed: {}", task_path, to, err); + })?; info!("copy {:?} to {:?} success", task_path, to); return Ok(()); @@ -165,9 +163,8 @@ impl Content { // Ensure the parent directory of the destination exists. if let Some(parent) = to.parent() { if !parent.exists() { - fs::create_dir_all(parent).await.map_err(|err| { + fs::create_dir_all(parent).await.inspect_err(|err| { error!("failed to create directory {:?}: {}", parent, err); - err })?; } } @@ -182,9 +179,8 @@ impl Content { // Ensure the parent directory of the destination exists. if let Some(parent) = to.parent() { if !parent.exists() { - fs::create_dir_all(parent).await.map_err(|err| { + fs::create_dir_all(parent).await.inspect_err(|err| { error!("failed to create directory {:?}: {}", parent, err); - err })?; } } @@ -212,18 +208,16 @@ impl Content { #[instrument(skip_all)] pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result { let task_path = self.get_task_path(task_id); - let from_f = File::open(task_path.as_path()).await.map_err(|err| { + let from_f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, from_f); f_reader .seek(SeekFrom::Start(range.start)) .await - .map_err(|err| { + .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; let range_reader = f_reader.take(range.length); @@ -235,10 +229,11 @@ impl Content { pub async fn delete_task(&self, task_id: &str) -> Result<()> { info!("delete task content: {}", task_id); let task_path = self.get_task_path(task_id); - fs::remove_file(task_path.as_path()).await.map_err(|err| { - error!("remove {:?} failed: {}", task_path, err); - err - })?; + fs::remove_file(task_path.as_path()) + .await + .inspect_err(|err| { + error!("remove {:?} failed: {}", task_path, err); + })?; Ok(()) } @@ -256,18 +251,16 @@ impl Content { // Calculate the target offset and length based on the range. let (target_offset, target_length) = calculate_piece_range(offset, length, range); - let f = File::open(task_path.as_path()).await.map_err(|err| { + let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); f_reader .seek(SeekFrom::Start(target_offset)) .await - .map_err(|err| { + .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; Ok(f_reader.take(target_length)) @@ -288,34 +281,30 @@ impl Content { // Calculate the target offset and length based on the range. let (target_offset, target_length) = calculate_piece_range(offset, length, range); - let f = File::open(task_path.as_path()).await.map_err(|err| { + let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); f_range_reader .seek(SeekFrom::Start(target_offset)) .await - .map_err(|err| { + .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; let range_reader = f_range_reader.take(target_length); // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.map_err(|err| { + let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); f_reader .seek(SeekFrom::Start(offset)) .await - .map_err(|err| { + .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; let reader = f_reader.take(length); @@ -339,14 +328,12 @@ impl Content { .write(true) .open(task_path.as_path()) .await - .map_err(|err| { + .inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; - f.seek(SeekFrom::Start(offset)).await.map_err(|err| { + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; // Copy the piece to the file while updating the CRC32C value. @@ -403,21 +390,18 @@ impl Content { .write(true) .open(task_path.as_path()) .await - .map_err(|err| { + .inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; - f.seek(SeekFrom::Start(offset)).await.map_err(|err| { + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); - err })?; // Copy the piece to the file. let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); - let length = io::copy(&mut tee, &mut writer).await.map_err(|err| { + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { error!("copy {:?} failed: {}", task_path, err); - err })?; // Calculate the hash of the piece. @@ -441,9 +425,8 @@ impl Content { #[instrument(skip_all)] async fn create_or_get_task_path(&self, task_id: &str) -> Result { let task_dir = self.dir.join(DEFAULT_TASK_DIR).join(&task_id[..3]); - fs::create_dir_all(&task_dir).await.map_err(|err| { + fs::create_dir_all(&task_dir).await.inspect_err(|err| { error!("create {:?} failed: {}", task_dir, err); - err })?; Ok(task_dir.join(task_id)) @@ -459,9 +442,8 @@ impl Content { // Ensure the parent directory of the destination exists. if let Some(parent) = to.parent() { if !parent.exists() { - fs::create_dir_all(parent).await.map_err(|err| { + fs::create_dir_all(parent).await.inspect_err(|err| { error!("failed to create directory {:?}: {}", parent, err); - err })?; } } @@ -481,18 +463,18 @@ impl Content { // ensure the file exists. if task.is_empty() { info!("persistent cache task is empty, no need to copy"); - File::create(to).await.map_err(|err| { + File::create(to).await.inspect_err(|err| { error!("create {:?} failed: {}", to, err); - err })?; return Ok(()); } - self.copy_task(task.id.as_str(), to).await.map_err(|err| { - error!("copy {:?} to {:?} failed: {}", task_path, to, err); - err - })?; + self.copy_task(task.id.as_str(), to) + .await + .inspect_err(|err| { + error!("copy {:?} to {:?} failed: {}", task_path, to, err); + })?; info!("copy {:?} to {:?} success", task_path, to); return Ok(()); @@ -516,14 +498,12 @@ impl Content { .create_or_get_persistent_cache_task_path(task_id) .await?; let to_f = OpenOptions::new() - .create(true) - .truncate(true) + .create_new(true) .write(true) .open(task_path.as_path()) .await - .map_err(|err| { + .inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); - err })?; // Copy the content to the file while updating the CRC32C value. @@ -558,9 +538,8 @@ impl Content { let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id); fs::remove_file(persistent_cache_task_path.as_path()) .await - .map_err(|err| { + .inspect_err(|err| { error!("remove {:?} failed: {}", persistent_cache_task_path, err); - err })?; Ok(()) } @@ -584,9 +563,8 @@ impl Content { .join(DEFAULT_PERSISTENT_CACHE_TASK_DIR) .join(&task_id[..3]); - fs::create_dir_all(&task_dir).await.map_err(|err| { + fs::create_dir_all(&task_dir).await.inspect_err(|err| { error!("create {:?} failed: {}", task_dir, err); - err })?; Ok(task_dir.join(task_id)) diff --git a/dragonfly-client/src/bin/dfcache/export.rs b/dragonfly-client/src/bin/dfcache/export.rs index 79218525..dae650e0 100644 --- a/dragonfly-client/src/bin/dfcache/export.rs +++ b/dragonfly-client/src/bin/dfcache/export.rs @@ -29,7 +29,9 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use std::{cmp::min, fmt::Write}; use termion::{color, style}; -use tracing::{error, info}; +use tokio::fs::{self, OpenOptions}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tracing::{debug, error, info}; use super::*; @@ -39,6 +41,13 @@ pub struct ExportCommand { #[arg(help = "Specify the persistent cache task ID to export")] id: String, + #[arg( + long = "transfer-from-dfdaemon", + default_value_t = false, + help = "Specify whether to transfer the content of downloading file from dfdaemon's unix domain socket. If it is true, dfcache will call dfdaemon to download the file, and dfdaemon will return the content of downloading file to dfcache via unix domain socket, and dfcache will copy the content to the output path. If it is false, dfdaemon will download the file and hardlink or copy the file to the output path." + )] + transfer_from_dfdaemon: bool, + #[arg( long = "application", default_value = "", @@ -75,7 +84,7 @@ impl ExportCommand { pub async fn execute(&self, endpoint: &Path) -> Result<()> { // Validate the command line arguments. if let Err(err) = self.validate_args() { - eprintln!( + println!( "{}{}{}Validating Failed!{}", color::Fg(color::Red), style::Italic, @@ -83,7 +92,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -91,7 +100,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -100,7 +109,7 @@ impl ExportCommand { err, ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -116,7 +125,7 @@ impl ExportCommand { match get_dfdaemon_download_client(endpoint.to_path_buf()).await { Ok(client) => client, Err(err) => { - eprintln!( + println!( "{}{}{}Connect Dfdaemon Failed!{}", color::Fg(color::Red), style::Italic, @@ -124,7 +133,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -132,7 +141,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, @@ -142,7 +151,7 @@ impl ExportCommand { endpoint.to_string_lossy(), ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -160,7 +169,7 @@ impl ExportCommand { Error::TonicStatus(status) => { let details = status.details(); if let Ok(backend_err) = serde_json::from_slice::(details) { - eprintln!( + println!( "{}{}{}Exporting Failed!{}", color::Fg(color::Red), style::Italic, @@ -168,7 +177,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -177,7 +186,7 @@ impl ExportCommand { ); if let Some(status_code) = backend_err.status_code { - eprintln!( + println!( "{}{}{}Bad Status Code:{} {}", color::Fg(color::Red), style::Italic, @@ -187,7 +196,7 @@ impl ExportCommand { ); } - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -197,7 +206,7 @@ impl ExportCommand { ); if !backend_err.header.is_empty() { - eprintln!( + println!( "{}{}{}Header:{}", color::Fg(color::Cyan), style::Italic, @@ -205,11 +214,11 @@ impl ExportCommand { style::Reset ); for (key, value) in backend_err.header.iter() { - eprintln!(" [{}]: {}", key.as_str(), value.as_str()); + println!(" [{}]: {}", key.as_str(), value.as_str()); } } - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -217,7 +226,7 @@ impl ExportCommand { style::Reset ); } else { - eprintln!( + println!( "{}{}{}Exporting Failed!{}", color::Fg(color::Red), style::Italic, @@ -225,7 +234,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -233,7 +242,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, @@ -242,7 +251,7 @@ impl ExportCommand { status.code() ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -252,7 +261,7 @@ impl ExportCommand { ); if !status.details().is_empty() { - eprintln!( + println!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), style::Italic, @@ -262,7 +271,7 @@ impl ExportCommand { ); } - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -272,7 +281,7 @@ impl ExportCommand { } } Error::BackendError(err) => { - eprintln!( + println!( "{}{}{}Exporting Failed!{}", color::Fg(color::Red), style::Italic, @@ -280,7 +289,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -288,7 +297,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -298,7 +307,7 @@ impl ExportCommand { ); if err.header.is_some() { - eprintln!( + println!( "{}{}{}Header:{}", color::Fg(color::Cyan), style::Italic, @@ -306,11 +315,11 @@ impl ExportCommand { style::Reset ); for (key, value) in err.header.unwrap_or_default().iter() { - eprintln!(" [{}]: {}", key.as_str(), value.to_str().unwrap()); + println!(" [{}]: {}", key.as_str(), value.to_str().unwrap()); } } - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -319,7 +328,7 @@ impl ExportCommand { ); } err => { - eprintln!( + println!( "{}{}{}Exporting Failed!{}", color::Fg(color::Red), style::Italic, @@ -327,7 +336,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -335,7 +344,7 @@ impl ExportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -344,7 +353,7 @@ impl ExportCommand { err ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -362,9 +371,16 @@ impl ExportCommand { /// run runs the export command. async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> { - // Get the absolute path of the output file. - let absolute_path = Path::new(&self.output).absolutize()?; - info!("download file to: {}", absolute_path.to_string_lossy()); + // Dfcache needs to notify dfdaemon to transfer the piece content of downloading file via unix domain socket + // when the `transfer_from_dfdaemon` is true. Otherwise, dfdaemon will download the file and hardlink or + // copy the file to the output path. + let (output_path, need_piece_content) = if self.transfer_from_dfdaemon { + (None, true) + } else { + let absolute_path = Path::new(&self.output).absolutize()?; + info!("download file to: {}", absolute_path.to_string_lossy()); + (Some(absolute_path.to_string_lossy().to_string()), false) + }; // Create dfdaemon client. let response = dfdaemon_download_client @@ -375,18 +391,44 @@ impl ExportCommand { persistent: false, tag: Some(self.tag.clone()), application: Some(self.application.clone()), - output_path: absolute_path.to_string_lossy().to_string(), + output_path, timeout: Some( prost_wkt_types::Duration::try_from(self.timeout) .or_err(ErrorType::ParseError)?, ), + need_piece_content, }) .await - .map_err(|err| { + .inspect_err(|err| { error!("download persistent cache task failed: {}", err); - err })?; + // If transfer_from_dfdaemon is true, then dfcache needs to create the output file and write the + // piece content to the output file. + let mut f = if self.transfer_from_dfdaemon { + if let Some(parent) = self.output.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + let f = OpenOptions::new() + .create_new(true) + .write(true) + .mode(dfcache::DEFAULT_OUTPUT_FILE_MODE) + .open(&self.output) + .await + .inspect_err(|err| { + error!("open file {:?} failed: {}", self.output, err); + })?; + + Some(f) + } else { + None + }; + // Initialize progress bar. let pb = ProgressBar::new(0); pb.set_style( @@ -403,9 +445,8 @@ impl ExportCommand { // Download file. let mut downloaded = 0; let mut out_stream = response.into_inner(); - while let Some(message) = out_stream.message().await.map_err(|err| { + while let Some(message) = out_stream.message().await.inspect_err(|err| { error!("get message failed: {}", err); - err })? { match message.response { Some(download_persistent_cache_task_response::Response::DownloadPersistentCacheTaskStartedResponse( @@ -418,6 +459,22 @@ impl ExportCommand { )) => { let piece = response.piece.ok_or(Error::InvalidParameter)?; + // Dfcache needs to write the piece content to the output file. + if let Some(f) = &mut f { + f.seek(SeekFrom::Start(piece.offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", self.output, err); + })?; + + let content = piece.content.ok_or(Error::InvalidParameter)?; + f.write_all(&content).await.inspect_err(|err| { + error!("write {:?} failed: {}", self.output, err); + })?; + + debug!("copy piece {} to {:?} success", piece.number, self.output); + }; + downloaded += piece.length; let position = min(downloaded + piece.length, pb.length().unwrap_or(0)); pb.set_position(position); diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index 3f03752a..9d9d6693 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -81,7 +81,7 @@ impl ImportCommand { pub async fn execute(&self, endpoint: &Path) -> Result<()> { // Validate the command line arguments. if let Err(err) = self.validate_args() { - eprintln!( + println!( "{}{}{}Validating Failed!{}", color::Fg(color::Red), style::Italic, @@ -89,7 +89,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -97,7 +97,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -106,7 +106,7 @@ impl ImportCommand { err, ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -122,7 +122,7 @@ impl ImportCommand { match get_dfdaemon_download_client(endpoint.to_path_buf()).await { Ok(client) => client, Err(err) => { - eprintln!( + println!( "{}{}{}Connect Dfdaemon Failed!{}", color::Fg(color::Red), style::Italic, @@ -130,7 +130,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -138,7 +138,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, @@ -148,7 +148,7 @@ impl ImportCommand { endpoint.to_string_lossy(), ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -164,7 +164,7 @@ impl ImportCommand { if let Err(err) = self.run(dfdaemon_download_client).await { match err { Error::TonicStatus(status) => { - eprintln!( + println!( "{}{}{}Importing Failed!{}", color::Fg(color::Red), style::Italic, @@ -172,7 +172,7 @@ impl ImportCommand { style::Reset, ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -180,7 +180,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, @@ -189,7 +189,7 @@ impl ImportCommand { status.code() ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -198,7 +198,7 @@ impl ImportCommand { status.message() ); - eprintln!( + println!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), style::Italic, @@ -207,7 +207,7 @@ impl ImportCommand { std::str::from_utf8(status.details()).unwrap() ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -216,7 +216,7 @@ impl ImportCommand { ); } err => { - eprintln!( + println!( "{}{}{}Importing Failed!{}", color::Fg(color::Red), style::Italic, @@ -224,7 +224,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -232,7 +232,7 @@ impl ImportCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -241,7 +241,7 @@ impl ImportCommand { err ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, diff --git a/dragonfly-client/src/bin/dfcache/remove.rs b/dragonfly-client/src/bin/dfcache/remove.rs index 17c10503..63430990 100644 --- a/dragonfly-client/src/bin/dfcache/remove.rs +++ b/dragonfly-client/src/bin/dfcache/remove.rs @@ -43,7 +43,7 @@ impl RemoveCommand { match get_dfdaemon_download_client(endpoint.to_path_buf()).await { Ok(client) => client, Err(err) => { - eprintln!( + println!( "{}{}{}Connect Dfdaemon Failed!{}", color::Fg(color::Red), style::Italic, @@ -51,7 +51,7 @@ impl RemoveCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -59,7 +59,7 @@ impl RemoveCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, @@ -69,7 +69,7 @@ impl RemoveCommand { endpoint.to_string_lossy(), ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -85,7 +85,7 @@ impl RemoveCommand { if let Err(err) = self.run(dfdaemon_download_client).await { match err { Error::TonicStatus(status) => { - eprintln!( + println!( "{}{}{}Removing Failed!{}", color::Fg(color::Red), style::Italic, @@ -93,7 +93,7 @@ impl RemoveCommand { style::Reset, ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -101,7 +101,7 @@ impl RemoveCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, @@ -110,7 +110,7 @@ impl RemoveCommand { status.code() ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -119,7 +119,7 @@ impl RemoveCommand { status.message() ); - eprintln!( + println!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), style::Italic, @@ -128,7 +128,7 @@ impl RemoveCommand { std::str::from_utf8(status.details()).unwrap() ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -137,7 +137,7 @@ impl RemoveCommand { ); } err => { - eprintln!( + println!( "{}{}{}Removing Failed!{}", color::Fg(color::Red), style::Italic, @@ -145,7 +145,7 @@ impl RemoveCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -153,7 +153,7 @@ impl RemoveCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -162,7 +162,7 @@ impl RemoveCommand { err ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, diff --git a/dragonfly-client/src/bin/dfcache/stat.rs b/dragonfly-client/src/bin/dfcache/stat.rs index eb8172d0..8059c5a8 100644 --- a/dragonfly-client/src/bin/dfcache/stat.rs +++ b/dragonfly-client/src/bin/dfcache/stat.rs @@ -48,7 +48,7 @@ impl StatCommand { match get_dfdaemon_download_client(endpoint.to_path_buf()).await { Ok(client) => client, Err(err) => { - eprintln!( + println!( "{}{}{}Connect Dfdaemon Failed!{}", color::Fg(color::Red), style::Italic, @@ -56,7 +56,7 @@ impl StatCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -64,7 +64,7 @@ impl StatCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, @@ -74,7 +74,7 @@ impl StatCommand { endpoint.to_string_lossy(), ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -90,7 +90,7 @@ impl StatCommand { if let Err(err) = self.run(dfdaemon_download_client).await { match err { Error::TonicStatus(status) => { - eprintln!( + println!( "{}{}{}Stating Failed!{}", color::Fg(color::Red), style::Italic, @@ -98,7 +98,7 @@ impl StatCommand { style::Reset, ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -106,7 +106,7 @@ impl StatCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, @@ -115,7 +115,7 @@ impl StatCommand { status.code() ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -124,7 +124,7 @@ impl StatCommand { status.message() ); - eprintln!( + println!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), style::Italic, @@ -133,7 +133,7 @@ impl StatCommand { std::str::from_utf8(status.details()).unwrap() ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -142,7 +142,7 @@ impl StatCommand { ); } err => { - eprintln!( + println!( "{}{}{}Stating Failed!{}", color::Fg(color::Red), style::Italic, @@ -150,7 +150,7 @@ impl StatCommand { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -158,7 +158,7 @@ impl StatCommand { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -167,7 +167,7 @@ impl StatCommand { err ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index 94e5cf54..c76226e5 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -94,7 +94,7 @@ struct Args { #[arg( long = "verbose", - default_value_t = false, + default_value_t = true, help = "Specify whether to print log" )] verbose: bool, @@ -119,7 +119,7 @@ async fn main() -> Result<(), anyhow::Error> { let config = match dfdaemon::Config::load(&args.config).await { Ok(config) => config, Err(err) => { - eprintln!( + println!( "{}{}Load config {} error: {}{}\n", color::Fg(color::Red), style::Bold, @@ -128,7 +128,7 @@ async fn main() -> Result<(), anyhow::Error> { style::Reset ); - eprintln!( + println!( "{}{}If the file does not exist, you need to new a default config file refer to: {}{}{}{}https://d7y.io/docs/next/reference/configuration/client/dfdaemon/{}", color::Fg(color::Yellow), style::Bold, @@ -158,9 +158,8 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize storage. let storage = Storage::new(config.clone(), config.storage.dir.as_path(), args.log_dir) .await - .map_err(|err| { + .inspect_err(|err| { error!("initialize storage failed: {}", err); - err })?; let storage = Arc::new(storage); @@ -175,9 +174,8 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize manager client. let manager_client = ManagerClient::new(config.clone(), config.manager.addr.clone()) .await - .map_err(|err| { + .inspect_err(|err| { error!("initialize manager client failed: {}", err); - err })?; let manager_client = Arc::new(manager_client); @@ -193,25 +191,22 @@ async fn main() -> Result<(), anyhow::Error> { shutdown_complete_tx.clone(), ) .await - .map_err(|err| { + .inspect_err(|err| { error!("initialize dynconfig server failed: {}", err); - err })?; let dynconfig = Arc::new(dynconfig); // Initialize scheduler client. let scheduler_client = SchedulerClient::new(config.clone(), dynconfig.clone()) .await - .map_err(|err| { + .inspect_err(|err| { error!("initialize scheduler client failed: {}", err); - err })?; let scheduler_client = Arc::new(scheduler_client); - let backend_factory = - BackendFactory::new(Some(config.server.plugin_dir.as_path())).map_err(|err| { + let backend_factory = BackendFactory::new(Some(config.server.plugin_dir.as_path())) + .inspect_err(|err| { error!("initialize backend factory failed: {}", err); - err })?; let backend_factory = Arc::new(backend_factory); @@ -281,9 +276,8 @@ async fn main() -> Result<(), anyhow::Error> { shutdown_complete_tx.clone(), ) .await - .map_err(|err| { + .inspect_err(|err| { error!("initialize scheduler announcer failed: {}", err); - err })?; // Initialize upload grpc server. diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 0bbf2319..126a41fe 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -40,10 +40,11 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::{cmp::min, fmt::Write}; use termion::{color, style}; -use tokio::fs; +use tokio::fs::{self, OpenOptions}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::sync::Semaphore; use tokio::task::JoinSet; -use tracing::{error, info, warn, Level}; +use tracing::{debug, error, info, warn, Level}; use url::Url; use uuid::Uuid; @@ -91,6 +92,13 @@ struct Args { #[arg(help = "Specify the URL to download")] url: Url, + #[arg( + long = "transfer-from-dfdaemon", + default_value_t = false, + help = "Specify whether to transfer the content of downloading file from dfdaemon's unix domain socket. If it is true, dfget will call dfdaemon to download the file, and dfdaemon will return the content of downloading file to dfget via unix domain socket, and dfget will copy the content to the output path. If it is false, dfdaemon will download the file and hardlink or copy the file to the output path." + )] + transfer_from_dfdaemon: bool, + #[arg( short = 'O', long = "output", @@ -281,7 +289,7 @@ async fn main() -> anyhow::Result<()> { // Validate command line arguments. if let Err(err) = validate_args(&args) { - eprintln!( + println!( "{}{}{}Validating Failed!{}", color::Fg(color::Red), style::Italic, @@ -289,7 +297,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -297,7 +305,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -306,7 +314,7 @@ async fn main() -> anyhow::Result<()> { err, ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -322,7 +330,7 @@ async fn main() -> anyhow::Result<()> { match get_dfdaemon_download_client(args.endpoint.to_path_buf()).await { Ok(client) => client, Err(err) => { - eprintln!( + println!( "{}{}{}Connect Dfdaemon Failed!{}", color::Fg(color::Red), style::Italic, @@ -330,7 +338,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -338,7 +346,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, @@ -348,7 +356,7 @@ async fn main() -> anyhow::Result<()> { args.endpoint.to_string_lossy(), ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -366,7 +374,7 @@ async fn main() -> anyhow::Result<()> { Error::TonicStatus(status) => { let details = status.details(); if let Ok(backend_err) = serde_json::from_slice::(details) { - eprintln!( + println!( "{}{}{}Downloading Failed!{}", color::Fg(color::Red), style::Italic, @@ -374,7 +382,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -383,7 +391,7 @@ async fn main() -> anyhow::Result<()> { ); if let Some(status_code) = backend_err.status_code { - eprintln!( + println!( "{}{}{}Bad Status Code:{} {}", color::Fg(color::Red), style::Italic, @@ -393,7 +401,7 @@ async fn main() -> anyhow::Result<()> { ); } - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -403,7 +411,7 @@ async fn main() -> anyhow::Result<()> { ); if !backend_err.header.is_empty() { - eprintln!( + println!( "{}{}{}Header:{}", color::Fg(color::Cyan), style::Italic, @@ -411,11 +419,11 @@ async fn main() -> anyhow::Result<()> { style::Reset ); for (key, value) in backend_err.header.iter() { - eprintln!(" [{}]: {}", key.as_str(), value.as_str()); + println!(" [{}]: {}", key.as_str(), value.as_str()); } } - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -423,7 +431,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); } else { - eprintln!( + println!( "{}{}{}Downloading Failed!{}", color::Fg(color::Red), style::Italic, @@ -431,7 +439,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -439,7 +447,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, @@ -448,7 +456,7 @@ async fn main() -> anyhow::Result<()> { status.code() ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), style::Italic, @@ -458,7 +466,7 @@ async fn main() -> anyhow::Result<()> { ); if !status.details().is_empty() { - eprintln!( + println!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), style::Italic, @@ -468,7 +476,7 @@ async fn main() -> anyhow::Result<()> { ); } - eprintln!( + println!( "{}{}{}*********************************{}", color::Fg(color::Black), style::Italic, @@ -478,7 +486,7 @@ async fn main() -> anyhow::Result<()> { } } Error::BackendError(err) => { - eprintln!( + println!( "{}{}{}Downloading Failed!{}", color::Fg(color::Red), style::Italic, @@ -486,7 +494,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -494,7 +502,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -504,7 +512,7 @@ async fn main() -> anyhow::Result<()> { ); if err.header.is_some() { - eprintln!( + println!( "{}{}{}Header:{}", color::Fg(color::Cyan), style::Italic, @@ -512,11 +520,11 @@ async fn main() -> anyhow::Result<()> { style::Reset ); for (key, value) in err.header.unwrap_or_default().iter() { - eprintln!(" [{}]: {}", key.as_str(), value.to_str().unwrap()); + println!(" [{}]: {}", key.as_str(), value.to_str().unwrap()); } } - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -525,7 +533,7 @@ async fn main() -> anyhow::Result<()> { ); } err => { - eprintln!( + println!( "{}{}{}Downloading Failed!{}", color::Fg(color::Red), style::Italic, @@ -533,7 +541,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -541,7 +549,7 @@ async fn main() -> anyhow::Result<()> { style::Reset ); - eprintln!( + println!( "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, @@ -550,7 +558,7 @@ async fn main() -> anyhow::Result<()> { err ); - eprintln!( + println!( "{}{}{}****************************************{}", color::Fg(color::Black), style::Italic, @@ -631,9 +639,8 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re // then download the file to the output directory. if entry.is_dir { let output_dir = make_output_by_entry(args.url.clone(), &args.output, entry)?; - fs::create_dir_all(&output_dir).await.map_err(|err| { + fs::create_dir_all(&output_dir).await.inspect_err(|err| { error!("create {} failed: {}", output_dir.to_string_lossy(), err); - err })?; } else { let mut entry_args = args.clone(); @@ -713,6 +720,15 @@ async fn download( .filtered_query_params .unwrap_or_else(dfdaemon::default_proxy_rule_filtered_query_params); + // Dfget needs to notify dfdaemon to transfer the piece content of downloading file via unix domain socket + // when the `transfer_from_dfdaemon` is true. Otherwise, dfdaemon will download the file and hardlink or + // copy the file to the output path. + let (output_path, need_piece_content) = if args.transfer_from_dfdaemon { + (None, true) + } else { + (Some(args.output.to_string_lossy().to_string()), false) + }; + // Create dfdaemon client. let response = download_client .download_task(DownloadTaskRequest { @@ -728,7 +744,7 @@ async fn download( filtered_query_params, request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?, piece_length: None, - output_path: Some(args.output.to_string_lossy().to_string()), + output_path, timeout: Some( prost_wkt_types::Duration::try_from(args.timeout) .or_err(ErrorType::ParseError)?, @@ -738,16 +754,42 @@ async fn download( certificate_chain: Vec::new(), prefetch: false, is_prefetch: false, + need_piece_content, object_storage, hdfs, }), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("download task failed: {}", err); - err })?; + // If transfer_from_dfdaemon is true, then dfget needs to create the output file and write the + // piece content to the output file. + let mut f = if args.transfer_from_dfdaemon { + if let Some(parent) = args.output.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + let f = OpenOptions::new() + .create_new(true) + .write(true) + .mode(dfget::DEFAULT_OUTPUT_FILE_MODE) + .open(&args.output) + .await + .inspect_err(|err| { + error!("open file {:?} failed: {}", args.output, err); + })?; + + Some(f) + } else { + None + }; + // Get actual path rather than percentage encoded path as download path. let download_path = percent_decode_str(args.url.path()).decode_utf8_lossy(); progress_bar.set_style( @@ -765,9 +807,8 @@ async fn download( // Download file. let mut downloaded = 0; let mut out_stream = response.into_inner(); - while let Some(message) = out_stream.message().await.map_err(|err| { + while let Some(message) = out_stream.message().await.inspect_err(|err| { error!("get message failed: {}", err); - err })? { match message.response { Some(download_task_response::Response::DownloadTaskStartedResponse(response)) => { @@ -776,6 +817,22 @@ async fn download( Some(download_task_response::Response::DownloadPieceFinishedResponse(response)) => { let piece = response.piece.ok_or(Error::InvalidParameter)?; + // Dfget needs to write the piece content to the output file. + if let Some(f) = &mut f { + f.seek(SeekFrom::Start(piece.offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", args.output, err); + })?; + + let content = piece.content.ok_or(Error::InvalidParameter)?; + f.write_all(&content).await.inspect_err(|err| { + error!("write {:?} failed: {}", args.output, err); + })?; + + debug!("copy piece {} to {:?} success", piece.number, args.output); + } + downloaded += piece.length; let position = min( downloaded + piece.length, diff --git a/dragonfly-client/src/bin/dfstore/main.rs b/dragonfly-client/src/bin/dfstore/main.rs index ff941e6a..6e38831b 100644 --- a/dragonfly-client/src/bin/dfstore/main.rs +++ b/dragonfly-client/src/bin/dfstore/main.rs @@ -66,7 +66,7 @@ struct Args { #[arg( long = "verbose", - default_value_t = false, + default_value_t = true, help = "Specify whether to print log" )] verbose: bool, diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index a4073075..c51bf0b1 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -790,21 +790,20 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { info!("download persistent cache task succeeded"); // Hard link or copy the persistent cache task content to the destination. - if let Err(err) = task_manager_clone - .hard_link_or_copy( - &task_clone, - Path::new(request_clone.output_path.as_str()), - ) - .await - { - error!("hard link or copy persistent cache task: {}", err); - out_stream_tx - .send(Err(Status::internal(err.to_string()))) + if let Some(output_path) = request_clone.output_path { + if let Err(err) = task_manager_clone + .hard_link_or_copy(&task_clone, Path::new(output_path.as_str())) .await - .unwrap_or_else(|err| { - error!("send download progress error: {:?}", err) - }); - }; + { + error!("hard link or copy persistent cache task: {}", err); + out_stream_tx + .send(Err(Status::internal(err.to_string()))) + .await + .unwrap_or_else(|err| { + error!("send download progress error: {:?}", err) + }); + }; + } } Err(e) => { // Download task failed. @@ -1023,9 +1022,8 @@ impl DfdaemonDownloadClient { } })) .await - .map_err(|err| { + .inspect_err(|err| { error!("connect failed: {}", err); - err }) .or_err(ErrorType::ConnectError)?; diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 1e0abce9..90ca7e63 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -23,14 +23,14 @@ use crate::metrics::{ }; use crate::resource::{persistent_cache_task, task}; use crate::shutdown; -use dragonfly_api::common::v2::{PersistentCacheTask, Piece, Priority, Task, TaskType}; +use dragonfly_api::common::v2::{Host, PersistentCacheTask, Piece, Priority, Task, TaskType}; use dragonfly_api::dfdaemon::v2::{ dfdaemon_upload_client::DfdaemonUploadClient as DfdaemonUploadGRPCClient, dfdaemon_upload_server::{DfdaemonUpload, DfdaemonUploadServer as DfdaemonUploadGRPCServer}, DeletePersistentCacheTaskRequest, DeleteTaskRequest, DownloadPersistentCacheTaskRequest, DownloadPersistentCacheTaskResponse, DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, DownloadTaskResponse, StatPersistentCacheTaskRequest, StatTaskRequest, - SyncPiecesRequest, SyncPiecesResponse, + SyncHostRequest, SyncPiecesRequest, SyncPiecesResponse, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_client_config::dfdaemon::Config; @@ -858,6 +858,18 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { })) } + /// SyncHostStream is the stream of the sync host response. + type SyncHostStream = ReceiverStream>; + + /// sync_host syncs the host information. + #[instrument(skip_all)] + async fn sync_host( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + /// DownloadPersistentCacheTaskStream is the stream of the download persistent cache task response. type DownloadPersistentCacheTaskStream = ReceiverStream>; @@ -988,21 +1000,20 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { info!("download persistent cache task succeeded"); // Hard link or copy the persistent cache task content to the destination. - if let Err(err) = task_manager_clone - .hard_link_or_copy( - &task_clone, - Path::new(request_clone.output_path.as_str()), - ) - .await - { - error!("hard link or copy persistent cache task: {}", err); - out_stream_tx - .send(Err(Status::internal(err.to_string()))) + if let Some(output_path) = request_clone.output_path { + if let Err(err) = task_manager_clone + .hard_link_or_copy(&task_clone, Path::new(output_path.as_str())) .await - .unwrap_or_else(|err| { - error!("send download progress error: {:?}", err) - }); - }; + { + error!("hard link or copy persistent cache task: {}", err); + out_stream_tx + .send(Err(Status::internal(err.to_string()))) + .await + .unwrap_or_else(|err| { + error!("send download progress error: {:?}", err) + }); + }; + } } Err(e) => { // Download task failed. @@ -1131,9 +1142,8 @@ impl DfdaemonUploadClient { .timeout(super::REQUEST_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr, err); - err }) .or_err(ErrorType::ConnectError)? } @@ -1144,9 +1154,8 @@ impl DfdaemonUploadClient { .timeout(super::REQUEST_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr, err); - err }) .or_err(ErrorType::ConnectError)?, }; diff --git a/dragonfly-client/src/grpc/health.rs b/dragonfly-client/src/grpc/health.rs index 0aab1c17..21e8d2d6 100644 --- a/dragonfly-client/src/grpc/health.rs +++ b/dragonfly-client/src/grpc/health.rs @@ -54,9 +54,8 @@ impl HealthClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr, err); - err }) .or_err(ErrorType::ConnectError)?, None => Channel::from_shared(addr.to_string()) @@ -68,9 +67,8 @@ impl HealthClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr, err); - err }) .or_err(ErrorType::ConnectError)?, }; @@ -96,9 +94,8 @@ impl HealthClient { } })) .await - .map_err(|err| { + .inspect_err(|err| { error!("connect failed: {}", err); - err }) .or_err(ErrorType::ConnectError)?; let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor) diff --git a/dragonfly-client/src/grpc/manager.rs b/dragonfly-client/src/grpc/manager.rs index 415bd96a..88dd7147 100644 --- a/dragonfly-client/src/grpc/manager.rs +++ b/dragonfly-client/src/grpc/manager.rs @@ -79,9 +79,8 @@ impl ManagerClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?, None => Channel::from_shared(addr.clone()) @@ -94,9 +93,8 @@ impl ManagerClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?, }; diff --git a/dragonfly-client/src/grpc/mod.rs b/dragonfly-client/src/grpc/mod.rs index 2fe04afd..815dd163 100644 --- a/dragonfly-client/src/grpc/mod.rs +++ b/dragonfly-client/src/grpc/mod.rs @@ -95,9 +95,8 @@ pub async fn prefetch_task( let response = dfdaemon_download_client .download_task(request) .await - .map_err(|err| { + .inspect_err(|err| { error!("prefetch task failed: {}", err); - err })?; // Collect the prefetch task started metrics. diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index ee8c1b6e..2c6a6a37 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -187,9 +187,8 @@ impl SchedulerClient { .timeout(super::REQUEST_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?; @@ -241,9 +240,8 @@ impl SchedulerClient { .timeout(super::REQUEST_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?; @@ -300,9 +298,8 @@ impl SchedulerClient { .timeout(super::REQUEST_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?; @@ -498,9 +495,8 @@ impl SchedulerClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?, None => Channel::from_shared(addr.clone()) @@ -513,9 +509,8 @@ impl SchedulerClient { .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) .connect() .await - .map_err(|err| { + .inspect_err(|err| { error!("connect to {} failed: {}", addr.to_string(), err); - err }) .or_err(ErrorType::ConnectError)?, }; diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 9292d2ce..3ebd325c 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -1029,9 +1029,8 @@ async fn proxy_via_https( .build(); let client = Client::builder(TokioExecutor::new()).build(https); - let response = client.request(request).await.map_err(|err| { + let response = client.request(request).await.inspect_err(|err| { error!("request failed: {:?}", err); - err })?; Ok(response.map(|b| b.map_err(ClientError::from).boxed())) @@ -1109,6 +1108,7 @@ fn make_download_task_request( object_storage: None, hdfs: None, is_prefetch: false, + need_piece_content: false, }), }) } diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 3a22634c..197c8489 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -46,6 +46,7 @@ use dragonfly_client_util::id_generator::IDGenerator; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use tokio::io::AsyncReadExt; use tokio::sync::{ mpsc::{self, Sender}, Semaphore, @@ -121,9 +122,8 @@ impl PersistentCacheTask { // Get the content length of the file. let content_length = std::fs::metadata(path) - .map_err(|err| { + .inspect_err(|err| { error!("get file metadata error: {}", err); - err })? .len(); @@ -151,9 +151,8 @@ impl PersistentCacheTask { ttl: request.ttl, }) .await - .map_err(|err| { + .inspect_err(|err| { error!("upload persistent cache task started: {}", err); - err })?; // Create the persistent cache task. @@ -194,9 +193,8 @@ impl PersistentCacheTask { }, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("upload persistent cache task failed: {}", err); - err })?; // Delete the persistent cache task. @@ -232,9 +230,8 @@ impl PersistentCacheTask { description: Some(err.to_string()), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("upload persistent cache task failed: {}", err); - err })?; // Delete the persistent cache task. @@ -379,9 +376,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPersistentCacheTaskStartedResponse failed: {:?}", err); - err })?; // Download the pieces from the local. @@ -391,6 +387,7 @@ impl PersistentCacheTask { task, host_id, peer_id, + request.need_piece_content, interested_pieces.clone(), download_progress_tx.clone(), ) @@ -525,9 +522,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send RegisterPersistentCachePeerRequest failed: {:?}", err); - err })?; info!("sent RegisterPersistentCachePeerRequest"); @@ -538,9 +534,8 @@ impl PersistentCacheTask { .scheduler_client .announce_persistent_cache_peer(task.id.as_str(), peer_id, request_stream) .await - .map_err(|err| { + .inspect_err(|err| { error!("announce peer failed: {:?}", err); - err })?; info!("announced peer has been connected"); @@ -606,9 +601,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPersistentCachePeerStartedRequest failed: {:?}", err); - err })?; info!("sent DownloadPersistentCachePeerStartedRequest"); @@ -630,9 +624,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPersistentCachePeerFinishedRequest failed: {:?}", err); - err })?; info!("sent DownloadPersistentCachePeerFinishedRequest"); @@ -690,6 +683,7 @@ impl PersistentCacheTask { host_id, peer_id, response.candidate_cache_parents.clone(), + request.need_piece_content, remaining_interested_pieces.clone(), download_progress_tx.clone(), in_stream_tx.clone(), @@ -796,6 +790,7 @@ impl PersistentCacheTask { host_id: &str, peer_id: &str, parents: Vec, + need_piece_content: bool, interested_pieces: Vec, download_progress_tx: Sender>, in_stream_tx: Sender, @@ -831,6 +826,7 @@ impl PersistentCacheTask { peer_id: String, number: u32, length: u64, + need_piece_content: bool, parent: piece_collector::CollectedParent, piece_manager: Arc, storage: Arc, @@ -873,7 +869,7 @@ impl PersistentCacheTask { })?; // Construct the piece. - let piece = Piece { + let mut piece = Piece { number: metadata.number, parent_id: metadata.parent_id.clone(), offset: metadata.offset, @@ -885,6 +881,30 @@ impl PersistentCacheTask { created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), }; + // If need_piece_content is true, read the piece content from the local. + if need_piece_content { + let mut reader = piece_manager + .download_from_local_into_async_read( + piece_id.as_str(), + task_id.as_str(), + metadata.length, + None, + true, + false, + ) + .await + .inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + let mut content = vec![0; metadata.length as usize]; + reader.read_exact(&mut content).await.inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + piece.content = Some(content); + } + // Send the download piece finished request. in_stream_tx .send_timeout( @@ -903,9 +923,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPieceFinishedRequest failed: {:?}", err); - err })?; // Send the download progress. @@ -926,9 +945,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPieceFinishedResponse failed: {:?}", err); - err })?; info!( @@ -946,6 +964,7 @@ impl PersistentCacheTask { peer_id.to_string(), collect_piece.number, collect_piece.length, + need_piece_content, collect_piece.parent.clone(), self.piece.clone(), self.storage.clone(), @@ -1023,6 +1042,7 @@ impl PersistentCacheTask { task: &metadata::PersistentCacheTask, host_id: &str, peer_id: &str, + need_piece_content: bool, interested_pieces: Vec, download_progress_tx: Sender>, ) -> ClientResult> { @@ -1054,7 +1074,7 @@ impl PersistentCacheTask { info!("finished piece {} from local", piece_id); // Construct the piece. - let piece = Piece { + let mut piece = Piece { number: piece.number, parent_id: None, offset: piece.offset, @@ -1066,6 +1086,31 @@ impl PersistentCacheTask { created_at: Some(prost_wkt_types::Timestamp::from(piece.created_at)), }; + // If need_piece_content is true, read the piece content from the local. + if need_piece_content { + let mut reader = self + .piece + .download_from_local_into_async_read( + piece_id.as_str(), + task.id.as_str(), + piece.length, + None, + true, + false, + ) + .await + .inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + let mut content = vec![0; piece.length as usize]; + reader.read_exact(&mut content).await.inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + piece.content = Some(content); + } + // Send the download progress. download_progress_tx .send_timeout( @@ -1084,9 +1129,8 @@ impl PersistentCacheTask { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPieceFinishedResponse failed: {:?}", err); - err })?; // Store the finished piece. diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 9a908115..56771c5b 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -597,13 +597,11 @@ impl Piece { ); // Download the piece from the source. - let backend = self.backend_factory.build(url).map_err(|err| { + let backend = self.backend_factory.build(url).inspect_err(|err| { error!("build backend failed: {}", err); if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; - - err })?; // Record the start time. @@ -630,7 +628,7 @@ impl Piece { hdfs, }) .await - .map_err(|err| { + .inspect_err(|err| { // Collect the backend request failure metrics. collect_backend_request_failure_metrics( backend.scheme().as_str(), @@ -642,8 +640,6 @@ impl Piece { if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; - - err })?; if !response.success { diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 0d89fb07..452482e0 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -171,12 +171,11 @@ impl PieceCollector { let dfdaemon_upload_client = DfdaemonUploadClient::new(config, format!("http://{}:{}", host.ip, host.port)) .await - .map_err(|err| { + .inspect_err(|err| { error!( "create dfdaemon upload client from parent {} failed: {}", parent.id, err ); - err })?; let response = dfdaemon_upload_client @@ -189,9 +188,8 @@ impl PieceCollector { .collect(), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("sync pieces from parent {} failed: {}", parent.id, err); - err })?; // If the response repeating timeout exceeds the piece download timeout, the stream will return error. @@ -229,9 +227,8 @@ impl PieceCollector { parent: parent.clone(), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("send CollectedPiece failed: {}", err); - err })?; // Release the lock of the piece with parent_id. diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index da126ea3..544cd667 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -54,6 +54,7 @@ use std::sync::{ Arc, Mutex, }; use std::time::{Duration, Instant}; +use tokio::io::AsyncReadExt; use tokio::sync::{ mpsc::{self, Sender}, Semaphore, @@ -134,10 +135,10 @@ impl Task { } // Handle the request header. - let mut request_header = hashmap_to_headermap(&request.request_header).map_err(|err| { - error!("convert header: {}", err); - err - })?; + let mut request_header = + hashmap_to_headermap(&request.request_header).inspect_err(|err| { + error!("convert header: {}", err); + })?; // Remove the range header to prevent the server from // returning a 206 partial content and returning @@ -337,9 +338,8 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadTaskStartedResponse failed: {:?}", err); - err })?; // Download the pieces from the local. @@ -349,6 +349,7 @@ impl Task { task, host_id, peer_id, + request.need_piece_content, interested_pieces.clone(), download_progress_tx.clone(), ) @@ -512,9 +513,8 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send RegisterPeerRequest failed: {:?}", err); - err })?; info!("sent RegisterPeerRequest"); @@ -525,9 +525,8 @@ impl Task { .scheduler_client .announce_peer(task_id, peer_id, request_stream) .await - .map_err(|err| { + .inspect_err(|err| { error!("announce peer failed: {:?}", err); - err })?; info!("announced peer has been connected"); @@ -591,9 +590,8 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPeerStartedRequest failed: {:?}", err); - err })?; info!("sent DownloadPeerStartedRequest"); @@ -616,9 +614,8 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("send DownloadPeerFinishedRequest failed: {:?}", err); - err })?; info!("sent DownloadPeerFinishedRequest"); @@ -676,6 +673,7 @@ impl Task { response.candidate_parents.clone(), remaining_interested_pieces.clone(), request.is_prefetch, + request.need_piece_content, download_progress_tx.clone(), in_stream_tx.clone(), ) @@ -918,6 +916,7 @@ impl Task { parents: Vec, interested_pieces: Vec, is_prefetch: bool, + need_piece_content: bool, download_progress_tx: Sender>, in_stream_tx: Sender, ) -> ClientResult> { @@ -977,6 +976,7 @@ impl Task { interrupt: Arc, finished_pieces: Arc>>, is_prefetch: bool, + need_piece_content: bool, ) -> ClientResult { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); @@ -1013,7 +1013,7 @@ impl Task { })?; // Construct the piece. - let piece = Piece { + let mut piece = Piece { number: metadata.number, parent_id: metadata.parent_id.clone(), offset: metadata.offset, @@ -1025,6 +1025,32 @@ impl Task { created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), }; + // If need_piece_content is true, read the piece content from the local. + if need_piece_content { + let mut reader = piece_manager + .download_from_local_into_async_read( + piece_id.as_str(), + task_id.as_str(), + metadata.length, + None, + true, + false, + ) + .await + .inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + interrupt.store(true, Ordering::SeqCst); + })?; + + let mut content = vec![0; metadata.length as usize]; + reader.read_exact(&mut content).await.inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + interrupt.store(true, Ordering::SeqCst); + })?; + + piece.content = Some(content); + } + // Send the download piece finished request. in_stream_tx .send_timeout( @@ -1043,13 +1069,12 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!( "send DownloadPieceFinishedRequest for piece {} failed: {:?}", piece_id, err ); interrupt.store(true, Ordering::SeqCst); - err })?; // Send the download progress. @@ -1070,13 +1095,12 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", piece_id, err ); interrupt.store(true, Ordering::SeqCst); - err })?; info!( @@ -1106,6 +1130,7 @@ impl Task { interrupt.clone(), finished_pieces.clone(), is_prefetch, + need_piece_content, ) .in_current_span(), ); @@ -1218,6 +1243,7 @@ impl Task { length: u64, request_header: HeaderMap, is_prefetch: bool, + need_piece_content: bool, piece_manager: Arc, storage: Arc, semaphore: Arc, @@ -1248,7 +1274,7 @@ impl Task { .await?; // Construct the piece. - let piece = Piece { + let mut piece = Piece { number: metadata.number, parent_id: metadata.parent_id.clone(), offset: metadata.offset, @@ -1260,6 +1286,30 @@ impl Task { created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), }; + // If need_piece_content is true, read the piece content from the local. + if need_piece_content { + let mut reader = piece_manager + .download_from_local_into_async_read( + piece_id.as_str(), + task_id.as_str(), + metadata.length, + None, + true, + false, + ) + .await + .inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + let mut content = vec![0; metadata.length as usize]; + reader.read_exact(&mut content).await.inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + piece.content = Some(content); + } + // Send the download piece finished request. in_stream_tx .send_timeout( @@ -1277,9 +1327,8 @@ impl Task { }, REQUEST_TIMEOUT, ) - .await.map_err(|err| { + .await.inspect_err(|err| { error!("send DownloadPieceBackToSourceFinishedRequest for piece {} failed: {:?}", piece_id, err); - err })?; // Send the download progress. @@ -1300,12 +1349,11 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", piece_id, err ); - err })?; info!("finished piece {} from source", piece_id); @@ -1323,6 +1371,7 @@ impl Task { interested_piece.length, request_header.clone(), request.is_prefetch, + request.need_piece_content, self.piece.clone(), self.storage.clone(), semaphore.clone(), @@ -1444,6 +1493,7 @@ impl Task { task: &metadata::Task, host_id: &str, peer_id: &str, + need_piece_content: bool, interested_pieces: Vec, download_progress_tx: Sender>, ) -> ClientResult> { @@ -1475,7 +1525,7 @@ impl Task { info!("finished piece {} from local", piece_id,); // Construct the piece. - let piece = Piece { + let mut piece = Piece { number: piece.number, parent_id: None, offset: piece.offset, @@ -1487,6 +1537,31 @@ impl Task { created_at: Some(prost_wkt_types::Timestamp::from(piece.created_at)), }; + // If need_piece_content is true, read the piece content from the local. + if need_piece_content { + let mut reader = self + .piece + .download_from_local_into_async_read( + piece_id.as_str(), + task_id, + piece.length, + None, + true, + false, + ) + .await + .inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + let mut content = vec![0; piece.length as usize]; + reader.read_exact(&mut content).await.inspect_err(|err| { + error!("read piece {} failed: {:?}", piece_id, err); + })?; + + piece.content = Some(content); + } + // Send the download progress. download_progress_tx .send_timeout( @@ -1505,12 +1580,11 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", piece_id, err ); - err })?; // Store the finished piece. @@ -1619,12 +1693,11 @@ impl Task { REQUEST_TIMEOUT, ) .await - .map_err(|err| { + .inspect_err(|err| { error!( "send DownloadPieceFinishedResponse for piece {} failed: {:?}", piece_id, err ); - err })?; info!("finished piece {} from source", piece_id); @@ -1696,9 +1769,8 @@ impl Task { task_id: task_id.to_string(), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("stat task failed: {}", err); - err })?; Ok(task) @@ -1707,9 +1779,8 @@ impl Task { /// Delete a task and reclaim local storage. #[instrument(skip_all)] pub async fn delete(&self, task_id: &str, host_id: &str) -> ClientResult<()> { - let task = self.storage.get_task(task_id).map_err(|err| { + let task = self.storage.get_task(task_id).inspect_err(|err| { error!("get task {} from local storage error: {:?}", task_id, err); - err })?; match task { @@ -1722,9 +1793,8 @@ impl Task { task_id: task_id.to_string(), }) .await - .map_err(|err| { + .inspect_err(|err| { error!("delete task {} failed from scheduler: {:?}", task_id, err); - err })?; info!("delete task {} from local storage", task.id);