diff --git a/Cargo.lock b/Cargo.lock index e8b00df8..fc797a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1160,6 +1160,7 @@ dependencies = [ "headers 0.4.1", "hyper 1.6.0", "hyper-util", + "nix 0.30.1", "opendal", "quinn", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index f6a290b3..b7dee5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ hostname = "^0.4" tonic-health = "0.12.3" hashring = "0.3.6" reqwest-tracing = "0.5" +nix = { version = "0.30.1", features = ["socket", "net", "fs"] } mocktail = "0.3.0" [profile.release] diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 1c60e753..d041e412 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -212,6 +212,12 @@ fn default_storage_keep() -> bool { false } +/// default_storage_directio is the default whether enable direct I/O when reading or writing piece to storage. +#[inline] +fn default_storage_directio() -> bool { + false +} + /// default_storage_write_piece_timeout is the default timeout for writing a piece to storage(e.g., disk /// or cache). #[inline] @@ -1006,6 +1012,10 @@ pub struct Storage { #[serde(default = "default_storage_keep")] pub keep: bool, + /// directio indicates whether enable direct I/O when reading or writing piece to storage. + #[serde(default = "default_storage_directio")] + pub directio: bool, + /// write_piece_timeout is the timeout for writing a piece to storage(e.g., disk /// or cache). #[serde( @@ -1063,6 +1073,7 @@ impl Default for Storage { server: StorageServer::default(), dir: crate::default_storage_dir(), keep: default_storage_keep(), + directio: default_storage_directio(), write_piece_timeout: default_storage_write_piece_timeout(), write_buffer_size: default_storage_write_buffer_size(), read_buffer_size: default_storage_read_buffer_size(), diff --git a/dragonfly-client-core/Cargo.toml b/dragonfly-client-core/Cargo.toml index 597cf5f0..2fc38c1c 100644 --- a/dragonfly-client-core/Cargo.toml +++ b/dragonfly-client-core/Cargo.toml @@ -23,4 +23,5 @@ opendal.workspace = true url.workspace = true headers.workspace = true vortex-protocol.workspace = true +nix.workspace = true quinn = "0.11.9" diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index cdba6327..b0689687 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -170,6 +170,10 @@ pub enum DFError { #[error(transparent)] VortexProtocolError(#[from] vortex_protocol::error::Error), + /// NixError is the error for nix. + #[error(transparent)] + NixError(#[from] nix::Error), + /// TonicStatus is the error for tonic status. #[error(transparent)] TonicStatus(#[from] tonic::Status), diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index e4cadacf..0c8ab623 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -31,12 +31,12 @@ bytesize.workspace = true leaky-bucket.workspace = true vortex-protocol.workspace = true rustls.workspace = true +nix.workspace = true num_cpus = "1.17" bincode = "1.3.3" walkdir = "2.5.0" quinn = "0.11.9" socket2 = "0.6.0" -nix = { version = "0.30.1", features = ["socket", "net"] } [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client-storage/src/content_linux.rs b/dragonfly-client-storage/src/content_linux.rs index 5ce27ac0..140da9cc 100644 --- a/dragonfly-client-storage/src/content_linux.rs +++ b/dragonfly-client-storage/src/content_linux.rs @@ -19,6 +19,10 @@ use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::fs::fallocate; +use nix::fcntl::{open, OFlag}; +use nix::sys::stat::Mode; +use std::os::fd::AsRawFd; +use std::os::unix::io::FromRawFd; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs::{self, File, OpenOptions}; @@ -214,6 +218,22 @@ impl Content { /// copy_task_by_range copies the task content to the destination by range. #[instrument(skip_all)] async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> { + if self.config.storage.directio { + self.copy_task_direct_by_range(task_id, to, range).await + } else { + self.copy_task_standard_by_range(task_id, to, range).await + } + } + + /// copy_task_standard_by_range copies the task content to the destination by range using + /// standard I/O. + #[instrument(skip_all)] + async fn copy_task_standard_by_range( + &self, + task_id: &str, + to: &Path, + range: Range, + ) -> Result<()> { // Ensure the parent directory of the destination exists. if let Some(parent) = to.parent() { if !parent.exists() { @@ -242,6 +262,56 @@ impl Content { Ok(()) } + /// copy_task_direct_by_range copies the task content to the destination by range using + /// O_DIRECT for direct I/O. + #[instrument(skip_all)] + async fn copy_task_direct_by_range( + &self, + task_id: &str, + to: &Path, + range: Range, + ) -> Result<()> { + // Ensure the parent directory of the destination exists. + if let Some(parent) = to.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + // Open the source file with O_DIRECT flag for direct I/O and make a reader for the range. + let from = self.get_task_path(task_id); + let owned_from_fd = open( + from.as_path(), + OFlag::O_RDONLY | OFlag::O_DIRECT, + Mode::empty(), + ) + .inspect_err(|err| { + error!("open {:?} failed: {}", from, err); + })?; + let std_from_fd = std::fs::File::from(owned_from_fd); + let mut from_f = File::from_std(std_from_fd); + from_f.seek(SeekFrom::Start(range.start)).await?; + let mut range_reader = from_f.take(range.length); + + // Open the destination file with O_DIRECT flag for direct I/O. + let owned_to_fd = open( + to.as_os_str(), + OFlag::O_WRONLY | OFlag::O_CREAT | OFlag::O_DIRECT, + Mode::S_IRUSR | Mode::S_IWUSR, + ) + .inspect_err(|err| { + error!("open {:?} failed: {}", to, err); + })?; + let std_to_fd = std::fs::File::from(owned_to_fd); + let mut to_f = File::from_std(std_to_fd); + + // Copy the range to the destination. + io::copy(&mut range_reader, &mut to_f).await?; + Ok(()) + } + /// delete_task deletes the task content. pub async fn delete_task(&self, task_id: &str) -> Result<()> { info!("delete task content: {}", task_id); @@ -254,7 +324,7 @@ impl Content { Ok(()) } - /// read_piece reads the piece from the content. + // read_piece reads the piece from the content. #[instrument(skip_all)] pub async fn read_piece( &self, @@ -262,38 +332,24 @@ impl Content { offset: u64, length: u64, range: Option, - ) -> Result { - let task_path = self.get_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = - super::content::calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - Ok(f_reader.take(target_length)) + ) -> Result> { + if self.config.storage.directio { + self.read_piece_direct(task_id, offset, length, range).await + } else { + self.read_piece_standard(task_id, offset, length, range) + .await + } } - /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the piece. It is used for cache the piece content to the proxy cache. + /// read_piece_standard reads the piece from the content using standard I/O. #[instrument(skip_all)] - pub async fn read_piece_with_dual_read( + pub async fn read_piece_standard( &self, task_id: &str, offset: u64, length: u64, range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { + ) -> Result> { let task_path = self.get_task_path(task_id); // Calculate the target offset and length based on the range. @@ -303,31 +359,51 @@ impl Content { let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - f_range_reader + f_reader .seek(SeekFrom::Start(target_offset)) .await .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); })?; - let range_reader = f_range_reader.take(target_length); - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { + Ok(Box::new(f_reader.take(target_length))) + } + + /// read_piece_direct reads the piece from the content using O_DIRECT for direct I/O. + #[instrument(skip_all)] + pub async fn read_piece_direct( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result> { + let task_path = self.get_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let owned_fd = open( + task_path.as_path(), + OFlag::O_RDONLY | OFlag::O_DIRECT, + Mode::empty(), + ) + .inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - f_reader - .seek(SeekFrom::Start(offset)) + let std_fd = std::fs::File::from(owned_fd); + let mut f = File::from_std(std_fd); + f.seek(SeekFrom::Start(target_offset)) .await .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); })?; - let reader = f_reader.take(length); - Ok((range_reader, reader)) + Ok(Box::new(f.take(target_length))) } /// write_piece writes the piece to the content and calculates the hash of the piece by crc32. @@ -338,6 +414,24 @@ impl Content { offset: u64, expected_length: u64, reader: &mut R, + ) -> Result { + if self.config.storage.directio { + self.write_piece_direct(task_id, offset, expected_length, reader) + .await + } else { + self.write_piece_standard(task_id, offset, expected_length, reader) + .await + } + } + + /// write_piece_standard writes the piece to the content and calculates the hash of the piece by crc32 using standard I/O. + #[instrument(skip_all)] + pub async fn write_piece_standard( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, ) -> Result { // Open the file and seek to the offset. let task_path = self.get_task_path(task_id); @@ -385,6 +479,55 @@ impl Content { }) } + /// write_piece_direct writes the piece to the content and calculates the hash of the piece by crc32 using O_DIRECT for direct I/O. + #[instrument(skip_all)] + pub async fn write_piece_direct( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + let task_path = self.get_task_path(task_id); + let owned_fd = open( + task_path.as_path(), + OFlag::O_WRONLY | OFlag::O_DIRECT, + Mode::empty(), + ) + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let std_fd = std::fs::File::from(owned_fd); + let mut f = File::from_std(std_fd); + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + // Copy the piece to the file while updating the CRC32 value. + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut f).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + /// get_task_path returns the task path by task id. fn get_task_path(&self, task_id: &str) -> PathBuf { // The task needs split by the first 3 characters of task id(sha256) to @@ -479,7 +622,6 @@ impl Content { info!("copy to {:?} success", to); Ok(()) } - /// read_persistent_cache_piece reads the persistent cache piece from the content. #[instrument(skip_all)] pub async fn read_persistent_cache_piece( @@ -488,38 +630,26 @@ impl Content { offset: u64, length: u64, range: Option, - ) -> Result { - let task_path = self.get_persistent_cache_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = - super::content::calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - - Ok(f_reader.take(target_length)) + ) -> Result> { + if self.config.storage.directio { + self.read_persistent_cache_piece_direct(task_id, offset, length, range) + .await + } else { + self.read_persistent_cache_piece_standard(task_id, offset, length, range) + .await + } } - /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. + /// read_persistent_cache_piece_standard reads the persistent cache piece from the content + /// using standard I/O. #[instrument(skip_all)] - pub async fn read_persistent_cache_piece_with_dual_read( + pub async fn read_persistent_cache_piece_standard( &self, task_id: &str, offset: u64, length: u64, range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { + ) -> Result> { let task_path = self.get_persistent_cache_task_path(task_id); // Calculate the target offset and length based on the range. @@ -529,31 +659,52 @@ impl Content { let f = File::open(task_path.as_path()).await.inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - f_range_reader + f_reader .seek(SeekFrom::Start(target_offset)) .await .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); })?; - let range_reader = f_range_reader.take(target_length); - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { + Ok(Box::new(f_reader.take(target_length))) + } + + /// read_persistent_cache_piece_direct reads the persistent cache piece from the content + /// using O_DIRECT for direct I/O. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece_direct( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result> { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = + super::content::calculate_piece_range(offset, length, range); + + let owned_fd = open( + task_path.as_path(), + OFlag::O_RDONLY | OFlag::O_DIRECT, + Mode::empty(), + ) + .inspect_err(|err| { error!("open {:?} failed: {}", task_path, err); })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - f_reader - .seek(SeekFrom::Start(offset)) + let std_fd = std::fs::File::from(owned_fd); + let mut f = File::from_std(std_fd); + f.seek(SeekFrom::Start(target_offset)) .await .inspect_err(|err| { error!("seek {:?} failed: {}", task_path, err); })?; - let reader = f_reader.take(length); - Ok((range_reader, reader)) + Ok(Box::new(f.take(target_length))) } /// write_persistent_cache_piece writes the persistent cache piece to the content and @@ -565,6 +716,25 @@ impl Content { offset: u64, expected_length: u64, reader: &mut R, + ) -> Result { + if self.config.storage.directio { + self.write_persistent_cache_piece_direct(task_id, offset, expected_length, reader) + .await + } else { + self.write_persistent_cache_piece_standard(task_id, offset, expected_length, reader) + .await + } + } + + /// write_persistent_cache_piece_standard writes the persistent cache piece to the content and + /// calculates the hash of the piece by crc32 using standard I/O. + #[instrument(skip_all)] + pub async fn write_persistent_cache_piece_standard( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, ) -> Result { // Open the file and seek to the offset. let task_path = self.get_persistent_cache_task_path(task_id); @@ -612,6 +782,56 @@ impl Content { }) } + /// write_persistent_cache_piece writes the persistent cache piece to the content and + /// calculates the hash of the piece by crc32 using O_DIRECT for direct I/O. + #[instrument(skip_all)] + pub async fn write_persistent_cache_piece_direct( + &self, + task_id: &str, + offset: u64, + expected_length: u64, + reader: &mut R, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + let owned_fd = open( + task_path.as_path(), + OFlag::O_WRONLY | OFlag::O_DIRECT, + Mode::empty(), + ) + .inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let std_fd = std::fs::File::from(owned_fd); + let mut f = File::from_std(std_fd); + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + // Copy the piece to the file while updating the CRC32 value. + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); + let mut hasher = crc32fast::Hasher::new(); + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + let length = io::copy(&mut tee, &mut f).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + + // Calculate the hash of the piece. + Ok(super::content::WritePieceResponse { + length, + hash: hasher.finalize().to_string(), + }) + } + /// delete_task deletes the persistent cache task content. pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { info!("delete persistent cache task content: {}", task_id); diff --git a/dragonfly-client-storage/src/content_macos.rs b/dragonfly-client-storage/src/content_macos.rs index 5ce27ac0..e27ccc45 100644 --- a/dragonfly-client-storage/src/content_macos.rs +++ b/dragonfly-client-storage/src/content_macos.rs @@ -284,52 +284,6 @@ impl Content { Ok(f_reader.take(target_length)) } - /// read_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the piece. It is used for cache the piece content to the proxy cache. - #[instrument(skip_all)] - pub async fn read_piece_with_dual_read( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { - let task_path = self.get_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = - super::content::calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_range_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let range_reader = f_range_reader.take(target_length); - - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let reader = f_reader.take(length); - - Ok((range_reader, reader)) - } - /// write_piece writes the piece to the content and calculates the hash of the piece by crc32. #[instrument(skip_all)] pub async fn write_piece( @@ -510,52 +464,6 @@ impl Content { Ok(f_reader.take(target_length)) } - /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the - /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. - #[instrument(skip_all)] - pub async fn read_persistent_cache_piece_with_dual_read( - &self, - task_id: &str, - offset: u64, - length: u64, - range: Option, - ) -> Result<(impl AsyncRead, impl AsyncRead)> { - let task_path = self.get_persistent_cache_task_path(task_id); - - // Calculate the target offset and length based on the range. - let (target_offset, target_length) = - super::content::calculate_piece_range(offset, length, range); - - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_range_reader - .seek(SeekFrom::Start(target_offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let range_reader = f_range_reader.take(target_length); - - // Create full reader of the piece. - let f = File::open(task_path.as_path()).await.inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; - let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); - - f_reader - .seek(SeekFrom::Start(offset)) - .await - .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); - })?; - let reader = f_reader.take(length); - - Ok((range_reader, reader)) - } - /// write_persistent_cache_piece writes the persistent cache piece to the content and /// calculates the hash of the piece by crc32. #[instrument(skip_all)]