From 31e4d136d0552afe10227b26630fd8e72d724feb Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 27 Feb 2025 00:23:03 +0800 Subject: [PATCH] feat(dragonfly-client-storage): optimize hard link handling and cache logic (#1005) - Bump all crate versions from 0.2.15 to 0.2.16 - Add cross-platform device/inode check for hard link operations - Refactor hard link logic to remove redundant helper methods - Implement proper file existence checks with same-device/inode validation Signed-off-by: Gaius --- Cargo.lock | 16 +-- Cargo.toml | 16 +-- dragonfly-client-storage/src/content.rs | 124 +++++++++++++++++------- dragonfly-client/src/proxy/header.rs | 16 ++- dragonfly-client/src/proxy/mod.rs | 59 ++++++----- 5 files changed, 150 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55894c0b..9ad79524 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,7 +939,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.15" +version = "0.2.16" dependencies = [ "anyhow", "blake3", @@ -1011,7 +1011,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.15" +version = "0.2.16" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.15" +version = "0.2.16" dependencies = [ "bytesize", "bytesize-serde", @@ -1068,7 +1068,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.15" +version = "0.2.16" dependencies = [ "headers 0.4.0", "hyper 1.6.0", @@ -1087,7 +1087,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.15" +version = "0.2.16" dependencies = [ "anyhow", "clap", @@ -1105,7 +1105,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.15" +version = "0.2.16" dependencies = [ "base16ct", "bincode", @@ -1131,7 +1131,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.15" +version = "0.2.16" dependencies = [ "base16ct", "base64 0.22.1", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.15" +version = "0.2.16" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 07c3c8dc..3aef3746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.15" +version = "0.2.16" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.15" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.15" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.15" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.15" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.15" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.15" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.15" } +dragonfly-client = { path = "dragonfly-client", version = "0.2.16" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.16" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.16" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.16" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.16" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.16" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.16" } thiserror = "1.0" dragonfly-api = "=2.1.30" reqwest = { version = "0.12.4", features = [ diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 91d24163..04f98182 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -17,7 +17,7 @@ use crc::*; use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; -use dragonfly_client_core::Result; +use dragonfly_client_core::{Error, Result}; use std::cmp::{max, min}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -111,7 +111,41 @@ impl Content { Ok(true) } + /// is_same_dev_inode checks if the source and target are the same device and inode. + async fn is_same_dev_inode, Q: AsRef>( + &self, + source: P, + target: Q, + ) -> Result { + let source_metadata = fs::metadata(source).await?; + let target_metadata = fs::metadata(target).await?; + + #[cfg(unix)] + { + use std::os::unix::fs::MetadataExt; + Ok(source_metadata.dev() == target_metadata.dev() + && source_metadata.ino() == target_metadata.ino()) + } + + #[cfg(not(unix))] + { + Err(Error::IO(io::Error::new( + io::ErrorKind::Unsupported, + "platform not supported", + ))) + } + } + /// hard_link_or_copy_task hard links or copies the task content to the destination. + /// + /// 1. Destination exists: + /// 1.1. If the source and destination are the same device and inode, return directly. + /// 1.2. If the source and destination are not the same device and inode, return an error. + /// Because the destination already exists, it is not allowed to overwrite the + /// destination. + /// 2. Destination does not exist: + /// 2.1. Hard link the task content to the destination. + /// 2.2. If the hard link fails, copy the task content to the destination. #[instrument(skip_all)] pub async fn hard_link_or_copy_task( &self, @@ -120,12 +154,25 @@ impl Content { ) -> Result<()> { let task_path = self.get_task_path(task.id.as_str()); - // If the hard link fails, copy the task content to the destination. - fs::remove_file(to).await.unwrap_or_else(|err| { - info!("remove {:?} failed: {}", to, err); - }); + // If the destination exists, check if the source and destination are the same device and + // inode. If they are the same, return directly. If not, return an error. + if to.exists() { + return match self.is_same_dev_inode(&task_path, to).await { + Ok(true) => { + info!("hard already exists, no need to operate"); + Ok(()) + } + Ok(false) => Err(Error::IO(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{:?} already exists", to), + ))), + Err(err) => Err(err), + }; + } - if let Err(err) = self.hard_link_task(task.id.as_str(), to).await { + // If the destination does not exist, hard link the task content to the destination. + // If the hard link fails, copy the task content to the destination. + if let Err(err) = fs::hard_link(task_path.clone(), to).await { warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); // If the task is empty, no need to copy. Need to open the file to @@ -153,13 +200,6 @@ impl Content { Ok(()) } - /// hard_link_task hard links the task content. - #[instrument(skip_all)] - async fn hard_link_task(&self, task_id: &str, link: &Path) -> Result<()> { - fs::hard_link(self.get_task_path(task_id), link).await?; - Ok(()) - } - /// copy_task copies the task content to the destination. #[instrument(skip_all)] async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { @@ -365,33 +405,50 @@ impl Content { } /// hard_link_or_copy_persistent_cache_task hard links or copies the task content to the destination. + /// + /// 1. Destination exists: + /// 1.1. If the source and destination are the same device and inode, return directly. + /// 1.2. If the source and destination are not the same device and inode, return an error. + /// Because the destination already exists, it is not allowed to overwrite the + /// destination. + /// 2. Destination does not exist: + /// 2.1. Hard link the task content to the destination. + /// 2.2. If the hard link fails, copy the task content to the destination. #[instrument(skip_all)] pub async fn hard_link_or_copy_persistent_cache_task( &self, task: &crate::metadata::PersistentCacheTask, to: &Path, ) -> Result<()> { - // Ensure the parent directory of the destination exists. - if let Some(parent) = to.parent() { - if !parent.exists() { - fs::create_dir_all(parent).await.inspect_err(|err| { - error!("failed to create directory {:?}: {}", parent, err); - })?; - } - } - // Get the persistent cache task path. let task_path = self.get_persistent_cache_task_path(task.id.as_str()); - // If the hard link fails, copy the task content to the destination. - fs::remove_file(to).await.unwrap_or_else(|err| { - info!("remove {:?} failed: {}", to, err); - }); + // If the destination exists, check if the source and destination are the same device and + // inode. If they are the same, return directly. If not, return an error. + if to.exists() { + return match self.is_same_dev_inode(&task_path, to).await { + Ok(true) => { + info!("hard already exists, no need to operate"); + Ok(()) + } + Ok(false) => Err(Error::IO(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{:?} already exists", to), + ))), + Err(err) => { + error!( + "check if {:?} and {:?} are the same device and inode failed: {}", + task_path, to, err + ); - if let Err(err) = self - .hard_link_persistent_cache_task(task.id.as_str(), to) - .await - { + Err(err) + } + }; + } + + // If the destination does not exist, hard link the task content to the destination. + // If the hard link fails, copy the task content to the destination. + if let Err(err) = fs::hard_link(task_path.clone(), to).await { warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); // If the persistent cache task is empty, no need to copy. Need to open the file to @@ -547,13 +604,6 @@ impl Content { }) } - /// hard_link_persistent_cache_task hard links the persistent cache task content. - #[instrument(skip_all)] - async fn hard_link_persistent_cache_task(&self, task_id: &str, link: &Path) -> Result<()> { - fs::hard_link(self.get_persistent_cache_task_path(task_id), link).await?; - Ok(()) - } - /// copy_persistent_cache_task copies the persistent cache task content to the destination. #[instrument(skip_all)] async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { diff --git a/dragonfly-client/src/proxy/header.rs b/dragonfly-client/src/proxy/header.rs index 491a00a1..63e1e2d6 100644 --- a/dragonfly-client/src/proxy/header.rs +++ b/dragonfly-client/src/proxy/header.rs @@ -52,10 +52,18 @@ pub const DRAGONFLY_USE_P2P_HEADER: &str = "X-Dragonfly-Use-P2P"; /// If the value is "false", the range request will fetch the range content. pub const DRAGONFLY_PREFETCH_HEADER: &str = "X-Dragonfly-Prefetch"; -/// DRAGONFLY_OUTPUT_PATH_HEADER is the header key of output path in http request. -/// If the value is set, the downloaded file will be saved to the specified path, -/// and response the file by the body. If the specified path is exist, the file will be -/// removed and saved to the specified path. +/// DRAGONFLY_OUTPUT_PATH_HEADER is the header key of absolute output path in http request. +/// +/// When this header is present in a request, following rules apply: +/// - If the path exists: +/// - If it's the same file as the cache (same dev/inode): Request succeeds +/// - If it's a different file: Request fails with error message +/// +/// - If the path doesn't exist: +/// - A new file will be created at the specified location +/// +/// Note: When X-Dragonfly-Output-Path is specified, the client expects to create the +/// file itself, and returning cached content would prevent proper file creation. pub const DRAGONFLY_OUTPUT_PATH_HEADER: &str = "X-Dragonfly-Output-Path"; /// DRAGONFLY_PIECE_LENGTH is the header key of piece length in http request. diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index f47a145d..2facd68e 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -655,32 +655,42 @@ async fn proxy_via_dfdaemon( } }; - // Get the content from the cache by the request. - match cache.get_by_request(&download_task_request).await { - Ok(None) => { - debug!("cache miss"); - } - Ok(Some(content)) => { - info!("cache hit"); + // Skip cache lookup if output_path is set in the download task request. + // Rationale: When output_path is specified, the client expects to create the file itself, + // and returning cached content would prevent proper file creation. + let has_output_path = download_task_request + .download + .as_ref() + .map_or(false, |d| d.output_path.is_some()); - // Collect the download piece traffic metrics and the proxy request via dfdaemon and - // cache hits metrics. - collect_proxy_request_via_dfdaemon_and_cache_hits_metrics(); - collect_download_piece_traffic_metrics( - &TrafficType::LocalPeer, - TaskType::Standard as i32, - content.len() as u64, - ); + if !has_output_path { + // Get the content from the cache by the request. + match cache.get_by_request(&download_task_request).await { + Ok(None) => { + debug!("cache miss"); + } + Ok(Some(content)) => { + info!("cache hit"); - let body_boxed = Full::new(content).map_err(ClientError::from).boxed(); - return Ok(Response::new(body_boxed)); - } - Err(err) => { - error!("get content from cache failed: {}", err); - return Ok(make_error_response( - http::StatusCode::INTERNAL_SERVER_ERROR, - None, - )); + // Collect the download piece traffic metrics and the proxy request via dfdaemon and + // cache hits metrics. + collect_proxy_request_via_dfdaemon_and_cache_hits_metrics(); + collect_download_piece_traffic_metrics( + &TrafficType::LocalPeer, + TaskType::Standard as i32, + content.len() as u64, + ); + + let body_boxed = Full::new(content).map_err(ClientError::from).boxed(); + return Ok(Response::new(body_boxed)); + } + Err(err) => { + error!("get content from cache failed: {}", err); + return Ok(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + None, + )); + } } } @@ -1119,6 +1129,7 @@ fn make_download_task_request( ), request_header: headermap_to_hashmap(&header), piece_length, + // Need the absolute path. output_path: header::get_output_path(&header), timeout: None, need_back_to_source: false,