feat: add buffer size config for dfdaemon (#434)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-04-29 14:44:46 +08:00 committed by GitHub
parent 9d945195a7
commit 4849535dc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 80 additions and 30 deletions

16
Cargo.lock generated
View File

@ -934,7 +934,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -997,7 +997,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-backend" name = "dragonfly-client-backend"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"dragonfly-client-core", "dragonfly-client-core",
"futures", "futures",
@ -1015,7 +1015,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-config" name = "dragonfly-client-config"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"dragonfly-client-core", "dragonfly-client-core",
"home", "home",
@ -1034,7 +1034,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-core" name = "dragonfly-client-core"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"libloading", "libloading",
"reqwest", "reqwest",
@ -1045,7 +1045,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-init" name = "dragonfly-client-init"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -1061,7 +1061,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-storage" name = "dragonfly-client-storage"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"base16ct", "base16ct",
"chrono", "chrono",
@ -1084,7 +1084,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-util" name = "dragonfly-client-util"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1542,7 +1542,7 @@ dependencies = [
[[package]] [[package]]
name = "hdfs" name = "hdfs"
version = "0.1.43" version = "0.1.44"
dependencies = [ dependencies = [
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-core", "dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.1.43" version = "0.1.44"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.43" } dragonfly-client = { path = "dragonfly-client", version = "0.1.44" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.43" } dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.44" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.43" } dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.44" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.43" } dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.44" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.43" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.44" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.43" } dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.44" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.43" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.44" }
thiserror = "1.0" thiserror = "1.0"
dragonfly-api = "2.0.110" dragonfly-api = "2.0.110"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }

View File

@ -145,6 +145,18 @@ fn default_dynconfig_refresh_interval() -> Duration {
Duration::from_secs(300) Duration::from_secs(300)
} }
// default_storage_write_buffer_size is the default buffer size for writing piece to disk, default is 16KB.
#[inline]
fn default_storage_write_buffer_size() -> usize {
16 * 1024
}
// default_storage_read_buffer_size is the default buffer size for reading piece from disk, default is 16KB.
#[inline]
fn default_storage_read_buffer_size() -> usize {
16 * 1024
}
// default_seed_peer_cluster_id is the default cluster id of seed peer. // default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline] #[inline]
fn default_seed_peer_cluster_id() -> u64 { fn default_seed_peer_cluster_id() -> u64 {
@ -187,6 +199,12 @@ pub fn default_proxy_server_port() -> u16 {
4001 4001
} }
// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 16KB.
#[inline]
pub fn default_proxy_read_buffer_size() -> usize {
16 * 1024
}
// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id. // default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
#[inline] #[inline]
fn s3_filtered_query_params() -> Vec<String> { fn s3_filtered_query_params() -> Vec<String> {
@ -578,6 +596,14 @@ pub struct Storage {
// dir is the directory to store task's metadata and content. // dir is the directory to store task's metadata and content.
#[serde(default = "crate::default_storage_dir")] #[serde(default = "crate::default_storage_dir")]
pub dir: PathBuf, pub dir: PathBuf,
// write_buffer_size is the buffer size for writing piece to disk, default is 16KB.
#[serde(default = "default_storage_write_buffer_size")]
pub write_buffer_size: usize,
// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_storage_read_buffer_size")]
pub read_buffer_size: usize,
} }
// Storage implements Default. // Storage implements Default.
@ -585,6 +611,8 @@ impl Default for Storage {
fn default() -> Self { fn default() -> Self {
Storage { Storage {
dir: crate::default_storage_dir(), dir: crate::default_storage_dir(),
write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(),
} }
} }
} }
@ -761,7 +789,7 @@ impl Default for RegistryMirror {
} }
// Proxy is the proxy configuration for dfdaemon. // Proxy is the proxy configuration for dfdaemon.
#[derive(Debug, Clone, Default, Validate, Deserialize)] #[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")] #[serde(default, rename_all = "camelCase")]
pub struct Proxy { pub struct Proxy {
// server is the proxy server configuration for dfdaemon. // server is the proxy server configuration for dfdaemon.
@ -779,6 +807,24 @@ pub struct Proxy {
// prefetch pre-downloads full of the task when download with range request. // prefetch pre-downloads full of the task when download with range request.
pub prefetch: bool, pub prefetch: bool,
// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_proxy_read_buffer_size")]
pub read_buffer_size: usize,
}
// Proxy implements Default.
impl Default for Proxy {
fn default() -> Self {
Self {
server: ProxyServer::default(),
rules: None,
registry_mirror: RegistryMirror::default(),
disable_back_to_source: false,
prefetch: false,
read_buffer_size: default_proxy_read_buffer_size(),
}
}
} }
// Security is the security configuration for dfdaemon. // Security is the security configuration for dfdaemon.

View File

@ -15,10 +15,12 @@
*/ */
use dragonfly_api::common::v2::Range; use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result; use dragonfly_client_core::Result;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions}; use tokio::fs::{self, File, OpenOptions};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom}; use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio_util::io::InspectReader; use tokio_util::io::InspectReader;
@ -27,11 +29,11 @@ use tracing::{error, info, warn};
// DEFAULT_DIR_NAME is the default directory name to store content. // DEFAULT_DIR_NAME is the default directory name to store content.
const DEFAULT_DIR_NAME: &str = "content"; const DEFAULT_DIR_NAME: &str = "content";
// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;
// Content is the content of a piece. // Content is the content of a piece.
pub struct Content { pub struct Content {
// config is the configuration of the dfdaemon.
config: Arc<Config>,
// dir is the directory to store content. // dir is the directory to store content.
dir: PathBuf, dir: PathBuf,
} }
@ -48,12 +50,12 @@ pub struct WritePieceResponse {
// Content implements the content storage. // Content implements the content storage.
impl Content { impl Content {
// new returns a new content. // new returns a new content.
pub async fn new(dir: &Path) -> Result<Content> { pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
let dir = dir.join(DEFAULT_DIR_NAME); let dir = dir.join(DEFAULT_DIR_NAME);
fs::create_dir_all(&dir).await?; fs::create_dir_all(&dir).await?;
info!("content initialized directory: {:?}", dir); info!("content initialized directory: {:?}", dir);
Ok(Content { dir }) Ok(Content { config, dir })
} }
// hard_link_or_copy_task hard links or copies the task content to the destination. // hard_link_or_copy_task hard links or copies the task content to the destination.
@ -144,7 +146,8 @@ impl Content {
let range_reader = from_f.take(range.length); let range_reader = from_f.take(range.length);
// Use a buffer to read the range. // Use a buffer to read the range.
let mut range_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, range_reader); let mut range_reader =
BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader);
let mut to_f = OpenOptions::new() let mut to_f = OpenOptions::new()
.create(true) .create(true)
@ -240,7 +243,7 @@ impl Content {
let task_path = self.dir.join(task_id); let task_path = self.dir.join(task_id);
// Use a buffer to read the piece. // Use a buffer to read the piece.
let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, reader); let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
// Sha256 is used to calculate the hash of the piece. // Sha256 is used to calculate the hash of the piece.
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();

View File

@ -48,7 +48,7 @@ impl Storage {
// new returns a new storage. // new returns a new storage.
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> { pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> {
let metadata = metadata::Metadata::new(dir)?; let metadata = metadata::Metadata::new(dir)?;
let content = content::Content::new(dir).await?; let content = content::Content::new(config.clone(), dir).await?;
Ok(Storage { Ok(Storage {
config, config,
metadata, metadata,

View File

@ -65,9 +65,6 @@ use tracing::{error, info, instrument, Span};
pub mod header; pub mod header;
// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;
// Response is the response of the proxy server. // Response is the response of the proxy server.
pub type Response = hyper::Response<BoxBody<Bytes, ClientError>>; pub type Response = hyper::Response<BoxBody<Bytes, ClientError>>;
@ -474,7 +471,7 @@ async fn proxy_by_dfdaemon(
}; };
// Make the download task request. // Make the download task request.
let download_task_request = match make_download_task_request(config, rule, request) { let download_task_request = match make_download_task_request(config.clone(), rule, request) {
Ok(download_task_request) => download_task_request, Ok(download_task_request) => download_task_request,
Err(err) => { Err(err) => {
error!("make download task request failed: {}", err); error!("make download task request failed: {}", err);
@ -562,6 +559,9 @@ async fn proxy_by_dfdaemon(
*response.headers_mut() = make_response_headers(download_task_started_response.clone())?; *response.headers_mut() = make_response_headers(download_task_started_response.clone())?;
*response.status_mut() = http::StatusCode::OK; *response.status_mut() = http::StatusCode::OK;
// Get the read buffer size from the config.
let read_buffer_size = config.proxy.read_buffer_size;
// Write task data to pipe. If grpc received error message, // Write task data to pipe. If grpc received error message,
// shutdown the writer. // shutdown the writer.
tokio::spawn(async move { tokio::spawn(async move {
@ -616,8 +616,9 @@ async fn proxy_by_dfdaemon(
return; return;
} }
}; };
// Use a buffer to read the piece. // Use a buffer to read the piece.
let piece_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, piece_reader); let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader);
// Write the piece data to the pipe in order. // Write the piece data to the pipe in order.
finished_piece_readers.insert(piece.number, piece_reader); finished_piece_readers.insert(piece.number, piece_reader);