feat: dio

This commit is contained in:
Gaius 2025-09-29 10:02:13 -04:00
parent 2ca55e8fec
commit 09eb695fb8
4 changed files with 147 additions and 16 deletions

37
Cargo.lock generated
View File

@ -1235,6 +1235,7 @@ dependencies = [
"socket2 0.6.0",
"tempfile",
"tokio",
"tokio-uring",
"tokio-util",
"tracing",
"vortex-protocol",
@ -2319,6 +2320,16 @@ version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767"
[[package]]
name = "io-uring"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "io-uring"
version = "0.7.8"
@ -4873,6 +4884,16 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.9"
@ -5284,7 +5305,7 @@ checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"io-uring 0.7.8",
"libc",
"mio",
"parking_lot 0.12.1",
@ -5363,6 +5384,20 @@ dependencies = [
"tungstenite",
]
[[package]]
name = "tokio-uring"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "748482e3e13584a34664a710168ad5068e8cb1d968aa4ffa887e83ca6dd27967"
dependencies = [
"futures-util",
"io-uring 0.6.4",
"libc",
"slab",
"socket2 0.4.10",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.16"

View File

@ -37,6 +37,7 @@ bincode = "1.3.3"
walkdir = "2.5.0"
quinn = "0.11.9"
socket2 = "0.6.0"
tokio-uring = { version = "0.5.0" }
[dev-dependencies]
tempfile.workspace = true

View File

@ -17,9 +17,12 @@
use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use std::alloc::{alloc, Layout};
use std::cmp::{max, min};
use std::io::{self, ErrorKind};
use std::path::Path;
use std::sync::Arc;
use tokio_uring::buf::{IoBuf, IoBufMut};
#[cfg(target_os = "linux")]
pub type Content = super::content_linux::Content;
@ -72,6 +75,98 @@ pub fn calculate_piece_range(offset: u64, length: u64, range: Option<Range>) ->
}
}
/// A struct to hold aligned memory
pub struct AlignedBuffer {
ptr: *mut u8,
size: usize,
init: usize, // Tracks initialized length
}
impl AlignedBuffer {
/// Create a new aligned buffer
pub fn new(size: usize, alignment: usize) -> io::Result<Self> {
// Ensure alignment is a power of two
if !alignment.is_power_of_two() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"Alignment must be a power of two",
));
}
// Allocate aligned memory
unsafe {
let layout = Layout::from_size_align(size, alignment).map_err(|_| {
io::Error::new(ErrorKind::InvalidInput, "Invalid size or alignment")
})?;
let ptr = alloc(layout);
if ptr.is_null() {
return Err(io::Error::new(
ErrorKind::OutOfMemory,
"Failed to allocate memory",
));
}
Ok(Self { ptr, size, init: 0 })
}
}
}
impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe {
let layout = Layout::from_size_align(self.size, 512).unwrap();
std::alloc::dealloc(self.ptr, layout);
}
}
}
impl std::ops::Deref for AlignedBuffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.ptr, self.size) }
}
}
impl std::ops::DerefMut for AlignedBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size) }
}
}
/// Implement `IoBuf` for `AlignedBuffer`
unsafe impl IoBuf for AlignedBuffer {
/// Return the pointer to the start of the buffer
fn stable_ptr(&self) -> *const u8 {
self.ptr
}
/// Return the number of initialized bytes
fn bytes_init(&self) -> usize {
self.init
}
/// Return the total size of the buffer
fn bytes_total(&self) -> usize {
self.size
}
}
/// Implement `IoBufMut` for `AlignedBuffer`
unsafe impl IoBufMut for AlignedBuffer {
/// Return the pointer to the start of the buffer
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.ptr
}
/// Set the initialized length of the buffer
unsafe fn set_init(&mut self, init: usize) {
assert!(init <= self.size, "init length exceeds buffer size");
self.init = init;
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -21,8 +21,6 @@ use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::fs::fallocate;
use nix::fcntl::{open, OFlag};
use nix::sys::stat::Mode;
use std::os::fd::AsRawFd;
use std::os::unix::io::FromRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
@ -386,24 +384,26 @@ impl Content {
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let owned_fd = open(
task_path.as_path(),
OFlag::O_RDONLY | OFlag::O_DIRECT,
Mode::empty(),
)
use std::os::unix::fs::OpenOptionsExt;
use tokio_uring::fs::OpenOptions;
let f = OpenOptions::new()
.read(true)
.custom_flags(OFlag::O_DIRECT.bits())
.open(task_path.as_path())
.await
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let std_fd = std::fs::File::from(owned_fd);
let mut f = File::from_std(std_fd);
f.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
// Ensure the buffer is aligned for O_DIRECT
let aligned_buffer = super::content::AlignedBuffer::new(target_length as usize, 4096)?;
Ok(Box::new(f.take(target_length)))
// Submit the read operation with io_uring
let (_, aligned_buffer) = f.read_at(aligned_buffer, target_offset as u64).await;
// Box the result to return it as an AsyncRead
Ok(Box::new(aligned_buffer.take(target_length)))
}
/// write_piece writes the piece to the content and calculates the hash of the piece by crc32.