feat: add concurrency limit for grpc server (#982)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-02-14 18:31:51 +08:00 committed by GitHub
parent 991ecd34ac
commit d0fa28a879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 51 additions and 2 deletions

View File

@ -60,6 +60,12 @@ pub fn default_download_unix_socket_path() -> PathBuf {
crate::default_root_dir().join("dfdaemon.sock") crate::default_root_dir().join("dfdaemon.sock")
} }
/// default_download_request_rate_limit is the default rate limit of the download request in the
/// download grpc server, default is 4000 req/s.
pub fn default_download_request_rate_limit() -> u64 {
4000
}
/// default_parent_selector_sync_interval is the default interval to sync host information. /// default_parent_selector_sync_interval is the default interval to sync host information.
#[inline] #[inline]
fn default_parent_selector_sync_interval() -> Duration { fn default_parent_selector_sync_interval() -> Duration {
@ -96,6 +102,12 @@ fn default_upload_grpc_server_port() -> u16 {
4000 4000
} }
/// default_upload_request_rate_limit is the default rate limit of the upload request in the
/// upload grpc server, default is 4000 req/s.
pub fn default_upload_request_rate_limit() -> u64 {
4000
}
/// default_upload_rate_limit is the default rate limit of the upload speed in GiB/Mib/Kib per second. /// default_upload_rate_limit is the default rate limit of the upload speed in GiB/Mib/Kib per second.
#[inline] #[inline]
fn default_upload_rate_limit() -> ByteSize { fn default_upload_rate_limit() -> ByteSize {
@ -426,6 +438,11 @@ pub struct DownloadServer {
/// socket_path is the unix socket path for dfdaemon gRPC service. /// socket_path is the unix socket path for dfdaemon gRPC service.
#[serde(default = "default_download_unix_socket_path")] #[serde(default = "default_download_unix_socket_path")]
pub socket_path: PathBuf, pub socket_path: PathBuf,
/// request_rate_limit is the rate limit of the download request in the download grpc server,
/// default is 4000 req/s.
#[serde(default = "default_download_request_rate_limit")]
pub request_rate_limit: u64,
} }
/// DownloadServer implements Default. /// DownloadServer implements Default.
@ -433,6 +450,7 @@ impl Default for DownloadServer {
fn default() -> Self { fn default() -> Self {
DownloadServer { DownloadServer {
socket_path: default_download_unix_socket_path(), socket_path: default_download_unix_socket_path(),
request_rate_limit: default_download_request_rate_limit(),
} }
} }
} }
@ -496,6 +514,11 @@ pub struct UploadServer {
/// key is the server key path with PEM format for the upload server and it is used for /// key is the server key path with PEM format for the upload server and it is used for
/// mutual TLS. /// mutual TLS.
pub key: Option<PathBuf>, pub key: Option<PathBuf>,
/// request_rate_limit is the rate limit of the upload request in the upload grpc server,
/// default is 4000 req/s.
#[serde(default = "default_upload_request_rate_limit")]
pub request_rate_limit: u64,
} }
/// UploadServer implements Default. /// UploadServer implements Default.
@ -507,6 +530,7 @@ impl Default for UploadServer {
ca_cert: None, ca_cert: None,
cert: None, cert: None,
key: None, key: None,
request_rate_limit: default_upload_request_rate_limit(),
} }
} }
} }

View File

@ -79,7 +79,7 @@ prometheus = { version = "0.13", features = ["process"] }
tonic-health = "0.12.3" tonic-health = "0.12.3"
bytes = "1.10" bytes = "1.10"
sysinfo = "0.32.1" sysinfo = "0.32.1"
tower = "0.4.13" tower = { version = "0.4.13", features = ["limit", "load-shed", "buffer"] }
indicatif = "0.17.11" indicatif = "0.17.11"
dashmap = "6.1.0" dashmap = "6.1.0"
hashring = "0.3.6" hashring = "0.3.6"

View File

