feat(dragonfly-client/proxy): add switch for cache in proxy (#1010)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-03-03 17:45:10 +08:00 committed by GitHub
parent 545bbe5902
commit cd9f5ca356
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 85 additions and 89 deletions

18
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "addr2line"
@ -939,7 +939,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"anyhow",
"blake3",
@ -1011,7 +1011,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1042,7 +1042,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1068,7 +1068,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"headers 0.4.0",
"hyper 1.6.0",
@ -1087,7 +1087,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"anyhow",
"clap",
@ -1105,7 +1105,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"base16ct",
"bincode",
@ -1131,7 +1131,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"base16ct",
"base64 0.22.1",
@ -1532,7 +1532,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.2.16"
version = "0.2.17"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.2.16"
version = "0.2.17"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.16" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.16" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.16" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.16" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.16" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.16" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.16" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.17" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.17" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.17" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.17" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.17" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.17" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.17" }
thiserror = "1.0"
dragonfly-api = "=2.1.30"
reqwest = { version = "0.12.4", features = [

View File

@ -200,13 +200,6 @@ fn default_storage_read_buffer_size() -> usize {
128 * 1024
}
/// default_storage_cache_capacity is the default cache capacity for the preheat job, default is
/// 100.
#[inline]
fn default_storage_cache_capacity() -> usize {
100
}
/// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline]
fn default_seed_peer_cluster_id() -> u64 {
@ -249,12 +242,6 @@ pub fn default_proxy_server_port() -> u16 {
4001
}
/// default_proxy_cache_capacity is the default cache capacity for the proxy server, default is 150.
#[inline]
pub fn default_proxy_cache_capacity() -> usize {
150
}
/// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 32KB.
#[inline]
pub fn default_proxy_read_buffer_size() -> usize {
@ -981,8 +968,7 @@ pub struct Storage {
/// | |
/// +--------------------------------------------------+
/// ```
#[serde(default = "default_storage_cache_capacity")]
pub cache_capacity: usize,
pub cache_capacity: Option<usize>,
}
/// Storage implements Default.
@ -994,7 +980,7 @@ impl Default for Storage {
keep: default_storage_keep(),
write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(),
cache_capacity: default_storage_cache_capacity(),
cache_capacity: None,
}
}
}
@ -1264,8 +1250,7 @@ pub struct Proxy {
/// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB.
/// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the
/// memory size of the host.
#[serde(default = "default_proxy_cache_capacity")]
pub cache_capacity: usize,
pub cache_capacity: Option<usize>,
/// read_buffer_size is the buffer size for reading piece from disk, default is 1KB.
#[serde(default = "default_proxy_read_buffer_size")]
@ -1282,7 +1267,7 @@ impl Default for Proxy {
disable_back_to_source: false,
prefetch: false,
prefetch_rate_limit: default_prefetch_rate_limit(),
cache_capacity: default_proxy_cache_capacity(),
cache_capacity: None,
read_buffer_size: default_proxy_read_buffer_size(),
}
}

View File

@ -24,6 +24,7 @@ use std::cmp::{max, min};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
/// TODO(Gaius): Fix the memory leak issue.
/// Cache is the cache for storing http response by LRU algorithm.
#[derive(Clone)]
pub struct Cache {

View File

@ -76,7 +76,7 @@ pub struct Proxy {
config: Arc<Config>,
/// cache is the cache manager for storing the piece content.
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
/// task is the task manager.
task: Arc<Task>,
@ -110,7 +110,7 @@ impl Proxy {
) -> Self {
let mut proxy = Self {
config: config.clone(),
cache: Arc::new(cache::Cache::new(config.proxy.cache_capacity, task.clone()).unwrap()),
cache: None,
task: task.clone(),
addr: SocketAddr::new(config.proxy.server.ip.unwrap(), config.proxy.server.port),
registry_cert: Arc::new(None),
@ -119,6 +119,10 @@ impl Proxy {
_shutdown_complete: shutdown_complete_tx,
};
if let Some(cache_capacity) = config.proxy.cache_capacity {
proxy.cache = Some(Arc::new(cache::Cache::new(cache_capacity, task).unwrap()));
}
// Load and generate the registry certificates from the PEM format file.
proxy.registry_cert = match config.proxy.registry_mirror.load_cert_der() {
Ok(registry_cert) => {
@ -221,7 +225,7 @@ impl Proxy {
#[instrument(skip_all, fields(uri, method))]
pub async fn handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
@ -292,7 +296,7 @@ pub async fn handler(
#[instrument(skip_all)]
pub async fn registry_mirror_http_handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
@ -314,7 +318,7 @@ pub async fn registry_mirror_http_handler(
#[instrument(skip_all)]
pub async fn registry_mirror_https_handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
@ -338,7 +342,7 @@ pub async fn registry_mirror_https_handler(
#[instrument(skip_all)]
pub async fn http_handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
@ -423,7 +427,7 @@ pub async fn http_handler(
#[instrument(skip_all)]
pub async fn https_handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
@ -470,7 +474,7 @@ pub async fn https_handler(
#[instrument(skip_all)]
async fn upgraded_tunnel(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
upgraded: Upgraded,
host: String,
@ -540,7 +544,7 @@ async fn upgraded_tunnel(
#[instrument(skip_all, fields(uri, method))]
pub async fn upgraded_handler(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
host: String,
mut request: Request<hyper::body::Incoming>,
@ -634,7 +638,7 @@ pub async fn upgraded_handler(
#[instrument(skip_all, fields(host_id, task_id, peer_id))]
async fn proxy_via_dfdaemon(
config: Arc<Config>,
cache: Arc<cache::Cache>,
cache: Option<Arc<cache::Cache>>,
task: Arc<Task>,
rule: &Rule,
request: Request<hyper::body::Incoming>,
@ -663,33 +667,35 @@ async fn proxy_via_dfdaemon(
.as_ref()
.is_some_and(|d| d.output_path.is_some());
// If has_output_path is false and cache is enabled, check the cache first.
if !has_output_path {
// Get the content from the cache by the request.
match cache.get_by_request(&download_task_request).await {
Ok(None) => {
debug!("cache miss");
}
Ok(Some(content)) => {
info!("cache hit");
if let Some(cache) = cache.as_ref() {
match cache.get_by_request(&download_task_request).await {
Ok(None) => {
debug!("cache miss");
}
Ok(Some(content)) => {
info!("cache hit");
// Collect the download piece traffic metrics and the proxy request via dfdaemon and
// cache hits metrics.
collect_proxy_request_via_dfdaemon_and_cache_hits_metrics();
collect_download_piece_traffic_metrics(
&TrafficType::LocalPeer,
TaskType::Standard as i32,
content.len() as u64,
);
// Collect the download piece traffic metrics and the proxy request via dfdaemon and
// cache hits metrics.
collect_proxy_request_via_dfdaemon_and_cache_hits_metrics();
collect_download_piece_traffic_metrics(
&TrafficType::LocalPeer,
TaskType::Standard as i32,
content.len() as u64,
);
let body_boxed = Full::new(content).map_err(ClientError::from).boxed();
return Ok(Response::new(body_boxed));
}
Err(err) => {
error!("get content from cache failed: {}", err);
return Ok(make_error_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
None,
));
let body_boxed = Full::new(content).map_err(ClientError::from).boxed();
return Ok(Response::new(body_boxed));
}
Err(err) => {
error!("get content from cache failed: {}", err);
return Ok(make_error_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
None,
));
}
}
}
}
@ -884,30 +890,34 @@ async fn proxy_via_dfdaemon(
return;
}
// If the piece is not in the cache, add it to the cache.
let piece_id =
task.piece.id(message.task_id.as_str(), need_piece_number);
// If cache is enabled, check the cache first.
if let Some(cache) = cache.as_ref() {
// If the piece is not in the cache, add it to the cache.
let piece_id =
task.piece.id(message.task_id.as_str(), need_piece_number);
if !cache.contains_piece(&piece_id) {
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);
loop {
let n = match piece_reader.read_buf(&mut content).await {
Ok(n) => n,
Err(err) => {
error!("read piece reader error: {}", err);
if !cache.contains_piece(&piece_id) {
let mut content =
bytes::BytesMut::with_capacity(piece.length as usize);
loop {
let n = match piece_reader.read_buf(&mut content).await
{
Ok(n) => n,
Err(err) => {
error!("read piece reader error: {}", err);
break;
}
};
// When the piece reader reads to the end, add the piece
// to the cache.
if n == 0 {
cache.add_piece(&piece_id, content.freeze());
break;
}
};
// When the piece reader reads to the end, add the piece
// to the cache.
if n == 0 {
cache.add_piece(&piece_id, content.freeze());
break;
}
}
}
};
need_piece_number += 1;
}