From cd9f5ca3568d810252fbddcbb75cc6fc9be8d2dd Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 3 Mar 2025 17:45:10 +0800 Subject: [PATCH] feat(dragonfly-client/proxy): add switch for cache in proxy (#1010) Signed-off-by: Gaius --- Cargo.lock | 18 ++-- Cargo.toml | 16 ++-- dragonfly-client-config/src/dfdaemon.rs | 23 +---- dragonfly-client/src/proxy/cache.rs | 1 + dragonfly-client/src/proxy/mod.rs | 116 +++++++++++++----------- 5 files changed, 85 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ad79524..6dd2f450 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -939,7 +939,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.16" +version = "0.2.17" dependencies = [ "anyhow", "blake3", @@ -1011,7 +1011,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.16" +version = "0.2.17" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.16" +version = "0.2.17" dependencies = [ "bytesize", "bytesize-serde", @@ -1068,7 +1068,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.16" +version = "0.2.17" dependencies = [ "headers 0.4.0", "hyper 1.6.0", @@ -1087,7 +1087,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.16" +version = "0.2.17" dependencies = [ "anyhow", "clap", @@ -1105,7 +1105,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.16" +version = "0.2.17" dependencies = [ "base16ct", "bincode", @@ -1131,7 +1131,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.16" +version = "0.2.17" dependencies = [ "base16ct", "base64 0.22.1", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.16" +version = "0.2.17" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 5d857d4f..636b54e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.16" +version = "0.2.17" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.16" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.16" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.16" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.16" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.16" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.16" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.16" } +dragonfly-client = { path = "dragonfly-client", version = "0.2.17" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.17" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.17" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.17" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.17" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.17" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.17" } thiserror = "1.0" dragonfly-api = "=2.1.30" reqwest = { version = "0.12.4", features = [ diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index c799aa58..24ed5e35 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -200,13 +200,6 @@ fn default_storage_read_buffer_size() -> usize { 128 * 1024 } -/// default_storage_cache_capacity is the default cache capacity for the preheat job, default is -/// 100. -#[inline] -fn default_storage_cache_capacity() -> usize { - 100 -} - /// default_seed_peer_cluster_id is the default cluster id of seed peer. #[inline] fn default_seed_peer_cluster_id() -> u64 { @@ -249,12 +242,6 @@ pub fn default_proxy_server_port() -> u16 { 4001 } -/// default_proxy_cache_capacity is the default cache capacity for the proxy server, default is 150. -#[inline] -pub fn default_proxy_cache_capacity() -> usize { - 150 -} - /// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 32KB. #[inline] pub fn default_proxy_read_buffer_size() -> usize { @@ -981,8 +968,7 @@ pub struct Storage { /// | | /// +--------------------------------------------------+ /// ``` - #[serde(default = "default_storage_cache_capacity")] - pub cache_capacity: usize, + pub cache_capacity: Option, } /// Storage implements Default. @@ -994,7 +980,7 @@ impl Default for Storage { keep: default_storage_keep(), write_buffer_size: default_storage_write_buffer_size(), read_buffer_size: default_storage_read_buffer_size(), - cache_capacity: default_storage_cache_capacity(), + cache_capacity: None, } } } @@ -1264,8 +1250,7 @@ pub struct Proxy { /// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB. /// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the /// memory size of the host. - #[serde(default = "default_proxy_cache_capacity")] - pub cache_capacity: usize, + pub cache_capacity: Option, /// read_buffer_size is the buffer size for reading piece from disk, default is 1KB. #[serde(default = "default_proxy_read_buffer_size")] @@ -1282,7 +1267,7 @@ impl Default for Proxy { disable_back_to_source: false, prefetch: false, prefetch_rate_limit: default_prefetch_rate_limit(), - cache_capacity: default_proxy_cache_capacity(), + cache_capacity: None, read_buffer_size: default_proxy_read_buffer_size(), } } diff --git a/dragonfly-client/src/proxy/cache.rs b/dragonfly-client/src/proxy/cache.rs index 3c3462c8..4ea7468c 100644 --- a/dragonfly-client/src/proxy/cache.rs +++ b/dragonfly-client/src/proxy/cache.rs @@ -24,6 +24,7 @@ use std::cmp::{max, min}; use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; +/// TODO(Gaius): Fix the memory leak issue. /// Cache is the cache for storing http response by LRU algorithm. #[derive(Clone)] pub struct Cache { diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index ab6df170..eeed674b 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -76,7 +76,7 @@ pub struct Proxy { config: Arc, /// cache is the cache manager for storing the piece content. - cache: Arc, + cache: Option>, /// task is the task manager. task: Arc, @@ -110,7 +110,7 @@ impl Proxy { ) -> Self { let mut proxy = Self { config: config.clone(), - cache: Arc::new(cache::Cache::new(config.proxy.cache_capacity, task.clone()).unwrap()), + cache: None, task: task.clone(), addr: SocketAddr::new(config.proxy.server.ip.unwrap(), config.proxy.server.port), registry_cert: Arc::new(None), @@ -119,6 +119,10 @@ impl Proxy { _shutdown_complete: shutdown_complete_tx, }; + if let Some(cache_capacity) = config.proxy.cache_capacity { + proxy.cache = Some(Arc::new(cache::Cache::new(cache_capacity, task).unwrap())); + } + // Load and generate the registry certificates from the PEM format file. proxy.registry_cert = match config.proxy.registry_mirror.load_cert_der() { Ok(registry_cert) => { @@ -221,7 +225,7 @@ impl Proxy { #[instrument(skip_all, fields(uri, method))] pub async fn handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -292,7 +296,7 @@ pub async fn handler( #[instrument(skip_all)] pub async fn registry_mirror_http_handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -314,7 +318,7 @@ pub async fn registry_mirror_http_handler( #[instrument(skip_all)] pub async fn registry_mirror_https_handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -338,7 +342,7 @@ pub async fn registry_mirror_https_handler( #[instrument(skip_all)] pub async fn http_handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -423,7 +427,7 @@ pub async fn http_handler( #[instrument(skip_all)] pub async fn https_handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, request: Request, dfdaemon_download_client: DfdaemonDownloadClient, @@ -470,7 +474,7 @@ pub async fn https_handler( #[instrument(skip_all)] async fn upgraded_tunnel( config: Arc, - cache: Arc, + cache: Option>, task: Arc, upgraded: Upgraded, host: String, @@ -540,7 +544,7 @@ async fn upgraded_tunnel( #[instrument(skip_all, fields(uri, method))] pub async fn upgraded_handler( config: Arc, - cache: Arc, + cache: Option>, task: Arc, host: String, mut request: Request, @@ -634,7 +638,7 @@ pub async fn upgraded_handler( #[instrument(skip_all, fields(host_id, task_id, peer_id))] async fn proxy_via_dfdaemon( config: Arc, - cache: Arc, + cache: Option>, task: Arc, rule: &Rule, request: Request, @@ -663,33 +667,35 @@ async fn proxy_via_dfdaemon( .as_ref() .is_some_and(|d| d.output_path.is_some()); + // If has_output_path is false and cache is enabled, check the cache first. if !has_output_path { - // Get the content from the cache by the request. - match cache.get_by_request(&download_task_request).await { - Ok(None) => { - debug!("cache miss"); - } - Ok(Some(content)) => { - info!("cache hit"); + if let Some(cache) = cache.as_ref() { + match cache.get_by_request(&download_task_request).await { + Ok(None) => { + debug!("cache miss"); + } + Ok(Some(content)) => { + info!("cache hit"); - // Collect the download piece traffic metrics and the proxy request via dfdaemon and - // cache hits metrics. - collect_proxy_request_via_dfdaemon_and_cache_hits_metrics(); - collect_download_piece_traffic_metrics( - &TrafficType::LocalPeer, - TaskType::Standard as i32, - content.len() as u64, - ); + // Collect the download piece traffic metrics and the proxy request via dfdaemon and + // cache hits metrics. + collect_proxy_request_via_dfdaemon_and_cache_hits_metrics(); + collect_download_piece_traffic_metrics( + &TrafficType::LocalPeer, + TaskType::Standard as i32, + content.len() as u64, + ); - let body_boxed = Full::new(content).map_err(ClientError::from).boxed(); - return Ok(Response::new(body_boxed)); - } - Err(err) => { - error!("get content from cache failed: {}", err); - return Ok(make_error_response( - http::StatusCode::INTERNAL_SERVER_ERROR, - None, - )); + let body_boxed = Full::new(content).map_err(ClientError::from).boxed(); + return Ok(Response::new(body_boxed)); + } + Err(err) => { + error!("get content from cache failed: {}", err); + return Ok(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + None, + )); + } } } } @@ -884,30 +890,34 @@ async fn proxy_via_dfdaemon( return; } - // If the piece is not in the cache, add it to the cache. - let piece_id = - task.piece.id(message.task_id.as_str(), need_piece_number); + // If cache is enabled, check the cache first. + if let Some(cache) = cache.as_ref() { + // If the piece is not in the cache, add it to the cache. + let piece_id = + task.piece.id(message.task_id.as_str(), need_piece_number); - if !cache.contains_piece(&piece_id) { - let mut content = - bytes::BytesMut::with_capacity(piece.length as usize); - loop { - let n = match piece_reader.read_buf(&mut content).await { - Ok(n) => n, - Err(err) => { - error!("read piece reader error: {}", err); + if !cache.contains_piece(&piece_id) { + let mut content = + bytes::BytesMut::with_capacity(piece.length as usize); + loop { + let n = match piece_reader.read_buf(&mut content).await + { + Ok(n) => n, + Err(err) => { + error!("read piece reader error: {}", err); + break; + } + }; + + // When the piece reader reads to the end, add the piece + // to the cache. + if n == 0 { + cache.add_piece(&piece_id, content.freeze()); break; } - }; - - // When the piece reader reads to the end, add the piece - // to the cache. - if n == 0 { - cache.add_piece(&piece_id, content.freeze()); - break; } } - } + }; need_piece_number += 1; }