@ -292,6 +292,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize download grpc server. // Initialize download grpc server.
let mut dfdaemon_download_grpc = DfdaemonDownloadServer::new( let mut dfdaemon_download_grpc = DfdaemonDownloadServer::new(
config.clone(),
config.download.server.socket_path.clone(), config.download.server.socket_path.clone(),
task.clone(), task.clone(),
persistent_cache_task.clone(), persistent_cache_task.clone(),

View File

@ -36,6 +36,7 @@ use dragonfly_api::dfdaemon::v2::{
}; };
use dragonfly_api::errordetails::v2::Backend; use dragonfly_api::errordetails::v2::Backend;
use dragonfly_api::scheduler::v2::DeleteHostRequest as SchedulerDeleteHostRequest; use dragonfly_api::scheduler::v2::DeleteHostRequest as SchedulerDeleteHostRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{ use dragonfly_client_core::{
error::{ErrorType, OrErr}, error::{ErrorType, OrErr},
Error as ClientError, Result as ClientResult, Error as ClientError, Result as ClientResult,
@ -56,13 +57,16 @@ use tonic::{
transport::{Channel, Endpoint, Server, Uri}, transport::{Channel, Endpoint, Server, Uri},
Code, Request, Response, Status, Code, Request, Response, Status,
}; };
use tower::service_fn; use tower::{service_fn, ServiceBuilder};
use tracing::{error, info, instrument, Instrument, Span}; use tracing::{error, info, instrument, Instrument, Span};
use super::interceptor::TracingInterceptor; use super::interceptor::TracingInterceptor;
/// DfdaemonDownloadServer is the grpc unix server of the download. /// DfdaemonDownloadServer is the grpc unix server of the download.
pub struct DfdaemonDownloadServer { pub struct DfdaemonDownloadServer {
/// config is the configuration of the dfdaemon.
config: Arc<Config>,
/// socket_path is the path of the unix domain socket. /// socket_path is the path of the unix domain socket.
socket_path: PathBuf, socket_path: PathBuf,
@ -81,6 +85,7 @@ impl DfdaemonDownloadServer {
/// new creates a new DfdaemonServer. /// new creates a new DfdaemonServer.
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn new( pub fn new(
config: Arc<Config>,
socket_path: PathBuf, socket_path: PathBuf,
task: Arc<task::Task>, task: Arc<task::Task>,
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>, persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
@ -97,6 +102,7 @@ impl DfdaemonDownloadServer {
.max_encoding_message_size(usize::MAX); .max_encoding_message_size(usize::MAX);
Self { Self {
config,
socket_path, socket_path,
service, service,
shutdown, shutdown,
@ -130,16 +136,25 @@ impl DfdaemonDownloadServer {
fs::remove_file(&self.socket_path).await?; fs::remove_file(&self.socket_path).await?;
} }
// Bind the unix domain socket and set the permissions for the socket.
let uds = UnixListener::bind(&self.socket_path)?; let uds = UnixListener::bind(&self.socket_path)?;
let perms = std::fs::Permissions::from_mode(0o660); let perms = std::fs::Permissions::from_mode(0o660);
fs::set_permissions(&self.socket_path, perms).await?; fs::set_permissions(&self.socket_path, perms).await?;
// TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here.
// Only use the LoadShed layer and the ConcurrencyLimit layer.
let layer = ServiceBuilder::new()
.concurrency_limit(self.config.download.server.request_rate_limit as usize)
.load_shed()
.into_inner();
let uds_stream = UnixListenerStream::new(uds); let uds_stream = UnixListenerStream::new(uds);
let server = Server::builder() let server = Server::builder()
.max_frame_size(super::MAX_FRAME_SIZE) .max_frame_size(super::MAX_FRAME_SIZE)
.tcp_keepalive(Some(super::TCP_KEEPALIVE)) .tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
.layer(layer)
.add_service(reflection.clone()) .add_service(reflection.clone())
.add_service(health_service) .add_service(health_service)
.add_service(self.service.clone()) .add_service(self.service.clone())

View File

@ -55,6 +55,7 @@ use tonic::{
transport::{Channel, Server}, transport::{Channel, Server},
Code, Request, Response, Status, Code, Request, Response, Status,
}; };
use tower::ServiceBuilder;
use tracing::{error, info, instrument, Instrument, Span}; use tracing::{error, info, instrument, Instrument, Span};
use url::Url; use url::Url;
@ -127,6 +128,13 @@ impl DfdaemonUploadServer {
.set_serving::<DfdaemonUploadGRPCServer<DfdaemonUploadServerHandler>>() .set_serving::<DfdaemonUploadGRPCServer<DfdaemonUploadServerHandler>>()
.await; .await;
// TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here.
// Only use the LoadShed layer and the ConcurrencyLimit layer.
let layer = ServiceBuilder::new()
.concurrency_limit(self.config.upload.server.request_rate_limit as usize)
.load_shed()
.into_inner();
// Start upload grpc server. // Start upload grpc server.
let mut server_builder = Server::builder(); let mut server_builder = Server::builder();
if let Ok(Some(server_tls_config)) = if let Ok(Some(server_tls_config)) =
@ -143,6 +151,7 @@ impl DfdaemonUploadServer {
.tcp_keepalive(Some(super::TCP_KEEPALIVE)) .tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
.layer(layer)
.add_service(reflection.clone()) .add_service(reflection.clone())
.add_service(health_service) .add_service(health_service)
.add_service(self.service.clone()) .add_service(self.service.clone())