From 09eb695fb81349534cd875622d0678c059be41dd Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 29 Sep 2025 10:02:13 -0400 Subject: [PATCH] feat: dio --- Cargo.lock | 37 +++++++- dragonfly-client-storage/Cargo.toml | 1 + dragonfly-client-storage/src/content.rs | 95 +++++++++++++++++++ dragonfly-client-storage/src/content_linux.rs | 30 +++--- 4 files changed, 147 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc797a3e..a8853050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1235,6 +1235,7 @@ dependencies = [ "socket2 0.6.0", "tempfile", "tokio", + "tokio-uring", "tokio-util", "tracing", "vortex-protocol", @@ -2319,6 +2320,16 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767" +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "io-uring" version = "0.7.8" @@ -4873,6 +4884,16 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.9" @@ -5284,7 +5305,7 @@ checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", - "io-uring", + "io-uring 0.7.8", "libc", "mio", "parking_lot 0.12.1", @@ -5363,6 +5384,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-uring" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967" +dependencies = [ + "futures-util", + "io-uring 0.6.4", + "libc", + "slab", + "socket2 0.4.10", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index 0c8ab623..d1b4d332 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -37,6 +37,7 @@ bincode = "1.3.3" walkdir = "2.5.0" quinn = "0.11.9" socket2 = "0.6.0" +tokio-uring = { version = "0.5.0" } [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 68306059..e2b72cb5 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -17,9 +17,12 @@ use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; +use std::alloc::{alloc, Layout}; use std::cmp::{max, min}; +use std::io::{self, ErrorKind}; use std::path::Path; use std::sync::Arc; +use tokio_uring::buf::{IoBuf, IoBufMut}; #[cfg(target_os = "linux")] pub type Content = super::content_linux::Content; @@ -72,6 +75,98 @@ pub fn calculate_piece_range(offset: u64, length: u64, range: Option) -> } } +/// A struct to hold aligned memory +pub struct AlignedBuffer { + ptr: *mut u8, + size: usize, + init: usize, // Tracks initialized length +} + +impl AlignedBuffer { + /// Create a new aligned buffer + pub fn new(size: usize, alignment: usize) -> io::Result { + // Ensure alignment is a power of two + if !alignment.is_power_of_two() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "Alignment must be a power of two", + )); + } + + // Allocate aligned memory + unsafe { + let layout = Layout::from_size_align(size, alignment).map_err(|_| { + io::Error::new(ErrorKind::InvalidInput, "Invalid size or alignment") + })?; + let ptr = alloc(layout); + + if ptr.is_null() { + return Err(io::Error::new( + ErrorKind::OutOfMemory, + "Failed to allocate memory", + )); + } + + Ok(Self { ptr, size, init: 0 }) + } + } +} + +impl Drop for AlignedBuffer { + fn drop(&mut self) { + unsafe { + let layout = Layout::from_size_align(self.size, 512).unwrap(); + std::alloc::dealloc(self.ptr, layout); + } + } +} + +impl std::ops::Deref for AlignedBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { std::slice::from_raw_parts(self.ptr, self.size) } + } +} + +impl std::ops::DerefMut for AlignedBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size) } + } +} + +/// Implement `IoBuf` for `AlignedBuffer` +unsafe impl IoBuf for AlignedBuffer { + /// Return the pointer to the start of the buffer + fn stable_ptr(&self) -> *const u8 { + self.ptr + } + + /// Return the number of initialized bytes + fn bytes_init(&self) -> usize { + self.init + } + + /// Return the total size of the buffer + fn bytes_total(&self) -> usize { + self.size + } +} + +/// Implement `IoBufMut` for `AlignedBuffer` +unsafe impl IoBufMut for AlignedBuffer { + /// Return the pointer to the start of the buffer + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.ptr + } + + /// Set the initialized length of the buffer + unsafe fn set_init(&mut self, init: usize) { + assert!(init <= self.size, "init length exceeds buffer size"); + self.init = init; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/dragonfly-client-storage/src/content_linux.rs b/dragonfly-client-storage/src/content_linux.rs index 140da9cc..4553eaba 100644 --- a/dragonfly-client-storage/src/content_linux.rs +++ b/dragonfly-client-storage/src/content_linux.rs @@ -21,8 +21,6 @@ use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::fs::fallocate; use nix::fcntl::{open, OFlag}; use nix::sys::stat::Mode; -use std::os::fd::AsRawFd; -use std::os::unix::io::FromRawFd; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::fs::{self, File, OpenOptions}; @@ -386,24 +384,26 @@ impl Content { let (target_offset, target_length) = super::content::calculate_piece_range(offset, length, range); - let owned_fd = open( - task_path.as_path(), - OFlag::O_RDONLY | OFlag::O_DIRECT, - Mode::empty(), - ) - .inspect_err(|err| { - error!("open {:?} failed: {}", task_path, err); - })?; + use std::os::unix::fs::OpenOptionsExt; + use tokio_uring::fs::OpenOptions; - let std_fd = std::fs::File::from(owned_fd); - let mut f = File::from_std(std_fd); - f.seek(SeekFrom::Start(target_offset)) + let f = OpenOptions::new() + .read(true) + .custom_flags(OFlag::O_DIRECT.bits()) + .open(task_path.as_path()) .await .inspect_err(|err| { - error!("seek {:?} failed: {}", task_path, err); + error!("open {:?} failed: {}", task_path, err); })?; - Ok(Box::new(f.take(target_length))) + // Ensure the buffer is aligned for O_DIRECT + let aligned_buffer = super::content::AlignedBuffer::new(target_length as usize, 4096)?; + + // Submit the read operation with io_uring + let (_, aligned_buffer) = f.read_at(aligned_buffer, target_offset as u64).await; + + // Box the result to return it as an AsyncRead + Ok(Box::new(aligned_buffer.take(target_length))) } /// write_piece writes the piece to the content and calculates the hash of the piece by crc32.