feat: directio

This commit is contained in:
Gaius 2025-09-29 06:18:47 -04:00
parent 47342a9663
commit 2ca55e8fec
8 changed files with 309 additions and 163 deletions

1
Cargo.lock generated
View File

@ -1160,6 +1160,7 @@ dependencies = [
"headers 0.4.1",
"hyper 1.6.0",
"hyper-util",
"nix 0.30.1",
"opendal",
"quinn",
"reqwest",

View File

@ -117,6 +117,7 @@ hostname = "^0.4"
tonic-health = "0.12.3"
hashring = "0.3.6"
reqwest-tracing = "0.5"
nix = { version = "0.30.1", features = ["socket", "net", "fs"] }
mocktail = "0.3.0"
[profile.release]

View File

@ -212,6 +212,12 @@ fn default_storage_keep() -> bool {
false
}
/// default_storage_directio is the default whether enable direct I/O when reading or writing piece to storage.
#[inline]
fn default_storage_directio() -> bool {
false
}
/// default_storage_write_piece_timeout is the default timeout for writing a piece to storage(e.g., disk
/// or cache).
#[inline]
@ -1006,6 +1012,10 @@ pub struct Storage {
#[serde(default = "default_storage_keep")]
pub keep: bool,
/// directio indicates whether enable direct I/O when reading or writing piece to storage.
#[serde(default = "default_storage_directio")]
pub directio: bool,
/// write_piece_timeout is the timeout for writing a piece to storage(e.g., disk
/// or cache).
#[serde(
@ -1063,6 +1073,7 @@ impl Default for Storage {
server: StorageServer::default(),
dir: crate::default_storage_dir(),
keep: default_storage_keep(),
directio: default_storage_directio(),
write_piece_timeout: default_storage_write_piece_timeout(),
write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(),

View File

@ -23,4 +23,5 @@ opendal.workspace = true
url.workspace = true
headers.workspace = true
vortex-protocol.workspace = true
nix.workspace = true
quinn = "0.11.9"

View File

@ -170,6 +170,10 @@ pub enum DFError {
#[error(transparent)]
VortexProtocolError(#[from] vortex_protocol::error::Error),
/// NixError is the error for nix.
#[error(transparent)]
NixError(#[from] nix::Error),
/// TonicStatus is the error for tonic status.
#[error(transparent)]
TonicStatus(#[from] tonic::Status),

View File

@ -31,12 +31,12 @@ bytesize.workspace = true
leaky-bucket.workspace = true
vortex-protocol.workspace = true
rustls.workspace = true
nix.workspace = true
num_cpus = "1.17"
bincode = "1.3.3"
walkdir = "2.5.0"
quinn = "0.11.9"
socket2 = "0.6.0"
nix = { version = "0.30.1", features = ["socket", "net"] }
[dev-dependencies]
tempfile.workspace = true

View File

@ -19,6 +19,10 @@ use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::fs::fallocate;
use nix::fcntl::{open, OFlag};
use nix::sys::stat::Mode;
use std::os::fd::AsRawFd;
use std::os::unix::io::FromRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
@ -214,6 +218,22 @@ impl Content {
/// copy_task_by_range copies the task content to the destination by range.
#[instrument(skip_all)]
async fn copy_task_by_range(&self, task_id: &str, to: &Path, range: Range) -> Result<()> {
if self.config.storage.directio {
self.copy_task_direct_by_range(task_id, to, range).await
} else {
self.copy_task_standard_by_range(task_id, to, range).await
}
}
/// copy_task_standard_by_range copies the task content to the destination by range using
/// standard I/O.
#[instrument(skip_all)]
async fn copy_task_standard_by_range(
&self,
task_id: &str,
to: &Path,
range: Range,
) -> Result<()> {
// Ensure the parent directory of the destination exists.
if let Some(parent) = to.parent() {
if !parent.exists() {
@ -242,6 +262,56 @@ impl Content {
Ok(())
}
/// copy_task_direct_by_range copies the task content to the destination by range using
/// O_DIRECT for direct I/O.
#[instrument(skip_all)]
async fn copy_task_direct_by_range(
&self,
task_id: &str,
to: &Path,
range: Range,
) -> Result<()> {
// Ensure the parent directory of the destination exists.
if let Some(parent) = to.parent() {
if !parent.exists() {
fs::create_dir_all(parent).await.inspect_err(|err| {
error!("failed to create directory {:?}: {}", parent, err);
})?;
}
}
// Open the source file with O_DIRECT flag for direct I/O and make a reader for the range.
let from = self.get_task_path(task_id);
let owned_from_fd = open(
from.as_path(),
OFlag::O_RDONLY | OFlag::O_DIRECT,
Mode::empty(),
)
.inspect_err(|err| {
error!("open {:?} failed: {}", from, err);
})?;
let std_from_fd = std::fs::File::from(owned_from_fd);
let mut from_f = File::from_std(std_from_fd);
from_f.seek(SeekFrom::Start(range.start)).await?;
let mut range_reader = from_f.take(range.length);
// Open the destination file with O_DIRECT flag for direct I/O.
let owned_to_fd = open(
to.as_os_str(),
OFlag::O_WRONLY | OFlag::O_CREAT | OFlag::O_DIRECT,
Mode::S_IRUSR | Mode::S_IWUSR,
)
.inspect_err(|err| {
error!("open {:?} failed: {}", to, err);
})?;
let std_to_fd = std::fs::File::from(owned_to_fd);
let mut to_f = File::from_std(std_to_fd);
// Copy the range to the destination.
io::copy(&mut range_reader, &mut to_f).await?;
Ok(())
}
/// delete_task deletes the task content.
pub async fn delete_task(&self, task_id: &str) -> Result<()> {
info!("delete task content: {}", task_id);
@ -254,7 +324,7 @@ impl Content {
Ok(())
}
/// read_piece reads the piece from the content.
// read_piece reads the piece from the content.
#[instrument(skip_all)]
pub async fn read_piece(
&self,
@ -262,38 +332,24 @@ impl Content {
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(target_offset))
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
if self.config.storage.directio {
self.read_piece_direct(task_id, offset, length, range).await
} else {
self.read_piece_standard(task_id, offset, length, range)
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
}
}
/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
/// read_piece_standard reads the piece from the content using standard I/O.
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
pub async fn read_piece_standard(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
let task_path = self.get_task_path(task_id);
// Calculate the target offset and length based on the range.
@ -303,31 +359,51 @@ impl Content {
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_range_reader
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);
// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
Ok(Box::new(f_reader.take(target_length)))
}
/// read_piece_direct reads the piece from the content using O_DIRECT for direct I/O.
#[instrument(skip_all)]
pub async fn read_piece_direct(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
let task_path = self.get_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let owned_fd = open(
task_path.as_path(),
OFlag::O_RDONLY | OFlag::O_DIRECT,
Mode::empty(),
)
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(offset))
let std_fd = std::fs::File::from(owned_fd);
let mut f = File::from_std(std_fd);
f.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);
Ok((range_reader, reader))
Ok(Box::new(f.take(target_length)))
}
/// write_piece writes the piece to the content and calculates the hash of the piece by crc32.
@ -338,6 +414,24 @@ impl Content {
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
if self.config.storage.directio {
self.write_piece_direct(task_id, offset, expected_length, reader)
.await
} else {
self.write_piece_standard(task_id, offset, expected_length, reader)
.await
}
}
/// write_piece_standard writes the piece to the content and calculates the hash of the piece by crc32 using standard I/O.
#[instrument(skip_all)]
pub async fn write_piece_standard<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
// Open the file and seek to the offset.
let task_path = self.get_task_path(task_id);
@ -385,6 +479,55 @@ impl Content {
})
}
/// write_piece_direct writes the piece to the content and calculates the hash of the piece by crc32 using O_DIRECT for direct I/O.
#[instrument(skip_all)]
pub async fn write_piece_direct<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
let task_path = self.get_task_path(task_id);
let owned_fd = open(
task_path.as_path(),
OFlag::O_WRONLY | OFlag::O_DIRECT,
Mode::empty(),
)
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let std_fd = std::fs::File::from(owned_fd);
let mut f = File::from_std(std_fd);
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
// Copy the piece to the file while updating the CRC32 value.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
let length = io::copy(&mut tee, &mut f).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
// Calculate the hash of the piece.
Ok(super::content::WritePieceResponse {
length,
hash: hasher.finalize().to_string(),
})
}
/// get_task_path returns the task path by task id.
fn get_task_path(&self, task_id: &str) -> PathBuf {
// The task needs split by the first 3 characters of task id(sha256) to
@ -479,7 +622,6 @@ impl Content {
info!("copy to {:?} success", to);
Ok(())
}
/// read_persistent_cache_piece reads the persistent cache piece from the content.
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece(
@ -488,38 +630,26 @@ impl Content {
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_persistent_cache_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(target_offset))
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
if self.config.storage.directio {
self.read_persistent_cache_piece_direct(task_id, offset, length, range)
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
Ok(f_reader.take(target_length))
} else {
self.read_persistent_cache_piece_standard(task_id, offset, length, range)
.await
}
}
/// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache.
/// read_persistent_cache_piece_standard reads the persistent cache piece from the content
/// using standard I/O.
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece_with_dual_read(
pub async fn read_persistent_cache_piece_standard(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
let task_path = self.get_persistent_cache_task_path(task_id);
// Calculate the target offset and length based on the range.
@ -529,31 +659,52 @@ impl Content {
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_range_reader
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);
// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
Ok(Box::new(f_reader.take(target_length)))
}
/// read_persistent_cache_piece_direct reads the persistent cache piece from the content
/// using O_DIRECT for direct I/O.
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece_direct(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
let task_path = self.get_persistent_cache_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let owned_fd = open(
task_path.as_path(),
OFlag::O_RDONLY | OFlag::O_DIRECT,
Mode::empty(),
)
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(offset))
let std_fd = std::fs::File::from(owned_fd);
let mut f = File::from_std(std_fd);
f.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);
Ok((range_reader, reader))
Ok(Box::new(f.take(target_length)))
}
/// write_persistent_cache_piece writes the persistent cache piece to the content and
@ -565,6 +716,25 @@ impl Content {
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
if self.config.storage.directio {
self.write_persistent_cache_piece_direct(task_id, offset, expected_length, reader)
.await
} else {
self.write_persistent_cache_piece_standard(task_id, offset, expected_length, reader)
.await
}
}
/// write_persistent_cache_piece_standard writes the persistent cache piece to the content and
/// calculates the hash of the piece by crc32 using standard I/O.
#[instrument(skip_all)]
pub async fn write_persistent_cache_piece_standard<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
// Open the file and seek to the offset.
let task_path = self.get_persistent_cache_task_path(task_id);
@ -612,6 +782,56 @@ impl Content {
})
}
/// write_persistent_cache_piece writes the persistent cache piece to the content and
/// calculates the hash of the piece by crc32 using O_DIRECT for direct I/O.
#[instrument(skip_all)]
pub async fn write_persistent_cache_piece_direct<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
offset: u64,
expected_length: u64,
reader: &mut R,
) -> Result<super::content::WritePieceResponse> {
let task_path = self.get_persistent_cache_task_path(task_id);
let owned_fd = open(
task_path.as_path(),
OFlag::O_WRONLY | OFlag::O_DIRECT,
Mode::empty(),
)
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let std_fd = std::fs::File::from(owned_fd);
let mut f = File::from_std(std_fd);
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
// Copy the piece to the file while updating the CRC32 value.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
let length = io::copy(&mut tee, &mut f).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
// Calculate the hash of the piece.
Ok(super::content::WritePieceResponse {
length,
hash: hasher.finalize().to_string(),
})
}
/// delete_task deletes the persistent cache task content.
pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> {
info!("delete persistent cache task content: {}", task_id);

View File

@ -284,52 +284,6 @@ impl Content {
Ok(f_reader.take(target_length))
}
/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);
// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);
Ok((range_reader, reader))
}
/// write_piece writes the piece to the content and calculates the hash of the piece by crc32.
#[instrument(skip_all)]
pub async fn write_piece<R: AsyncRead + Unpin + ?Sized>(
@ -510,52 +464,6 @@ impl Content {
Ok(f_reader.take(target_length))
}
/// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_persistent_cache_task_path(task_id);
// Calculate the target offset and length based on the range.
let (target_offset, target_length) =
super::content::calculate_piece_range(offset, length, range);
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);
// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);
f_reader
.seek(SeekFrom::Start(offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);
Ok((range_reader, reader))
}
/// write_persistent_cache_piece writes the persistent cache piece to the content and
/// calculates the hash of the piece by crc32.
#[instrument(skip_all)]