feat: Add support for CacheTask in the storage layer. (#1278)

This pull request modifies the Cache's size field type, adds storage layer support for CacheTask, and implements related processing for CacheTask metadata.

- Reconstruct the Cache structure, changing the size field to Arc<AtomicU64> to achieve internal mutability and state sharing.
- Created a new CacheTask metadata structure and implemented its associated processing functions.
- Added CacheTask support in the storage layer, implementing all necessary functions for its download functionality.

To provide storage layer support for CacheTask, laying the foundation for future CacheTask integration into the client.

Signed-off-by: fu220 <2863318196@qq.com>
This commit is contained in:
this is my name 2025-09-02 11:38:58 +08:00 committed by GitHub
parent 2bb0d2e618
commit 622ef4ec56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 749 additions and 7 deletions

View File

@ -22,6 +22,7 @@ use lru_cache::LruCache;
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, BufReader}; use tokio::io::{AsyncRead, BufReader};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -110,7 +111,7 @@ pub struct Cache {
config: Arc<Config>, config: Arc<Config>,
/// size is the size of the cache in bytes. /// size is the size of the cache in bytes.
size: u64, size: Arc<AtomicU64>,
/// capacity is the maximum capacity of the cache in bytes. /// capacity is the maximum capacity of the cache in bytes.
capacity: u64, capacity: u64,
@ -119,13 +120,22 @@ pub struct Cache {
tasks: Arc<RwLock<LruCache<String, Task>>>, tasks: Arc<RwLock<LruCache<String, Task>>>,
} }
/// WriteCachePieceResponse is the response of writing a cache piece.
pub struct WriteCachePieceResponse {
/// length is the length of the cache piece.
pub length: u64,
/// hash is the hash of the cache piece.
pub hash: String,
}
/// Cache implements the cache for storing piece content by LRU algorithm. /// Cache implements the cache for storing piece content by LRU algorithm.
impl Cache { impl Cache {
/// new creates a new cache with the specified capacity. /// new creates a new cache with the specified capacity.
pub fn new(config: Arc<Config>) -> Self { pub fn new(config: Arc<Config>) -> Self {
Cache { Cache {
config: config.clone(), config: config.clone(),
size: 0, size: Arc::new(AtomicU64::new(0)),
capacity: config.storage.cache_capacity.as_u64(), capacity: config.storage.cache_capacity.as_u64(),
// LRU cache capacity is set to usize::MAX to avoid evicting tasks. LRU cache will evict tasks // LRU cache capacity is set to usize::MAX to avoid evicting tasks. LRU cache will evict tasks
// by cache capacity(cache size) itself, and used pop_lru to evict the least recently // by cache capacity(cache size) itself, and used pop_lru to evict the least recently
@ -221,10 +231,11 @@ impl Cache {
} }
let mut tasks = self.tasks.write().await; let mut tasks = self.tasks.write().await;
while self.size + content_length > self.capacity { while self.size.load(Ordering::Relaxed) + content_length > self.capacity {
match tasks.pop_lru() { match tasks.pop_lru() {
Some((_, task)) => { Some((_, task)) => {
self.size -= task.content_length(); self.size
.fetch_sub(task.content_length(), Ordering::Relaxed);
} }
None => { None => {
break; break;
@ -234,7 +245,7 @@ impl Cache {
let task = Task::new(content_length); let task = Task::new(content_length);
tasks.put(task_id.to_string(), task); tasks.put(task_id.to_string(), task);
self.size += content_length; self.size.fetch_add(content_length, Ordering::Relaxed);
} }
pub async fn delete_task(&mut self, task_id: &str) -> Result<()> { pub async fn delete_task(&mut self, task_id: &str) -> Result<()> {
@ -243,7 +254,8 @@ impl Cache {
return Err(Error::TaskNotFound(task_id.to_string())); return Err(Error::TaskNotFound(task_id.to_string()));
}; };
self.size -= task.content_length(); self.size
.fetch_sub(task.content_length(), Ordering::Relaxed);
Ok(()) Ok(())
} }
@ -306,7 +318,7 @@ mod tests {
for (config, expected_size, expected_capacity) in test_cases { for (config, expected_size, expected_capacity) in test_cases {
let cache = Cache::new(Arc::new(config)); let cache = Cache::new(Arc::new(config));
assert_eq!(cache.size, expected_size); assert_eq!(cache.size.load(Ordering::Relaxed), expected_size);
assert_eq!(cache.capacity, expected_capacity); assert_eq!(cache.capacity, expected_capacity);
} }
} }

View File

@ -27,6 +27,7 @@ use std::time::Duration;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_util::either::Either; use tokio_util::either::Either;
use tokio_util::io::InspectReader;
use tracing::{debug, error, info, instrument, warn}; use tracing::{debug, error, info, instrument, warn};
pub mod cache; pub mod cache;
@ -363,6 +364,102 @@ impl Storage {
}); });
} }
/// prepare_download_cache_task_started prepares the metadata of the cache task when the cache task downloads
/// started.
pub async fn prepare_download_cache_task_started(
&self,
id: &str,
) -> Result<metadata::CacheTask> {
self.metadata
.download_cache_task_started(id, None, None, None)
}
/// download_cache_task_started updates the metadata of the cache task and create cache task content
/// when the cache task downloads started.
#[instrument(skip_all)]
pub async fn download_cache_task_started(
&self,
id: &str,
piece_length: u64,
content_length: u64,
response_header: Option<HeaderMap>,
) -> Result<metadata::CacheTask> {
let mut cache = self.cache.clone();
cache.put_task(id, content_length).await;
self.metadata.download_cache_task_started(
id,
Some(piece_length),
Some(content_length),
response_header,
)
}
/// download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished.
#[instrument(skip_all)]
pub fn download_cache_task_finished(&self, id: &str) -> Result<metadata::CacheTask> {
self.metadata.download_cache_task_finished(id)
}
/// download_cache_task_failed updates the metadata of the cache task when the cache task downloads failed.
#[instrument(skip_all)]
pub async fn download_cache_task_failed(&self, id: &str) -> Result<metadata::CacheTask> {
self.metadata.download_cache_task_failed(id)
}
/// prefetch_cache_task_started updates the metadata of the cache task when the cache task prefetches started.
#[instrument(skip_all)]
pub async fn prefetch_cache_task_started(&self, id: &str) -> Result<metadata::CacheTask> {
self.metadata.prefetch_cache_task_started(id)
}
/// prefetch_cache_task_failed updates the metadata of the cache task when the cache task prefetches failed.
#[instrument(skip_all)]
pub async fn prefetch_cache_task_failed(&self, id: &str) -> Result<metadata::CacheTask> {
self.metadata.prefetch_cache_task_failed(id)
}
/// upload_cache_task_finished updates the metadata of the cache task when the cache task uploads finished.
#[instrument(skip_all)]
pub fn upload_cache_task_finished(&self, id: &str) -> Result<metadata::CacheTask> {
self.metadata.upload_cache_task_finished(id)
}
/// get_cache_task returns the cache task metadata.
#[instrument(skip_all)]
pub fn get_cache_task(&self, id: &str) -> Result<Option<metadata::CacheTask>> {
self.metadata.get_cache_task(id)
}
/// is_cache_task_exists returns whether the cache task exists.
#[instrument(skip_all)]
pub fn is_cache_task_exists(&self, id: &str) -> Result<bool> {
self.metadata.is_cache_task_exists(id)
}
/// get_cache_tasks returns the cache task metadatas.
#[instrument(skip_all)]
pub fn get_cache_tasks(&self) -> Result<Vec<metadata::CacheTask>> {
self.metadata.get_cache_tasks()
}
/// delete_cache_task deletes the cache task metadatas, cache task content and piece metadatas.
#[instrument(skip_all)]
pub async fn delete_cache_task(&self, id: &str) {
self.metadata
.delete_cache_task(id)
.unwrap_or_else(|err| error!("delete cache task metadata failed: {}", err));
self.metadata.delete_pieces(id).unwrap_or_else(|err| {
error!("delete cache piece metadatas failed: {}", err);
});
let mut cache = self.cache.clone();
cache.delete_task(id).await.unwrap_or_else(|err| {
info!("delete cache task from cache failed: {}", err);
});
}
/// create_persistent_cache_piece creates a new persistent cache piece. /// create_persistent_cache_piece creates a new persistent cache piece.
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn create_persistent_cache_piece<R: AsyncRead + Unpin + ?Sized>( pub async fn create_persistent_cache_piece<R: AsyncRead + Unpin + ?Sized>(
@ -812,4 +909,269 @@ impl Storage {
} }
} }
} }
/// download_cache_piece_started updates the metadata of the cache piece and writes
/// the data of cache piece to file when the cache piece downloads started.
#[instrument(skip_all)]
pub async fn download_cache_piece_started(
&self,
piece_id: &str,
number: u32,
) -> Result<metadata::Piece> {
// Wait for the piece to be finished.
match self.wait_for_cache_piece_finished(piece_id).await {
Ok(piece) => Ok(piece),
// If piece is not found or wait timeout, create piece metadata.
Err(_) => self.metadata.download_piece_started(piece_id, number),
}
}
/// download_cache_piece_from_source_finished is used for downloading cache piece from source.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn download_cache_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
offset: u64,
length: u64,
reader: &mut R,
timeout: Duration,
) -> Result<metadata::Piece> {
tokio::select! {
piece = self.handle_downloaded_cache_piece_from_source_finished(piece_id, task_id, offset, length, reader) => {
piece
}
_ = sleep(timeout) => {
Err(Error::DownloadPieceFinished(piece_id.to_string()))
}
}
}
// handle_downloaded_cache_piece_from_source_finished handles the downloaded cache piece from source.
#[instrument(skip_all)]
async fn handle_downloaded_cache_piece_from_source_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
offset: u64,
length: u64,
reader: &mut R,
) -> Result<metadata::Piece> {
let mut buffer = Vec::with_capacity(length as usize);
let mut writer = std::io::Cursor::new(&mut buffer);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
tokio::io::copy(&mut tee, &mut writer).await?;
let hash = hasher.finalize().to_string();
let content = bytes::Bytes::from(buffer);
let content_length = content.len() as u64;
self.cache.write_piece(task_id, piece_id, content).await?;
debug!("put piece to cache: {}", piece_id);
let digest = Digest::new(Algorithm::Crc32, hash);
self.metadata.download_piece_finished(
piece_id,
offset,
content_length,
digest.to_string().as_str(),
None,
)
}
/// download_cache_piece_from_parent_finished is used for downloading cache piece from parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn download_cache_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
offset: u64,
length: u64,
expected_digest: &str,
parent_id: &str,
reader: &mut R,
timeout: Duration,
) -> Result<metadata::Piece> {
tokio::select! {
piece = self.handle_downloaded_cache_piece_from_parent_finished(piece_id, task_id, offset, length, expected_digest, parent_id, reader) => {
piece
}
_ = sleep(timeout) => {
Err(Error::DownloadPieceFinished(piece_id.to_string()))
}
}
}
// handle_downloaded_cache_piece_from_parent_finished handles the downloaded cache piece from parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn handle_downloaded_cache_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
offset: u64,
length: u64,
expected_digest: &str,
parent_id: &str,
reader: &mut R,
) -> Result<metadata::Piece> {
let mut buffer = Vec::with_capacity(length as usize);
let mut writer = std::io::Cursor::new(&mut buffer);
let mut hasher = crc32fast::Hasher::new();
let mut tee = InspectReader::new(reader, |bytes| {
hasher.update(bytes);
});
tokio::io::copy(&mut tee, &mut writer).await?;
let hash = hasher.finalize().to_string();
let content = bytes::Bytes::from(buffer);
let content_length = content.len() as u64;
self.cache.write_piece(task_id, piece_id, content).await?;
debug!("put piece to cache: {}", piece_id);
let digest = Digest::new(Algorithm::Crc32, hash);
// Check the digest of the piece.
if expected_digest != digest.to_string() {
return Err(Error::DigestMismatch(
expected_digest.to_string(),
digest.to_string(),
));
}
self.metadata.download_piece_finished(
piece_id,
offset,
content_length,
digest.to_string().as_str(),
Some(parent_id.to_string()),
)
}
/// download_cache_piece_failed updates the metadata of the cache piece when the cache piece downloads failed.
#[instrument(skip_all)]
pub fn download_cache_piece_failed(&self, piece_id: &str) -> Result<()> {
self.metadata.download_piece_failed(piece_id)
}
/// upload_cache_piece updates the metadata of the piece and
/// returns the data of the piece.
#[instrument(skip_all)]
pub async fn upload_cache_piece(
&self,
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Wait for the cache piece to be finished.
self.wait_for_cache_piece_finished(piece_id).await?;
// Start uploading the task.
self.metadata.upload_cache_task_started(task_id)?;
// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
if self.cache.contains_piece(task_id, piece_id).await {
match self
.cache
.read_piece(task_id, piece_id, piece.clone(), range)
.await
{
Ok(reader) => {
// Finish uploading the task.
self.metadata.upload_cache_task_finished(task_id)?;
debug!("get piece from cache: {}", piece_id);
Ok(reader)
}
Err(err) => {
// Failed uploading the cache task.
self.metadata.upload_cache_task_failed(task_id)?;
Err(err)
}
}
} else {
// Failed uploading the cache task.
self.metadata.upload_cache_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
}
Ok(None) => {
// Failed uploading the cache task.
self.metadata.upload_cache_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the cache task.
self.metadata.upload_cache_task_failed(task_id)?;
Err(err)
}
}
}
/// get_cache_piece returns the cache piece metadata.
pub fn get_cache_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
self.metadata.get_piece(piece_id)
}
/// is_cache_piece_exists returns whether the cache piece exists.
#[instrument(skip_all)]
pub fn is_cache_piece_exists(&self, piece_id: &str) -> Result<bool> {
self.metadata.is_piece_exists(piece_id)
}
/// get_cache_pieces returns the cache piece metadatas.
#[instrument(skip_all)]
pub fn get_cache_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.metadata.get_pieces(task_id)
}
/// cache_piece_id returns the cache piece id.
#[inline]
pub fn cache_piece_id(&self, task_id: &str, number: u32) -> String {
self.metadata.piece_id(task_id, number)
}
/// wait_for_cache_piece_finished waits for the cache piece to be finished.
#[instrument(skip_all)]
async fn wait_for_cache_piece_finished(&self, piece_id: &str) -> Result<metadata::Piece> {
// Total timeout for downloading a piece, combining the download time and the time to write to storage.
let wait_timeout = tokio::time::sleep(
self.config.download.piece_timeout + self.config.storage.write_piece_timeout,
);
tokio::pin!(wait_timeout);
let mut interval = tokio::time::interval(DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL);
loop {
tokio::select! {
_ = interval.tick() => {
let piece = self
.get_cache_piece(piece_id)?
.ok_or_else(|| Error::PieceNotFound(piece_id.to_string()))?;
// If the piece is finished, return.
if piece.is_finished() {
debug!("wait piece finished success");
return Ok(piece);
}
}
_ = &mut wait_timeout => {
self.metadata.wait_for_piece_finished_failed(piece_id).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(piece_id.to_string()));
}
}
}
}
} }

