feat: add a LRU cache module for preheat jobs (#945)

Signed-off-by: southwest miao <1403572259@qq.com>
This commit is contained in:
SouthWest7 2025-03-14 13:47:53 +08:00 committed by GitHub
parent beff3f87b3
commit 9ca89909f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 781 additions and 6 deletions

2
Cargo.lock generated
View File

@ -1093,6 +1093,7 @@ version = "0.2.19"
dependencies = [ dependencies = [
"base16ct", "base16ct",
"bincode", "bincode",
"bytes",
"chrono", "chrono",
"crc32fast", "crc32fast",
"dragonfly-api", "dragonfly-api",
@ -1100,6 +1101,7 @@ dependencies = [
"dragonfly-client-core", "dragonfly-client-core",
"dragonfly-client-util", "dragonfly-client-util",
"fs2", "fs2",
"lru",
"num_cpus", "num_cpus",
"prost-wkt-types", "prost-wkt-types",
"rayon", "rayon",

View File

@ -104,6 +104,7 @@ serde_json = "1.0.140"
lru = "0.12.5" lru = "0.12.5"
fs2 = "0.4.3" fs2 = "0.4.3"
lazy_static = "1.5" lazy_static = "1.5"
bytes = "1.9"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3

View File

@ -200,6 +200,18 @@ fn default_storage_read_buffer_size() -> usize {
4 * 1024 * 1024 4 * 1024 * 1024
} }
/// default_storage_cache_capacity is the default cache capacity for the preheat job, default is 10GiB.
#[inline]
fn default_storage_cache_capacity() -> usize {
10 * 1024 * 1024 * 1024
}
/// default_storage_cache_tasks_capacity is the default capacity of the tasks that can be stored in the cache, default is 1.
#[inline]
fn default_storage_cache_tasks_capacity() -> usize {
1
}
/// default_seed_peer_cluster_id is the default cluster id of seed peer. /// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline] #[inline]
fn default_seed_peer_cluster_id() -> u64 { fn default_seed_peer_cluster_id() -> u64 {
@ -968,7 +980,12 @@ pub struct Storage {
/// | | /// | |
/// +--------------------------------------------------+ /// +--------------------------------------------------+
/// ``` /// ```
pub cache_capacity: Option<usize>, #[serde(default = "default_storage_cache_capacity")]
pub cache_capacity: usize,
/// cache_tasks_capacity is the capacity of the tasks that can be stored in the cache, default is 1.
#[serde(default = "default_storage_cache_tasks_capacity")]
pub cache_tasks_capacity: usize,
} }
/// Storage implements Default. /// Storage implements Default.
@ -980,7 +997,8 @@ impl Default for Storage {
keep: default_storage_keep(), keep: default_storage_keep(),
write_buffer_size: default_storage_write_buffer_size(), write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(), read_buffer_size: default_storage_read_buffer_size(),
cache_capacity: None, cache_capacity: default_storage_cache_capacity(),
cache_tasks_capacity: default_storage_cache_tasks_capacity(),
} }
} }
} }

View File

@ -26,6 +26,8 @@ sha2.workspace = true
crc32fast.workspace = true crc32fast.workspace = true
base16ct.workspace = true base16ct.workspace = true
fs2.workspace = true fs2.workspace = true
lru.workspace = true
bytes.workspace = true
num_cpus = "1.0" num_cpus = "1.0"
bincode = "1.3.3" bincode = "1.3.3"
rayon = "1.10.0" rayon = "1.10.0"

View File

