refactor(dragonfly-client-storage): lru cache support for piece download/upload (#1084)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-04-09 23:19:46 +08:00 committed by GitHub
parent 9c3019f88b
commit 2e08f6622a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 74 additions and 161 deletions

View File

@ -14,11 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
use bytes::Bytes;
use bytesize::ByteSize; use bytesize::ByteSize;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use dragonfly_client_config::dfdaemon::{Config, Storage}; use dragonfly_client_config::dfdaemon::{Config, Storage};
use dragonfly_client_storage::{cache::Cache, metadata::Piece}; use dragonfly_client_storage::{cache::Cache, metadata::Piece};
use std::io::Cursor;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
@ -60,11 +60,7 @@ pub fn put_task(c: &mut Criterion) {
&ByteSize::mb(10), &ByteSize::mb(10),
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }),
rt.block_on(async {
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap()
})
},
|mut cache| { |mut cache| {
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
@ -80,11 +76,7 @@ pub fn put_task(c: &mut Criterion) {
&ByteSize::mb(100), &ByteSize::mb(100),
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }),
rt.block_on(async {
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap()
})
},
|mut cache| { |mut cache| {
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
@ -100,11 +92,7 @@ pub fn put_task(c: &mut Criterion) {
&ByteSize::gb(1), &ByteSize::gb(1),
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) }),
rt.block_on(async {
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap()
})
},
|mut cache| { |mut cache| {
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
@ -128,9 +116,8 @@ pub fn delete_task(c: &mut Criterion) {
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || {
let mut cache = rt.block_on(async { let mut cache =
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) });
});
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
}); });
@ -152,9 +139,8 @@ pub fn delete_task(c: &mut Criterion) {
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || {
let mut cache = rt.block_on(async { let mut cache =
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) });
});
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
}); });
@ -176,9 +162,8 @@ pub fn delete_task(c: &mut Criterion) {
|b, size| { |b, size| {
b.iter_batched( b.iter_batched(
|| { || {
let mut cache = rt.block_on(async { let mut cache =
Cache::new(Arc::new(create_config(ByteSize::gb(2)))).unwrap() rt.block_on(async { Cache::new(Arc::new(create_config(ByteSize::gb(2)))) });
});
rt.block_on(async { rt.block_on(async {
cache.put_task("task", black_box(size.as_u64())).await; cache.put_task("task", black_box(size.as_u64())).await;
}); });
@ -211,7 +196,6 @@ pub fn write_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -224,13 +208,11 @@ pub fn write_piece(c: &mut Criterion) {
|cache| { |cache| {
rt.block_on(async { rt.block_on(async {
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();
@ -252,7 +234,6 @@ pub fn write_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -265,13 +246,11 @@ pub fn write_piece(c: &mut Criterion) {
|cache| { |cache| {
rt.block_on(async { rt.block_on(async {
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();
@ -293,7 +272,6 @@ pub fn write_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -306,13 +284,11 @@ pub fn write_piece(c: &mut Criterion) {
|cache| { |cache| {
rt.block_on(async { rt.block_on(async {
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();
@ -341,7 +317,6 @@ pub fn read_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(4) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -349,13 +324,11 @@ pub fn read_piece(c: &mut Criterion) {
.put_task("task", (ByteSize::mb(4) * PIECE_COUNT as u64).as_u64()) .put_task("task", (ByteSize::mb(4) * PIECE_COUNT as u64).as_u64())
.await; .await;
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();
@ -395,7 +368,6 @@ pub fn read_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(10) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -403,13 +375,11 @@ pub fn read_piece(c: &mut Criterion) {
.put_task("task", (ByteSize::mb(10) * PIECE_COUNT as u64).as_u64()) .put_task("task", (ByteSize::mb(10) * PIECE_COUNT as u64).as_u64())
.await; .await;
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();
@ -449,7 +419,6 @@ pub fn read_piece(c: &mut Criterion) {
Cache::new(Arc::new(create_config( Cache::new(Arc::new(create_config(
ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64, ByteSize::mb(16) * PIECE_COUNT as u64 + 1u64,
))) )))
.unwrap()
}); });
rt.block_on(async { rt.block_on(async {
@ -457,13 +426,11 @@ pub fn read_piece(c: &mut Criterion) {
.put_task("task", (ByteSize::mb(16) * PIECE_COUNT as u64).as_u64()) .put_task("task", (ByteSize::mb(16) * PIECE_COUNT as u64).as_u64())
.await; .await;
for i in 0..PIECE_COUNT { for i in 0..PIECE_COUNT {
let mut cursor = Cursor::new(data);
cache cache
.write_piece( .write_piece(
"task", "task",
&format!("piece{}", i), &format!("piece{}", i),
&mut cursor, Bytes::copy_from_slice(data),
data.len() as u64,
) )
.await .await
.unwrap(); .unwrap();

View File

@ -274,7 +274,7 @@ impl<K: Hash + Eq, V> LruCache<K, V> {
match self.map.remove(KeyWrapper::from_ref(k)) { match self.map.remove(KeyWrapper::from_ref(k)) {
None => None, None => None,
Some(entry) => { Some(entry) => {
let entry_ptr: *mut Entry<K, V> = Box::into_raw(entry); let entry_ptr = Box::into_raw(entry);
self.detach(entry_ptr); self.detach(entry_ptr);
unsafe { unsafe {

View File

@ -23,18 +23,15 @@ use std::cmp::{max, min};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; use tokio::io::{AsyncRead, BufReader};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{debug, info}; use tracing::info;
pub mod lru_cache; pub mod lru_cache;
/// Task is the task content in the cache. /// Task is the task content in the cache.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Task { struct Task {
/// id is the id of the task.
id: String,
/// content_length is the length of the task content. /// content_length is the length of the task content.
content_length: u64, content_length: u64,
@ -45,9 +42,8 @@ struct Task {
/// Task implements the task content in the cache. /// Task implements the task content in the cache.
impl Task { impl Task {
/// new creates a new task. /// new creates a new task.
fn new(id: String, content_length: u64) -> Self { fn new(content_length: u64) -> Self {
Self { Self {
id,
content_length, content_length,
pieces: Arc::new(RwLock::new(HashMap::new())), pieces: Arc::new(RwLock::new(HashMap::new())),
} }
@ -124,8 +120,8 @@ pub struct Cache {
/// Cache implements the cache for storing piece content by LRU algorithm. /// Cache implements the cache for storing piece content by LRU algorithm.
impl Cache { impl Cache {
/// new creates a new cache with the specified capacity. /// new creates a new cache with the specified capacity.
pub fn new(config: Arc<Config>) -> Result<Self> { pub fn new(config: Arc<Config>) -> Self {
Ok(Cache { Cache {
config: config.clone(), config: config.clone(),
size: 0, size: 0,
capacity: config.storage.cache_capacity.as_u64(), capacity: config.storage.cache_capacity.as_u64(),
@ -133,7 +129,7 @@ impl Cache {
// by cache capacity(cache size) itself, and used pop_lru to evict the least recently // by cache capacity(cache size) itself, and used pop_lru to evict the least recently
// used task. // used task.
tasks: Arc::new(RwLock::new(LruCache::new(usize::MAX))), tasks: Arc::new(RwLock::new(LruCache::new(usize::MAX))),
}) }
} }
/// read_piece reads the piece from the cache. /// read_piece reads the piece from the cache.
@ -182,35 +178,19 @@ impl Cache {
} }
/// write_piece writes the piece content to the cache. /// write_piece writes the piece content to the cache.
pub async fn write_piece<R: AsyncRead + Unpin + ?Sized>( pub async fn write_piece(&self, task_id: &str, piece_id: &str, content: Bytes) -> Result<()> {
&self,
task_id: &str,
piece_id: &str,
reader: &mut R,
length: u64,
) -> Result<()> {
let mut tasks = self.tasks.write().await; let mut tasks = self.tasks.write().await;
let Some(task) = tasks.get(task_id) else { let Some(task) = tasks.get(task_id) else {
return Err(Error::TaskNotFound(task_id.to_string())); return Err(Error::TaskNotFound(task_id.to_string()));
}; };
if task.contains(piece_id).await { if task.contains(piece_id).await {
debug!("piece {} already exists in cache", piece_id);
return Ok(()); return Ok(());
} }
let mut buffer = Vec::with_capacity(length as usize); task.write_piece(piece_id, content).await;
match reader.read_to_end(&mut buffer).await {
Ok(_) => {
task.write_piece(piece_id, bytes::Bytes::from(buffer)).await;
Ok(()) Ok(())
} }
Err(err) => Err(Error::Unknown(format!(
"failed to read piece data for {}: {}",
piece_id, err
))),
}
}
/// put_task puts a new task into the cache, constrained by the capacity of the cache. /// put_task puts a new task into the cache, constrained by the capacity of the cache.
pub async fn put_task(&mut self, task_id: &str, content_length: u64) { pub async fn put_task(&mut self, task_id: &str, content_length: u64) {
@ -220,7 +200,7 @@ impl Cache {
} }
// If the content length is larger than the cache capacity and the task cannot be cached. // If the content length is larger than the cache capacity and the task cannot be cached.
if content_length >= self.capacity { if content_length > self.capacity {
info!( info!(
"task {} is too large and cannot be cached: {}", "task {} is too large and cannot be cached: {}",
task_id, content_length task_id, content_length
@ -234,7 +214,6 @@ impl Cache {
match tasks.pop_lru() { match tasks.pop_lru() {
Some((_, task)) => { Some((_, task)) => {
self.size -= task.content_length(); self.size -= task.content_length();
debug!("evicted task in cache: {}", task.id);
} }
None => { None => {
break; break;
@ -242,16 +221,17 @@ impl Cache {
} }
} }
let task = Task::new(task_id.to_string(), content_length); let task = Task::new(content_length);
tasks.put(task_id.to_string(), task); tasks.put(task_id.to_string(), task);
self.size += content_length; self.size += content_length;
} }
pub async fn delete_task(&mut self, task_id: &str) -> Result<()> { pub async fn delete_task(&mut self, task_id: &str) -> Result<()> {
let mut tasks = self.tasks.write().await; let mut tasks = self.tasks.write().await;
let Some((_id, task)) = tasks.pop(task_id) else { let Some((_, task)) = tasks.pop(task_id) else {
return Err(Error::TaskNotFound(task_id.to_string())); return Err(Error::TaskNotFound(task_id.to_string()));
}; };
self.size -= task.content_length(); self.size -= task.content_length();
Ok(()) Ok(())
} }
@ -280,7 +260,6 @@ mod tests {
use bytesize::ByteSize; use bytesize::ByteSize;
use dragonfly_api::common::v2::Range; use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Storage; use dragonfly_client_config::dfdaemon::Storage;
use std::io::Cursor;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
#[tokio::test] #[tokio::test]
@ -315,7 +294,7 @@ mod tests {
]; ];
for (config, expected_size, expected_capacity) in test_cases { for (config, expected_size, expected_capacity) in test_cases {
let cache = Cache::new(Arc::new(config)).unwrap(); let cache = Cache::new(Arc::new(config));
assert_eq!(cache.size, expected_size); assert_eq!(cache.size, expected_size);
assert_eq!(cache.capacity, expected_capacity); assert_eq!(cache.capacity, expected_capacity);
} }
@ -330,7 +309,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let cache = Cache::new(Arc::new(config)).unwrap(); let cache = Cache::new(Arc::new(config));
let test_cases = vec![ let test_cases = vec![
// Test non-existent task. // Test non-existent task.
@ -355,7 +334,7 @@ mod tests {
assert_eq!(cache.contains_task(task_id).await, expected_result); assert_eq!(cache.contains_task(task_id).await, expected_result);
} }
"add" => { "add" => {
let task = Task::new(task_id.to_string(), content_length); let task = Task::new(content_length);
cache.tasks.write().await.put(task_id.to_string(), task); cache.tasks.write().await.put(task_id.to_string(), task);
assert_eq!(cache.contains_task(task_id).await, expected_result); assert_eq!(cache.contains_task(task_id).await, expected_result);
} }
@ -377,13 +356,13 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
let test_cases = vec![ let test_cases = vec![
// Empty task should not be cached. // Empty task should not be cached.
("empty_task", 0, false), ("empty_task", 0, false),
// Task equal to capacity should not be cached. // Task equal to capacity should not be cached.
("equal_capacity", ByteSize::mib(10).as_u64(), false), ("equal_capacity", ByteSize::mib(10).as_u64(), true),
// Task exceeding capacity should not be cached. // Task exceeding capacity should not be cached.
("exceed_capacity", ByteSize::mib(10).as_u64() + 1, false), ("exceed_capacity", ByteSize::mib(10).as_u64() + 1, false),
// Normal sized task should be cached. // Normal sized task should be cached.
@ -408,7 +387,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
let test_cases = vec![ let test_cases = vec![
// Add tasks until eviction triggers. // Add tasks until eviction triggers.
@ -439,7 +418,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
cache.put_task("task1", ByteSize::mib(1).as_u64()).await; cache.put_task("task1", ByteSize::mib(1).as_u64()).await;
cache.put_task("task2", ByteSize::mib(1).as_u64()).await; cache.put_task("task2", ByteSize::mib(1).as_u64()).await;
@ -484,7 +463,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
let test_cases = vec![ let test_cases = vec![
// Check non-existent task. // Check non-existent task.
@ -519,9 +498,8 @@ mod tests {
assert!(cache.contains_task(task_id).await); assert!(cache.contains_task(task_id).await);
} }
"add_piece" => { "add_piece" => {
let mut cursor = Cursor::new(content);
cache cache
.write_piece(task_id, piece_id, &mut cursor, content.len() as u64) .write_piece(task_id, piece_id, Bytes::from(content))
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@ -543,18 +521,12 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
// Test writing to non-existent task. // Test writing to non-existent task.
let test_data = b"test data".to_vec(); let test_data = b"test data".to_vec();
let mut cursor = Cursor::new(&test_data);
let result = cache let result = cache
.write_piece( .write_piece("non_existent", "piece1", Bytes::from(test_data))
"non_existent",
"piece1",
&mut cursor,
test_data.len() as u64,
)
.await; .await;
assert!(matches!(result, Err(Error::TaskNotFound(_)))); assert!(matches!(result, Err(Error::TaskNotFound(_))));
@ -574,9 +546,8 @@ mod tests {
]; ];
for (piece_id, content) in &test_cases { for (piece_id, content) in &test_cases {
let mut cursor = Cursor::new(content);
let result = cache let result = cache
.write_piece("task1", piece_id, &mut cursor, content.len() as u64) .write_piece("task1", piece_id, Bytes::copy_from_slice(content))
.await; .await;
assert!(result.is_ok()); assert!(result.is_ok());
assert!(cache.contains_piece("task1", piece_id).await); assert!(cache.contains_piece("task1", piece_id).await);
@ -607,10 +578,9 @@ mod tests {
// Test attempting to overwrite existing pieces. // Test attempting to overwrite existing pieces.
// The write should succeed (return Ok) but content should not change. // The write should succeed (return Ok) but content should not change.
for (piece_id, original_content) in &test_cases { for (piece_id, original_content) in &test_cases {
let new_content = format!("updated content for {}", piece_id).into_bytes(); let new_content = format!("updated content for {}", piece_id);
let mut cursor = Cursor::new(&new_content);
let result = cache let result = cache
.write_piece("task1", piece_id, &mut cursor, new_content.len() as u64) .write_piece("task1", piece_id, Bytes::from(new_content))
.await; .await;
assert!(result.is_ok()); assert!(result.is_ok());
@ -648,7 +618,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
let piece = Piece { let piece = Piece {
number: 0, number: 0,
@ -807,9 +777,8 @@ mod tests {
// Write all pieces. // Write all pieces.
for (id, content, _, _) in &test_pieces { for (id, content, _, _) in &test_pieces {
let mut cursor = Cursor::new(content);
cache cache
.write_piece("task1", id, &mut cursor, content.len() as u64) .write_piece("task1", id, Bytes::copy_from_slice(content))
.await .await
.unwrap(); .unwrap();
} }
@ -838,13 +807,12 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
cache.put_task("task1", ByteSize::mib(1).as_u64()).await; cache.put_task("task1", ByteSize::mib(1).as_u64()).await;
let content = b"test data for concurrent read".to_vec(); let content = b"test data for concurrent read".to_vec();
let mut cursor = Cursor::new(&content);
cache cache
.write_piece("task1", "piece1", &mut cursor, content.len() as u64) .write_piece("task1", "piece1", Bytes::from(content.clone()))
.await .await
.unwrap(); .unwrap();
@ -909,7 +877,7 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
cache.put_task("task1", ByteSize::mib(1).as_u64()).await; cache.put_task("task1", ByteSize::mib(1).as_u64()).await;
let cache_arc = Arc::new(cache); let cache_arc = Arc::new(cache);
@ -921,10 +889,9 @@ mod tests {
let content = format!("content for piece {}", i).into_bytes(); let content = format!("content for piece {}", i).into_bytes();
join_set.spawn(async move { join_set.spawn(async move {
let mut cursor = Cursor::new(&content);
let piece_id = format!("piece{}", i); let piece_id = format!("piece{}", i);
let result = cache_clone let result = cache_clone
.write_piece("task1", &piece_id, &mut cursor, content.len() as u64) .write_piece("task1", &piece_id, Bytes::from(content.clone()))
.await; .await;
assert!(result.is_ok()); assert!(result.is_ok());
@ -966,18 +933,12 @@ mod tests {
}, },
..Default::default() ..Default::default()
}; };
let mut cache = Cache::new(Arc::new(config)).unwrap(); let mut cache = Cache::new(Arc::new(config));
cache.put_task("task1", ByteSize::mib(1).as_u64()).await; cache.put_task("task1", ByteSize::mib(1).as_u64()).await;
let original_content = b"original content".to_vec(); let original_content = b"original content".to_vec();
let mut cursor = Cursor::new(&original_content);
cache cache
.write_piece( .write_piece("task1", "piece1", Bytes::from(original_content.clone()))
"task1",
"piece1",
&mut cursor,
original_content.len() as u64,
)
.await .await
.unwrap(); .unwrap();
@ -990,9 +951,8 @@ mod tests {
let new_content = format!("new content from writer {}", i).into_bytes(); let new_content = format!("new content from writer {}", i).into_bytes();
join_set.spawn(async move { join_set.spawn(async move {
let mut cursor = Cursor::new(&new_content);
let result = cache_clone let result = cache_clone
.write_piece("task1", "piece1", &mut cursor, new_content.len() as u64) .write_piece("task1", "piece1", Bytes::from(new_content))
.await; .await;
assert!(result.is_ok()); assert!(result.is_ok());
}); });

View File

@ -59,13 +59,7 @@ impl Storage {
pub async fn new(config: Arc<Config>, dir: &Path, log_dir: PathBuf) -> Result<Self> { pub async fn new(config: Arc<Config>, dir: &Path, log_dir: PathBuf) -> Result<Self> {
let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?;
let content = content::Content::new(config.clone(), dir).await?; let content = content::Content::new(config.clone(), dir).await?;
let cache = match cache::Cache::new(config.clone()) { let cache = cache::Cache::new(config.clone());
Ok(cache) => cache,
Err(err) => {
error!("create cache failed: {}", err);
return Err(err);
}
};
Ok(Storage { Ok(Storage {
config, config,
@ -136,7 +130,6 @@ impl Storage {
if let Some(content_length) = content_length { if let Some(content_length) = content_length {
let mut cache = self.cache.clone(); let mut cache = self.cache.clone();
cache.put_task(id, content_length).await; cache.put_task(id, content_length).await;
debug!("put task in cache: {}", id);
} }
} }
@ -430,26 +423,23 @@ impl Storage {
load_to_cache: bool, load_to_cache: bool,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = if load_to_cache { let response = if load_to_cache {
let mut buffer = Vec::new(); let mut buffer = Vec::with_capacity(length as usize);
let mut inspect_reader = InspectReader::new(reader, |bytes| { let mut tee = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes); buffer.extend_from_slice(bytes);
}); });
self.cache
.write_piece(task_id, piece_id, &mut inspect_reader, length)
.await
.inspect_err(|err| {
error!("load piece to cache failed: {}", err);
})?;
self.content let response = self.content.write_piece(task_id, offset, &mut tee).await?;
.write_piece(task_id, offset, &mut &buffer[..])
.await? self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
.await?;
response
} else { } else {
self.content.write_piece(task_id, offset, reader).await? self.content.write_piece(task_id, offset, reader).await?
}; };
let digest = Digest::new(Algorithm::Crc32, response.hash); let digest = Digest::new(Algorithm::Crc32, response.hash);
self.metadata.download_piece_finished( self.metadata.download_piece_finished(
piece_id, piece_id,
offset, offset,
@ -474,20 +464,18 @@ impl Storage {
load_to_cache: bool, load_to_cache: bool,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = if load_to_cache { let response = if load_to_cache {
let mut buffer = Vec::new(); let mut buffer = Vec::with_capacity(length as usize);
let mut inspect_reader = InspectReader::new(reader, |bytes| { let mut tee = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes); buffer.extend_from_slice(bytes);
}); });
self.cache
.write_piece(task_id, piece_id, &mut inspect_reader, length)
.await
.inspect_err(|err| {
error!("load piece to cache failed: {}", err);
})?;
self.content let response = self.content.write_piece(task_id, offset, &mut tee).await?;
.write_piece(task_id, offset, &mut &buffer[..])
.await? self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
.await?;
response
} else { } else {
self.content.write_piece(task_id, offset, reader).await? self.content.write_piece(task_id, offset, reader).await?
}; };
@ -537,19 +525,17 @@ impl Storage {
match self.metadata.get_piece(piece_id) { match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => { Ok(Some(piece)) => {
if self.cache.contains_piece(task_id, piece_id).await { if self.cache.contains_piece(task_id, piece_id).await {
debug!("hit cache: {}", piece_id);
match self match self
.cache .cache
.read_piece(task_id, piece_id, piece.clone(), range) .read_piece(task_id, piece_id, piece.clone(), range)
.await .await
{ {
Ok(cache_reader) => { Ok(reader) => {
// Finish uploading the task. // Finish uploading the task.
self.metadata.upload_task_finished(task_id)?; self.metadata.upload_task_finished(task_id)?;
return Ok(Either::Left(cache_reader)); return Ok(Either::Left(reader));
} }
Err(err) => { Err(err) => {
error!("read piece from cache failed: {}", err);
return Err(err); return Err(err);
} }
} }