View File

@ -218,6 +218,101 @@ impl PersistentCacheTask {
} }
} }
/// CacheTask is the metadata of the cache task.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct CacheTask {
/// id is the task id.
pub id: String,
/// piece_length is the length of the piece.
pub piece_length: Option<u64>,
/// content_length is the length of the content.
pub content_length: Option<u64>,
/// header is the header of the response.
pub response_header: HashMap<String, String>,
/// uploading_count is the count of the task being uploaded by other peers.
pub uploading_count: i64,
/// uploaded_count is the count of the task has been uploaded by other peers.
pub uploaded_count: u64,
/// updated_at is the time when the task metadata is updated. If the task is downloaded
/// by other peers, it will also update updated_at.
pub updated_at: NaiveDateTime,
/// created_at is the time when the task metadata is created.
pub created_at: NaiveDateTime,
/// prefetched_at is the time when the task prefetched.
pub prefetched_at: Option<NaiveDateTime>,
/// failed_at is the time when the task downloads failed.
pub failed_at: Option<NaiveDateTime>,
/// finished_at is the time when the task downloads finished.
pub finished_at: Option<NaiveDateTime>,
}
/// CacheTask implements the cache task database object.
impl DatabaseObject for CacheTask {
/// NAMESPACE is the namespace of [CacheTask] objects.
const NAMESPACE: &'static str = "cache_task";
}
/// CacheTask implements the cache task metadata.
impl CacheTask {
/// is_started returns whether the cache task downloads started.
pub fn is_started(&self) -> bool {
self.finished_at.is_none()
}
/// is_uploading returns whether the cache task is uploading.
pub fn is_uploading(&self) -> bool {
self.uploading_count > 0
}
/// is_expired returns whether the cache task is expired.
pub fn is_expired(&self, ttl: Duration) -> bool {
self.updated_at + ttl < Utc::now().naive_utc()
}
/// is_prefetched returns whether the cache task is prefetched.
pub fn is_prefetched(&self) -> bool {
self.prefetched_at.is_some()
}
/// is_failed returns whether the cache task downloads failed.
pub fn is_failed(&self) -> bool {
self.failed_at.is_some()
}
/// is_finished returns whether the cache task downloads finished.
pub fn is_finished(&self) -> bool {
self.finished_at.is_some()
}
/// is_empty returns whether the cache task is empty.
pub fn is_empty(&self) -> bool {
match self.content_length() {
Some(content_length) => content_length == 0,
None => false,
}
}
/// piece_length returns the piece length of the cache task.
pub fn piece_length(&self) -> Option<u64> {
self.piece_length
}
/// content_length returns the content length of the cache task.
pub fn content_length(&self) -> Option<u64> {
self.content_length
}
}
/// Piece is the metadata of the piece. /// Piece is the metadata of the piece.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Piece { pub struct Piece {
@ -753,6 +848,218 @@ impl<E: StorageEngineOwned> Metadata<E> {
self.db.delete::<PersistentCacheTask>(id.as_bytes()) self.db.delete::<PersistentCacheTask>(id.as_bytes())
} }
/// download_cache_task_started updates the metadata of the cache task when the cache task downloads started.
#[instrument(skip_all)]
pub fn download_cache_task_started(
&self,
id: &str,
piece_length: Option<u64>,
content_length: Option<u64>,
response_header: Option<HeaderMap>,
) -> Result<CacheTask> {
// Convert the response header to hashmap.
let response_header = response_header
.as_ref()
.map(headermap_to_hashmap)
.unwrap_or_default();
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
// If the task exists, update the task metadata.
task.updated_at = Utc::now().naive_utc();
task.failed_at = None;
// Protect content length to be overwritten by None.
if content_length.is_some() {
task.content_length = content_length;
}
// Protect piece length to be overwritten by None.
if piece_length.is_some() {
task.piece_length = piece_length;
}
// If the task has the response header, the response header
// will not be covered.
if task.response_header.is_empty() {
task.response_header = response_header;
}
task
}
None => CacheTask {
id: id.to_string(),
piece_length,
content_length,
response_header,
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
},
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished.
#[instrument(skip_all)]
pub fn download_cache_task_finished(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.updated_at = Utc::now().naive_utc();
task.failed_at = None;
task.finished_at = Some(Utc::now().naive_utc());
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// download_cache_task_failed updates the metadata of the cache task when the cache task downloads failed.
#[instrument(skip_all)]
pub fn download_cache_task_failed(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.updated_at = Utc::now().naive_utc();
task.failed_at = Some(Utc::now().naive_utc());
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// prefetch_cache_task_started updates the metadata of the cache task when the cache task prefetch started.
#[instrument(skip_all)]
pub fn prefetch_cache_task_started(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
// If the task is prefetched, return an error.
if task.is_prefetched() {
return Err(Error::InvalidState("prefetched".to_string()));
}
task.updated_at = Utc::now().naive_utc();
task.prefetched_at = Some(Utc::now().naive_utc());
task.failed_at = None;
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// prefetch_cache_task_failed updates the metadata of the cache task when the cache task prefetch failed.
#[instrument(skip_all)]
pub fn prefetch_cache_task_failed(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.updated_at = Utc::now().naive_utc();
task.prefetched_at = None;
task.failed_at = Some(Utc::now().naive_utc());
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// upload_cache_task_started updates the metadata of the cache task when the cache task uploads started.
#[instrument(skip_all)]
pub fn upload_cache_task_started(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// upload_cache_task_finished updates the metadata of the cache task when the cache task uploads finished.
#[instrument(skip_all)]
pub fn upload_cache_task_finished(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.uploaded_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// upload_cache_task_failed updates the metadata of the cache task when the cache task uploads failed.
#[instrument(skip_all)]
pub fn upload_cache_task_failed(&self, id: &str) -> Result<CacheTask> {
let task = match self.db.get::<CacheTask>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
self.db.put(id.as_bytes(), &task)?;
Ok(task)
}
/// get_cache_task gets the cache task metadata.
#[instrument(skip_all)]
pub fn get_cache_task(&self, id: &str) -> Result<Option<CacheTask>> {
self.db.get(id.as_bytes())
}
/// is_cache_task_exists checks if the cache task exists.
#[instrument(skip_all)]
pub fn is_cache_task_exists(&self, id: &str) -> Result<bool> {
self.db.is_exist::<CacheTask>(id.as_bytes())
}
/// get_cache_tasks gets the cache task metadatas.
#[instrument(skip_all)]
pub fn get_cache_tasks(&self) -> Result<Vec<CacheTask>> {
let tasks = self
.db
.iter_raw::<CacheTask>()?
.map(|ele| {
let (_, value) = ele?;
Ok(value)
})
.collect::<Result<Vec<Box<[u8]>>>>()?;
tasks
.iter()
.map(|task| CacheTask::deserialize_from(task))
.collect()
}
/// delete_cache_task deletes the cache task metadata.
#[instrument(skip_all)]
pub fn delete_cache_task(&self, id: &str) -> Result<()> {
info!("delete cache task metadata {}", id);
self.db.delete::<CacheTask>(id.as_bytes())
}
/// create_persistent_cache_piece creates a new persistent cache piece, which is imported by /// create_persistent_cache_piece creates a new persistent cache piece, which is imported by
/// local. /// local.
#[instrument(skip_all)] #[instrument(skip_all)]
@ -927,6 +1234,7 @@ impl Metadata<RocksdbStorageEngine> {
Task::NAMESPACE, Task::NAMESPACE,
Piece::NAMESPACE, Piece::NAMESPACE,
PersistentCacheTask::NAMESPACE, PersistentCacheTask::NAMESPACE,
CacheTask::NAMESPACE,
], ],
config.storage.keep, config.storage.keep,
)?; )?;
@ -1026,6 +1334,66 @@ mod tests {
assert!(task.is_none()); assert!(task.is_none());
} }
#[test]
fn test_cache_task_lifecycle() {
let dir = tempdir().unwrap();
let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
let task_id = "d3c4e940ad06c47fc36ac67801e6f8e36cb400e2391708620bc7e865b102062c";
// Test download_task_started.
metadata
.download_cache_task_started(task_id, Some(1024), Some(1024), None)
.unwrap();
let task = metadata
.get_cache_task(task_id)
.unwrap()
.expect("task should exist after download_cache_task_started");
assert_eq!(task.id, task_id);
assert_eq!(task.piece_length, Some(1024));
assert_eq!(task.content_length, Some(1024));
assert!(task.response_header.is_empty());
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 0);
assert!(!task.is_finished());
// Test download_cache_task_finished.
metadata.download_cache_task_finished(task_id).unwrap();
let task = metadata.get_cache_task(task_id).unwrap().unwrap();
assert!(task.is_finished());
// Test upload_cache_task_started.
metadata.upload_cache_task_started(task_id).unwrap();
let task = metadata.get_cache_task(task_id).unwrap().unwrap();
assert_eq!(task.uploading_count, 1);
// Test upload_cache_task_finished.
metadata.upload_cache_task_finished(task_id).unwrap();
let task = metadata.get_cache_task(task_id).unwrap().unwrap();
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 1);
// Test upload_cache_task_failed.
let task = metadata.upload_cache_task_started(task_id).unwrap();
assert_eq!(task.uploading_count, 1);
let task = metadata.upload_cache_task_failed(task_id).unwrap();
assert_eq!(task.uploading_count, 0);
assert_eq!(task.uploaded_count, 1);
// Test get_cache_tasks.
let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1";
metadata
.download_cache_task_started(task_id, Some(1024), None, None)
.unwrap();
let tasks = metadata.get_cache_tasks().unwrap();
assert_eq!(tasks.len(), 2);
// Test delete_cache_task.
metadata.delete_cache_task(task_id).unwrap();
let task = metadata.get_cache_task(task_id).unwrap();
assert!(task.is_none());
}
#[test] #[test]
fn test_piece_lifecycle() { fn test_piece_lifecycle() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();