diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 7485b01c..46f342a7 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -151,6 +151,7 @@ impl Storage { pub async fn create_persistent_cache_task( &self, id: &str, + ttl: Duration, path: &Path, piece_length: u64, expected_digest: &str, @@ -166,6 +167,7 @@ impl Storage { self.metadata.create_persistent_cache_task( id, + ttl, piece_length, response.length, digest.to_string().as_str(), @@ -176,12 +178,13 @@ impl Storage { pub fn download_cache_task_started( &self, id: &str, + ttl: Duration, persistent: bool, piece_length: u64, content_length: u64, ) -> Result { self.metadata - .download_cache_task_started(id, persistent, piece_length, content_length) + .download_cache_task_started(id, ttl, persistent, piece_length, content_length) } // download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished. diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 295a4ec0..678bb07a 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -132,6 +132,9 @@ pub struct CacheTask { // not be deleted when dfdamon runs garbage collection. pub persistent: bool, + // ttl is the time to live of the cache task. + pub ttl: Duration, + // digests is the digests of the cache task. pub digest: String, @@ -179,6 +182,14 @@ impl CacheTask { self.uploading_count > 0 } + // is_expired returns whether the cache task is expired. + pub fn is_expired(&self) -> bool { + // When scheduler runs garbage collection, it will trigger dfdaemon to evict the cache task. + // But sometimes the dfdaemon may not evict the cache task in time, so we select the ttl * 1.2 + // as the expired time to force evict the cache task. + self.created_at + self.ttl * 2 < Utc::now().naive_utc() + } + // is_failed returns whether the cache task downloads failed. pub fn is_failed(&self) -> bool { self.failed_at.is_some() @@ -489,6 +500,7 @@ impl Metadata { pub fn create_persistent_cache_task( &self, id: &str, + ttl: Duration, piece_length: u64, content_length: u64, digest: &str, @@ -496,6 +508,7 @@ impl Metadata { let task = CacheTask { id: id.to_string(), persistent: true, + ttl, piece_length, content_length, digest: digest.to_string(), @@ -515,6 +528,7 @@ impl Metadata { pub fn download_cache_task_started( &self, id: &str, + ttl: Duration, persistent: bool, piece_length: u64, content_length: u64, @@ -529,6 +543,7 @@ impl Metadata { None => CacheTask { id: id.to_string(), persistent, + ttl, piece_length, content_length, updated_at: Utc::now().naive_utc(), diff --git a/dragonfly-client/src/bin/dfcache/stat.rs b/dragonfly-client/src/bin/dfcache/stat.rs index fb450977..f573f454 100644 --- a/dragonfly-client/src/bin/dfcache/stat.rs +++ b/dragonfly-client/src/bin/dfcache/stat.rs @@ -17,9 +17,13 @@ use chrono::{DateTime, Local}; use clap::Parser; use dragonfly_api::dfdaemon::v2::StatCacheTaskRequest; -use dragonfly_client_core::{Error, Result}; +use dragonfly_client_core::{ + error::{ErrorType, OrErr}, + Error, Result, +}; use humantime::format_duration; use std::path::Path; +use std::time::Duration; use tabled::{ settings::{object::Rows, Alignment, Modify, Style}, Table, Tabled, @@ -186,10 +190,9 @@ impl StatCommand { }; // Convert ttl to human readable format. - if let Some(ttl) = task.ttl { - table_task.ttl = - format_duration(std::time::Duration::from_secs(ttl.seconds as u64)).to_string(); - } + let ttl = Duration::try_from(task.ttl.ok_or(Error::InvalidParameter)?) + .or_err(ErrorType::ParseError)?; + table_task.ttl = format_duration(ttl).to_string(); // Convert created_at to human readable format. if let Some(created_at) = task.created_at { diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index cdafd599..0ad3c00b 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -16,7 +16,7 @@ use crate::grpc::scheduler::SchedulerClient; use crate::shutdown; -use dragonfly_api::scheduler::v2::DeleteTaskRequest; +use dragonfly_api::scheduler::v2::{DeleteCacheTaskRequest, DeleteTaskRequest}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; use dragonfly_client_storage::{metadata, Storage}; @@ -75,14 +75,24 @@ impl GC { loop { tokio::select! { _ = interval.tick() => { - // Evict the cache by task ttl. - if let Err(err) = self.evict_by_task_ttl().await { - info!("failed to evict by task ttl: {}", err); + // Evict the cache task by ttl. + if let Err(err) = self.evict_cache_task_by_ttl().await { + info!("failed to evict cache task by ttl: {}", err); } // Evict the cache by disk usage. - if let Err(err) = self.evict_by_disk_usage().await { - info!("failed to evict by disk usage: {}", err); + if let Err(err) = self.evict_cache_task_by_disk_usage().await { + info!("failed to evict cache task by disk usage: {}", err); + } + + // Evict the task by ttl. + if let Err(err) = self.evict_task_by_ttl().await { + info!("failed to evict task by ttl: {}", err); + } + + // Evict the cache by disk usage. + if let Err(err) = self.evict_task_by_disk_usage().await { + info!("failed to evict task by disk usage: {}", err); } } _ = shutdown.recv() => { @@ -94,8 +104,8 @@ impl GC { } } - // evict_by_task_ttl evicts the cache by task ttl. - async fn evict_by_task_ttl(&self) -> Result<()> { + // evict_task_by_ttl evicts the task by ttl. + async fn evict_task_by_ttl(&self) -> Result<()> { info!("start to evict by task ttl"); for task in self.storage.get_tasks()? { // If the task is expired and not uploading, evict the task. @@ -116,8 +126,8 @@ impl GC { Ok(()) } - // evict_by_disk_usage evicts the cache by disk usage. - async fn evict_by_disk_usage(&self) -> Result<()> { + // evict_task_by_disk_usage evicts the task by disk usage. + async fn evict_task_by_disk_usage(&self) -> Result<()> { let stats = fs2::statvfs(self.config.storage.dir.as_path())?; let available_space = stats.available_space(); let total_space = stats.total_space(); @@ -126,7 +136,7 @@ impl GC { let usage_percent = (100 - available_space * 100 / total_space) as u8; if usage_percent >= self.config.gc.policy.dist_high_threshold_percent { info!( - "start to evict by disk usage, disk usage {}% is higher than high threshold {}%", + "start to evict task by disk usage, disk usage {}% is higher than high threshold {}%", usage_percent, self.config.gc.policy.dist_high_threshold_percent ); @@ -135,17 +145,17 @@ impl GC { * ((usage_percent - self.config.gc.policy.dist_low_threshold_percent) as f64 / 100.0); - // Evict the cache by the need evict space. - if let Err(err) = self.evict_space(need_evict_space as u64).await { - info!("failed to evict by disk usage: {}", err); + // Evict the task by the need evict space. + if let Err(err) = self.evict_task_space(need_evict_space as u64).await { + info!("failed to evict task by disk usage: {}", err); } } Ok(()) } - // evict_space evicts the cache by the given space. - async fn evict_space(&self, need_evict_space: u64) -> Result<()> { + // evict_task_space evicts the task by the given space. + async fn evict_task_space(&self, need_evict_space: u64) -> Result<()> { let mut tasks = self.storage.get_tasks()?; tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at)); @@ -197,4 +207,106 @@ impl GC { error!("failed to delete peer {}: {}", task.id, err); }); } + + // evict_cache_task_by_ttl evicts the cache task by ttl. + async fn evict_cache_task_by_ttl(&self) -> Result<()> { + info!("start to evict by cache task ttl * 2"); + for task in self.storage.get_cache_tasks()? { + // If the cache task is expired and not uploading, evict the cache task. + if task.is_expired() { + // If the cache task is uploading, skip it. + if task.is_uploading() { + continue; + } + + self.storage.delete_cache_task(&task.id).await; + info!("evict cache task {}", task.id); + + self.delete_cache_task_from_scheduler(task.clone()).await; + info!("delete cache task {} from scheduler", task.id); + } + } + + Ok(()) + } + + // evict_cache_task_by_disk_usage evicts the cache task by disk usage. + async fn evict_cache_task_by_disk_usage(&self) -> Result<()> { + let stats = fs2::statvfs(self.config.storage.dir.as_path())?; + let available_space = stats.available_space(); + let total_space = stats.total_space(); + + // Calculate the usage percent. + let usage_percent = (100 - available_space * 100 / total_space) as u8; + if usage_percent >= self.config.gc.policy.dist_high_threshold_percent { + info!( + "start to evict cache task by disk usage, disk usage {}% is higher than high threshold {}%", + usage_percent, self.config.gc.policy.dist_high_threshold_percent + ); + + // Calculate the need evict space. + let need_evict_space = total_space as f64 + * ((usage_percent - self.config.gc.policy.dist_low_threshold_percent) as f64 + / 100.0); + + // Evict the cache task by the need evict space. + if let Err(err) = self.evict_cache_task_space(need_evict_space as u64).await { + info!("failed to evict task by disk usage: {}", err); + } + } + + Ok(()) + } + + // evict_cache_task_space evicts the cache task by the given space. + async fn evict_cache_task_space(&self, need_evict_space: u64) -> Result<()> { + let mut tasks = self.storage.get_cache_tasks()?; + tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at)); + + let mut evicted_space = 0; + for task in tasks { + // Evict enough space. + if evicted_space >= need_evict_space { + break; + } + + // If the cache task is uploading, skip it. + if task.is_uploading() { + continue; + } + + // If the cache task is persistent, skip it. + if task.is_persistent() { + continue; + } + + let task_space = task.content_length(); + + // Evict the task. + self.storage.delete_task(&task.id).await; + + // Update the evicted space. + evicted_space += task_space; + info!("evict cache task {} size {}", task.id, task_space); + + self.delete_cache_task_from_scheduler(task.clone()).await; + info!("delete cache task {} from scheduler", task.id); + } + + info!("evict total size {}", evicted_space); + Ok(()) + } + + // delete_cache_task_from_scheduler deletes the cache task from the scheduler. + async fn delete_cache_task_from_scheduler(&self, task: metadata::CacheTask) { + self.scheduler_client + .delete_cache_task(DeleteCacheTaskRequest { + host_id: self.host_id.clone(), + task_id: task.id.clone(), + }) + .await + .unwrap_or_else(|err| { + error!("failed to delete cache peer {}: {}", task.id, err); + }); + } } diff --git a/dragonfly-client/src/resource/cache_task.rs b/dragonfly-client/src/resource/cache_task.rs index 56582116..87b468c5 100644 --- a/dragonfly-client/src/resource/cache_task.rs +++ b/dragonfly-client/src/resource/cache_task.rs @@ -128,10 +128,14 @@ impl CacheTask { err })?; + // Convert prost_wkt_types::Duration to std::time::Duration. + let ttl = Duration::try_from(request.ttl.clone().ok_or(Error::UnexpectedResponse)?) + .or_err(ErrorType::ParseError)?; + // Create the persistent cache task. match self .storage - .create_persistent_cache_task(task_id, path, request.piece_length, digest) + .create_persistent_cache_task(task_id, ttl, path, request.piece_length, digest) .await { Ok(metadata) => { @@ -219,8 +223,13 @@ impl CacheTask { }) .await?; + // Convert prost_wkt_types::Duration to std::time::Duration. + let ttl = Duration::try_from(response.ttl.ok_or(Error::InvalidParameter)?) + .or_err(ErrorType::ParseError)?; + self.storage.download_cache_task_started( task_id, + ttl, request.persistent, request.piece_length, response.content_length,