feat: add garbage collection for cache task (#610)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-07-18 11:54:05 +08:00 committed by GitHub
parent f9c8f2875f
commit c202b14fd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 165 additions and 23 deletions

View File

@ -151,6 +151,7 @@ impl Storage {
pub async fn create_persistent_cache_task( pub async fn create_persistent_cache_task(
&self, &self,
id: &str, id: &str,
ttl: Duration,
path: &Path, path: &Path,
piece_length: u64, piece_length: u64,
expected_digest: &str, expected_digest: &str,
@ -166,6 +167,7 @@ impl Storage {
self.metadata.create_persistent_cache_task( self.metadata.create_persistent_cache_task(
id, id,
ttl,
piece_length, piece_length,
response.length, response.length,
digest.to_string().as_str(), digest.to_string().as_str(),
@ -176,12 +178,13 @@ impl Storage {
pub fn download_cache_task_started( pub fn download_cache_task_started(
&self, &self,
id: &str, id: &str,
ttl: Duration,
persistent: bool, persistent: bool,
piece_length: u64, piece_length: u64,
content_length: u64, content_length: u64,
) -> Result<metadata::CacheTask> { ) -> Result<metadata::CacheTask> {
self.metadata self.metadata
.download_cache_task_started(id, persistent, piece_length, content_length) .download_cache_task_started(id, ttl, persistent, piece_length, content_length)
} }
// download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished. // download_cache_task_finished updates the metadata of the cache task when the cache task downloads finished.

View File

@ -132,6 +132,9 @@ pub struct CacheTask {
// not be deleted when dfdamon runs garbage collection. // not be deleted when dfdamon runs garbage collection.
pub persistent: bool, pub persistent: bool,
// ttl is the time to live of the cache task.
pub ttl: Duration,
// digests is the digests of the cache task. // digests is the digests of the cache task.
pub digest: String, pub digest: String,
@ -179,6 +182,14 @@ impl CacheTask {
self.uploading_count > 0 self.uploading_count > 0
} }
// is_expired returns whether the cache task is expired.
pub fn is_expired(&self) -> bool {
// When scheduler runs garbage collection, it will trigger dfdaemon to evict the cache task.
// But sometimes the dfdaemon may not evict the cache task in time, so we select the ttl * 1.2
// as the expired time to force evict the cache task.
self.created_at + self.ttl * 2 < Utc::now().naive_utc()
}
// is_failed returns whether the cache task downloads failed. // is_failed returns whether the cache task downloads failed.
pub fn is_failed(&self) -> bool { pub fn is_failed(&self) -> bool {
self.failed_at.is_some() self.failed_at.is_some()
@ -489,6 +500,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
pub fn create_persistent_cache_task( pub fn create_persistent_cache_task(
&self, &self,
id: &str, id: &str,
ttl: Duration,
piece_length: u64, piece_length: u64,
content_length: u64, content_length: u64,
digest: &str, digest: &str,
@ -496,6 +508,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
let task = CacheTask { let task = CacheTask {
id: id.to_string(), id: id.to_string(),
persistent: true, persistent: true,
ttl,
piece_length, piece_length,
content_length, content_length,
digest: digest.to_string(), digest: digest.to_string(),
@ -515,6 +528,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
pub fn download_cache_task_started( pub fn download_cache_task_started(
&self, &self,
id: &str, id: &str,
ttl: Duration,
persistent: bool, persistent: bool,
piece_length: u64, piece_length: u64,
content_length: u64, content_length: u64,
@ -529,6 +543,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
None => CacheTask { None => CacheTask {
id: id.to_string(), id: id.to_string(),
persistent, persistent,
ttl,
piece_length, piece_length,
content_length, content_length,
updated_at: Utc::now().naive_utc(), updated_at: Utc::now().naive_utc(),

View File

@ -17,9 +17,13 @@
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use clap::Parser; use clap::Parser;
use dragonfly_api::dfdaemon::v2::StatCacheTaskRequest; use dragonfly_api::dfdaemon::v2::StatCacheTaskRequest;
use dragonfly_client_core::{Error, Result}; use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use humantime::format_duration; use humantime::format_duration;
use std::path::Path; use std::path::Path;
use std::time::Duration;
use tabled::{ use tabled::{
settings::{object::Rows, Alignment, Modify, Style}, settings::{object::Rows, Alignment, Modify, Style},
Table, Tabled, Table, Tabled,
@ -186,10 +190,9 @@ impl StatCommand {
}; };
// Convert ttl to human readable format. // Convert ttl to human readable format.
if let Some(ttl) = task.ttl { let ttl = Duration::try_from(task.ttl.ok_or(Error::InvalidParameter)?)
table_task.ttl = .or_err(ErrorType::ParseError)?;
format_duration(std::time::Duration::from_secs(ttl.seconds as u64)).to_string(); table_task.ttl = format_duration(ttl).to_string();
}
// Convert created_at to human readable format. // Convert created_at to human readable format.
if let Some(created_at) = task.created_at { if let Some(created_at) = task.created_at {

View File

@ -16,7 +16,7 @@
use crate::grpc::scheduler::SchedulerClient; use crate::grpc::scheduler::SchedulerClient;
use crate::shutdown; use crate::shutdown;
use dragonfly_api::scheduler::v2::DeleteTaskRequest; use dragonfly_api::scheduler::v2::{DeleteCacheTaskRequest, DeleteTaskRequest};
use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result; use dragonfly_client_core::Result;
use dragonfly_client_storage::{metadata, Storage}; use dragonfly_client_storage::{metadata, Storage};
@ -75,14 +75,24 @@ impl GC {
loop { loop {
tokio::select! { tokio::select! {
_ = interval.tick() => { _ = interval.tick() => {
// Evict the cache by task ttl. // Evict the cache task by ttl.
if let Err(err) = self.evict_by_task_ttl().await { if let Err(err) = self.evict_cache_task_by_ttl().await {
info!("failed to evict by task ttl: {}", err); info!("failed to evict cache task by ttl: {}", err);
} }
// Evict the cache by disk usage. // Evict the cache by disk usage.
if let Err(err) = self.evict_by_disk_usage().await { if let Err(err) = self.evict_cache_task_by_disk_usage().await {
info!("failed to evict by disk usage: {}", err); info!("failed to evict cache task by disk usage: {}", err);
}
// Evict the task by ttl.
if let Err(err) = self.evict_task_by_ttl().await {
info!("failed to evict task by ttl: {}", err);
}
// Evict the cache by disk usage.
if let Err(err) = self.evict_task_by_disk_usage().await {
info!("failed to evict task by disk usage: {}", err);
} }
} }
_ = shutdown.recv() => { _ = shutdown.recv() => {
@ -94,8 +104,8 @@ impl GC {
} }
} }
// evict_by_task_ttl evicts the cache by task ttl. // evict_task_by_ttl evicts the task by ttl.
async fn evict_by_task_ttl(&self) -> Result<()> { async fn evict_task_by_ttl(&self) -> Result<()> {
info!("start to evict by task ttl"); info!("start to evict by task ttl");
for task in self.storage.get_tasks()? { for task in self.storage.get_tasks()? {
// If the task is expired and not uploading, evict the task. // If the task is expired and not uploading, evict the task.
@ -116,8 +126,8 @@ impl GC {
Ok(()) Ok(())
} }
// evict_by_disk_usage evicts the cache by disk usage. // evict_task_by_disk_usage evicts the task by disk usage.
async fn evict_by_disk_usage(&self) -> Result<()> { async fn evict_task_by_disk_usage(&self) -> Result<()> {
let stats = fs2::statvfs(self.config.storage.dir.as_path())?; let stats = fs2::statvfs(self.config.storage.dir.as_path())?;
let available_space = stats.available_space(); let available_space = stats.available_space();
let total_space = stats.total_space(); let total_space = stats.total_space();
@ -126,7 +136,7 @@ impl GC {
let usage_percent = (100 - available_space * 100 / total_space) as u8; let usage_percent = (100 - available_space * 100 / total_space) as u8;
if usage_percent >= self.config.gc.policy.dist_high_threshold_percent { if usage_percent >= self.config.gc.policy.dist_high_threshold_percent {
info!( info!(
"start to evict by disk usage, disk usage {}% is higher than high threshold {}%", "start to evict task by disk usage, disk usage {}% is higher than high threshold {}%",
usage_percent, self.config.gc.policy.dist_high_threshold_percent usage_percent, self.config.gc.policy.dist_high_threshold_percent
); );
@ -135,17 +145,17 @@ impl GC {
* ((usage_percent - self.config.gc.policy.dist_low_threshold_percent) as f64 * ((usage_percent - self.config.gc.policy.dist_low_threshold_percent) as f64
/ 100.0); / 100.0);
// Evict the cache by the need evict space. // Evict the task by the need evict space.
if let Err(err) = self.evict_space(need_evict_space as u64).await { if let Err(err) = self.evict_task_space(need_evict_space as u64).await {
info!("failed to evict by disk usage: {}", err); info!("failed to evict task by disk usage: {}", err);
} }
} }
Ok(()) Ok(())
} }
// evict_space evicts the cache by the given space. // evict_task_space evicts the task by the given space.
async fn evict_space(&self, need_evict_space: u64) -> Result<()> { async fn evict_task_space(&self, need_evict_space: u64) -> Result<()> {
let mut tasks = self.storage.get_tasks()?; let mut tasks = self.storage.get_tasks()?;
tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at)); tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at));
@ -197,4 +207,106 @@ impl GC {
error!("failed to delete peer {}: {}", task.id, err); error!("failed to delete peer {}: {}", task.id, err);
}); });
} }
// evict_cache_task_by_ttl evicts the cache task by ttl.
async fn evict_cache_task_by_ttl(&self) -> Result<()> {
info!("start to evict by cache task ttl * 2");
for task in self.storage.get_cache_tasks()? {
// If the cache task is expired and not uploading, evict the cache task.
if task.is_expired() {
// If the cache task is uploading, skip it.
if task.is_uploading() {
continue;
}
self.storage.delete_cache_task(&task.id).await;
info!("evict cache task {}", task.id);
self.delete_cache_task_from_scheduler(task.clone()).await;
info!("delete cache task {} from scheduler", task.id);
}
}
Ok(())
}
// evict_cache_task_by_disk_usage evicts the cache task by disk usage.
async fn evict_cache_task_by_disk_usage(&self) -> Result<()> {
let stats = fs2::statvfs(self.config.storage.dir.as_path())?;
let available_space = stats.available_space();
let total_space = stats.total_space();
// Calculate the usage percent.
let usage_percent = (100 - available_space * 100 / total_space) as u8;
if usage_percent >= self.config.gc.policy.dist_high_threshold_percent {
info!(
"start to evict cache task by disk usage, disk usage {}% is higher than high threshold {}%",
usage_percent, self.config.gc.policy.dist_high_threshold_percent
);
// Calculate the need evict space.
let need_evict_space = total_space as f64
* ((usage_percent - self.config.gc.policy.dist_low_threshold_percent) as f64
/ 100.0);
// Evict the cache task by the need evict space.
if let Err(err) = self.evict_cache_task_space(need_evict_space as u64).await {
info!("failed to evict task by disk usage: {}", err);
}
}
Ok(())
}
// evict_cache_task_space evicts the cache task by the given space.
async fn evict_cache_task_space(&self, need_evict_space: u64) -> Result<()> {
let mut tasks = self.storage.get_cache_tasks()?;
tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at));
let mut evicted_space = 0;
for task in tasks {
// Evict enough space.
if evicted_space >= need_evict_space {
break;
}
// If the cache task is uploading, skip it.
if task.is_uploading() {
continue;
}
// If the cache task is persistent, skip it.
if task.is_persistent() {
continue;
}
let task_space = task.content_length();
// Evict the task.
self.storage.delete_task(&task.id).await;
// Update the evicted space.
evicted_space += task_space;
info!("evict cache task {} size {}", task.id, task_space);
self.delete_cache_task_from_scheduler(task.clone()).await;
info!("delete cache task {} from scheduler", task.id);
}
info!("evict total size {}", evicted_space);
Ok(())
}
// delete_cache_task_from_scheduler deletes the cache task from the scheduler.
async fn delete_cache_task_from_scheduler(&self, task: metadata::CacheTask) {
self.scheduler_client
.delete_cache_task(DeleteCacheTaskRequest {
host_id: self.host_id.clone(),
task_id: task.id.clone(),
})
.await
.unwrap_or_else(|err| {
error!("failed to delete cache peer {}: {}", task.id, err);
});
}
} }

View File

@ -128,10 +128,14 @@ impl CacheTask {
err err
})?; })?;
// Convert prost_wkt_types::Duration to std::time::Duration.
let ttl = Duration::try_from(request.ttl.clone().ok_or(Error::UnexpectedResponse)?)
.or_err(ErrorType::ParseError)?;
// Create the persistent cache task. // Create the persistent cache task.
match self match self
.storage .storage
.create_persistent_cache_task(task_id, path, request.piece_length, digest) .create_persistent_cache_task(task_id, ttl, path, request.piece_length, digest)
.await .await
{ {
Ok(metadata) => { Ok(metadata) => {
@ -219,8 +223,13 @@ impl CacheTask {
}) })
.await?; .await?;
// Convert prost_wkt_types::Duration to std::time::Duration.
let ttl = Duration::try_from(response.ttl.ok_or(Error::InvalidParameter)?)
.or_err(ErrorType::ParseError)?;
self.storage.download_cache_task_started( self.storage.download_cache_task_started(
task_id, task_id,
ttl,
request.persistent, request.persistent,
request.piece_length, request.piece_length,
response.content_length, response.content_length,