feat(dragonfly-client-storage): optimize hard link handling and cache logic (#1005)

- Bump all crate versions from 0.2.15 to 0.2.16
- Add cross-platform device/inode check for hard link operations
- Refactor hard link logic to remove redundant helper methods
- Implement proper file existence checks with same-device/inode validation

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-02-27 00:23:03 +08:00 committed by GitHub
parent 300bb7b786
commit 31e4d136d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 150 additions and 81 deletions

16
Cargo.lock generated
View File

@ -939,7 +939,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"anyhow",
"blake3",
@ -1011,7 +1011,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1042,7 +1042,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1068,7 +1068,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"headers 0.4.0",
"hyper 1.6.0",
@ -1087,7 +1087,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"anyhow",
"clap",
@ -1105,7 +1105,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"base16ct",
"bincode",
@ -1131,7 +1131,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"base16ct",
"base64 0.22.1",
@ -1532,7 +1532,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.2.15"
version = "0.2.16"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.2.15"
version = "0.2.16"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.15" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.15" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.15" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.15" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.15" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.15" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.15" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.16" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.16" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.16" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.16" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.16" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.16" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.16" }
thiserror = "1.0"
dragonfly-api = "=2.1.30"
reqwest = { version = "0.12.4", features = [

View File

@ -17,7 +17,7 @@
use crc::*;
use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_core::{Error, Result};
use std::cmp::{max, min};
use std::path::{Path, PathBuf};
use std::sync::Arc;
@ -111,7 +111,41 @@ impl Content {
Ok(true)
}
/// is_same_dev_inode checks if the source and target are the same device and inode.
async fn is_same_dev_inode<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
source: P,
target: Q,
) -> Result<bool> {
let source_metadata = fs::metadata(source).await?;
let target_metadata = fs::metadata(target).await?;
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
Ok(source_metadata.dev() == target_metadata.dev()
&& source_metadata.ino() == target_metadata.ino())
}
#[cfg(not(unix))]
{
Err(Error::IO(io::Error::new(
io::ErrorKind::Unsupported,
"platform not supported",
)))
}
}
/// hard_link_or_copy_task hard links or copies the task content to the destination.
///
/// 1. Destination exists:
/// 1.1. If the source and destination are the same device and inode, return directly.
/// 1.2. If the source and destination are not the same device and inode, return an error.
/// Because the destination already exists, it is not allowed to overwrite the
/// destination.
/// 2. Destination does not exist:
/// 2.1. Hard link the task content to the destination.
/// 2.2. If the hard link fails, copy the task content to the destination.
#[instrument(skip_all)]
pub async fn hard_link_or_copy_task(
&self,
@ -120,12 +154,25 @@ impl Content {
) -> Result<()> {
let task_path = self.get_task_path(task.id.as_str());
// If the hard link fails, copy the task content to the destination.
fs::remove_file(to).await.unwrap_or_else(|err| {
info!("remove {:?} failed: {}", to, err);
});
// If the destination exists, check if the source and destination are the same device and
// inode. If they are the same, return directly. If not, return an error.
if to.exists() {
return match self.is_same_dev_inode(&task_path, to).await {
Ok(true) => {
info!("hard already exists, no need to operate");
Ok(())
}
Ok(false) => Err(Error::IO(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("{:?} already exists", to),
))),
Err(err) => Err(err),
};
}
if let Err(err) = self.hard_link_task(task.id.as_str(), to).await {
// If the destination does not exist, hard link the task content to the destination.
// If the hard link fails, copy the task content to the destination.
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);
// If the task is empty, no need to copy. Need to open the file to
@ -153,13 +200,6 @@ impl Content {
Ok(())
}
/// hard_link_task hard links the task content.
#[instrument(skip_all)]
async fn hard_link_task(&self, task_id: &str, link: &Path) -> Result<()> {
fs::hard_link(self.get_task_path(task_id), link).await?;
Ok(())
}
/// copy_task copies the task content to the destination.
#[instrument(skip_all)]
async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> {
@ -365,33 +405,50 @@ impl Content {
}
/// hard_link_or_copy_persistent_cache_task hard links or copies the task content to the destination.
///
/// 1. Destination exists:
/// 1.1. If the source and destination are the same device and inode, return directly.
/// 1.2. If the source and destination are not the same device and inode, return an error.
/// Because the destination already exists, it is not allowed to overwrite the
/// destination.
/// 2. Destination does not exist:
/// 2.1. Hard link the task content to the destination.
/// 2.2. If the hard link fails, copy the task content to the destination.
#[instrument(skip_all)]
pub async fn hard_link_or_copy_persistent_cache_task(
&self,
task: &crate::metadata::PersistentCacheTask,
to: &Path,
) -> Result<()> {
// Ensure the parent directory of the destination exists.
if let Some(parent) = to.parent() {
if !parent.exists() {
fs::create_dir_all(parent).await.inspect_err(|err| {
error!("failed to create directory {:?}: {}", parent, err);
})?;
}
}
// Get the persistent cache task path.
let task_path = self.get_persistent_cache_task_path(task.id.as_str());
// If the hard link fails, copy the task content to the destination.
fs::remove_file(to).await.unwrap_or_else(|err| {
info!("remove {:?} failed: {}", to, err);
});
// If the destination exists, check if the source and destination are the same device and
// inode. If they are the same, return directly. If not, return an error.
if to.exists() {
return match self.is_same_dev_inode(&task_path, to).await {
Ok(true) => {
info!("hard already exists, no need to operate");
Ok(())
}
Ok(false) => Err(Error::IO(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("{:?} already exists", to),
))),
Err(err) => {
error!(
"check if {:?} and {:?} are the same device and inode failed: {}",
task_path, to, err
);
if let Err(err) = self
.hard_link_persistent_cache_task(task.id.as_str(), to)
.await
{
Err(err)
}
};
}
// If the destination does not exist, hard link the task content to the destination.
// If the hard link fails, copy the task content to the destination.
if let Err(err) = fs::hard_link(task_path.clone(), to).await {
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);
// If the persistent cache task is empty, no need to copy. Need to open the file to
@ -547,13 +604,6 @@ impl Content {
})
}
/// hard_link_persistent_cache_task hard links the persistent cache task content.
#[instrument(skip_all)]
async fn hard_link_persistent_cache_task(&self, task_id: &str, link: &Path) -> Result<()> {
fs::hard_link(self.get_persistent_cache_task_path(task_id), link).await?;
Ok(())
}
/// copy_persistent_cache_task copies the persistent cache task content to the destination.
#[instrument(skip_all)]
async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> {

View File

@ -52,10 +52,18 @@ pub const DRAGONFLY_USE_P2P_HEADER: &str = "X-Dragonfly-Use-P2P";
/// If the value is "false", the range request will fetch the range content.
pub const DRAGONFLY_PREFETCH_HEADER: &str = "X-Dragonfly-Prefetch";
/// DRAGONFLY_OUTPUT_PATH_HEADER is the header key of output path in http request.
/// If the value is set, the downloaded file will be saved to the specified path,
/// and response the file by the body. If the specified path is exist, the file will be
/// removed and saved to the specified path.
/// DRAGONFLY_OUTPUT_PATH_HEADER is the header key of absolute output path in http request.
///
/// When this header is present in a request, following rules apply:
/// - If the path exists:
/// - If it's the same file as the cache (same dev/inode): Request succeeds
/// - If it's a different file: Request fails with error message
///
/// - If the path doesn't exist:
/// - A new file will be created at the specified location
///
/// Note: When X-Dragonfly-Output-Path is specified, the client expects to create the
/// file itself, and returning cached content would prevent proper file creation.
pub const DRAGONFLY_OUTPUT_PATH_HEADER: &str = "X-Dragonfly-Output-Path";
/// DRAGONFLY_PIECE_LENGTH is the header key of piece length in http request.

View File

@ -655,32 +655,42 @@ async fn proxy_via_dfdaemon(
}
};
// Get the content from the cache by the request.
match cache.get_by_request(&download_task_request).await {
Ok(None) => {
debug!("cache miss");
}
Ok(Some(content)) => {
info!("cache hit");
// Skip cache lookup if output_path is set in the download task request.
// Rationale: When output_path is specified, the client expects to create the file itself,
// and returning cached content would prevent proper file creation.
let has_output_path = download_task_request
.download
.as_ref()
.map_or(false, |d| d.output_path.is_some());
// Collect the download piece traffic metrics and the proxy request via dfdaemon and
// cache hits metrics.
collect_proxy_request_via_dfdaemon_and_cache_hits_metrics();
collect_download_piece_traffic_metrics(
&TrafficType::LocalPeer,
TaskType::Standard as i32,
content.len() as u64,
);
if !has_output_path {
// Get the content from the cache by the request.
match cache.get_by_request(&download_task_request).await {
Ok(None) => {
debug!("cache miss");
}
Ok(Some(content)) => {
info!("cache hit");
let body_boxed = Full::new(content).map_err(ClientError::from).boxed();
return Ok(Response::new(body_boxed));
}
Err(err) => {
error!("get content from cache failed: {}", err);
return Ok(make_error_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
None,
));
// Collect the download piece traffic metrics and the proxy request via dfdaemon and
// cache hits metrics.
collect_proxy_request_via_dfdaemon_and_cache_hits_metrics();
collect_download_piece_traffic_metrics(
&TrafficType::LocalPeer,
TaskType::Standard as i32,
content.len() as u64,
);
let body_boxed = Full::new(content).map_err(ClientError::from).boxed();
return Ok(Response::new(body_boxed));
}
Err(err) => {
error!("get content from cache failed: {}", err);
return Ok(make_error_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
None,
));
}
}
}
@ -1119,6 +1129,7 @@ fn make_download_task_request(
),
request_header: headermap_to_hashmap(&header),
piece_length,
// Need the absolute path.
output_path: header::get_output_path(&header),
timeout: None,
need_back_to_source: false,