@ -0,0 +1,386 @@
use std::{borrow::Borrow, collections::HashMap, hash::Hash, hash::Hasher, num::NonZeroUsize, ptr};
#[derive(Debug, Clone, Copy)]
/// KeyRef is a reference to the key.
struct KeyRef<K> {
k: *const K,
}
impl<K: Hash> Hash for KeyRef<K> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
unsafe {
let key = &*self.k;
key.hash(state)
}
}
}
impl<K: PartialEq> PartialEq for KeyRef<K> {
fn eq(&self, other: &Self) -> bool {
unsafe {
let key1 = &*self.k;
let key2 = &*other.k;
key1.eq(key2)
}
}
}
impl<K: Eq> Eq for KeyRef<K> {}
#[repr(transparent)]
/// KeyWrapper is a wrapper for the key.
struct KeyWrapper<K: ?Sized>(K);
impl<K: ?Sized> KeyWrapper<K> {
/// from_ref creates a new KeyWrapper from a reference to the key.
fn from_ref(key: &K) -> &Self {
unsafe { &*(key as *const K as *const KeyWrapper<K>) }
}
}
impl<K: ?Sized + Hash> Hash for KeyWrapper<K> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state)
}
}
impl<K: ?Sized + PartialEq> PartialEq for KeyWrapper<K> {
#![allow(unknown_lints)]
#[allow(clippy::unconditional_recursion)]
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl<K: ?Sized + Eq> Eq for KeyWrapper<K> {}
impl<K, Q> Borrow<KeyWrapper<Q>> for KeyRef<K>
where
K: Borrow<Q>,
Q: ?Sized,
{
fn borrow(&self) -> &KeyWrapper<Q> {
unsafe {
let key = &*self.k;
KeyWrapper::from_ref(key.borrow())
}
}
}
struct Entry<K, V> {
key: K,
value: V,
prev: Option<*mut Entry<K, V>>,
next: Option<*mut Entry<K, V>>,
}
impl<K, V> Entry<K, V> {
fn new(key: K, value: V) -> Self {
Self {
key,
value,
prev: None,
next: None,
}
}
}
pub struct LruCache<K, V> {
cap: NonZeroUsize,
map: HashMap<KeyRef<K>, Box<Entry<K, V>>>,
head: Option<*mut Entry<K, V>>,
tail: Option<*mut Entry<K, V>>,
_marker: std::marker::PhantomData<K>,
}
impl<K: Hash + Eq, V> LruCache<K, V> {
/// new creates a new LruCache.
pub fn new(cap: NonZeroUsize) -> Self {
Self {
cap,
map: HashMap::with_capacity(cap.get()),
head: None,
tail: None,
_marker: std::marker::PhantomData,
}
}
/// get gets the value of the key.
pub fn get<'a, Q>(&'a mut self, k: &Q) -> Option<&'a V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(entry) = self.map.get_mut(KeyWrapper::from_ref(k)) {
let entry_ptr: *mut Entry<K, V> = &mut **entry;
unsafe {
self.detach(entry_ptr);
self.attach(entry_ptr);
}
Some(&unsafe { &*entry_ptr }.value)
} else {
None
}
}
/// put puts the key and value into the cache.
pub fn put(&mut self, key: K, value: V) -> Option<V> {
let key_ref = KeyRef { k: &key };
if let Some(existing_entry) = self.map.get_mut(&key_ref) {
let entry_ptr = existing_entry.as_mut() as *mut Entry<K, V>;
let old_value = unsafe {
let old_value = ptr::read(&(*entry_ptr).value);
ptr::write(&mut (*entry_ptr).value, value);
old_value
};
unsafe {
self.detach(entry_ptr);
self.attach(entry_ptr);
}
return Some(old_value);
}
let mut evicted_value = None;
if self.map.len() >= self.cap.get() {
if let Some(tail) = self.tail {
unsafe {
let tail_key_ref = KeyRef { k: &(*tail).key };
self.detach(tail);
if let Some(entry) = self.map.remove(&tail_key_ref) {
evicted_value = Some(entry.value);
}
}
}
}
let new_entry = Box::new(Entry::new(key, value));
let key_ptr = &new_entry.key as *const K;
let entry_ptr = Box::into_raw(new_entry);
unsafe {
self.attach(entry_ptr);
self.map
.insert(KeyRef { k: key_ptr }, Box::from_raw(entry_ptr));
}
evicted_value
}
/// detach detaches the entry from the cache.
unsafe fn detach(&mut self, entry: *mut Entry<K, V>) {
let prev = (*entry).prev;
let next = (*entry).next;
match prev {
Some(prev) => (*prev).next = next,
None => self.head = next,
}
match next {
Some(next) => (*next).prev = prev,
None => self.tail = prev,
}
(*entry).prev = None;
(*entry).next = None;
}
/// attach attaches the entry to the cache.
unsafe fn attach(&mut self, entry: *mut Entry<K, V>) {
match self.head {
Some(old_head) => {
(*entry).next = Some(old_head);
(*old_head).prev = Some(entry);
self.head = Some(entry);
}
None => {
self.head = Some(entry);
self.tail = Some(entry);
}
}
}
/// contains checks whether the key exists in the cache.
pub fn contains<Q>(&self, k: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.map.contains_key(KeyWrapper::from_ref(k))
}
/// peek peeks the value of the key. It does not move the key to the front of the cache.
pub fn peek<'a, Q>(&'a self, k: &Q) -> Option<&'a V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.map
.get(KeyWrapper::from_ref(k))
.map(|entry| &entry.value)
}
/// pop_lru pops the least recently used value from the cache.
pub fn pop_lru(&mut self) -> Option<(K, V)> {
if self.is_empty() {
return None;
}
let tail = self.tail?;
unsafe {
self.detach(tail);
let tail_key_ref = KeyRef { k: &(*tail).key };
self.map
.remove(&tail_key_ref)
.map(|entry| (entry.key, entry.value))
}
}
/// is_empty checks whether the cache is empty.
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
}
unsafe impl<K: Send, V: Send> Send for LruCache<K, V> {}
unsafe impl<K: Sync, V: Sync> Sync for LruCache<K, V> {}
impl<K, V> Drop for LruCache<K, V> {
fn drop(&mut self) {
self.map.clear();
self.head = None;
self.tail = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[test]
fn test_put() {
let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap());
assert_eq!(cache.put("key1".to_string(), "value1".to_string()), None);
assert_eq!(cache.put("key2".to_string(), "value2".to_string()), None);
assert_eq!(cache.put("key3".to_string(), "value3".to_string()), None);
assert_eq!(
cache.put("key2".to_string(), "value2_updated".to_string()),
Some("value2".to_string())
);
assert_eq!(cache.get("key2"), Some(&"value2_updated".to_string()));
assert_eq!(
cache.put("key4".to_string(), "value4".to_string()),
Some("value1".to_string())
);
assert_eq!(cache.get("key1"), None);
assert!(cache.contains("key2"));
assert!(cache.contains("key3"));
assert!(cache.contains("key4"));
}
#[test]
fn test_get() {
let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap());
cache.put("key1".to_string(), "value1".to_string());
cache.put("key2".to_string(), "value2".to_string());
cache.put("key3".to_string(), "value3".to_string());
assert_eq!(cache.get("key1"), Some(&"value1".to_string()));
assert_eq!(cache.get("key2"), Some(&"value2".to_string()));
assert_eq!(cache.get("key3"), Some(&"value3".to_string()));
assert_eq!(cache.get("key4"), None);
cache.put("key4".to_string(), "value4".to_string());
assert_eq!(cache.get("key1"), None);
assert_eq!(cache.get("key2"), Some(&"value2".to_string()));
assert_eq!(cache.get("key3"), Some(&"value3".to_string()));
assert_eq!(cache.get("key4"), Some(&"value4".to_string()));
}
#[test]
fn test_peek() {
let mut cache = LruCache::new(NonZeroUsize::new(3).unwrap());
cache.put("key1".to_string(), "value1".to_string());
cache.put("key2".to_string(), "value2".to_string());
cache.put("key3".to_string(), "value3".to_string());
assert_eq!(cache.peek("key1"), Some(&"value1".to_string()));
assert_eq!(cache.peek("key2"), Some(&"value2".to_string()));
assert_eq!(cache.peek("key3"), Some(&"value3".to_string()));
assert_eq!(cache.peek("key4"), None);
cache.put("key4".to_string(), "value4".to_string());
assert_eq!(cache.peek("key1"), None);
assert_eq!(cache.peek("key2"), Some(&"value2".to_string()));
assert_eq!(cache.peek("key3"), Some(&"value3".to_string()));
assert_eq!(cache.peek("key4"), Some(&"value4".to_string()));
let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap());
cache.put("key1".to_string(), "value1".to_string());
cache.put("key2".to_string(), "value2".to_string());
cache.peek("key1");
cache.put("key3".to_string(), "value3".to_string());
assert_eq!(cache.peek("key1"), None);
let mut cache = LruCache::new(NonZeroUsize::new(2).unwrap());
cache.put("key1".to_string(), "value1".to_string());
cache.put("key2".to_string(), "value2".to_string());
cache.get("key1");
cache.put("key3".to_string(), "value3".to_string());
assert_eq!(cache.peek("key2"), None);
assert_eq!(cache.peek("key1"), Some(&"value1".to_string()));
}
#[test]
fn test_contains() {
let mut cache = LruCache::new(NonZeroUsize::new(5).unwrap());
let test_cases = vec![
("piece_1", Bytes::from("data 1"), false),
("piece_2", Bytes::from("data 2"), true),
("piece_3", Bytes::from("data 3"), false),
("piece_4", Bytes::from("data 4"), true),
("piece_5", Bytes::from("data 5"), true),
("piece_6", Bytes::from("data 6"), true),
("piece_7", Bytes::from("data 7"), true),
];
for (piece_id, piece_content, _) in test_cases[0..6].iter() {
cache.put(piece_id.to_string(), piece_content.clone());
}
let _ = cache.get("piece_2");
let (piece_id, piece_content, _) = &test_cases[6];
cache.put(piece_id.to_string(), piece_content.clone());
for (piece_id, _, expected_existence) in test_cases {
let exists = cache.contains(piece_id);
assert_eq!(exists, expected_existence);
}
}
}

