From 9ca89909f32b5b8214fde740ceead9e81037461a Mon Sep 17 00:00:00 2001 From: SouthWest7 <62731404+SouthWest7@users.noreply.github.com> Date: Fri, 14 Mar 2025 13:47:53 +0800 Subject: [PATCH] feat: add a LRU cache module for preheat jobs (#945) Signed-off-by: southwest miao <1403572259@qq.com> --- Cargo.lock | 2 + Cargo.toml | 1 + dragonfly-client-config/src/dfdaemon.rs | 22 +- dragonfly-client-storage/Cargo.toml | 2 + .../src/cache/lru_cache.rs | 386 ++++++++++++++++++ dragonfly-client-storage/src/cache/mod.rs | 246 +++++++++++ dragonfly-client-storage/src/lib.rs | 99 ++++- dragonfly-client/src/resource/piece.rs | 5 + dragonfly-client/src/resource/task.rs | 24 ++ 9 files changed, 781 insertions(+), 6 deletions(-) create mode 100644 dragonfly-client-storage/src/cache/lru_cache.rs create mode 100644 dragonfly-client-storage/src/cache/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 2f808135..b754784b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1093,6 +1093,7 @@ version = "0.2.19" dependencies = [ "base16ct", "bincode", + "bytes", "chrono", "crc32fast", "dragonfly-api", @@ -1100,6 +1101,7 @@ dependencies = [ "dragonfly-client-core", "dragonfly-client-util", "fs2", + "lru", "num_cpus", "prost-wkt-types", "rayon", diff --git a/Cargo.toml b/Cargo.toml index fe5c1538..1a1c8aff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,7 @@ serde_json = "1.0.140" lru = "0.12.5" fs2 = "0.4.3" lazy_static = "1.5" +bytes = "1.9" [profile.release] opt-level = 3 diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 711b41aa..391fe19a 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -200,6 +200,18 @@ fn default_storage_read_buffer_size() -> usize { 4 * 1024 * 1024 } +/// default_storage_cache_capacity is the default cache capacity for the preheat job, default is 10GiB. +#[inline] +fn default_storage_cache_capacity() -> usize { + 10 * 1024 * 1024 * 1024 +} + +/// default_storage_cache_tasks_capacity is the default capacity of the tasks that can be stored in the cache, default is 1. +#[inline] +fn default_storage_cache_tasks_capacity() -> usize { + 1 +} + /// default_seed_peer_cluster_id is the default cluster id of seed peer. #[inline] fn default_seed_peer_cluster_id() -> u64 { @@ -968,7 +980,12 @@ pub struct Storage { /// | | /// +--------------------------------------------------+ /// ``` - pub cache_capacity: Option, + #[serde(default = "default_storage_cache_capacity")] + pub cache_capacity: usize, + + /// cache_tasks_capacity is the capacity of the tasks that can be stored in the cache, default is 1. + #[serde(default = "default_storage_cache_tasks_capacity")] + pub cache_tasks_capacity: usize, } /// Storage implements Default. @@ -980,7 +997,8 @@ impl Default for Storage { keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), read_buffer_size: default_storage_read_buffer_size(), - cache_capacity: None, + cache_capacity: default_storage_cache_capacity(), + cache_tasks_capacity: default_storage_cache_tasks_capacity(), } } } diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index a026e4ac..44e5b936 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -26,6 +26,8 @@ sha2.workspace = true crc32fast.workspace = true base16ct.workspace = true fs2.workspace = true +lru.workspace = true +bytes.workspace = true num_cpus = "1.0" bincode = "1.3.3" rayon = "1.10.0" diff --git a/dragonfly-client-storage/src/cache/lru_cache.rs b/dragonfly-client-storage/src/cache/lru_cache.rs new file mode 100644 index 00000000..77a14a87 --- /dev/null +++ b/dragonfly-client-storage/src/cache/lru_cache.rs @@ -0,0 +1,386 @@ +use std::{borrow::Borrow, collections::HashMap, hash::Hash, hash::Hasher, num::NonZeroUsize, ptr}; + +#[derive(Debug, Clone, Copy)] +/// KeyRef is a reference to the key. +struct KeyRef { + k: *const K, +} + +impl Hash for KeyRef { + fn hash(&self, state: &mut H) { + unsafe { + let key = &*self.k; + key.hash(state) + } + } +} + +impl PartialEq for KeyRef { + fn eq(&self, other: &Self) -> bool { + unsafe { + let key1 = &*self.k; + let key2 = &*other.k; + key1.eq(key2) + } + } +} + +impl Eq for KeyRef {} + +#[repr(transparent)] +/// KeyWrapper is a wrapper for the key. +struct KeyWrapper(K); + +impl KeyWrapper { + /// from_ref creates a new KeyWrapper from a reference to the key. + fn from_ref(key: &K) -> &Self { + unsafe { &*(key as *const K as *const KeyWrapper) } + } +} + +impl Hash for KeyWrapper { + fn hash(&self, state: &mut H) { + self.0.hash(state) + } +} + +impl PartialEq for KeyWrapper { + #![allow(unknown_lints)] + #[allow(clippy::unconditional_recursion)] + fn eq(&self, other: &Self) -> bool { + self.0.eq(&other.0) + } +} + +impl Eq for KeyWrapper {} + +impl Borrow> for KeyRef +where + K: Borrow, + Q: ?Sized, +{ + fn borrow(&self) -> &KeyWrapper { + unsafe { + let key = &*self.k; + KeyWrapper::from_ref(key.borrow()) + } + } +} + +struct Entry { + key: K, + value: V, + prev: Option<*mut Entry>, + next: Option<*mut Entry>, +} + +impl Entry { + fn new(key: K, value: V) -> Self { + Self { + key, + value, + prev: None, + next: None, + } + } +} + +pub struct LruCache { + cap: NonZeroUsize, + map: HashMap, Box>>, + head: Option<*mut Entry>, + tail: Option<*mut Entry>, + _marker: std::marker::PhantomData, +} + +impl LruCache { + /// new creates a new LruCache. + pub fn new(cap: NonZeroUsize) -> Self { + Self { + cap, + map: HashMap::with_capacity(cap.get()), + head: None, + tail: None, + _marker: std::marker::PhantomData, + } + } + /// get gets the value of the key. + pub fn get<'a, Q>(&'a mut self, k: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + if let Some(entry) = self.map.get_mut(KeyWrapper::from_ref(k)) { + let entry_ptr: *mut Entry = &mut **entry; + + unsafe { + self.detach(entry_ptr); + self.attach(entry_ptr); + } + + Some(&unsafe { &*entry_ptr }.value) + } else { + None + } + } + + /// put puts the key and value into the cache. + pub fn put(&mut self, key: K, value: V) -> Option { + let key_ref = KeyRef { k: &key }; + + if let Some(existing_entry) = self.map.get_mut(&key_ref) { + let entry_ptr = existing_entry.as_mut() as *mut Entry; + + let old_value = unsafe { + let old_value = ptr::read(&(*entry_ptr).value); + ptr::write(&mut (*entry_ptr).value, value); + old_value + }; + + unsafe { + self.detach(entry_ptr); + self.attach(entry_ptr); + } + + return Some(old_value); + } + + let mut evicted_value = None; + if self.map.len() >= self.cap.get() { + if let Some(tail) = self.tail { + unsafe { + let tail_key_ref = KeyRef { k: &(*tail).key }; + + self.detach(tail); + + if let Some(entry) = self.map.remove(&tail_key_ref) { + evicted_value = Some(entry.value); + } + } + } + } + + let new_entry = Box::new(Entry::new(key, value)); + let key_ptr = &new_entry.key as *const K; + let entry_ptr = Box::into_raw(new_entry); + + unsafe { + self.attach(entry_ptr); + self.map + .insert(KeyRef { k: key_ptr }, Box::from_raw(entry_ptr)); + } + + evicted_value + } + + /// detach detaches the entry from the cache. + unsafe fn detach(&mut self, entry: *mut Entry) { + let prev = (*entry).prev; + let next = (*entry).next; + + match prev { + Some(prev) => (*prev).next = next, + None => self.head = next, + } + + match next { + Some(next) => (*next).prev = prev, + None => self.tail = prev, + } + + (*entry).prev = None; + (*entry).next = None; + } + + /// attach attaches the entry to the cache. + unsafe fn attach(&mut self, entry: *mut Entry) { + match self.head { + Some(old_head) => { + (*entry).next = Some(old_head); + (*old_head).prev = Some(entry); + self.head = Some(entry); + } + None => { + self.head = Some(entry); + self.tail = Some(entry); + } + } + } + + /// contains checks whether the key exists in the cache. + pub fn contains(&self, k: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.map.contains_key(KeyWrapper::from_ref(k)) + } + + /// peek peeks the value of the key. It does not move the key to the front of the cache. + pub fn peek<'a, Q>(&'a self, k: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.map + .get(KeyWrapper::from_ref(k)) + .map(|entry| &entry.value) + } + + /// pop_lru pops the least recently used value from the cache. + pub fn pop_lru(&mut self) -> Option<(K, V)> { + if self.is_empty() { + return None; + } + + let tail = self.tail?; + + unsafe { + self.detach(tail); + + let tail_key_ref = KeyRef { k: &(*tail).key }; + self.map + .remove(&tail_key_ref) + .map(|entry| (entry.key, entry.value)) + } + } + + /// is_empty checks whether the cache is empty. + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } +} + +unsafe impl Send for LruCache {} +unsafe impl Sync for LruCache {} + +impl Drop for LruCache { + fn drop(&mut self) { + self.map.clear(); + self.head = None; + self.tail = None; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + + #[test] + fn test_put() { + let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap()); + + assert_eq!(cache.put("key1".to_string(), "value1".to_string()), None); + assert_eq!(cache.put("key2".to_string(), "value2".to_string()), None); + assert_eq!(cache.put("key3".to_string(), "value3".to_string()), None); + + assert_eq!( + cache.put("key2".to_string(), "value2_updated".to_string()), + Some("value2".to_string()) + ); + + assert_eq!(cache.get("key2"), Some(&"value2_updated".to_string())); + + assert_eq!( + cache.put("key4".to_string(), "value4".to_string()), + Some("value1".to_string()) + ); + + assert_eq!(cache.get("key1"), None); + + assert!(cache.contains("key2")); + assert!(cache.contains("key3")); + assert!(cache.contains("key4")); + } + + #[test] + fn test_get() { + let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap()); + + cache.put("key1".to_string(), "value1".to_string()); + cache.put("key2".to_string(), "value2".to_string()); + cache.put("key3".to_string(), "value3".to_string()); + + assert_eq!(cache.get("key1"), Some(&"value1".to_string())); + assert_eq!(cache.get("key2"), Some(&"value2".to_string())); + assert_eq!(cache.get("key3"), Some(&"value3".to_string())); + + assert_eq!(cache.get("key4"), None); + + cache.put("key4".to_string(), "value4".to_string()); + + assert_eq!(cache.get("key1"), None); + assert_eq!(cache.get("key2"), Some(&"value2".to_string())); + assert_eq!(cache.get("key3"), Some(&"value3".to_string())); + assert_eq!(cache.get("key4"), Some(&"value4".to_string())); + } + + #[test] + fn test_peek() { + let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap()); + + cache.put("key1".to_string(), "value1".to_string()); + cache.put("key2".to_string(), "value2".to_string()); + cache.put("key3".to_string(), "value3".to_string()); + + assert_eq!(cache.peek("key1"), Some(&"value1".to_string())); + assert_eq!(cache.peek("key2"), Some(&"value2".to_string())); + assert_eq!(cache.peek("key3"), Some(&"value3".to_string())); + + assert_eq!(cache.peek("key4"), None); + + cache.put("key4".to_string(), "value4".to_string()); + + assert_eq!(cache.peek("key1"), None); + assert_eq!(cache.peek("key2"), Some(&"value2".to_string())); + assert_eq!(cache.peek("key3"), Some(&"value3".to_string())); + assert_eq!(cache.peek("key4"), Some(&"value4".to_string())); + + let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap()); + cache.put("key1".to_string(), "value1".to_string()); + cache.put("key2".to_string(), "value2".to_string()); + + cache.peek("key1"); + cache.put("key3".to_string(), "value3".to_string()); + assert_eq!(cache.peek("key1"), None); + + let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap()); + cache.put("key1".to_string(), "value1".to_string()); + cache.put("key2".to_string(), "value2".to_string()); + + cache.get("key1"); + cache.put("key3".to_string(), "value3".to_string()); + assert_eq!(cache.peek("key2"), None); + assert_eq!(cache.peek("key1"), Some(&"value1".to_string())); + } + + #[test] + fn test_contains() { + let mut cache = LruCache::new(NonZeroUsize::new(5).unwrap()); + + let test_cases = vec![ + ("piece_1", Bytes::from("data 1"), false), + ("piece_2", Bytes::from("data 2"), true), + ("piece_3", Bytes::from("data 3"), false), + ("piece_4", Bytes::from("data 4"), true), + ("piece_5", Bytes::from("data 5"), true), + ("piece_6", Bytes::from("data 6"), true), + ("piece_7", Bytes::from("data 7"), true), + ]; + + for (piece_id, piece_content, _) in test_cases[0..6].iter() { + cache.put(piece_id.to_string(), piece_content.clone()); + } + + let _ = cache.get("piece_2"); + + let (piece_id, piece_content, _) = &test_cases[6]; + cache.put(piece_id.to_string(), piece_content.clone()); + + for (piece_id, _, expected_existence) in test_cases { + let exists = cache.contains(piece_id); + assert_eq!(exists, expected_existence); + } + } +} diff --git a/dragonfly-client-storage/src/cache/mod.rs b/dragonfly-client-storage/src/cache/mod.rs new file mode 100644 index 00000000..2c9ca894 --- /dev/null +++ b/dragonfly-client-storage/src/cache/mod.rs @@ -0,0 +1,246 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytes::Bytes; +use dragonfly_api::common::v2::Range; +use dragonfly_client_core::{Error, Result}; +use lru_cache::LruCache; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::io::Cursor; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; +use tokio::sync::RwLock; +use tracing::info; +mod lru_cache; + +#[derive(Clone, Debug)] +/// Task is the task content in the cache. +struct Task { + /// id is the id of the task. + id: String, + + /// content_length is the length of the task content. + content_length: u64, + + /// pieces is the pieces content of the task. + pieces: Arc>>, +} + +impl Task { + /// new creates a new task. + fn new(id: String, content_length: u64) -> Self { + Self { + id, + content_length, + pieces: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// write_piece writes the piece content to the task. + async fn write_piece(&self, piece_id: &str, piece: Bytes) { + let mut pieces = self.pieces.write().await; + pieces.insert(piece_id.to_string(), piece); + } + + /// read_piece reads the piece content from the task. + async fn read_piece(&self, piece_id: &str) -> Option { + let pieces = self.pieces.read().await; + pieces.get(piece_id).cloned() + } + + /// contains checks whether the piece exists in the task. + async fn contains(&self, piece_id: &str) -> bool { + let pieces = self.pieces.read().await; + pieces.contains_key(piece_id) + } + + /// content_length returns the content length of the task. + fn content_length(&self) -> u64 { + self.content_length + } +} + +/// Cache is the cache for storing piece content by LRU algorithm. +/// +/// Cache storage: +/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`. +/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443. +/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no +/// page cache for the first read. +/// ``` +/// +/// 1.Preheat +/// | +/// | +/// +--------------------------------------------------+ +/// | | Peer | +/// | | +-----------+ | +/// | | -- Partial -->| Cache | | +/// | | | +-----------+ | +/// | v | | | | +/// | Download | Miss | | +/// | Task -->| | --- Hit ------>|<-- 2.Download +/// | | | ^ | +/// | | v | | +/// | | +-----------+ | | +/// | -- Full -->| Disk |---------- | +/// | +-----------+ | +/// | | +/// +--------------------------------------------------+ +/// ``` +/// Task is the metadata of the task. +#[derive(Clone)] +pub struct Cache { + /// size is the size of the cache in bytes. + size: u64, + + /// capacity is the maximum capacity of the cache in bytes. + capacity: u64, + + /// tasks stores the tasks with their task id. + tasks: Arc>>, +} + +/// Cache implements the cache for storing piece content by LRU algorithm. +impl Cache { + /// new creates a new cache with the specified capacity. + pub fn new(capacity: usize, tasks_capacity: usize) -> Result { + let capacity = NonZeroUsize::new(capacity).ok_or(Error::InvalidParameter)?; + let tasks_capacity = NonZeroUsize::new(tasks_capacity).ok_or(Error::InvalidParameter)?; + let tasks = Arc::new(RwLock::new(LruCache::new(tasks_capacity))); + + Ok(Cache { + size: 0, + capacity: capacity.get() as u64, + tasks, + }) + } + + /// read_piece reads the piece from the cache. + pub async fn read_piece( + &self, + task_id: &str, + piece_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result { + // Try to get the piece content from the task. + let mut tasks = self.tasks.write().await; + let Some(task) = tasks.get(task_id) else { + return Err(Error::TaskNotFound(task_id.to_string())); + }; + let Some(piece_content) = task.read_piece(piece_id).await else { + return Err(Error::PieceNotFound(piece_id.to_string())); + }; + + // Calculate the range of bytes to return based on the range provided. + let (target_offset, target_length) = if let Some(range) = range { + let target_offset = max(offset, range.start) - offset; + let target_length = + min(offset + length - 1, range.start + range.length - 1) - target_offset - offset + + 1; + (target_offset as usize, target_length as usize) + } else { + (0, length as usize) + }; + + // Check if the target range is valid. + let begin = target_offset; + let end = target_offset + target_length; + if begin >= piece_content.len() || end > piece_content.len() { + return Err(Error::InvalidParameter); + } + + let reader = BufReader::new(Cursor::new(piece_content.slice(begin..end))); + Ok(reader) + } + + /// write_piece writes the piece content to the cache. + pub async fn write_piece( + &self, + task_id: &str, + piece_id: &str, + reader: &mut R, + length: u64, + ) -> Result<()> { + let mut tasks = self.tasks.write().await; + let Some(task) = tasks.get(task_id) else { + return Err(Error::TaskNotFound(task_id.to_string())); + }; + + // The piece already exists in the cache. + if task.contains(piece_id).await { + return Err(Error::Unknown(format!( + "overwrite existing piece {}", + piece_id + ))); + } + + let mut buffer = Vec::with_capacity(length as usize); + match reader.read_to_end(&mut buffer).await { + Ok(_) => { + task.write_piece(piece_id, Bytes::from(buffer)).await; + Ok(()) + } + Err(err) => Err(Error::Unknown(format!( + "Failed to read piece data for {}: {}", + piece_id, err + ))), + } + } + + /// put_task puts a new task into the cache, constrained by the capacity of the cache. + pub async fn put_task(&mut self, task_id: &str, content_length: u64) { + if content_length == 0 { + return; + } + + let mut tasks = self.tasks.write().await; + let new_task = Task::new(task_id.to_string(), content_length); + while self.size + content_length > self.capacity { + match tasks.pop_lru() { + Some((_, task)) => { + self.size -= task.content_length(); + info!("evicted task in cache: {}", task.id); + } + None => { + break; + } + } + } + tasks.put(task_id.to_string(), new_task); + self.size += content_length; + } + + /// contains_task checks whether the task exists in the cache. + pub async fn contains_task(&self, id: &str) -> bool { + let tasks = self.tasks.read().await; + tasks.contains(id) + } + + /// contains_piece checks whether the piece exists in the specified task. + pub async fn contains_piece(&self, task_id: &str, piece_id: &str) -> bool { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.peek(task_id) { + task.contains(piece_id).await + } else { + false + } + } +} diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index fef9c4e5..68746271 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -25,8 +25,11 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; -use tracing::{debug, error, instrument, warn}; +use tokio_util::either::Either; +use tokio_util::io::InspectReader; +use tracing::{debug, error, info, instrument, warn}; +pub mod cache; pub mod content; pub mod metadata; pub mod storage_engine; @@ -44,6 +47,9 @@ pub struct Storage { /// content implements the content storage. content: content::Content, + + /// cache implements the cache for preheat task. + cache: cache::Cache, } /// Storage implements the storage. @@ -53,10 +59,16 @@ impl Storage { pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; + let cache = cache::Cache::new( + config.storage.cache_capacity, + config.storage.cache_tasks_capacity, + )?; + Ok(Storage { config, metadata, content, + cache, }) } @@ -348,8 +360,34 @@ impl Storage { offset: u64, length: u64, reader: &mut R, + load_to_cache: bool, ) -> Result { - let response = self.content.write_piece(task_id, offset, reader).await?; + let response = if load_to_cache { + // Load piece content to cache. + let mut buffer = Vec::new(); + let mut inspect_reader = InspectReader::new(reader, |bytes| { + buffer.extend_from_slice(bytes); + }); + match self + .cache + .write_piece(task_id, piece_id, &mut inspect_reader, length) + .await + { + Ok(_) => { + info!("load piece to cache. piece-id: {}", piece_id); + } + Err(err) => { + error!("load piece to cache failed: {}", err); + } + } + + self.content + .write_piece(task_id, offset, &mut &buffer[..]) + .await? + } else { + self.content.write_piece(task_id, offset, reader).await? + }; + let digest = Digest::new(Algorithm::Crc32, response.hash); self.metadata.download_piece_finished( @@ -362,6 +400,7 @@ impl Storage { } /// download_piece_from_parent_finished is used for downloading piece from parent. + #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub async fn download_piece_from_parent_finished( &self, @@ -371,8 +410,34 @@ impl Storage { expected_digest: &str, parent_id: &str, reader: &mut R, + load_to_cache: bool, + length: u64, ) -> Result { - let response = self.content.write_piece(task_id, offset, reader).await?; + let response = if load_to_cache { + // Load piece content to cache. + let mut buffer = Vec::new(); + let mut inspect_reader = InspectReader::new(reader, |bytes| { + buffer.extend_from_slice(bytes); + }); + match self + .cache + .write_piece(task_id, piece_id, &mut inspect_reader, length) + .await + { + Ok(_) => { + info!("load piece to cache. piece-id: {}", piece_id); + } + Err(err) => { + error!("load piece to cache failed: {}", err); + } + } + + self.content + .write_piece(task_id, offset, &mut &buffer[..]) + .await? + } else { + self.content.write_piece(task_id, offset, reader).await? + }; let length = response.length; let digest = Digest::new(Algorithm::Crc32, response.hash); @@ -418,6 +483,23 @@ impl Storage { // Get the piece metadata and return the content of the piece. match self.metadata.get_piece(piece_id) { Ok(Some(piece)) => { + // Try to upload piece content from cache. + if self.cache.contains_piece(task_id, piece_id).await { + match self + .cache + .read_piece(task_id, piece_id, piece.offset, piece.length, range) + .await + { + Ok(cache_reader) => { + // Finish uploading the task. + self.metadata.upload_task_finished(task_id)?; + return Ok(Either::Left(cache_reader)); + } + Err(_err) => {} + } + } + + // Upload piece content from storage when cache entry is not hit. match self .content .read_piece(task_id, piece.offset, piece.length, range) @@ -426,7 +508,7 @@ impl Storage { Ok(reader) => { // Finish uploading the task. self.metadata.upload_task_finished(task_id)?; - Ok(reader) + Ok(Either::Right(reader)) } Err(err) => { // Failed uploading the task. @@ -496,6 +578,15 @@ impl Storage { } } + /// add_task_to_cache adds the task to the cache. + #[instrument(skip_all)] + pub async fn add_task_to_cache(&self, task_id: &str, content_length: u64) { + let mut cache = self.cache.clone(); + if !cache.contains_task(task_id).await { + cache.put_task(task_id, content_length).await; + } + } + /// get_piece returns the piece metadata. #[instrument(skip_all)] pub fn get_piece(&self, piece_id: &str) -> Result> { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 7a37ecc8..31dc918d 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -470,6 +470,7 @@ impl Piece { length: u64, parent: piece_collector::CollectedParent, is_prefetch: bool, + load_to_cache: bool, ) -> Result { // Span record the piece_id. Span::current().record("piece_id", piece_id); @@ -532,6 +533,8 @@ impl Piece { digest.as_str(), parent.id.as_str(), &mut reader, + load_to_cache, + length, ) .await { @@ -568,6 +571,7 @@ impl Piece { length: u64, request_header: HeaderMap, is_prefetch: bool, + load_to_cache: bool, object_storage: Option, hdfs: Option, ) -> Result { @@ -691,6 +695,7 @@ impl Piece { offset, length, &mut response.reader, + load_to_cache, ) .await { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 5d7c0259..e093bb2e 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -683,6 +683,7 @@ impl Task { remaining_interested_pieces.clone(), request.is_prefetch, request.need_piece_content, + request.load_to_cache, download_progress_tx.clone(), in_stream_tx.clone(), ) @@ -926,6 +927,7 @@ impl Task { interested_pieces: Vec, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, download_progress_tx: Sender>, in_stream_tx: Sender, ) -> ClientResult> { @@ -961,6 +963,12 @@ impl Task { self.config.download.concurrent_piece_count as usize, )); + if load_to_cache { + self.storage + .add_task_to_cache(task_id, task.content_length().unwrap_or(0)) + .await; + } + // Download the pieces from the parents. while let Some(collect_piece) = piece_collector_rx.recv().await { if interrupt.load(Ordering::SeqCst) { @@ -985,6 +993,7 @@ impl Task { finished_pieces: Arc>>, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, ) -> ClientResult { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); @@ -1005,6 +1014,7 @@ impl Task { length, parent.clone(), is_prefetch, + load_to_cache, ) .await .map_err(|err| { @@ -1138,6 +1148,7 @@ impl Task { finished_pieces.clone(), is_prefetch, need_piece_content, + load_to_cache, ) .in_current_span(), ); @@ -1234,6 +1245,13 @@ impl Task { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); + if request.load_to_cache { + self.storage + .add_task_to_cache(task_id, task.content_length().unwrap_or(0)) + .await; + info!("add task to cache: {}", task_id); + } + // Download the piece from the local. let mut join_set = JoinSet::new(); let semaphore = Arc::new(Semaphore::new( @@ -1251,6 +1269,7 @@ impl Task { request_header: HeaderMap, is_prefetch: bool, need_piece_content: bool, + load_to_cache: bool, piece_manager: Arc, semaphore: Arc, download_progress_tx: Sender>, @@ -1274,6 +1293,7 @@ impl Task { length, request_header, is_prefetch, + load_to_cache, object_storage, hdfs, ) @@ -1378,6 +1398,7 @@ impl Task { request_header.clone(), request.is_prefetch, request.need_piece_content, + request.load_to_cache, self.piece.clone(), semaphore.clone(), download_progress_tx.clone(), @@ -1639,6 +1660,7 @@ impl Task { length: u64, request_header: HeaderMap, is_prefetch: bool, + load_to_cache: bool, piece_manager: Arc, semaphore: Arc, download_progress_tx: Sender>, @@ -1661,6 +1683,7 @@ impl Task { length, request_header, is_prefetch, + load_to_cache, object_storage, hdfs, ) @@ -1719,6 +1742,7 @@ impl Task { interested_piece.length, request_header.clone(), request.is_prefetch, + request.load_to_cache, self.piece.clone(), semaphore.clone(), download_progress_tx.clone(),