From 2e08f6622a3c8dddc36dfe022c236d446012a63c Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 9 Apr 2025 23:19:46 +0800 Subject: [PATCH] refactor(dragonfly-client-storage): lru cache support for piece download/upload (#1084) Signed-off-by: Gaius --- dragonfly-client-storage/benches/cache.rs | 65 +++------- .../src/cache/lru_cache.rs | 2 +- dragonfly-client-storage/src/cache/mod.rs | 112 ++++++------------ dragonfly-client-storage/src/lib.rs | 56 ++++----- 4 files changed, 74 insertions(+), 161 deletions(-) diff --git a/dragonfly-client-storage/benches/cache.rs b/dragonfly-client-storage/benches/cache.rs index 556e4336..aeed9942 100644 --- a/dragonfly-client-storage/benches/cache.rs +++ b/dragonfly-client-storage/benches/cache.rs @@ -14,11 +14,11 @@ * limitations under the License. */ +use bytes::Bytes; use bytesize::ByteSize; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use dragonfly_client_config::dfdaemon::{Config, Storage}; use dragonfly_client_storage::{cache::Cache, metadata::Piece}; -use std::io::Cursor; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::runtime::Runtime; @@ -60,11 +60,7 @@ pub fn put_task(c: &mut Criterion) { &ByteSize::mb(10), |b, size| { b.iter_batched( - || { - rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }) - }, + || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }), |mut cache| { rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; @@ -80,11 +76,7 @@ pub fn put_task(c: &mut Criterion) { &ByteSize::mb(100), |b, size| { b.iter_batched( - || { - rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }) - }, + || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }), |mut cache| { rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; @@ -100,11 +92,7 @@ pub fn put_task(c: &mut Criterion) { &ByteSize::gb(1), |b, size| { b.iter_batched( - || { - rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }) - }, + || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }), |mut cache| { rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; @@ -128,9 +116,8 @@ pub fn delete_task(c: &mut Criterion) { |b, size| { b.iter_batched( || { - let mut cache = rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }); + let mut cache = + rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }); rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; }); @@ -152,9 +139,8 @@ pub fn delete_task(c: &mut Criterion) { |b, size| { b.iter_batched( || { - let mut cache = rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }); + let mut cache = + rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }); rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; }); @@ -176,9 +162,8 @@ pub fn delete_task(c: &mut Criterion) { |b, size| { b.iter_batched( || { - let mut cache = rt.block_on(async { - Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() - }); + let mut cache = + rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }); rt.block_on(async { cache.put_task("task", black_box(size.as_u64())).await; }); @@ -211,7 +196,6 @@ pub fn write_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -224,13 +208,11 @@ pub fn write_piece(c: &mut Criterion) { |cache| { rt.block_on(async { for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); @@ -252,7 +234,6 @@ pub fn write_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -265,13 +246,11 @@ pub fn write_piece(c: &mut Criterion) { |cache| { rt.block_on(async { for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); @@ -293,7 +272,6 @@ pub fn write_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -306,13 +284,11 @@ pub fn write_piece(c: &mut Criterion) { |cache| { rt.block_on(async { for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); @@ -341,7 +317,6 @@ pub fn read_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -349,13 +324,11 @@ pub fn read_piece(c: &mut Criterion) { .put_task("task", (ByteSize::mb(4) * PIECE_COUNT as u64).as_u64()) .await; for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); @@ -395,7 +368,6 @@ pub fn read_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -403,13 +375,11 @@ pub fn read_piece(c: &mut Criterion) { .put_task("task", (ByteSize::mb(10) * PIECE_COUNT as u64).as_u64()) .await; for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); @@ -449,7 +419,6 @@ pub fn read_piece(c: &mut Criterion) { Cache::new(Arc::new(create_config( ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64, ))) - .unwrap() }); rt.block_on(async { @@ -457,13 +426,11 @@ pub fn read_piece(c: &mut Criterion) { .put_task("task", (ByteSize::mb(16) * PIECE_COUNT as u64).as_u64()) .await; for i in 0..PIECE_COUNT { - let mut cursor = Cursor::new(data); cache .write_piece( "task", &format!("piece{}", i), - &mut cursor, - data.len() as u64, + Bytes::copy_from_slice(data), ) .await .unwrap(); diff --git a/dragonfly-client-storage/src/cache/lru_cache.rs b/dragonfly-client-storage/src/cache/lru_cache.rs index 9f6f9c3d..c76188f8 100644 --- a/dragonfly-client-storage/src/cache/lru_cache.rs +++ b/dragonfly-client-storage/src/cache/lru_cache.rs @@ -274,7 +274,7 @@ impl LruCache { match self.map.remove(KeyWrapper::from_ref(k)) { None => None, Some(entry) => { - let entry_ptr: *mut Entry = Box::into_raw(entry); + let entry_ptr = Box::into_raw(entry); self.detach(entry_ptr); unsafe { diff --git a/dragonfly-client-storage/src/cache/mod.rs b/dragonfly-client-storage/src/cache/mod.rs index 7c0760c7..0bd55c2a 100644 --- a/dragonfly-client-storage/src/cache/mod.rs +++ b/dragonfly-client-storage/src/cache/mod.rs @@ -23,18 +23,15 @@ use std::cmp::{max, min}; use std::collections::HashMap; use std::io::Cursor; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; +use tokio::io::{AsyncRead, BufReader}; use tokio::sync::RwLock; -use tracing::{debug, info}; +use tracing::info; pub mod lru_cache; /// Task is the task content in the cache. #[derive(Clone, Debug)] struct Task { - /// id is the id of the task. - id: String, - /// content_length is the length of the task content. content_length: u64, @@ -45,9 +42,8 @@ struct Task { /// Task implements the task content in the cache. impl Task { /// new creates a new task. - fn new(id: String, content_length: u64) -> Self { + fn new(content_length: u64) -> Self { Self { - id, content_length, pieces: Arc::new(RwLock::new(HashMap::new())), } @@ -124,8 +120,8 @@ pub struct Cache { /// Cache implements the cache for storing piece content by LRU algorithm. impl Cache { /// new creates a new cache with the specified capacity. - pub fn new(config: Arc) -> Result { - Ok(Cache { + pub fn new(config: Arc) -> Self { + Cache { config: config.clone(), size: 0, capacity: config.storage.cache_capacity.as_u64(), @@ -133,7 +129,7 @@ impl Cache { // by cache capacity(cache size) itself, and used pop_lru to evict the least recently // used task. tasks: Arc::new(RwLock::new(LruCache::new(usize::MAX))), - }) + } } /// read_piece reads the piece from the cache. @@ -182,34 +178,18 @@ impl Cache { } /// write_piece writes the piece content to the cache. - pub async fn write_piece( - &self, - task_id: &str, - piece_id: &str, - reader: &mut R, - length: u64, - ) -> Result<()> { + pub async fn write_piece(&self, task_id: &str, piece_id: &str, content: Bytes) -> Result<()> { let mut tasks = self.tasks.write().await; let Some(task) = tasks.get(task_id) else { return Err(Error::TaskNotFound(task_id.to_string())); }; if task.contains(piece_id).await { - debug!("piece {} already exists in cache", piece_id); return Ok(()); } - let mut buffer = Vec::with_capacity(length as usize); - match reader.read_to_end(&mut buffer).await { - Ok(_) => { - task.write_piece(piece_id, bytes::Bytes::from(buffer)).await; - Ok(()) - } - Err(err) => Err(Error::Unknown(format!( - "failed to read piece data for {}: {}", - piece_id, err - ))), - } + task.write_piece(piece_id, content).await; + Ok(()) } /// put_task puts a new task into the cache, constrained by the capacity of the cache. @@ -220,7 +200,7 @@ impl Cache { } // If the content length is larger than the cache capacity and the task cannot be cached. - if content_length >= self.capacity { + if content_length > self.capacity { info!( "task {} is too large and cannot be cached: {}", task_id, content_length @@ -234,7 +214,6 @@ impl Cache { match tasks.pop_lru() { Some((_, task)) => { self.size -= task.content_length(); - debug!("evicted task in cache: {}", task.id); } None => { break; @@ -242,16 +221,17 @@ impl Cache { } } - let task = Task::new(task_id.to_string(), content_length); + let task = Task::new(content_length); tasks.put(task_id.to_string(), task); self.size += content_length; } pub async fn delete_task(&mut self, task_id: &str) -> Result<()> { let mut tasks = self.tasks.write().await; - let Some((_id, task)) = tasks.pop(task_id) else { + let Some((_, task)) = tasks.pop(task_id) else { return Err(Error::TaskNotFound(task_id.to_string())); }; + self.size -= task.content_length(); Ok(()) } @@ -280,7 +260,6 @@ mod tests { use bytesize::ByteSize; use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Storage; - use std::io::Cursor; use tokio::io::AsyncReadExt; #[tokio::test] @@ -315,7 +294,7 @@ mod tests { ]; for (config, expected_size, expected_capacity) in test_cases { - let cache = Cache::new(Arc::new(config)).unwrap(); + let cache = Cache::new(Arc::new(config)); assert_eq!(cache.size, expected_size); assert_eq!(cache.capacity, expected_capacity); } @@ -330,7 +309,7 @@ mod tests { }, ..Default::default() }; - let cache = Cache::new(Arc::new(config)).unwrap(); + let cache = Cache::new(Arc::new(config)); let test_cases = vec![ // Test non-existent task. @@ -355,7 +334,7 @@ mod tests { assert_eq!(cache.contains_task(task_id).await, expected_result); } "add" => { - let task = Task::new(task_id.to_string(), content_length); + let task = Task::new(content_length); cache.tasks.write().await.put(task_id.to_string(), task); assert_eq!(cache.contains_task(task_id).await, expected_result); } @@ -377,13 +356,13 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); let test_cases = vec![ // Empty task should not be cached. ("empty_task", 0, false), // Task equal to capacity should not be cached. - ("equal_capacity", ByteSize::mib(10).as_u64(), false), + ("equal_capacity", ByteSize::mib(10).as_u64(), true), // Task exceeding capacity should not be cached. ("exceed_capacity", ByteSize::mib(10).as_u64() + 1, false), // Normal sized task should be cached. @@ -408,7 +387,7 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); let test_cases = vec![ // Add tasks until eviction triggers. @@ -439,7 +418,7 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); cache.put_task("task1", ByteSize::mib(1).as_u64()).await; cache.put_task("task2", ByteSize::mib(1).as_u64()).await; @@ -484,7 +463,7 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); let test_cases = vec![ // Check non-existent task. @@ -519,9 +498,8 @@ mod tests { assert!(cache.contains_task(task_id).await); } "add_piece" => { - let mut cursor = Cursor::new(content); cache - .write_piece(task_id, piece_id, &mut cursor, content.len() as u64) + .write_piece(task_id, piece_id, Bytes::from(content)) .await .unwrap(); assert_eq!( @@ -543,18 +521,12 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); // Test writing to non-existent task. let test_data = b"test data".to_vec(); - let mut cursor = Cursor::new(&test_data); let result = cache - .write_piece( - "non_existent", - "piece1", - &mut cursor, - test_data.len() as u64, - ) + .write_piece("non_existent", "piece1", Bytes::from(test_data)) .await; assert!(matches!(result, Err(Error::TaskNotFound(_)))); @@ -574,9 +546,8 @@ mod tests { ]; for (piece_id, content) in &test_cases { - let mut cursor = Cursor::new(content); let result = cache - .write_piece("task1", piece_id, &mut cursor, content.len() as u64) + .write_piece("task1", piece_id, Bytes::copy_from_slice(content)) .await; assert!(result.is_ok()); assert!(cache.contains_piece("task1", piece_id).await); @@ -607,10 +578,9 @@ mod tests { // Test attempting to overwrite existing pieces. // The write should succeed (return Ok) but content should not change. for (piece_id, original_content) in &test_cases { - let new_content = format!("updated content for {}", piece_id).into_bytes(); - let mut cursor = Cursor::new(&new_content); + let new_content = format!("updated content for {}", piece_id); let result = cache - .write_piece("task1", piece_id, &mut cursor, new_content.len() as u64) + .write_piece("task1", piece_id, Bytes::from(new_content)) .await; assert!(result.is_ok()); @@ -648,7 +618,7 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); let piece = Piece { number: 0, @@ -807,9 +777,8 @@ mod tests { // Write all pieces. for (id, content, _, _) in &test_pieces { - let mut cursor = Cursor::new(content); cache - .write_piece("task1", id, &mut cursor, content.len() as u64) + .write_piece("task1", id, Bytes::copy_from_slice(content)) .await .unwrap(); } @@ -838,13 +807,12 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); cache.put_task("task1", ByteSize::mib(1).as_u64()).await; let content = b"test data for concurrent read".to_vec(); - let mut cursor = Cursor::new(&content); cache - .write_piece("task1", "piece1", &mut cursor, content.len() as u64) + .write_piece("task1", "piece1", Bytes::from(content.clone())) .await .unwrap(); @@ -909,7 +877,7 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); cache.put_task("task1", ByteSize::mib(1).as_u64()).await; let cache_arc = Arc::new(cache); @@ -921,10 +889,9 @@ mod tests { let content = format!("content for piece {}", i).into_bytes(); join_set.spawn(async move { - let mut cursor = Cursor::new(&content); let piece_id = format!("piece{}", i); let result = cache_clone - .write_piece("task1", &piece_id, &mut cursor, content.len() as u64) + .write_piece("task1", &piece_id, Bytes::from(content.clone())) .await; assert!(result.is_ok()); @@ -966,18 +933,12 @@ mod tests { }, ..Default::default() }; - let mut cache = Cache::new(Arc::new(config)).unwrap(); + let mut cache = Cache::new(Arc::new(config)); cache.put_task("task1", ByteSize::mib(1).as_u64()).await; let original_content = b"original content".to_vec(); - let mut cursor = Cursor::new(&original_content); cache - .write_piece( - "task1", - "piece1", - &mut cursor, - original_content.len() as u64, - ) + .write_piece("task1", "piece1", Bytes::from(original_content.clone())) .await .unwrap(); @@ -990,9 +951,8 @@ mod tests { let new_content = format!("new content from writer {}", i).into_bytes(); join_set.spawn(async move { - let mut cursor = Cursor::new(&new_content); let result = cache_clone - .write_piece("task1", "piece1", &mut cursor, new_content.len() as u64) + .write_piece("task1", "piece1", Bytes::from(new_content)) .await; assert!(result.is_ok()); }); diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index dc131e52..0e37e007 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -59,13 +59,7 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; - let cache = match cache::Cache::new(config.clone()) { - Ok(cache) => cache, - Err(err) => { - error!("create cache failed: {}", err); - return Err(err); - } - }; + let cache = cache::Cache::new(config.clone()); Ok(Storage { config, @@ -136,7 +130,6 @@ impl Storage { if let Some(content_length) = content_length { let mut cache = self.cache.clone(); cache.put_task(id, content_length).await; - debug!("put task in cache: {}", id); } } @@ -430,26 +423,23 @@ impl Storage { load_to_cache: bool, ) -> Result { let response = if load_to_cache { - let mut buffer = Vec::new(); - let mut inspect_reader = InspectReader::new(reader, |bytes| { + let mut buffer = Vec::with_capacity(length as usize); + let mut tee = InspectReader::new(reader, |bytes| { buffer.extend_from_slice(bytes); }); - self.cache - .write_piece(task_id, piece_id, &mut inspect_reader, length) - .await - .inspect_err(|err| { - error!("load piece to cache failed: {}", err); - })?; - self.content - .write_piece(task_id, offset, &mut &buffer[..]) - .await? + let response = self.content.write_piece(task_id, offset, &mut tee).await?; + + self.cache + .write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) + .await?; + + response } else { self.content.write_piece(task_id, offset, reader).await? }; let digest = Digest::new(Algorithm::Crc32, response.hash); - self.metadata.download_piece_finished( piece_id, offset, @@ -474,20 +464,18 @@ impl Storage { load_to_cache: bool, ) -> Result { let response = if load_to_cache { - let mut buffer = Vec::new(); - let mut inspect_reader = InspectReader::new(reader, |bytes| { + let mut buffer = Vec::with_capacity(length as usize); + let mut tee = InspectReader::new(reader, |bytes| { buffer.extend_from_slice(bytes); }); - self.cache - .write_piece(task_id, piece_id, &mut inspect_reader, length) - .await - .inspect_err(|err| { - error!("load piece to cache failed: {}", err); - })?; - self.content - .write_piece(task_id, offset, &mut &buffer[..]) - .await? + let response = self.content.write_piece(task_id, offset, &mut tee).await?; + + self.cache + .write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) + .await?; + + response } else { self.content.write_piece(task_id, offset, reader).await? }; @@ -537,19 +525,17 @@ impl Storage { match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { if self.cache.contains_piece(task_id, piece_id).await { - debug!("hit cache: {}", piece_id); match self .cache .read_piece(task_id, piece_id, piece.clone(), range) .await { - Ok(cache_reader) => { + Ok(reader) => { // Finish uploading the task. self.metadata.upload_task_finished(task_id)?; - return Ok(Either::Left(cache_reader)); + return Ok(Either::Left(reader)); } Err(err) => { - error!("read piece from cache failed: {}", err); return Err(err); } }