View File

@ -0,0 +1,246 @@
/*
* Copyright 2025 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use bytes::Bytes;
use dragonfly_api::common::v2::Range;
use dragonfly_client_core::{Error, Result};
use lru_cache::LruCache;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::io::Cursor;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use tokio::sync::RwLock;
use tracing::info;
mod lru_cache;
#[derive(Clone, Debug)]
/// Task is the task content in the cache.
struct Task {
/// id is the id of the task.
id: String,
/// content_length is the length of the task content.
content_length: u64,
/// pieces is the pieces content of the task.
pieces: Arc<RwLock<HashMap<String, Bytes>>>,
}
impl Task {
/// new creates a new task.
fn new(id: String, content_length: u64) -> Self {
Self {
id,
content_length,
pieces: Arc::new(RwLock::new(HashMap::new())),
}
}
/// write_piece writes the piece content to the task.
async fn write_piece(&self, piece_id: &str, piece: Bytes) {
let mut pieces = self.pieces.write().await;
pieces.insert(piece_id.to_string(), piece);
}
/// read_piece reads the piece content from the task.
async fn read_piece(&self, piece_id: &str) -> Option<Bytes> {
let pieces = self.pieces.read().await;
pieces.get(piece_id).cloned()
}
/// contains checks whether the piece exists in the task.
async fn contains(&self, piece_id: &str) -> bool {
let pieces = self.pieces.read().await;
pieces.contains_key(piece_id)
}
/// content_length returns the content length of the task.
fn content_length(&self) -> u64 {
self.content_length
}
}
/// Cache is the cache for storing piece content by LRU algorithm.
///
/// Cache storage:
/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`.
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443.
/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no
/// page cache for the first read.
/// ```
///
/// 1.Preheat
/// |
/// |
/// +--------------------------------------------------+
/// | | Peer |
/// | | +-----------+ |
/// | | -- Partial -->| Cache | |
/// | | | +-----------+ |
/// | v | | | |
/// | Download | Miss | |
/// | Task -->| | --- Hit ------>|<-- 2.Download
/// | | | ^ |
/// | | v | |
/// | | +-----------+ | |
/// | -- Full -->| Disk |---------- |
/// | +-----------+ |
/// | |
/// +--------------------------------------------------+
/// ```
/// Task is the metadata of the task.
#[derive(Clone)]
pub struct Cache {
/// size is the size of the cache in bytes.
size: u64,
/// capacity is the maximum capacity of the cache in bytes.
capacity: u64,
/// tasks stores the tasks with their task id.
tasks: Arc<RwLock<LruCache<String, Task>>>,
}
/// Cache implements the cache for storing piece content by LRU algorithm.
impl Cache {
/// new creates a new cache with the specified capacity.
pub fn new(capacity: usize, tasks_capacity: usize) -> Result<Self> {
let capacity = NonZeroUsize::new(capacity).ok_or(Error::InvalidParameter)?;
let tasks_capacity = NonZeroUsize::new(tasks_capacity).ok_or(Error::InvalidParameter)?;
let tasks = Arc::new(RwLock::new(LruCache::new(tasks_capacity)));
Ok(Cache {
size: 0,
capacity: capacity.get() as u64,
tasks,
})
}
/// read_piece reads the piece from the cache.
pub async fn read_piece(
&self,
task_id: &str,
piece_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Try to get the piece content from the task.
let mut tasks = self.tasks.write().await;
let Some(task) = tasks.get(task_id) else {
return Err(Error::TaskNotFound(task_id.to_string()));
};
let Some(piece_content) = task.read_piece(piece_id).await else {
return Err(Error::PieceNotFound(piece_id.to_string()));
};
// Calculate the range of bytes to return based on the range provided.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset = max(offset, range.start) - offset;
let target_length =
min(offset + length - 1, range.start + range.length - 1) - target_offset - offset
+ 1;
(target_offset as usize, target_length as usize)
} else {
(0, length as usize)
};
// Check if the target range is valid.
let begin = target_offset;
let end = target_offset + target_length;
if begin >= piece_content.len() || end > piece_content.len() {
return Err(Error::InvalidParameter);
}
let reader = BufReader::new(Cursor::new(piece_content.slice(begin..end)));
Ok(reader)
}
/// write_piece writes the piece content to the cache.
pub async fn write_piece<R: AsyncRead + Unpin + ?Sized>(
&self,
task_id: &str,
piece_id: &str,
reader: &mut R,
length: u64,
) -> Result<()> {
let mut tasks = self.tasks.write().await;
let Some(task) = tasks.get(task_id) else {
return Err(Error::TaskNotFound(task_id.to_string()));
};
// The piece already exists in the cache.
if task.contains(piece_id).await {
return Err(Error::Unknown(format!(
"overwrite existing piece {}",
piece_id
)));
}
let mut buffer = Vec::with_capacity(length as usize);
match reader.read_to_end(&mut buffer).await {
Ok(_) => {
task.write_piece(piece_id, Bytes::from(buffer)).await;
Ok(())
}
Err(err) => Err(Error::Unknown(format!(
"Failed to read piece data for {}: {}",
piece_id, err
))),
}
}
/// put_task puts a new task into the cache, constrained by the capacity of the cache.
pub async fn put_task(&mut self, task_id: &str, content_length: u64) {
if content_length == 0 {
return;
}
let mut tasks = self.tasks.write().await;
let new_task = Task::new(task_id.to_string(), content_length);
while self.size + content_length > self.capacity {
match tasks.pop_lru() {
Some((_, task)) => {
self.size -= task.content_length();
info!("evicted task in cache: {}", task.id);
}
None => {
break;
}
}
}
tasks.put(task_id.to_string(), new_task);
self.size += content_length;
}
/// contains_task checks whether the task exists in the cache.
pub async fn contains_task(&self, id: &str) -> bool {
let tasks = self.tasks.read().await;
tasks.contains(id)
}
/// contains_piece checks whether the piece exists in the specified task.
pub async fn contains_piece(&self, task_id: &str, piece_id: &str) -> bool {
let tasks = self.tasks.read().await;
if let Some(task) = tasks.peek(task_id) {
task.contains(piece_id).await
} else {
false
}
}
}

View File

@ -25,8 +25,11 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tracing::{debug, error, instrument, warn}; use tokio_util::either::Either;
use tokio_util::io::InspectReader;
use tracing::{debug, error, info, instrument, warn};
pub mod cache;
pub mod content; pub mod content;
pub mod metadata; pub mod metadata;
pub mod storage_engine; pub mod storage_engine;
@ -44,6 +47,9 @@ pub struct Storage {
/// content implements the content storage. /// content implements the content storage.
content: content::Content, content: content::Content,
/// cache implements the cache for preheat task.
cache: cache::Cache,
} }
/// Storage implements the storage. /// Storage implements the storage.
@ -53,10 +59,16 @@ impl Storage {
pub async fn new(config: Arc<Config>, dir: &Path, log_dir: PathBuf) -> Result<Self> { pub async fn new(config: Arc<Config>, dir: &Path, log_dir: PathBuf) -> Result<Self> {
let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?;
let content = content::Content::new(config.clone(), dir).await?; let content = content::Content::new(config.clone(), dir).await?;
let cache = cache::Cache::new(
config.storage.cache_capacity,
config.storage.cache_tasks_capacity,
)?;
Ok(Storage { Ok(Storage {
config, config,
metadata, metadata,
content, content,
cache,
}) })
} }
@ -348,8 +360,34 @@ impl Storage {
offset: u64, offset: u64,
length: u64, length: u64,
reader: &mut R, reader: &mut R,
load_to_cache: bool,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = self.content.write_piece(task_id, offset, reader).await?; let response = if load_to_cache {
// Load piece content to cache.
let mut buffer = Vec::new();
let mut inspect_reader = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes);
});
match self
.cache
.write_piece(task_id, piece_id, &mut inspect_reader, length)
.await
{
Ok(_) => {
info!("load piece to cache. piece-id: {}", piece_id);
}
Err(err) => {
error!("load piece to cache failed: {}", err);
}
}
self.content
.write_piece(task_id, offset, &mut &buffer[..])
.await?
} else {
self.content.write_piece(task_id, offset, reader).await?
};
let digest = Digest::new(Algorithm::Crc32, response.hash); let digest = Digest::new(Algorithm::Crc32, response.hash);
self.metadata.download_piece_finished( self.metadata.download_piece_finished(
@ -362,6 +400,7 @@ impl Storage {
} }
/// download_piece_from_parent_finished is used for downloading piece from parent. /// download_piece_from_parent_finished is used for downloading piece from parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn download_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>( pub async fn download_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>(
&self, &self,
@ -371,8 +410,34 @@ impl Storage {
expected_digest: &str, expected_digest: &str,
parent_id: &str, parent_id: &str,
reader: &mut R, reader: &mut R,
load_to_cache: bool,
length: u64,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = self.content.write_piece(task_id, offset, reader).await?; let response = if load_to_cache {
// Load piece content to cache.
let mut buffer = Vec::new();
let mut inspect_reader = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes);
});
match self
.cache
.write_piece(task_id, piece_id, &mut inspect_reader, length)
.await
{
Ok(_) => {
info!("load piece to cache. piece-id: {}", piece_id);
}
Err(err) => {
error!("load piece to cache failed: {}", err);
}
}
self.content
.write_piece(task_id, offset, &mut &buffer[..])
.await?
} else {
self.content.write_piece(task_id, offset, reader).await?
};
let length = response.length; let length = response.length;
let digest = Digest::new(Algorithm::Crc32, response.hash); let digest = Digest::new(Algorithm::Crc32, response.hash);
@ -418,6 +483,23 @@ impl Storage {
// Get the piece metadata and return the content of the piece. // Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) { match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => { Ok(Some(piece)) => {
// Try to upload piece content from cache.
if self.cache.contains_piece(task_id, piece_id).await {
match self
.cache
.read_piece(task_id, piece_id, piece.offset, piece.length, range)
.await
{
Ok(cache_reader) => {
// Finish uploading the task.
self.metadata.upload_task_finished(task_id)?;
return Ok(Either::Left(cache_reader));
}
Err(_err) => {}
}
}
// Upload piece content from storage when cache entry is not hit.
match self match self
.content .content
.read_piece(task_id, piece.offset, piece.length, range) .read_piece(task_id, piece.offset, piece.length, range)
@ -426,7 +508,7 @@ impl Storage {
Ok(reader) => { Ok(reader) => {
// Finish uploading the task. // Finish uploading the task.
self.metadata.upload_task_finished(task_id)?; self.metadata.upload_task_finished(task_id)?;
Ok(reader) Ok(Either::Right(reader))
} }
Err(err) => { Err(err) => {
// Failed uploading the task. // Failed uploading the task.
@ -496,6 +578,15 @@ impl Storage {
} }
} }
/// add_task_to_cache adds the task to the cache.
#[instrument(skip_all)]
pub async fn add_task_to_cache(&self, task_id: &str, content_length: u64) {
let mut cache = self.cache.clone();
if !cache.contains_task(task_id).await {
cache.put_task(task_id, content_length).await;
}
}
/// get_piece returns the piece metadata. /// get_piece returns the piece metadata.
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> { pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {

View File

@ -470,6 +470,7 @@ impl Piece {
length: u64, length: u64,
parent: piece_collector::CollectedParent, parent: piece_collector::CollectedParent,
is_prefetch: bool, is_prefetch: bool,
load_to_cache: bool,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
// Span record the piece_id. // Span record the piece_id.
Span::current().record("piece_id", piece_id); Span::current().record("piece_id", piece_id);
@ -532,6 +533,8 @@ impl Piece {
digest.as_str(), digest.as_str(),
parent.id.as_str(), parent.id.as_str(),
&mut reader, &mut reader,
load_to_cache,
length,
) )
.await .await
{ {
@ -568,6 +571,7 @@ impl Piece {
length: u64, length: u64,
request_header: HeaderMap, request_header: HeaderMap,
is_prefetch: bool, is_prefetch: bool,
load_to_cache: bool,
object_storage: Option<ObjectStorage>, object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>, hdfs: Option<Hdfs>,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
@ -691,6 +695,7 @@ impl Piece {
offset, offset,
length, length,
&mut response.reader, &mut response.reader,
load_to_cache,
) )
.await .await
{ {

View File

@ -683,6 +683,7 @@ impl Task {
remaining_interested_pieces.clone(), remaining_interested_pieces.clone(),
request.is_prefetch, request.is_prefetch,
request.need_piece_content, request.need_piece_content,
request.load_to_cache,
download_progress_tx.clone(), download_progress_tx.clone(),
in_stream_tx.clone(), in_stream_tx.clone(),
) )
@ -926,6 +927,7 @@ impl Task {
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
is_prefetch: bool, is_prefetch: bool,
need_piece_content: bool, need_piece_content: bool,
load_to_cache: bool,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>, in_stream_tx: Sender<AnnouncePeerRequest>,
) -> ClientResult<Vec<metadata::Piece>> { ) -> ClientResult<Vec<metadata::Piece>> {
@ -961,6 +963,12 @@ impl Task {
self.config.download.concurrent_piece_count as usize, self.config.download.concurrent_piece_count as usize,
)); ));
if load_to_cache {
self.storage
.add_task_to_cache(task_id, task.content_length().unwrap_or(0))
.await;
}
// Download the pieces from the parents. // Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await { while let Some(collect_piece) = piece_collector_rx.recv().await {
if interrupt.load(Ordering::SeqCst) { if interrupt.load(Ordering::SeqCst) {
@ -985,6 +993,7 @@ impl Task {
finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>, finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>,
is_prefetch: bool, is_prefetch: bool,
need_piece_content: bool, need_piece_content: bool,
load_to_cache: bool,
) -> ClientResult<metadata::Piece> { ) -> ClientResult<metadata::Piece> {
// Limit the concurrent piece count. // Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap(); let _permit = semaphore.acquire().await.unwrap();
@ -1005,6 +1014,7 @@ impl Task {
length, length,
parent.clone(), parent.clone(),
is_prefetch, is_prefetch,
load_to_cache,
) )
.await .await
.map_err(|err| { .map_err(|err| {
@ -1138,6 +1148,7 @@ impl Task {
finished_pieces.clone(), finished_pieces.clone(),
is_prefetch, is_prefetch,
need_piece_content, need_piece_content,
load_to_cache,
) )
.in_current_span(), .in_current_span(),
); );
@ -1234,6 +1245,13 @@ impl Task {
// Initialize the finished pieces. // Initialize the finished pieces.
let mut finished_pieces: Vec<metadata::Piece> = Vec::new(); let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
if request.load_to_cache {
self.storage
.add_task_to_cache(task_id, task.content_length().unwrap_or(0))
.await;
info!("add task to cache: {}", task_id);
}
// Download the piece from the local. // Download the piece from the local.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
let semaphore = Arc::new(Semaphore::new( let semaphore = Arc::new(Semaphore::new(
@ -1251,6 +1269,7 @@ impl Task {
request_header: HeaderMap, request_header: HeaderMap,
is_prefetch: bool, is_prefetch: bool,
need_piece_content: bool, need_piece_content: bool,
load_to_cache: bool,
piece_manager: Arc<piece::Piece>, piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
@ -1274,6 +1293,7 @@ impl Task {
length, length,
request_header, request_header,
is_prefetch, is_prefetch,
load_to_cache,
object_storage, object_storage,
hdfs, hdfs,
) )
@ -1378,6 +1398,7 @@ impl Task {
request_header.clone(), request_header.clone(),
request.is_prefetch, request.is_prefetch,
request.need_piece_content, request.need_piece_content,
request.load_to_cache,
self.piece.clone(), self.piece.clone(),
semaphore.clone(), semaphore.clone(),
download_progress_tx.clone(), download_progress_tx.clone(),
@ -1639,6 +1660,7 @@ impl Task {
length: u64, length: u64,
request_header: HeaderMap, request_header: HeaderMap,
is_prefetch: bool, is_prefetch: bool,
load_to_cache: bool,
piece_manager: Arc<piece::Piece>, piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
@ -1661,6 +1683,7 @@ impl Task {
length, length,
request_header, request_header,
is_prefetch, is_prefetch,
load_to_cache,
object_storage, object_storage,
hdfs, hdfs,
) )
@ -1719,6 +1742,7 @@ impl Task {
interested_piece.length, interested_piece.length,
request_header.clone(), request_header.clone(),
request.is_prefetch, request.is_prefetch,
request.load_to_cache,
self.piece.clone(), self.piece.clone(),
semaphore.clone(), semaphore.clone(),
download_progress_tx.clone(), download_progress_tx.clone(),