diff --git a/dragonfly-client-storage/src/cache/mod.rs b/dragonfly-client-storage/src/cache/mod.rs index aaa4e2d8..0532971d 100644 --- a/dragonfly-client-storage/src/cache/mod.rs +++ b/dragonfly-client-storage/src/cache/mod.rs @@ -22,6 +22,7 @@ use lru_cache::LruCache; use std::cmp::{max, min}; use std::collections::HashMap; use std::io::Cursor; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::io::{AsyncRead, BufReader}; use tokio::sync::RwLock; @@ -110,7 +111,7 @@ pub struct Cache { config: Arc, /// size is the size of the cache in bytes. - size: u64, + size: Arc, /// capacity is the maximum capacity of the cache in bytes. capacity: u64, @@ -119,13 +120,22 @@ pub struct Cache { tasks: Arc>>, } +/// WriteCachePieceResponse is the response of writing a cache piece. +pub struct WriteCachePieceResponse { + /// length is the length of the cache piece. + pub length: u64, + + /// hash is the hash of the cache piece. + pub hash: String, +} + /// Cache implements the cache for storing piece content by LRU algorithm. impl Cache { /// new creates a new cache with the specified capacity. pub fn new(config: Arc) -> Self { Cache { config: config.clone(), - size: 0, + size: Arc::new(AtomicU64::new(0)), capacity: config.storage.cache_capacity.as_u64(), // LRU cache capacity is set to usize::MAX to avoid evicting tasks. LRU cache will evict tasks // by cache capacity(cache size) itself, and used pop_lru to evict the least recently @@ -221,10 +231,11 @@ impl Cache { } let mut tasks = self.tasks.write().await; - while self.size + content_length > self.capacity { + while self.size.load(Ordering::Relaxed) + content_length > self.capacity { match tasks.pop_lru() { Some((_, task)) => { - self.size -= task.content_length(); + self.size + .fetch_sub(task.content_length(), Ordering::Relaxed); } None => { break; @@ -234,7 +245,7 @@ impl Cache { let task = Task::new(content_length); tasks.put(task_id.to_string(), task); - self.size += content_length; + self.size.fetch_add(content_length, Ordering::Relaxed); } pub async fn delete_task(&mut self, task_id: &str) -> Result<()> { @@ -243,7 +254,8 @@ impl Cache { return Err(Error::TaskNotFound(task_id.to_string())); }; - self.size -= task.content_length(); + self.size + .fetch_sub(task.content_length(), Ordering::Relaxed); Ok(()) } @@ -306,7 +318,7 @@ mod tests { for (config, expected_size, expected_capacity) in test_cases { let cache = Cache::new(Arc::new(config)); - assert_eq!(cache.size, expected_size); + assert_eq!(cache.size.load(Ordering::Relaxed), expected_size); assert_eq!(cache.capacity, expected_capacity); } } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index ae400835..8d99b409 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -27,6 +27,7 @@ use std::time::Duration; use tokio::io::AsyncRead; use tokio::time::sleep; use tokio_util::either::Either; +use tokio_util::io::InspectReader; use tracing::{debug, error, info, instrument, warn}; pub mod cache; @@ -363,6 +364,102 @@ impl Storage { }); } + /// prepare_download_cache_task_started prepares the metadata of the cache task when the cache task downloads + /// started. + pub async fn prepare_download_cache_task_started( + &self, + id: &str, + ) -> Result { + self.metadata + .download_cache_task_started(id, None, None, None) + } + + /// download_cache_task_started updates the metadata of the cache task and create cache task content + /// when the cache task downloads started. + #[instrument(skip_all)] + pub async fn download_cache_task_started( + &self, + id: &str, + piece_length: u64, + content_length: u64, + response_header: Option, + ) -> Result { + let mut cache = self.cache.clone(); + cache.put_task(id, content_length).await; + + self.metadata.download_cache_task_started( + id, + Some(piece_length), + Some(content_length), + response_header, + ) + } + + /// download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished. + #[instrument(skip_all)] + pub fn download_cache_task_finished(&self, id: &str) -> Result { + self.metadata.download_cache_task_finished(id) + } + + /// download_cache_task_failed updates the metadata of the cache task when the cache task downloads failed. + #[instrument(skip_all)] + pub async fn download_cache_task_failed(&self, id: &str) -> Result { + self.metadata.download_cache_task_failed(id) + } + + /// prefetch_cache_task_started updates the metadata of the cache task when the cache task prefetches started. + #[instrument(skip_all)] + pub async fn prefetch_cache_task_started(&self, id: &str) -> Result { + self.metadata.prefetch_cache_task_started(id) + } + + /// prefetch_cache_task_failed updates the metadata of the cache task when the cache task prefetches failed. + #[instrument(skip_all)] + pub async fn prefetch_cache_task_failed(&self, id: &str) -> Result { + self.metadata.prefetch_cache_task_failed(id) + } + + /// upload_cache_task_finished updates the metadata of the cache task when the cache task uploads finished. + #[instrument(skip_all)] + pub fn upload_cache_task_finished(&self, id: &str) -> Result { + self.metadata.upload_cache_task_finished(id) + } + + /// get_cache_task returns the cache task metadata. + #[instrument(skip_all)] + pub fn get_cache_task(&self, id: &str) -> Result> { + self.metadata.get_cache_task(id) + } + + /// is_cache_task_exists returns whether the cache task exists. + #[instrument(skip_all)] + pub fn is_cache_task_exists(&self, id: &str) -> Result { + self.metadata.is_cache_task_exists(id) + } + + /// get_cache_tasks returns the cache task metadatas. + #[instrument(skip_all)] + pub fn get_cache_tasks(&self) -> Result> { + self.metadata.get_cache_tasks() + } + + /// delete_cache_task deletes the cache task metadatas, cache task content and piece metadatas. + #[instrument(skip_all)] + pub async fn delete_cache_task(&self, id: &str) { + self.metadata + .delete_cache_task(id) + .unwrap_or_else(|err| error!("delete cache task metadata failed: {}", err)); + + self.metadata.delete_pieces(id).unwrap_or_else(|err| { + error!("delete cache piece metadatas failed: {}", err); + }); + + let mut cache = self.cache.clone(); + cache.delete_task(id).await.unwrap_or_else(|err| { + info!("delete cache task from cache failed: {}", err); + }); + } + /// create_persistent_cache_piece creates a new persistent cache piece. #[instrument(skip_all)] pub async fn create_persistent_cache_piece( @@ -812,4 +909,269 @@ impl Storage { } } } + + /// download_cache_piece_started updates the metadata of the cache piece and writes + /// the data of cache piece to file when the cache piece downloads started. + #[instrument(skip_all)] + pub async fn download_cache_piece_started( + &self, + piece_id: &str, + number: u32, + ) -> Result { + // Wait for the piece to be finished. + match self.wait_for_cache_piece_finished(piece_id).await { + Ok(piece) => Ok(piece), + // If piece is not found or wait timeout, create piece metadata. + Err(_) => self.metadata.download_piece_started(piece_id, number), + } + } + + /// download_cache_piece_from_source_finished is used for downloading cache piece from source. + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + pub async fn download_cache_piece_from_source_finished( + &self, + piece_id: &str, + task_id: &str, + offset: u64, + length: u64, + reader: &mut R, + timeout: Duration, + ) -> Result { + tokio::select! { + piece = self.handle_downloaded_cache_piece_from_source_finished(piece_id, task_id, offset, length, reader) => { + piece + } + _ = sleep(timeout) => { + Err(Error::DownloadPieceFinished(piece_id.to_string())) + } + } + } + + // handle_downloaded_cache_piece_from_source_finished handles the downloaded cache piece from source. + #[instrument(skip_all)] + async fn handle_downloaded_cache_piece_from_source_finished( + &self, + piece_id: &str, + task_id: &str, + offset: u64, + length: u64, + reader: &mut R, + ) -> Result { + let mut buffer = Vec::with_capacity(length as usize); + let mut writer = std::io::Cursor::new(&mut buffer); + let mut hasher = crc32fast::Hasher::new(); + + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + tokio::io::copy(&mut tee, &mut writer).await?; + + let hash = hasher.finalize().to_string(); + let content = bytes::Bytes::from(buffer); + let content_length = content.len() as u64; + + self.cache.write_piece(task_id, piece_id, content).await?; + + debug!("put piece to cache: {}", piece_id); + + let digest = Digest::new(Algorithm::Crc32, hash); + + self.metadata.download_piece_finished( + piece_id, + offset, + content_length, + digest.to_string().as_str(), + None, + ) + } + + /// download_cache_piece_from_parent_finished is used for downloading cache piece from parent. + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + pub async fn download_cache_piece_from_parent_finished( + &self, + piece_id: &str, + task_id: &str, + offset: u64, + length: u64, + expected_digest: &str, + parent_id: &str, + reader: &mut R, + timeout: Duration, + ) -> Result { + tokio::select! { + piece = self.handle_downloaded_cache_piece_from_parent_finished(piece_id, task_id, offset, length, expected_digest, parent_id, reader) => { + piece + } + _ = sleep(timeout) => { + Err(Error::DownloadPieceFinished(piece_id.to_string())) + } + } + } + + // handle_downloaded_cache_piece_from_parent_finished handles the downloaded cache piece from parent. + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + async fn handle_downloaded_cache_piece_from_parent_finished( + &self, + piece_id: &str, + task_id: &str, + offset: u64, + length: u64, + expected_digest: &str, + parent_id: &str, + reader: &mut R, + ) -> Result { + let mut buffer = Vec::with_capacity(length as usize); + let mut writer = std::io::Cursor::new(&mut buffer); + let mut hasher = crc32fast::Hasher::new(); + + let mut tee = InspectReader::new(reader, |bytes| { + hasher.update(bytes); + }); + + tokio::io::copy(&mut tee, &mut writer).await?; + + let hash = hasher.finalize().to_string(); + let content = bytes::Bytes::from(buffer); + let content_length = content.len() as u64; + + self.cache.write_piece(task_id, piece_id, content).await?; + + debug!("put piece to cache: {}", piece_id); + + let digest = Digest::new(Algorithm::Crc32, hash); + + // Check the digest of the piece. + if expected_digest != digest.to_string() { + return Err(Error::DigestMismatch( + expected_digest.to_string(), + digest.to_string(), + )); + } + + self.metadata.download_piece_finished( + piece_id, + offset, + content_length, + digest.to_string().as_str(), + Some(parent_id.to_string()), + ) + } + /// download_cache_piece_failed updates the metadata of the cache piece when the cache piece downloads failed. + #[instrument(skip_all)] + pub fn download_cache_piece_failed(&self, piece_id: &str) -> Result<()> { + self.metadata.download_piece_failed(piece_id) + } + + /// upload_cache_piece updates the metadata of the piece and + /// returns the data of the piece. + #[instrument(skip_all)] + pub async fn upload_cache_piece( + &self, + piece_id: &str, + task_id: &str, + range: Option, + ) -> Result { + // Wait for the cache piece to be finished. + self.wait_for_cache_piece_finished(piece_id).await?; + + // Start uploading the task. + self.metadata.upload_cache_task_started(task_id)?; + + // Get the piece metadata and return the content of the piece. + match self.metadata.get_piece(piece_id) { + Ok(Some(piece)) => { + if self.cache.contains_piece(task_id, piece_id).await { + match self + .cache + .read_piece(task_id, piece_id, piece.clone(), range) + .await + { + Ok(reader) => { + // Finish uploading the task. + self.metadata.upload_cache_task_finished(task_id)?; + debug!("get piece from cache: {}", piece_id); + Ok(reader) + } + Err(err) => { + // Failed uploading the cache task. + self.metadata.upload_cache_task_failed(task_id)?; + Err(err) + } + } + } else { + // Failed uploading the cache task. + self.metadata.upload_cache_task_failed(task_id)?; + Err(Error::PieceNotFound(piece_id.to_string())) + } + } + Ok(None) => { + // Failed uploading the cache task. + self.metadata.upload_cache_task_failed(task_id)?; + Err(Error::PieceNotFound(piece_id.to_string())) + } + Err(err) => { + // Failed uploading the cache task. + self.metadata.upload_cache_task_failed(task_id)?; + Err(err) + } + } + } + + /// get_cache_piece returns the cache piece metadata. + pub fn get_cache_piece(&self, piece_id: &str) -> Result> { + self.metadata.get_piece(piece_id) + } + + /// is_cache_piece_exists returns whether the cache piece exists. + #[instrument(skip_all)] + pub fn is_cache_piece_exists(&self, piece_id: &str) -> Result { + self.metadata.is_piece_exists(piece_id) + } + + /// get_cache_pieces returns the cache piece metadatas. + #[instrument(skip_all)] + pub fn get_cache_pieces(&self, task_id: &str) -> Result> { + self.metadata.get_pieces(task_id) + } + + /// cache_piece_id returns the cache piece id. + #[inline] + pub fn cache_piece_id(&self, task_id: &str, number: u32) -> String { + self.metadata.piece_id(task_id, number) + } + + /// wait_for_cache_piece_finished waits for the cache piece to be finished. + #[instrument(skip_all)] + async fn wait_for_cache_piece_finished(&self, piece_id: &str) -> Result { + // Total timeout for downloading a piece, combining the download time and the time to write to storage. + let wait_timeout = tokio::time::sleep( + self.config.download.piece_timeout + self.config.storage.write_piece_timeout, + ); + tokio::pin!(wait_timeout); + + let mut interval = tokio::time::interval(DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL); + loop { + tokio::select! { + _ = interval.tick() => { + let piece = self + .get_cache_piece(piece_id)? + .ok_or_else(|| Error::PieceNotFound(piece_id.to_string()))?; + + // If the piece is finished, return. + if piece.is_finished() { + debug!("wait piece finished success"); + return Ok(piece); + } + } + _ = &mut wait_timeout => { + self.metadata.wait_for_piece_finished_failed(piece_id).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err)); + return Err(Error::WaitForPieceFinishedTimeout(piece_id.to_string())); + } + } + } + } } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index b1a3fb57..fa3b58a7 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -218,6 +218,101 @@ impl PersistentCacheTask { } } +/// CacheTask is the metadata of the cache task. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct CacheTask { + /// id is the task id. + pub id: String, + + /// piece_length is the length of the piece. + pub piece_length: Option, + + /// content_length is the length of the content. + pub content_length: Option, + + /// header is the header of the response. + pub response_header: HashMap, + + /// uploading_count is the count of the task being uploaded by other peers. + pub uploading_count: i64, + + /// uploaded_count is the count of the task has been uploaded by other peers. + pub uploaded_count: u64, + + /// updated_at is the time when the task metadata is updated. If the task is downloaded + /// by other peers, it will also update updated_at. + pub updated_at: NaiveDateTime, + + /// created_at is the time when the task metadata is created. + pub created_at: NaiveDateTime, + + /// prefetched_at is the time when the task prefetched. + pub prefetched_at: Option, + + /// failed_at is the time when the task downloads failed. + pub failed_at: Option, + + /// finished_at is the time when the task downloads finished. + pub finished_at: Option, +} + +/// CacheTask implements the cache task database object. +impl DatabaseObject for CacheTask { + /// NAMESPACE is the namespace of [CacheTask] objects. + const NAMESPACE: &'static str = "cache_task"; +} + +/// CacheTask implements the cache task metadata. +impl CacheTask { + /// is_started returns whether the cache task downloads started. + pub fn is_started(&self) -> bool { + self.finished_at.is_none() + } + + /// is_uploading returns whether the cache task is uploading. + pub fn is_uploading(&self) -> bool { + self.uploading_count > 0 + } + + /// is_expired returns whether the cache task is expired. + pub fn is_expired(&self, ttl: Duration) -> bool { + self.updated_at + ttl < Utc::now().naive_utc() + } + + /// is_prefetched returns whether the cache task is prefetched. + pub fn is_prefetched(&self) -> bool { + self.prefetched_at.is_some() + } + + /// is_failed returns whether the cache task downloads failed. + pub fn is_failed(&self) -> bool { + self.failed_at.is_some() + } + + /// is_finished returns whether the cache task downloads finished. + pub fn is_finished(&self) -> bool { + self.finished_at.is_some() + } + + /// is_empty returns whether the cache task is empty. + pub fn is_empty(&self) -> bool { + match self.content_length() { + Some(content_length) => content_length == 0, + None => false, + } + } + + /// piece_length returns the piece length of the cache task. + pub fn piece_length(&self) -> Option { + self.piece_length + } + + /// content_length returns the content length of the cache task. + pub fn content_length(&self) -> Option { + self.content_length + } +} + /// Piece is the metadata of the piece. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Piece { @@ -753,6 +848,218 @@ impl Metadata { self.db.delete::(id.as_bytes()) } + /// download_cache_task_started updates the metadata of the cache task when the cache task downloads started. + #[instrument(skip_all)] + pub fn download_cache_task_started( + &self, + id: &str, + piece_length: Option, + content_length: Option, + response_header: Option, + ) -> Result { + // Convert the response header to hashmap. + let response_header = response_header + .as_ref() + .map(headermap_to_hashmap) + .unwrap_or_default(); + + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + // If the task exists, update the task metadata. + task.updated_at = Utc::now().naive_utc(); + task.failed_at = None; + + // Protect content length to be overwritten by None. + if content_length.is_some() { + task.content_length = content_length; + } + + // Protect piece length to be overwritten by None. + if piece_length.is_some() { + task.piece_length = piece_length; + } + + // If the task has the response header, the response header + // will not be covered. + if task.response_header.is_empty() { + task.response_header = response_header; + } + + task + } + None => CacheTask { + id: id.to_string(), + piece_length, + content_length, + response_header, + updated_at: Utc::now().naive_utc(), + created_at: Utc::now().naive_utc(), + ..Default::default() + }, + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished. + #[instrument(skip_all)] + pub fn download_cache_task_finished(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.updated_at = Utc::now().naive_utc(); + task.failed_at = None; + task.finished_at = Some(Utc::now().naive_utc()); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// download_cache_task_failed updates the metadata of the cache task when the cache task downloads failed. + #[instrument(skip_all)] + pub fn download_cache_task_failed(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.updated_at = Utc::now().naive_utc(); + task.failed_at = Some(Utc::now().naive_utc()); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// prefetch_cache_task_started updates the metadata of the cache task when the cache task prefetch started. + #[instrument(skip_all)] + pub fn prefetch_cache_task_started(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + // If the task is prefetched, return an error. + if task.is_prefetched() { + return Err(Error::InvalidState("prefetched".to_string())); + } + + task.updated_at = Utc::now().naive_utc(); + task.prefetched_at = Some(Utc::now().naive_utc()); + task.failed_at = None; + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// prefetch_cache_task_failed updates the metadata of the cache task when the cache task prefetch failed. + #[instrument(skip_all)] + pub fn prefetch_cache_task_failed(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.updated_at = Utc::now().naive_utc(); + task.prefetched_at = None; + task.failed_at = Some(Utc::now().naive_utc()); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// upload_cache_task_started updates the metadata of the cache task when the cache task uploads started. + #[instrument(skip_all)] + pub fn upload_cache_task_started(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count += 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// upload_cache_task_finished updates the metadata of the cache task when the cache task uploads finished. + #[instrument(skip_all)] + pub fn upload_cache_task_finished(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count -= 1; + task.uploaded_count += 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// upload_cache_task_failed updates the metadata of the cache task when the cache task uploads failed. + #[instrument(skip_all)] + pub fn upload_cache_task_failed(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count -= 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + + /// get_cache_task gets the cache task metadata. + #[instrument(skip_all)] + pub fn get_cache_task(&self, id: &str) -> Result> { + self.db.get(id.as_bytes()) + } + + /// is_cache_task_exists checks if the cache task exists. + #[instrument(skip_all)] + pub fn is_cache_task_exists(&self, id: &str) -> Result { + self.db.is_exist::(id.as_bytes()) + } + + /// get_cache_tasks gets the cache task metadatas. + #[instrument(skip_all)] + pub fn get_cache_tasks(&self) -> Result> { + let tasks = self + .db + .iter_raw::()? + .map(|ele| { + let (_, value) = ele?; + Ok(value) + }) + .collect::>>>()?; + + tasks + .iter() + .map(|task| CacheTask::deserialize_from(task)) + .collect() + } + + /// delete_cache_task deletes the cache task metadata. + #[instrument(skip_all)] + pub fn delete_cache_task(&self, id: &str) -> Result<()> { + info!("delete cache task metadata {}", id); + self.db.delete::(id.as_bytes()) + } + /// create_persistent_cache_piece creates a new persistent cache piece, which is imported by /// local. #[instrument(skip_all)] @@ -927,6 +1234,7 @@ impl Metadata { Task::NAMESPACE, Piece::NAMESPACE, PersistentCacheTask::NAMESPACE, + CacheTask::NAMESPACE, ], config.storage.keep, )?; @@ -1026,6 +1334,66 @@ mod tests { assert!(task.is_none()); } + #[test] + fn test_cache_task_lifecycle() { + let dir = tempdir().unwrap(); + let log_dir = dir.path().join("log"); + let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); + let task_id = "d3c4e940ad06c47fc36ac67801e6f8e36cb400e2391708620bc7e865b102062c"; + + // Test download_task_started. + metadata + .download_cache_task_started(task_id, Some(1024), Some(1024), None) + .unwrap(); + let task = metadata + .get_cache_task(task_id) + .unwrap() + .expect("task should exist after download_cache_task_started"); + assert_eq!(task.id, task_id); + assert_eq!(task.piece_length, Some(1024)); + assert_eq!(task.content_length, Some(1024)); + assert!(task.response_header.is_empty()); + assert_eq!(task.uploading_count, 0); + assert_eq!(task.uploaded_count, 0); + assert!(!task.is_finished()); + + // Test download_cache_task_finished. + metadata.download_cache_task_finished(task_id).unwrap(); + let task = metadata.get_cache_task(task_id).unwrap().unwrap(); + assert!(task.is_finished()); + + // Test upload_cache_task_started. + metadata.upload_cache_task_started(task_id).unwrap(); + let task = metadata.get_cache_task(task_id).unwrap().unwrap(); + assert_eq!(task.uploading_count, 1); + + // Test upload_cache_task_finished. + metadata.upload_cache_task_finished(task_id).unwrap(); + let task = metadata.get_cache_task(task_id).unwrap().unwrap(); + assert_eq!(task.uploading_count, 0); + assert_eq!(task.uploaded_count, 1); + + // Test upload_cache_task_failed. + let task = metadata.upload_cache_task_started(task_id).unwrap(); + assert_eq!(task.uploading_count, 1); + let task = metadata.upload_cache_task_failed(task_id).unwrap(); + assert_eq!(task.uploading_count, 0); + assert_eq!(task.uploaded_count, 1); + + // Test get_cache_tasks. + let task_id = "a535b115f18d96870f0422ac891f91dd162f2f391e4778fb84279701fcd02dd1"; + metadata + .download_cache_task_started(task_id, Some(1024), None, None) + .unwrap(); + let tasks = metadata.get_cache_tasks().unwrap(); + assert_eq!(tasks.len(), 2); + + // Test delete_cache_task. + metadata.delete_cache_task(task_id).unwrap(); + let task = metadata.get_cache_task(task_id).unwrap(); + assert!(task.is_none()); + } + #[test] fn test_piece_lifecycle() { let dir = tempdir().unwrap();