diff --git a/Cargo.lock b/Cargo.lock index 6c49ec54..c8968956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1024,7 +1024,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "1.0.27" +version = "1.0.28" dependencies = [ "anyhow", "bytes", @@ -1100,7 +1100,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "1.0.27" +version = "1.0.28" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1131,7 +1131,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "1.0.27" +version = "1.0.28" dependencies = [ "bytesize", "bytesize-serde", @@ -1161,7 +1161,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "1.0.27" +version = "1.0.28" dependencies = [ "headers 0.4.1", "hyper 1.6.0", @@ -1181,7 +1181,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "1.0.27" +version = "1.0.28" dependencies = [ "anyhow", "clap", @@ -1198,7 +1198,7 @@ dependencies = [ [[package]] name = "dragonfly-client-metric" -version = "1.0.27" +version = "1.0.28" dependencies = [ "dragonfly-api", "dragonfly-client-config", @@ -1213,7 +1213,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "1.0.27" +version = "1.0.28" dependencies = [ "bincode", "bytes", @@ -1250,7 +1250,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "1.0.27" +version = "1.0.28" dependencies = [ "base64 0.22.1", "bytes", @@ -1695,7 +1695,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "1.0.27" +version = "1.0.28" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index ca3f7a63..f6a290b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ ] [workspace.package] -version = "1.0.27" +version = "1.0.28" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -23,14 +23,14 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "1.0.27" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.27" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.27" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.27" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.27" } -dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.27" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.27" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.27" } +dragonfly-client = { path = "dragonfly-client", version = "1.0.28" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.28" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.28" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.28" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.28" } +dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.28" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.28" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.28" } dragonfly-api = "=2.1.70" thiserror = "2.0" futures = "0.3.31" diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index d0e78bed..5b54b860 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -38,7 +38,6 @@ walkdir = "2.5.0" quinn = "0.11.9" socket2 = "0.6.0" mockall = "0.13.1" -nix = { version = "0.30.1", features = ["socket", "net"] } [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client-storage/src/client/tcp.rs b/dragonfly-client-storage/src/client/tcp.rs index 193c12ef..a840cf04 100644 --- a/dragonfly-client-storage/src/client/tcp.rs +++ b/dragonfly-client-storage/src/client/tcp.rs @@ -65,18 +65,6 @@ impl Client for TCPClient { socket.set_tcp_keepalive( &TcpKeepalive::new().with_interval(super::DEFAULT_KEEPALIVE_INTERVAL), )?; - #[cfg(target_os = "linux")] - { - use nix::sys::socket::{setsockopt, sockopt::TcpFastOpenConnect}; - use std::os::fd::AsFd; - use tracing::{info, warn}; - - if let Err(err) = setsockopt(&socket.as_fd(), TcpFastOpenConnect, &true) { - warn!("failed to set tcp fast open: {}", err); - } else { - info!("set tcp fast open to true"); - } - } let (reader, mut writer) = stream.into_split(); writer.write_all(&request).await.inspect_err(|err| { diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index c52eda61..68306059 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -14,21 +14,18 @@ * limitations under the License. */ -use bytesize::ByteSize; use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; -use dragonfly_client_core::{Error, Result}; -use dragonfly_client_util::fs::fallocate; +use dragonfly_client_core::Result; use std::cmp::{max, min}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; -use tokio::fs::{self, File, OpenOptions}; -use tokio::io::{ - self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom, -}; -use tokio_util::io::InspectReader; -use tracing::{error, info, instrument, warn}; -use walkdir::WalkDir; + +#[cfg(target_os = "linux")] +pub type Content = super::content_linux::Content; + +#[cfg(target_os = "macos")] +pub type Content = super::content_macos::Content; /// DEFAULT_CONTENT_DIR is the default directory for store content. pub const DEFAULT_CONTENT_DIR: &str = "content"; @@ -39,15 +36,6 @@ pub const DEFAULT_TASK_DIR: &str = "tasks"; /// DEFAULT_PERSISTENT_CACHE_TASK_DIR is the default directory for store persistent cache task. pub const DEFAULT_PERSISTENT_CACHE_TASK_DIR: &str = "persistent-cache-tasks"; -/// Content is the content of a piece. -pub struct Content { - /// config is the configuration of the dfdaemon. - config: Arc, - - /// dir is the directory to store content. - dir: PathBuf, -} - /// WritePieceResponse is the response of writing a piece. pub struct WritePieceResponse { /// length is the length of the piece. @@ -66,591 +54,9 @@ pub struct WritePersistentCacheTaskResponse { pub hash: String, } -/// Content implements the content storage. -impl Content { - /// new returns a new content. - pub async fn new(config: Arc, dir: &Path) -> Result { - let dir = dir.join(DEFAULT_CONTENT_DIR); - - // If the storage is not kept, remove the directory. - if !config.storage.keep { - fs::remove_dir_all(&dir).await.unwrap_or_else(|err| { - warn!("remove {:?} failed: {}", dir, err); - }); - } - - fs::create_dir_all(&dir.join(DEFAULT_TASK_DIR)).await?; - fs::create_dir_all(&dir.join(DEFAULT_PERSISTENT_CACHE_TASK_DIR)).await?; - info!("content initialized directory: {:?}", dir); - Ok(Content { config, dir }) - } - - /// available_space returns the available space of the disk. - pub fn available_space(&self) -> Result { - let dist_threshold = self.config.gc.policy.dist_threshold; - if dist_threshold != ByteSize::default() { - let usage_space = WalkDir::new(&self.dir) - .into_iter() - .filter_map(|entry| entry.ok()) - .filter_map(|entry| entry.metadata().ok()) - .filter(|metadata| metadata.is_file()) - .fold(0, |acc, m| acc + m.len()); - - if usage_space >= dist_threshold.as_u64() { - warn!( - "usage space {} is greater than dist threshold {}, no need to calculate available space", - usage_space, dist_threshold - ); - - return Ok(0); - } - - return Ok(dist_threshold.as_u64() - usage_space); - } - - let stat = fs2::statvfs(&self.dir)?; - Ok(stat.available_space()) - } - - /// total_space returns the total space of the disk. - pub fn total_space(&self) -> Result { - // If the dist_threshold is set, return it directly. - let dist_threshold = self.config.gc.policy.dist_threshold; - if dist_threshold != ByteSize::default() { - return Ok(dist_threshold.as_u64()); - } - - let stat = fs2::statvfs(&self.dir)?; - Ok(stat.total_space()) - } - - /// has_enough_space checks if the storage has enough space to store the content. - pub fn has_enough_space(&self, content_length: u64) -> Result { - let available_space = self.available_space()?; - if available_space < content_length { - warn!( - "not enough space to store the persistent cache task: available_space={}, content_length={}", - available_space, content_length - ); - - return Ok(false); - } - - Ok(true) - } - - /// is_same_dev_inode checks if the source and target are the same device and inode. - async fn is_same_dev_inode, Q: AsRef>( - &self, - source: P, - target: Q, - ) -> Result { - let source_metadata = fs::metadata(source).await?; - let target_metadata = fs::metadata(target).await?; - - #[cfg(unix)] - { - use std::os::unix::fs::MetadataExt; - Ok(source_metadata.dev() == target_metadata.dev() - && source_metadata.ino() == target_metadata.ino()) - } - - #[cfg(not(unix))] - { - Err(Error::IO(io::Error::new( - io::ErrorKind::Unsupported, - "platform not supported", - ))) - } - } - - /// is_same_dev_inode_as_task checks if the task and target are the same device and inode. - pub async fn is_same_dev_inode_as_task(&self, task_id: &str, to: &Path) -> Result { - let task_path = self.get_task_path(task_id); - self.is_same_dev_inode(&task_path, to).await - } - - /// create_task creates a new task content. - /// - /// Behavior of `create_task`: - /// 1. If the task already exists, return the task path. - /// 2. If the task does not exist, create the task directory and file. - #[instrument(skip_all)] - pub async fn create_task(&self, task_id: &str, length: u64) -> Result { - let task_path = self.get_task_path(task_id); - if task_path.exists() { - return Ok(task_path); - } - - let task_dir = self.dir.join(DEFAULT_TASK_DIR).join(&task_id[..3]); - fs::create_dir_all(&task_dir).await.inspect_err(|err| { - error!("create {:?} failed: {}", task_dir, err); - })?; - - let f = fs::File::create(task_dir.join(task_id)) - .await - .inspect_err(|err| { - error!("create {:?} failed: {}", task_dir, err); - })?; - - fallocate(&f, length).await.inspect_err(|err| { - error!("fallocate {:?} failed: {}", task_dir, err); - })?; - - Ok(task_dir.join(task_id)) - } - - /// Hard links the task content to the destination. - /// - /// Behavior of `hard_link_task`: - /// 1. If the destination exists: - /// 1.1. If the source and destination share the same device and inode, return immediately. - /// 1.2. Otherwise, return an error. - /// 2. If the destination does not exist: - /// 2.1. If the hard link succeeds, return immediately. - /// 2.2. If the hard link fails, copy the task content to the destination once the task is finished, then return immediately. - #[instrument(skip_all)] - pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> { - let task_path = self.get_task_path(task_id); - if let Err(err) = fs::hard_link(task_path.clone(), to).await { - if err.kind() == std::io::ErrorKind::AlreadyExists { - if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { - info!("hard already exists, no need to operate"); - return Ok(()); - } - } - - warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); - return Err(Error::IO(err)); - } - - info!("hard link {:?} to {:?} success", task_path, to); - Ok(()) - } - - /// copy_task copies the task content to the destination. - #[instrument(skip_all)] - pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { - fs::copy(self.get_task_path(task_id), to).await?; - info!("copy to {:?} success", to); - Ok(()) - } - - /// copy_task_by_range copies the task content to the destination by range. - #[instrument(skip_all)] - async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { - // Ensure the parent directory of the destination exists. - if let Some(parent) = to.parent() { - if !parent.exists() { - fs::create_dir_all(parent).await.inspect_err(|err| { - error!("failed to create directory {:?}: {}", parent, err); - })?; - } - } - - let mut from_f = File::open(self.get_task_path(task_id)).await?; - from_f.seek(SeekFrom::Start(range.start)).await?; - let range_reader = from_f.take(range.length); - - // Use a buffer to read the range. - let mut range_reader = - BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader); - - let mut to_f = OpenOptions::new() - .create(true) - .truncate(false) - .write(true) - .open(to.as_os_str()) - .await?; - - io::copy(&mut range_reader, &mut to_f).await?; - Ok(()) - } - - /// delete_task deletes the task content. - pub async fn delete_task(&self, task_id: &str) -> Result<()> { - info!("delete task content: {}", task_id); - let task_path = self.get_task_path(task_id); - fs::remove_file(task_path.as_path()) - .await - .inspect_err(|err| { - error!("remove {:?} failed: {}", task_path, err); - })?; - Ok(()) - } - - /// read_piece reads the piece from the content. - #[instrument(skip_all)] - pub async fn read_piece( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result { - let task_path = self.get_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - Ok(f_reader.take(target_length)) - } - - /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the piece. It is used for cache the piece content to the proxy cache. - #[instrument(skip_all)] - pub async fn read_piece_with_dual_read( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { - let task_path = self.get_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_range_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let range_reader = f_range_reader.take(target_length); - - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let reader = f_reader.take(length); - - Ok((range_reader, reader)) - } - - /// write_piece writes the piece to the content and calculates the hash of the piece by crc32. - #[instrument(skip_all)] - pub async fn write_piece( - &self, - task_id: &str, - offset: u64, - expected_length: u64, - reader: &mut R, - ) -> Result { - // Open the file and seek to the offset. - let task_path = self.get_task_path(task_id); - let mut f = OpenOptions::new() - .truncate(false) - .write(true) - .open(task_path.as_path()) - .await - .inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - - f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); - let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); - - // Copy the piece to the file while updating the CRC32 value. - let mut hasher = crc32fast::Hasher::new(); - let mut tee = InspectReader::new(reader, |bytes| { - hasher.update(bytes); - }); - - let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { - error!("copy {:?} failed: {}", task_path, err); - })?; - - writer.flush().await.inspect_err(|err| { - error!("flush {:?} failed: {}", task_path, err); - })?; - - if length != expected_length { - return Err(Error::Unknown(format!( - "expected length {} but got {}", - expected_length, length - ))); - } - - // Calculate the hash of the piece. - Ok(WritePieceResponse { - length, - hash: hasher.finalize().to_string(), - }) - } - - /// get_task_path returns the task path by task id. - fn get_task_path(&self, task_id: &str) -> PathBuf { - // The task needs split by the first 3 characters of task id(sha256) to - // avoid too many files in one directory. - let sub_dir = &task_id[..3]; - self.dir.join(DEFAULT_TASK_DIR).join(sub_dir).join(task_id) - } - - /// is_same_dev_inode_as_persistent_cache_task checks if the persistent cache task and target - /// are the same device and inode. - pub async fn is_same_dev_inode_as_persistent_cache_task( - &self, - task_id: &str, - to: &Path, - ) -> Result { - let task_path = self.get_persistent_cache_task_path(task_id); - self.is_same_dev_inode(&task_path, to).await - } - - /// create_persistent_cache_task creates a new persistent cache task content. - /// - /// Behavior of `create_persistent_cache_task`: - /// 1. If the persistent cache task already exists, return the persistent cache task path. - /// 2. If the persistent cache task does not exist, create the persistent cache task directory and file. - #[instrument(skip_all)] - pub async fn create_persistent_cache_task( - &self, - task_id: &str, - length: u64, - ) -> Result { - let task_path = self.get_persistent_cache_task_path(task_id); - if task_path.exists() { - return Ok(task_path); - } - - let task_dir = self - .dir - .join(DEFAULT_PERSISTENT_CACHE_TASK_DIR) - .join(&task_id[..3]); - fs::create_dir_all(&task_dir).await.inspect_err(|err| { - error!("create {:?} failed: {}", task_dir, err); - })?; - - let f = fs::File::create(task_dir.join(task_id)) - .await - .inspect_err(|err| { - error!("create {:?} failed: {}", task_dir, err); - })?; - - fallocate(&f, length).await.inspect_err(|err| { - error!("fallocate {:?} failed: {}", task_dir, err); - })?; - - Ok(task_dir.join(task_id)) - } - - /// Hard links the persistent cache task content to the destination. - /// - /// Behavior of `hard_link_persistent_cache_task`: - /// 1. If the destination exists: - /// 1.1. If the source and destination share the same device and inode, return immediately. - /// 1.2. Otherwise, return an error. - /// 2. If the destination does not exist: - /// 2.1. If the hard link succeeds, return immediately. - /// 2.2. If the hard link fails, copy the persistent cache task content to the destination once the task is finished, then return immediately. - #[instrument(skip_all)] - pub async fn hard_link_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { - let task_path = self.get_persistent_cache_task_path(task_id); - if let Err(err) = fs::hard_link(task_path.clone(), to).await { - if err.kind() == std::io::ErrorKind::AlreadyExists { - if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { - info!("hard already exists, no need to operate"); - return Ok(()); - } - } - - warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); - return Err(Error::IO(err)); - } - - info!("hard link {:?} to {:?} success", task_path, to); - Ok(()) - } - - /// copy_persistent_cache_task copies the persistent cache task content to the destination. - #[instrument(skip_all)] - pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { - fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; - info!("copy to {:?} success", to); - Ok(()) - } - - /// read_persistent_cache_piece reads the persistent cache piece from the content. - #[instrument(skip_all)] - pub async fn read_persistent_cache_piece( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result { - let task_path = self.get_persistent_cache_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - Ok(f_reader.take(target_length)) - } - - /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. - #[instrument(skip_all)] - pub async fn read_persistent_cache_piece_with_dual_read( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { - let task_path = self.get_persistent_cache_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_range_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let range_reader = f_range_reader.take(target_length); - - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let reader = f_reader.take(length); - - Ok((range_reader, reader)) - } - - /// write_persistent_cache_piece writes the persistent cache piece to the content and - /// calculates the hash of the piece by crc32. - #[instrument(skip_all)] - pub async fn write_persistent_cache_piece( - &self, - task_id: &str, - offset: u64, - expected_length: u64, - reader: &mut R, - ) -> Result { - // Open the file and seek to the offset. - let task_path = self.get_persistent_cache_task_path(task_id); - let mut f = OpenOptions::new() - .truncate(false) - .write(true) - .open(task_path.as_path()) - .await - .inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - - f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); - let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); - - // Copy the piece to the file while updating the CRC32 value. - let mut hasher = crc32fast::Hasher::new(); - let mut tee = InspectReader::new(reader, |bytes| { - hasher.update(bytes); - }); - - let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { - error!("copy {:?} failed: {}", task_path, err); - })?; - - writer.flush().await.inspect_err(|err| { - error!("flush {:?} failed: {}", task_path, err); - })?; - - if length != expected_length { - return Err(Error::Unknown(format!( - "expected length {} but got {}", - expected_length, length - ))); - } - - // Calculate the hash of the piece. - Ok(WritePieceResponse { - length, - hash: hasher.finalize().to_string(), - }) - } - - /// delete_task deletes the persistent cache task content. - pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { - info!("delete persistent cache task content: {}", task_id); - let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id); - fs::remove_file(persistent_cache_task_path.as_path()) - .await - .inspect_err(|err| { - error!("remove {:?} failed: {}", persistent_cache_task_path, err); - })?; - Ok(()) - } - - /// get_persistent_cache_task_path returns the persistent cache task path by task id. - fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf { - // The persistent cache task needs split by the first 3 characters of task id(sha256) to - // avoid too many files in one directory. - self.dir - .join(DEFAULT_PERSISTENT_CACHE_TASK_DIR) - .join(&task_id[..3]) - .join(task_id) - } +/// new_content creates a new Content instance to support linux and macos. +pub async fn new_content(config: Arc, dir: &Path) -> Result { + Content::new(config, dir).await } /// calculate_piece_range calculates the target offset and length based on the piece range and @@ -669,316 +75,6 @@ pub fn calculate_piece_range(offset: u64, length: u64, range: Option) -> #[cfg(test)] mod tests { use super::*; - use std::io::Cursor; - use tempfile::tempdir; - - #[tokio::test] - async fn test_create_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3"; - let task_path = content.create_task(task_id, 0).await.unwrap(); - assert!(task_path.exists()); - assert_eq!(task_path, temp_dir.path().join("content/tasks/604/60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3")); - - let task_path_exists = content.create_task(task_id, 0).await.unwrap(); - assert_eq!(task_path, task_path_exists); - } - - #[tokio::test] - async fn test_hard_link_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"; - content.create_task(task_id, 0).await.unwrap(); - - let to = temp_dir - .path() - .join("c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"); - content.hard_link_task(task_id, &to).await.unwrap(); - assert!(to.exists()); - - content.hard_link_task(task_id, &to).await.unwrap(); - } - - #[tokio::test] - async fn test_copy_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"; - content.create_task(task_id, 64).await.unwrap(); - - let to = temp_dir - .path() - .join("bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"); - content.copy_task(task_id, &to).await.unwrap(); - assert!(to.exists()); - } - - #[tokio::test] - async fn test_delete_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "4e19f03b0fceb38f23ff4f657681472a53ef335db3660ae5494912570b7a2bb7"; - let task_path = content.create_task(task_id, 0).await.unwrap(); - assert!(task_path.exists()); - - content.delete_task(task_id).await.unwrap(); - assert!(!task_path.exists()); - } - - #[tokio::test] - async fn test_read_piece() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "c794a3bbae81e06d1c8d362509bdd42a7c105b0fb28d80ffe27f94b8f04fc845"; - content.create_task(task_id, 13).await.unwrap(); - - let data = b"hello, world!"; - let mut reader = Cursor::new(data); - content - .write_piece(task_id, 0, 13, &mut reader) - .await - .unwrap(); - - let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await.unwrap(); - assert_eq!(buffer, data); - - let mut reader = content - .read_piece( - task_id, - 0, - 13, - Some(Range { - start: 0, - length: 5, - }), - ) - .await - .unwrap(); - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await.unwrap(); - assert_eq!(buffer, b"hello"); - } - - #[tokio::test] - async fn test_write_piece() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "60b48845606946cea72084f14ed5cce61ec96e69f80a30f891a6963dccfd5b4f"; - content.create_task(task_id, 4).await.unwrap(); - - let data = b"test"; - let mut reader = Cursor::new(data); - let response = content - .write_piece(task_id, 0, 4, &mut reader) - .await - .unwrap(); - assert_eq!(response.length, 4); - assert!(!response.hash.is_empty()); - } - - #[tokio::test] - async fn test_create_persistent_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745"; - let task_path = content - .create_persistent_cache_task(task_id, 0) - .await - .unwrap(); - assert!(task_path.exists()); - assert_eq!(task_path, temp_dir.path().join("content/persistent-cache-tasks/c4f/c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745")); - - let task_path_exists = content - .create_persistent_cache_task(task_id, 0) - .await - .unwrap(); - assert_eq!(task_path, task_path_exists); - } - - #[tokio::test] - async fn test_hard_link_persistent_cache_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"; - content - .create_persistent_cache_task(task_id, 0) - .await - .unwrap(); - - let to = temp_dir - .path() - .join("5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"); - content - .hard_link_persistent_cache_task(task_id, &to) - .await - .unwrap(); - assert!(to.exists()); - - content - .hard_link_persistent_cache_task(task_id, &to) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_copy_persistent_cache_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"; - content - .create_persistent_cache_task(task_id, 64) - .await - .unwrap(); - - let to = temp_dir - .path() - .join("194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"); - content - .copy_persistent_cache_task(task_id, &to) - .await - .unwrap(); - assert!(to.exists()); - } - - #[tokio::test] - async fn test_delete_persistent_cache_task() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "17430ba545c3ce82790e9c9f77e64dca44bb6d6a0c9e18be175037c16c73713d"; - let task_path = content - .create_persistent_cache_task(task_id, 0) - .await - .unwrap(); - assert!(task_path.exists()); - - content.delete_persistent_cache_task(task_id).await.unwrap(); - assert!(!task_path.exists()); - } - - #[tokio::test] - async fn test_read_persistent_cache_piece() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "9cb27a4af09aee4eb9f904170217659683f4a0ea7cd55e1a9fbcb99ddced659a"; - content - .create_persistent_cache_task(task_id, 13) - .await - .unwrap(); - - let data = b"hello, world!"; - let mut reader = Cursor::new(data); - content - .write_persistent_cache_piece(task_id, 0, 13, &mut reader) - .await - .unwrap(); - - let mut reader = content - .read_persistent_cache_piece(task_id, 0, 13, None) - .await - .unwrap(); - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await.unwrap(); - assert_eq!(buffer, data); - - let mut reader = content - .read_persistent_cache_piece( - task_id, - 0, - 13, - Some(Range { - start: 0, - length: 5, - }), - ) - .await - .unwrap(); - let mut buffer = Vec::new(); - reader.read_to_end(&mut buffer).await.unwrap(); - assert_eq!(buffer, b"hello"); - } - - #[tokio::test] - async fn test_write_persistent_cache_piece() { - let temp_dir = tempdir().unwrap(); - let config = Arc::new(Config::default()); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let task_id = "ca1afaf856e8a667fbd48093ca3ca1b8eeb4bf735912fbe551676bc5817a720a"; - content - .create_persistent_cache_task(task_id, 4) - .await - .unwrap(); - - let data = b"test"; - let mut reader = Cursor::new(data); - let response = content - .write_persistent_cache_piece(task_id, 0, 4, &mut reader) - .await - .unwrap(); - assert_eq!(response.length, 4); - assert!(!response.hash.is_empty()); - } - - #[tokio::test] - async fn test_has_enough_space() { - let config = Arc::new(Config::default()); - let temp_dir = tempdir().unwrap(); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let has_space = content.has_enough_space(1).unwrap(); - assert!(has_space); - - let has_space = content.has_enough_space(u64::MAX).unwrap(); - assert!(!has_space); - - let mut config = Config::default(); - config.gc.policy.dist_threshold = ByteSize::mib(10); - let config = Arc::new(config); - let content = Content::new(config, temp_dir.path()).await.unwrap(); - - let file_path = Path::new(temp_dir.path()) - .join(DEFAULT_CONTENT_DIR) - .join(DEFAULT_TASK_DIR) - .join("1mib"); - let mut file = File::create(&file_path).await.unwrap(); - let buffer = vec![0u8; ByteSize::mib(1).as_u64() as usize]; - file.write_all(&buffer).await.unwrap(); - file.flush().await.unwrap(); - - let has_space = content - .has_enough_space(ByteSize::mib(9).as_u64() + 1) - .unwrap(); - assert!(!has_space); - - let has_space = content.has_enough_space(ByteSize::mib(9).as_u64()).unwrap(); - assert!(has_space); - } #[tokio::test] async fn test_calculate_piece_range() { diff --git a/dragonfly-client-storage/src/content_linux.rs b/dragonfly-client-storage/src/content_linux.rs new file mode 100644 index 00000000..5ce27ac0 --- /dev/null +++ b/dragonfly-client-storage/src/content_linux.rs @@ -0,0 +1,952 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytesize::ByteSize; +use dragonfly_api::common::v2::Range; +use dragonfly_client_config::dfdaemon::Config; +use dragonfly_client_core::{Error, Result}; +use dragonfly_client_util::fs::fallocate; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::fs::{self, File, OpenOptions}; +use tokio::io::{ + self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom, +}; +use tokio_util::io::InspectReader; +use tracing::{error, info, instrument, warn}; +use walkdir::WalkDir; + +/// Content is the content of a piece. +pub struct Content { + /// config is the configuration of the dfdaemon. + pub config: Arc, + + /// dir is the directory to store content. + pub dir: PathBuf, +} + +/// Content implements the content storage. +impl Content { + /// new returns a new content. + pub async fn new(config: Arc, dir: &Path) -> Result { + let dir = dir.join(super::content::DEFAULT_CONTENT_DIR); + + // If the storage is not kept, remove the directory. + if !config.storage.keep { + fs::remove_dir_all(&dir).await.unwrap_or_else(|err| { + warn!("remove {:?} failed: {}", dir, err); + }); + } + + fs::create_dir_all(&dir.join(super::content::DEFAULT_TASK_DIR)).await?; + fs::create_dir_all(&dir.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)).await?; + info!("content initialized directory: {:?}", dir); + Ok(Content { config, dir }) + } + + /// available_space returns the available space of the disk. + pub fn available_space(&self) -> Result { + let dist_threshold = self.config.gc.policy.dist_threshold; + if dist_threshold != ByteSize::default() { + let usage_space = WalkDir::new(&self.dir) + .into_iter() + .filter_map(|entry| entry.ok()) + .filter_map(|entry| entry.metadata().ok()) + .filter(|metadata| metadata.is_file()) + .fold(0, |acc, m| acc + m.len()); + + if usage_space >= dist_threshold.as_u64() { + warn!( + "usage space {} is greater than dist threshold {}, no need to calculate available space", + usage_space, dist_threshold + ); + + return Ok(0); + } + + return Ok(dist_threshold.as_u64() - usage_space); + } + + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.available_space()) + } + + /// total_space returns the total space of the disk. + pub fn total_space(&self) -> Result { + // If the dist_threshold is set, return it directly. + let dist_threshold = self.config.gc.policy.dist_threshold; + if dist_threshold != ByteSize::default() { + return Ok(dist_threshold.as_u64()); + } + + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.total_space()) + } + + /// has_enough_space checks if the storage has enough space to store the content. + pub fn has_enough_space(&self, content_length: u64) -> Result { + let available_space = self.available_space()?; + if available_space < content_length { + warn!( + "not enough space to store the persistent cache task: available_space={}, content_length={}", + available_space, content_length + ); + + return Ok(false); + } + + Ok(true) + } + + /// is_same_dev_inode checks if the source and target are the same device and inode. + async fn is_same_dev_inode, Q: AsRef>( + &self, + source: P, + target: Q, + ) -> Result { + let source_metadata = fs::metadata(source).await?; + let target_metadata = fs::metadata(target).await?; + + #[cfg(unix)] + { + use std::os::unix::fs::MetadataExt; + Ok(source_metadata.dev() == target_metadata.dev() + && source_metadata.ino() == target_metadata.ino()) + } + + #[cfg(not(unix))] + { + Err(Error::IO(io::Error::new( + io::ErrorKind::Unsupported, + "platform not supported", + ))) + } + } + + /// is_same_dev_inode_as_task checks if the task and target are the same device and inode. + pub async fn is_same_dev_inode_as_task(&self, task_id: &str, to: &Path) -> Result { + let task_path = self.get_task_path(task_id); + self.is_same_dev_inode(&task_path, to).await + } + + /// create_task creates a new task content. + /// + /// Behavior of `create_task`: + /// 1. If the task already exists, return the task path. + /// 2. If the task does not exist, create the task directory and file. + #[instrument(skip_all)] + pub async fn create_task(&self, task_id: &str, length: u64) -> Result { + let task_path = self.get_task_path(task_id); + if task_path.exists() { + return Ok(task_path); + } + + let task_dir = self + .dir + .join(super::content::DEFAULT_TASK_DIR) + .join(&task_id[..3]); + fs::create_dir_all(&task_dir).await.inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + let f = fs::File::create(task_dir.join(task_id)) + .await + .inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + fallocate(&f, length).await.inspect_err(|err| { + error!("fallocate {:?} failed: {}", task_dir, err); + })?; + + Ok(task_dir.join(task_id)) + } + + /// Hard links the task content to the destination. + /// + /// Behavior of `hard_link_task`: + /// 1. If the destination exists: + /// 1.1. If the source and destination share the same device and inode, return immediately. + /// 1.2. Otherwise, return an error. + /// 2. If the destination does not exist: + /// 2.1. If the hard link succeeds, return immediately. + /// 2.2. If the hard link fails, copy the task content to the destination once the task is finished, then return immediately. + #[instrument(skip_all)] + pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> { + let task_path = self.get_task_path(task_id); + if let Err(err) = fs::hard_link(task_path.clone(), to).await { + if err.kind() == std::io::ErrorKind::AlreadyExists { + if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { + info!("hard already exists, no need to operate"); + return Ok(()); + } + } + + warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); + return Err(Error::IO(err)); + } + + info!("hard link {:?} to {:?} success", task_path, to); + Ok(()) + } + + /// copy_task copies the task content to the destination. + #[instrument(skip_all)] + pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { + fs::copy(self.get_task_path(task_id), to).await?; + info!("copy to {:?} success", to); + Ok(()) + } + + /// copy_task_by_range copies the task content to the destination by range. + #[instrument(skip_all)] + async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { + // Ensure the parent directory of the destination exists. + if let Some(parent) = to.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + let mut from_f = File::open(self.get_task_path(task_id)).await?; + from_f.seek(SeekFrom::Start(range.start)).await?; + let range_reader = from_f.take(range.length); + + // Use a buffer to read the range. + let mut range_reader = + BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader); + + let mut to_f = OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(to.as_os_str()) + .await?; + + io::copy(&mut range_reader, &mut to_f).await?; + Ok(()) + } + + /// delete_task deletes the task content. + pub async fn delete_task(&self, task_id: &str) -> Result<()> { + info!("delete task content: {}", task_id); + let task_path = self.get_task_path(task_id); + fs::remove_file(task_path.as_path()) + .await + .inspect_err(|err| { + error!("remove {:?} failed: {}", task_path, err); + })?; + Ok(()) + } + + /// read_piece reads the piece from the content. + #[instrument(skip_all)] + pub async fn read_piece( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let reader = f_reader.take(length); + + Ok((range_reader, reader)) + } + + /// write_piece writes the piece to the content and calculates the hash of the piece by crc32. + #[instrument(skip_all)] + pub async fn write_piece( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + // Open the file and seek to the offset. + let task_path = self.get_task_path(task_id); + let mut f = OpenOptions::new() + .truncate(false) + .write(true) + .open(task_path.as_path()) + .await + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + + // Copy the piece to the file while updating the CRC32 value. + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + + /// get_task_path returns the task path by task id. + fn get_task_path(&self, task_id: &str) -> PathBuf { + // The task needs split by the first 3 characters of task id(sha256) to + // avoid too many files in one directory. + let sub_dir = &task_id[..3]; + self.dir + .join(super::content::DEFAULT_TASK_DIR) + .join(sub_dir) + .join(task_id) + } + + /// is_same_dev_inode_as_persistent_cache_task checks if the persistent cache task and target + /// are the same device and inode. + pub async fn is_same_dev_inode_as_persistent_cache_task( + &self, + task_id: &str, + to: &Path, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + self.is_same_dev_inode(&task_path, to).await + } + + /// create_persistent_cache_task creates a new persistent cache task content. + /// + /// Behavior of `create_persistent_cache_task`: + /// 1. If the persistent cache task already exists, return the persistent cache task path. + /// 2. If the persistent cache task does not exist, create the persistent cache task directory and file. + #[instrument(skip_all)] + pub async fn create_persistent_cache_task( + &self, + task_id: &str, + length: u64, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + if task_path.exists() { + return Ok(task_path); + } + + let task_dir = self + .dir + .join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR) + .join(&task_id[..3]); + fs::create_dir_all(&task_dir).await.inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + let f = fs::File::create(task_dir.join(task_id)) + .await + .inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + fallocate(&f, length).await.inspect_err(|err| { + error!("fallocate {:?} failed: {}", task_dir, err); + })?; + + Ok(task_dir.join(task_id)) + } + + /// Hard links the persistent cache task content to the destination. + /// + /// Behavior of `hard_link_persistent_cache_task`: + /// 1. If the destination exists: + /// 1.1. If the source and destination share the same device and inode, return immediately. + /// 1.2. Otherwise, return an error. + /// 2. If the destination does not exist: + /// 2.1. If the hard link succeeds, return immediately. + /// 2.2. If the hard link fails, copy the persistent cache task content to the destination once the task is finished, then return immediately. + #[instrument(skip_all)] + pub async fn hard_link_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { + let task_path = self.get_persistent_cache_task_path(task_id); + if let Err(err) = fs::hard_link(task_path.clone(), to).await { + if err.kind() == std::io::ErrorKind::AlreadyExists { + if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { + info!("hard already exists, no need to operate"); + return Ok(()); + } + } + + warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); + return Err(Error::IO(err)); + } + + info!("hard link {:?} to {:?} success", task_path, to); + Ok(()) + } + + /// copy_persistent_cache_task copies the persistent cache task content to the destination. + #[instrument(skip_all)] + pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { + fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; + info!("copy to {:?} success", to); + Ok(()) + } + + /// read_persistent_cache_piece reads the persistent cache piece from the content. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let reader = f_reader.take(length); + + Ok((range_reader, reader)) + } + + /// write_persistent_cache_piece writes the persistent cache piece to the content and + /// calculates the hash of the piece by crc32. + #[instrument(skip_all)] + pub async fn write_persistent_cache_piece( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + // Open the file and seek to the offset. + let task_path = self.get_persistent_cache_task_path(task_id); + let mut f = OpenOptions::new() + .truncate(false) + .write(true) + .open(task_path.as_path()) + .await + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + + // Copy the piece to the file while updating the CRC32 value. + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + + /// delete_task deletes the persistent cache task content. + pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { + info!("delete persistent cache task content: {}", task_id); + let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id); + fs::remove_file(persistent_cache_task_path.as_path()) + .await + .inspect_err(|err| { + error!("remove {:?} failed: {}", persistent_cache_task_path, err); + })?; + Ok(()) + } + + /// get_persistent_cache_task_path returns the persistent cache task path by task id. + fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf { + // The persistent cache task needs split by the first 3 characters of task id(sha256) to + // avoid too many files in one directory. + self.dir + .join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR) + .join(&task_id[..3]) + .join(task_id) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::content; + use std::io::Cursor; + use tempfile::tempdir; + + #[tokio::test] + async fn test_create_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3"; + let task_path = content.create_task(task_id, 0).await.unwrap(); + assert!(task_path.exists()); + assert_eq!(task_path, temp_dir.path().join("content/tasks/604/60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3")); + + let task_path_exists = content.create_task(task_id, 0).await.unwrap(); + assert_eq!(task_path, task_path_exists); + } + + #[tokio::test] + async fn test_hard_link_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"; + content.create_task(task_id, 0).await.unwrap(); + + let to = temp_dir + .path() + .join("c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"); + content.hard_link_task(task_id, &to).await.unwrap(); + assert!(to.exists()); + + content.hard_link_task(task_id, &to).await.unwrap(); + } + + #[tokio::test] + async fn test_copy_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"; + content.create_task(task_id, 64).await.unwrap(); + + let to = temp_dir + .path() + .join("bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"); + content.copy_task(task_id, &to).await.unwrap(); + assert!(to.exists()); + } + + #[tokio::test] + async fn test_delete_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "4e19f03b0fceb38f23ff4f657681472a53ef335db3660ae5494912570b7a2bb7"; + let task_path = content.create_task(task_id, 0).await.unwrap(); + assert!(task_path.exists()); + + content.delete_task(task_id).await.unwrap(); + assert!(!task_path.exists()); + } + + #[tokio::test] + async fn test_read_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c794a3bbae81e06d1c8d362509bdd42a7c105b0fb28d80ffe27f94b8f04fc845"; + content.create_task(task_id, 13).await.unwrap(); + + let data = b"hello, world!"; + let mut reader = Cursor::new(data); + content + .write_piece(task_id, 0, 13, &mut reader) + .await + .unwrap(); + + let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, data); + + let mut reader = content + .read_piece( + task_id, + 0, + 13, + Some(Range { + start: 0, + length: 5, + }), + ) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, b"hello"); + } + + #[tokio::test] + async fn test_write_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "60b48845606946cea72084f14ed5cce61ec96e69f80a30f891a6963dccfd5b4f"; + content.create_task(task_id, 4).await.unwrap(); + + let data = b"test"; + let mut reader = Cursor::new(data); + let response = content + .write_piece(task_id, 0, 4, &mut reader) + .await + .unwrap(); + assert_eq!(response.length, 4); + assert!(!response.hash.is_empty()); + } + + #[tokio::test] + async fn test_create_persistent_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745"; + let task_path = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert!(task_path.exists()); + assert_eq!(task_path, temp_dir.path().join("content/persistent-cache-tasks/c4f/c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745")); + + let task_path_exists = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert_eq!(task_path, task_path_exists); + } + + #[tokio::test] + async fn test_hard_link_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"; + content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + + let to = temp_dir + .path() + .join("5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"); + content + .hard_link_persistent_cache_task(task_id, &to) + .await + .unwrap(); + assert!(to.exists()); + + content + .hard_link_persistent_cache_task(task_id, &to) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_copy_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"; + content + .create_persistent_cache_task(task_id, 64) + .await + .unwrap(); + + let to = temp_dir + .path() + .join("194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"); + content + .copy_persistent_cache_task(task_id, &to) + .await + .unwrap(); + assert!(to.exists()); + } + + #[tokio::test] + async fn test_delete_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "17430ba545c3ce82790e9c9f77e64dca44bb6d6a0c9e18be175037c16c73713d"; + let task_path = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert!(task_path.exists()); + + content.delete_persistent_cache_task(task_id).await.unwrap(); + assert!(!task_path.exists()); + } + + #[tokio::test] + async fn test_read_persistent_cache_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "9cb27a4af09aee4eb9f904170217659683f4a0ea7cd55e1a9fbcb99ddced659a"; + content + .create_persistent_cache_task(task_id, 13) + .await + .unwrap(); + + let data = b"hello, world!"; + let mut reader = Cursor::new(data); + content + .write_persistent_cache_piece(task_id, 0, 13, &mut reader) + .await + .unwrap(); + + let mut reader = content + .read_persistent_cache_piece(task_id, 0, 13, None) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, data); + + let mut reader = content + .read_persistent_cache_piece( + task_id, + 0, + 13, + Some(Range { + start: 0, + length: 5, + }), + ) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, b"hello"); + } + + #[tokio::test] + async fn test_write_persistent_cache_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "ca1afaf856e8a667fbd48093ca3ca1b8eeb4bf735912fbe551676bc5817a720a"; + content + .create_persistent_cache_task(task_id, 4) + .await + .unwrap(); + + let data = b"test"; + let mut reader = Cursor::new(data); + let response = content + .write_persistent_cache_piece(task_id, 0, 4, &mut reader) + .await + .unwrap(); + assert_eq!(response.length, 4); + assert!(!response.hash.is_empty()); + } + + #[tokio::test] + async fn test_has_enough_space() { + let config = Arc::new(Config::default()); + let temp_dir = tempdir().unwrap(); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let has_space = content.has_enough_space(1).unwrap(); + assert!(has_space); + + let has_space = content.has_enough_space(u64::MAX).unwrap(); + assert!(!has_space); + + let mut config = Config::default(); + config.gc.policy.dist_threshold = ByteSize::mib(10); + let config = Arc::new(config); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let file_path = Path::new(temp_dir.path()) + .join(content::DEFAULT_CONTENT_DIR) + .join(content::DEFAULT_TASK_DIR) + .join("1mib"); + let mut file = File::create(&file_path).await.unwrap(); + let buffer = vec![0u8; ByteSize::mib(1).as_u64() as usize]; + file.write_all(&buffer).await.unwrap(); + file.flush().await.unwrap(); + + let has_space = content + .has_enough_space(ByteSize::mib(9).as_u64() + 1) + .unwrap(); + assert!(!has_space); + + let has_space = content.has_enough_space(ByteSize::mib(9).as_u64()).unwrap(); + assert!(has_space); + } +} diff --git a/dragonfly-client-storage/src/content_macos.rs b/dragonfly-client-storage/src/content_macos.rs new file mode 100644 index 00000000..5ce27ac0 --- /dev/null +++ b/dragonfly-client-storage/src/content_macos.rs @@ -0,0 +1,952 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytesize::ByteSize; +use dragonfly_api::common::v2::Range; +use dragonfly_client_config::dfdaemon::Config; +use dragonfly_client_core::{Error, Result}; +use dragonfly_client_util::fs::fallocate; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::fs::{self, File, OpenOptions}; +use tokio::io::{ + self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom, +}; +use tokio_util::io::InspectReader; +use tracing::{error, info, instrument, warn}; +use walkdir::WalkDir; + +/// Content is the content of a piece. +pub struct Content { + /// config is the configuration of the dfdaemon. + pub config: Arc, + + /// dir is the directory to store content. + pub dir: PathBuf, +} + +/// Content implements the content storage. +impl Content { + /// new returns a new content. + pub async fn new(config: Arc, dir: &Path) -> Result { + let dir = dir.join(super::content::DEFAULT_CONTENT_DIR); + + // If the storage is not kept, remove the directory. + if !config.storage.keep { + fs::remove_dir_all(&dir).await.unwrap_or_else(|err| { + warn!("remove {:?} failed: {}", dir, err); + }); + } + + fs::create_dir_all(&dir.join(super::content::DEFAULT_TASK_DIR)).await?; + fs::create_dir_all(&dir.join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR)).await?; + info!("content initialized directory: {:?}", dir); + Ok(Content { config, dir }) + } + + /// available_space returns the available space of the disk. + pub fn available_space(&self) -> Result { + let dist_threshold = self.config.gc.policy.dist_threshold; + if dist_threshold != ByteSize::default() { + let usage_space = WalkDir::new(&self.dir) + .into_iter() + .filter_map(|entry| entry.ok()) + .filter_map(|entry| entry.metadata().ok()) + .filter(|metadata| metadata.is_file()) + .fold(0, |acc, m| acc + m.len()); + + if usage_space >= dist_threshold.as_u64() { + warn!( + "usage space {} is greater than dist threshold {}, no need to calculate available space", + usage_space, dist_threshold + ); + + return Ok(0); + } + + return Ok(dist_threshold.as_u64() - usage_space); + } + + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.available_space()) + } + + /// total_space returns the total space of the disk. + pub fn total_space(&self) -> Result { + // If the dist_threshold is set, return it directly. + let dist_threshold = self.config.gc.policy.dist_threshold; + if dist_threshold != ByteSize::default() { + return Ok(dist_threshold.as_u64()); + } + + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.total_space()) + } + + /// has_enough_space checks if the storage has enough space to store the content. + pub fn has_enough_space(&self, content_length: u64) -> Result { + let available_space = self.available_space()?; + if available_space < content_length { + warn!( + "not enough space to store the persistent cache task: available_space={}, content_length={}", + available_space, content_length + ); + + return Ok(false); + } + + Ok(true) + } + + /// is_same_dev_inode checks if the source and target are the same device and inode. + async fn is_same_dev_inode, Q: AsRef>( + &self, + source: P, + target: Q, + ) -> Result { + let source_metadata = fs::metadata(source).await?; + let target_metadata = fs::metadata(target).await?; + + #[cfg(unix)] + { + use std::os::unix::fs::MetadataExt; + Ok(source_metadata.dev() == target_metadata.dev() + && source_metadata.ino() == target_metadata.ino()) + } + + #[cfg(not(unix))] + { + Err(Error::IO(io::Error::new( + io::ErrorKind::Unsupported, + "platform not supported", + ))) + } + } + + /// is_same_dev_inode_as_task checks if the task and target are the same device and inode. + pub async fn is_same_dev_inode_as_task(&self, task_id: &str, to: &Path) -> Result { + let task_path = self.get_task_path(task_id); + self.is_same_dev_inode(&task_path, to).await + } + + /// create_task creates a new task content. + /// + /// Behavior of `create_task`: + /// 1. If the task already exists, return the task path. + /// 2. If the task does not exist, create the task directory and file. + #[instrument(skip_all)] + pub async fn create_task(&self, task_id: &str, length: u64) -> Result { + let task_path = self.get_task_path(task_id); + if task_path.exists() { + return Ok(task_path); + } + + let task_dir = self + .dir + .join(super::content::DEFAULT_TASK_DIR) + .join(&task_id[..3]); + fs::create_dir_all(&task_dir).await.inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + let f = fs::File::create(task_dir.join(task_id)) + .await + .inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + fallocate(&f, length).await.inspect_err(|err| { + error!("fallocate {:?} failed: {}", task_dir, err); + })?; + + Ok(task_dir.join(task_id)) + } + + /// Hard links the task content to the destination. + /// + /// Behavior of `hard_link_task`: + /// 1. If the destination exists: + /// 1.1. If the source and destination share the same device and inode, return immediately. + /// 1.2. Otherwise, return an error. + /// 2. If the destination does not exist: + /// 2.1. If the hard link succeeds, return immediately. + /// 2.2. If the hard link fails, copy the task content to the destination once the task is finished, then return immediately. + #[instrument(skip_all)] + pub async fn hard_link_task(&self, task_id: &str, to: &Path) -> Result<()> { + let task_path = self.get_task_path(task_id); + if let Err(err) = fs::hard_link(task_path.clone(), to).await { + if err.kind() == std::io::ErrorKind::AlreadyExists { + if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { + info!("hard already exists, no need to operate"); + return Ok(()); + } + } + + warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); + return Err(Error::IO(err)); + } + + info!("hard link {:?} to {:?} success", task_path, to); + Ok(()) + } + + /// copy_task copies the task content to the destination. + #[instrument(skip_all)] + pub async fn copy_task(&self, task_id: &str, to: &Path) -> Result<()> { + fs::copy(self.get_task_path(task_id), to).await?; + info!("copy to {:?} success", to); + Ok(()) + } + + /// copy_task_by_range copies the task content to the destination by range. + #[instrument(skip_all)] + async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { + // Ensure the parent directory of the destination exists. + if let Some(parent) = to.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + let mut from_f = File::open(self.get_task_path(task_id)).await?; + from_f.seek(SeekFrom::Start(range.start)).await?; + let range_reader = from_f.take(range.length); + + // Use a buffer to read the range. + let mut range_reader = + BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader); + + let mut to_f = OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(to.as_os_str()) + .await?; + + io::copy(&mut range_reader, &mut to_f).await?; + Ok(()) + } + + /// delete_task deletes the task content. + pub async fn delete_task(&self, task_id: &str) -> Result<()> { + info!("delete task content: {}", task_id); + let task_path = self.get_task_path(task_id); + fs::remove_file(task_path.as_path()) + .await + .inspect_err(|err| { + error!("remove {:?} failed: {}", task_path, err); + })?; + Ok(()) + } + + /// read_piece reads the piece from the content. + #[instrument(skip_all)] + pub async fn read_piece( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let reader = f_reader.take(length); + + Ok((range_reader, reader)) + } + + /// write_piece writes the piece to the content and calculates the hash of the piece by crc32. + #[instrument(skip_all)] + pub async fn write_piece( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + // Open the file and seek to the offset. + let task_path = self.get_task_path(task_id); + let mut f = OpenOptions::new() + .truncate(false) + .write(true) + .open(task_path.as_path()) + .await + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + + // Copy the piece to the file while updating the CRC32 value. + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + + /// get_task_path returns the task path by task id. + fn get_task_path(&self, task_id: &str) -> PathBuf { + // The task needs split by the first 3 characters of task id(sha256) to + // avoid too many files in one directory. + let sub_dir = &task_id[..3]; + self.dir + .join(super::content::DEFAULT_TASK_DIR) + .join(sub_dir) + .join(task_id) + } + + /// is_same_dev_inode_as_persistent_cache_task checks if the persistent cache task and target + /// are the same device and inode. + pub async fn is_same_dev_inode_as_persistent_cache_task( + &self, + task_id: &str, + to: &Path, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + self.is_same_dev_inode(&task_path, to).await + } + + /// create_persistent_cache_task creates a new persistent cache task content. + /// + /// Behavior of `create_persistent_cache_task`: + /// 1. If the persistent cache task already exists, return the persistent cache task path. + /// 2. If the persistent cache task does not exist, create the persistent cache task directory and file. + #[instrument(skip_all)] + pub async fn create_persistent_cache_task( + &self, + task_id: &str, + length: u64, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + if task_path.exists() { + return Ok(task_path); + } + + let task_dir = self + .dir + .join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR) + .join(&task_id[..3]); + fs::create_dir_all(&task_dir).await.inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + let f = fs::File::create(task_dir.join(task_id)) + .await + .inspect_err(|err| { + error!("create {:?} failed: {}", task_dir, err); + })?; + + fallocate(&f, length).await.inspect_err(|err| { + error!("fallocate {:?} failed: {}", task_dir, err); + })?; + + Ok(task_dir.join(task_id)) + } + + /// Hard links the persistent cache task content to the destination. + /// + /// Behavior of `hard_link_persistent_cache_task`: + /// 1. If the destination exists: + /// 1.1. If the source and destination share the same device and inode, return immediately. + /// 1.2. Otherwise, return an error. + /// 2. If the destination does not exist: + /// 2.1. If the hard link succeeds, return immediately. + /// 2.2. If the hard link fails, copy the persistent cache task content to the destination once the task is finished, then return immediately. + #[instrument(skip_all)] + pub async fn hard_link_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { + let task_path = self.get_persistent_cache_task_path(task_id); + if let Err(err) = fs::hard_link(task_path.clone(), to).await { + if err.kind() == std::io::ErrorKind::AlreadyExists { + if let Ok(true) = self.is_same_dev_inode(&task_path, to).await { + info!("hard already exists, no need to operate"); + return Ok(()); + } + } + + warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); + return Err(Error::IO(err)); + } + + info!("hard link {:?} to {:?} success", task_path, to); + Ok(()) + } + + /// copy_persistent_cache_task copies the persistent cache task content to the destination. + #[instrument(skip_all)] + pub async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { + fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; + info!("copy to {:?} success", to); + Ok(()) + } + + /// read_persistent_cache_piece reads the persistent cache piece from the content. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let reader = f_reader.take(length); + + Ok((range_reader, reader)) + } + + /// write_persistent_cache_piece writes the persistent cache piece to the content and + /// calculates the hash of the piece by crc32. + #[instrument(skip_all)] + pub async fn write_persistent_cache_piece( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + // Open the file and seek to the offset. + let task_path = self.get_persistent_cache_task_path(task_id); + let mut f = OpenOptions::new() + .truncate(false) + .write(true) + .open(task_path.as_path()) + .await + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + + // Copy the piece to the file while updating the CRC32 value. + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + + /// delete_task deletes the persistent cache task content. + pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { + info!("delete persistent cache task content: {}", task_id); + let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id); + fs::remove_file(persistent_cache_task_path.as_path()) + .await + .inspect_err(|err| { + error!("remove {:?} failed: {}", persistent_cache_task_path, err); + })?; + Ok(()) + } + + /// get_persistent_cache_task_path returns the persistent cache task path by task id. + fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf { + // The persistent cache task needs split by the first 3 characters of task id(sha256) to + // avoid too many files in one directory. + self.dir + .join(super::content::DEFAULT_PERSISTENT_CACHE_TASK_DIR) + .join(&task_id[..3]) + .join(task_id) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::content; + use std::io::Cursor; + use tempfile::tempdir; + + #[tokio::test] + async fn test_create_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3"; + let task_path = content.create_task(task_id, 0).await.unwrap(); + assert!(task_path.exists()); + assert_eq!(task_path, temp_dir.path().join("content/tasks/604/60409bd0ec44160f44c53c39b3fe1c5fdfb23faded0228c68bee83bc15a200e3")); + + let task_path_exists = content.create_task(task_id, 0).await.unwrap(); + assert_eq!(task_path, task_path_exists); + } + + #[tokio::test] + async fn test_hard_link_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"; + content.create_task(task_id, 0).await.unwrap(); + + let to = temp_dir + .path() + .join("c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4"); + content.hard_link_task(task_id, &to).await.unwrap(); + assert!(to.exists()); + + content.hard_link_task(task_id, &to).await.unwrap(); + } + + #[tokio::test] + async fn test_copy_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"; + content.create_task(task_id, 64).await.unwrap(); + + let to = temp_dir + .path() + .join("bfd3c02fb31a7373e25b405fd5fd3082987ccfbaf210889153af9e65bbf13002"); + content.copy_task(task_id, &to).await.unwrap(); + assert!(to.exists()); + } + + #[tokio::test] + async fn test_delete_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "4e19f03b0fceb38f23ff4f657681472a53ef335db3660ae5494912570b7a2bb7"; + let task_path = content.create_task(task_id, 0).await.unwrap(); + assert!(task_path.exists()); + + content.delete_task(task_id).await.unwrap(); + assert!(!task_path.exists()); + } + + #[tokio::test] + async fn test_read_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c794a3bbae81e06d1c8d362509bdd42a7c105b0fb28d80ffe27f94b8f04fc845"; + content.create_task(task_id, 13).await.unwrap(); + + let data = b"hello, world!"; + let mut reader = Cursor::new(data); + content + .write_piece(task_id, 0, 13, &mut reader) + .await + .unwrap(); + + let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, data); + + let mut reader = content + .read_piece( + task_id, + 0, + 13, + Some(Range { + start: 0, + length: 5, + }), + ) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, b"hello"); + } + + #[tokio::test] + async fn test_write_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "60b48845606946cea72084f14ed5cce61ec96e69f80a30f891a6963dccfd5b4f"; + content.create_task(task_id, 4).await.unwrap(); + + let data = b"test"; + let mut reader = Cursor::new(data); + let response = content + .write_piece(task_id, 0, 4, &mut reader) + .await + .unwrap(); + assert_eq!(response.length, 4); + assert!(!response.hash.is_empty()); + } + + #[tokio::test] + async fn test_create_persistent_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745"; + let task_path = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert!(task_path.exists()); + assert_eq!(task_path, temp_dir.path().join("content/persistent-cache-tasks/c4f/c4f108ab1d2b8cfdffe89ea9676af35123fa02e3c25167d62538f630d5d44745")); + + let task_path_exists = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert_eq!(task_path, task_path_exists); + } + + #[tokio::test] + async fn test_hard_link_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"; + content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + + let to = temp_dir + .path() + .join("5e81970eb2b048910cc84cab026b951f2ceac0a09c72c0717193bb6e466e11cd"); + content + .hard_link_persistent_cache_task(task_id, &to) + .await + .unwrap(); + assert!(to.exists()); + + content + .hard_link_persistent_cache_task(task_id, &to) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_copy_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"; + content + .create_persistent_cache_task(task_id, 64) + .await + .unwrap(); + + let to = temp_dir + .path() + .join("194b9c2018429689fb4e596a506c7e9db564c187b9709b55b33b96881dfb6dd5"); + content + .copy_persistent_cache_task(task_id, &to) + .await + .unwrap(); + assert!(to.exists()); + } + + #[tokio::test] + async fn test_delete_persistent_cache_task() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "17430ba545c3ce82790e9c9f77e64dca44bb6d6a0c9e18be175037c16c73713d"; + let task_path = content + .create_persistent_cache_task(task_id, 0) + .await + .unwrap(); + assert!(task_path.exists()); + + content.delete_persistent_cache_task(task_id).await.unwrap(); + assert!(!task_path.exists()); + } + + #[tokio::test] + async fn test_read_persistent_cache_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "9cb27a4af09aee4eb9f904170217659683f4a0ea7cd55e1a9fbcb99ddced659a"; + content + .create_persistent_cache_task(task_id, 13) + .await + .unwrap(); + + let data = b"hello, world!"; + let mut reader = Cursor::new(data); + content + .write_persistent_cache_piece(task_id, 0, 13, &mut reader) + .await + .unwrap(); + + let mut reader = content + .read_persistent_cache_piece(task_id, 0, 13, None) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, data); + + let mut reader = content + .read_persistent_cache_piece( + task_id, + 0, + 13, + Some(Range { + start: 0, + length: 5, + }), + ) + .await + .unwrap(); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await.unwrap(); + assert_eq!(buffer, b"hello"); + } + + #[tokio::test] + async fn test_write_persistent_cache_piece() { + let temp_dir = tempdir().unwrap(); + let config = Arc::new(Config::default()); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let task_id = "ca1afaf856e8a667fbd48093ca3ca1b8eeb4bf735912fbe551676bc5817a720a"; + content + .create_persistent_cache_task(task_id, 4) + .await + .unwrap(); + + let data = b"test"; + let mut reader = Cursor::new(data); + let response = content + .write_persistent_cache_piece(task_id, 0, 4, &mut reader) + .await + .unwrap(); + assert_eq!(response.length, 4); + assert!(!response.hash.is_empty()); + } + + #[tokio::test] + async fn test_has_enough_space() { + let config = Arc::new(Config::default()); + let temp_dir = tempdir().unwrap(); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let has_space = content.has_enough_space(1).unwrap(); + assert!(has_space); + + let has_space = content.has_enough_space(u64::MAX).unwrap(); + assert!(!has_space); + + let mut config = Config::default(); + config.gc.policy.dist_threshold = ByteSize::mib(10); + let config = Arc::new(config); + let content = Content::new(config, temp_dir.path()).await.unwrap(); + + let file_path = Path::new(temp_dir.path()) + .join(content::DEFAULT_CONTENT_DIR) + .join(content::DEFAULT_TASK_DIR) + .join("1mib"); + let mut file = File::create(&file_path).await.unwrap(); + let buffer = vec![0u8; ByteSize::mib(1).as_u64() as usize]; + file.write_all(&buffer).await.unwrap(); + file.flush().await.unwrap(); + + let has_space = content + .has_enough_space(ByteSize::mib(9).as_u64() + 1) + .unwrap(); + assert!(!has_space); + + let has_space = content.has_enough_space(ByteSize::mib(9).as_u64()).unwrap(); + assert!(has_space); + } +} diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 593f8e4f..9dffff8f 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -32,6 +32,12 @@ use tokio_util::either::Either; use tokio_util::io::InspectReader; use tracing::{debug, error, info, instrument, warn}; +#[cfg(target_os = "linux")] +mod content_linux; + +#[cfg(target_os = "macos")] +mod content_macos; + pub mod cache; pub mod client; pub mod content; @@ -62,7 +68,7 @@ impl Storage { /// new returns a new storage. pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; - let content = content::Content::new(config.clone(), dir).await?; + let content = content::new_content(config.clone(), dir).await?; let cache = cache::Cache::new(config.clone()); Ok(Storage { diff --git a/dragonfly-client-storage/src/server/tcp.rs b/dragonfly-client-storage/src/server/tcp.rs index 8271449a..0fd47eea 100644 --- a/dragonfly-client-storage/src/server/tcp.rs +++ b/dragonfly-client-storage/src/server/tcp.rs @@ -99,8 +99,6 @@ impl TCPServer { )?; #[cfg(target_os = "linux")] { - use nix::sys::socket::{setsockopt, sockopt::TcpFastOpenConnect}; - use std::os::fd::AsFd; use tracing::{info, warn}; if let Err(err) = socket.set_tcp_congestion("cubic".as_bytes()) { @@ -108,12 +106,6 @@ impl TCPServer { } else { info!("set tcp congestion to cubic"); } - - if let Err(err) = setsockopt(&socket.as_fd(), TcpFastOpenConnect, &true) { - warn!("failed to set tcp fast open: {}", err); - } else { - info!("set tcp fast open to true"); - } } socket.bind(&self.addr.into())?; diff --git a/dragonfly-client/src/bin/dfcache/export.rs b/dragonfly-client/src/bin/dfcache/export.rs index ae449e9d..700f2a29 100644 --- a/dragonfly-client/src/bin/dfcache/export.rs +++ b/dragonfly-client/src/bin/dfcache/export.rs @@ -518,11 +518,14 @@ impl ExportCommand { response, )) => { if let Some(f) = &f { - fallocate(f, response.content_length) - .await - .inspect_err(|err| { - error!("fallocate {:?} failed: {}", self.output, err); + if let Err(err) = fallocate(f, response.content_length).await { + error!("fallocate {:?} failed: {}", self.output, err); + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); })?; + + return Err(err); + }; } progress_bar.set_length(response.content_length); @@ -530,25 +533,58 @@ impl ExportCommand { Some(download_persistent_cache_task_response::Response::DownloadPieceFinishedResponse( response, )) => { - let piece = response.piece.ok_or(Error::InvalidParameter).inspect_err(|_err| { - error!("response piece is missing"); - })?; + let piece = match response.piece { + Some(piece) => piece, + None => { + error!("response piece is missing"); + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); + })?; + + return Err(Error::InvalidParameter); + } + }; // Dfcache needs to write the piece content to the output file. if let Some(f) = &mut f { - f.seek(SeekFrom::Start(piece.offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", self.output, err); + if let Err(err) =f.seek(SeekFrom::Start(piece.offset)).await { + error!("seek {:?} failed: {}", self.output, err); + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); })?; - let content = piece.content.ok_or(Error::InvalidParameter).inspect_err(|_err| { - error!("piece content is missing"); - })?; + return Err(Error::IO(err)); + }; - f.write_all(&content).await.inspect_err(|err| { + let content = match piece.content { + Some(content) => content, + None => { + error!("piece content is missing"); + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); + })?; + + return Err(Error::InvalidParameter); + } + }; + + if let Err(err) =f.write_all(&content).await { error!("write {:?} failed: {}", self.output, err); - })?; + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); + })?; + + return Err(Error::IO(err)); + } + + if let Err(err) = f.flush().await { + error!("flush {:?} failed: {}", self.output, err); + fs::remove_file(&self.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", self.output, err); + })?; + + return Err(Error::IO(err)); + } debug!("copy piece {} to {:?} success", piece.number, self.output); }; diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index abbcd5da..834800ff 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -951,11 +951,14 @@ async fn download( response, )) => { if let Some(f) = &f { - fallocate(f, response.content_length) - .await - .inspect_err(|err| { - error!("fallocate {:?} failed: {}", args.output, err); + if let Err(err) = fallocate(f, response.content_length).await { + error!("fallocate {:?} failed: {}", args.output, err); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); })?; + + return Err(err); + } } progress_bar.set_length(response.content_length); @@ -963,32 +966,61 @@ async fn download( Some(download_task_response::Response::DownloadPieceFinishedResponse( response, )) => { - let piece = - response - .piece - .ok_or(Error::InvalidParameter) - .inspect_err(|_err| { - error!("response piece is missing"); + let piece = match response.piece { + Some(piece) => piece, + None => { + error!("response piece is missing"); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); })?; + return Err(Error::InvalidParameter); + } + }; + // Dfget needs to write the piece content to the output file. if let Some(f) = &mut f { - f.seek(SeekFrom::Start(piece.offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", args.output, err); + if let Err(err) = f.seek(SeekFrom::Start(piece.offset)).await { + error!("seek {:?} failed: {}", args.output, err); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); })?; - let content = piece - .content - .ok_or(Error::InvalidParameter) - .inspect_err(|_err| { + return Err(Error::IO(err)); + } + + let content = match piece.content { + Some(content) => content, + None => { error!("piece content is missing"); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); + })?; + + return Err(Error::InvalidParameter); + } + }; + + if let Err(err) = f.write_all(&content).await { + error!( + "write piece {} to {:?} failed: {}", + piece.number, args.output, err + ); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); })?; - f.write_all(&content).await.inspect_err(|err| { - error!("write {:?} failed: {}", args.output, err); - })?; + return Err(Error::IO(err)); + } + + if let Err(err) = f.flush().await { + error!("flush {:?} failed: {}", args.output, err); + fs::remove_file(&args.output).await.inspect_err(|err| { + error!("remove file {:?} failed: {}", args.output, err); + })?; + + return Err(Error::IO(err)); + } debug!("copy piece {} to {:?} success", piece.number, args.output); }