From c5d47519327c70fbe4e8b58390a3261cd7aed2d9 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 11 Jan 2024 18:16:56 +0800 Subject: [PATCH] feat: add rate limit to download server and upload server (#212) Signed-off-by: Gaius --- Cargo.lock | 8 +++--- Cargo.toml | 2 +- src/bin/dfget/main.rs | 8 ------ src/config/dfdaemon.rs | 32 +++++++++++----------- src/grpc/dfdaemon_upload.rs | 20 +++----------- src/task/mod.rs | 10 +++++-- src/task/piece.rs | 53 +++++++++++++++++++++++++++++++++++-- src/task/piece_collector.rs | 21 ++++++++------- 8 files changed, 96 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b6658fe..ad039aae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,9 +523,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.75" +version = "2.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caed4f4abde1457d99274ad0c6da115740645271db8ca115c8c02dfd4649cf39" +checksum = "358cffc9a9b0b2f0e08496b9c6e3fd044a916d1a3139760a27fcf60120deba3b" dependencies = [ "prost 0.11.9", "prost-types 0.12.3", @@ -838,9 +838,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" dependencies = [ "bytes", "fnv", diff --git a/Cargo.toml b/Cargo.toml index a5b14992..aa26c8cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ local-ip-address = "0.5.3" rocksdb = "0.21.0" num_cpus = "1.0" chrono = { version = "0.4.26", features = ["serde"] } -dragonfly-api = "2.0.75" +dragonfly-api = "2.0.78" sysinfo = "0.29.6" sha2 = "0.10" hex = "0.4" diff --git a/src/bin/dfget/main.rs b/src/bin/dfget/main.rs index 98909fdd..4810f4f8 100644 --- a/src/bin/dfget/main.rs +++ b/src/bin/dfget/main.rs @@ -83,13 +83,6 @@ struct Args { )] piece_length: u64, - #[arg( - long = "download-rate-limit", - default_value_t = 2147483648, - help = "Specify the rate limit of the downloading in bytes per second" - )] - download_rate_limit: u64, - #[arg( short = 'd', long = "digest", @@ -254,7 +247,6 @@ async fn main() -> Result<(), anyhow::Error> { piece_length: args.piece_length, output_path: args.output.into_os_string().into_string().unwrap(), timeout: Some(prost_wkt_types::Duration::try_from(args.timeout)?), - download_rate_limit: Some(args.download_rate_limit), need_back_to_source: false, }), }) diff --git a/src/config/dfdaemon.rs b/src/config/dfdaemon.rs index 58ef3687..ea4acb9e 100644 --- a/src/config/dfdaemon.rs +++ b/src/config/dfdaemon.rs @@ -72,11 +72,11 @@ fn default_upload_grpc_server_port() -> u16 { 4000 } -// default_upload_bandwidth is the default upload speed in bps(bytes per second). +// default_upload_rate_limit is the default rate limit of the upload speed in bps(bytes per second). #[inline] -fn default_upload_bandwidth() -> u64 { - // Default bandwidth is 8.192Gbps. - 8_192_000_000 +fn default_upload_rate_limit() -> u64 { + // Default rate limit is 10Gbps. + 10_000_000_000 } // default_metrics_server_port is the default port of the metrics server. @@ -85,11 +85,11 @@ fn default_metrics_server_port() -> u16 { 4001 } -// default_download_bandwidth is the default download speed in bps(bytes per second). +// default_download_rate_limit is the default rate limit of the download speed in bps(bytes per second). #[inline] -fn default_download_bandwidth() -> u64 { - // Default bandwidth is 8.192Gbps. - 8_192_000_000 +fn default_download_rate_limit() -> u64 { + // Default rate limit is 10Gbps. + 10_000_000_000 } // default_download_piece_timeout is the default timeout for downloading a piece from source. @@ -248,9 +248,9 @@ pub struct Download { // server is the download server configuration for dfdaemon. pub server: DownloadServer, - // bandwidth is the download speed in bps(bytes per second). - #[serde(default = "default_download_bandwidth")] - pub bandwidth: u64, + // rate_limit is the rate limit of the download speed in bps(bytes per second). + #[serde(default = "default_download_rate_limit")] + pub rate_limit: u64, // piece_timeout is the timeout for downloading a piece from source. #[serde(default = "default_download_piece_timeout", with = "humantime_serde")] @@ -267,7 +267,7 @@ impl Default for Download { fn default() -> Self { Download { server: DownloadServer::default(), - bandwidth: default_download_bandwidth(), + rate_limit: default_download_rate_limit(), piece_timeout: default_download_piece_timeout(), concurrent_piece_count: default_download_concurrent_piece_count(), } @@ -303,9 +303,9 @@ pub struct Upload { // server is the upload server configuration for dfdaemon. pub server: UploadServer, - // bandwidth is the upload speed in bps(bytes per second). - #[serde(default = "default_upload_bandwidth")] - pub bandwidth: u64, + // rate_limit is the rate limit of the upload speed in bps(bytes per second). + #[serde(default = "default_upload_rate_limit")] + pub rate_limit: u64, } // Upload implements Default. @@ -313,7 +313,7 @@ impl Default for Upload { fn default() -> Self { Upload { server: UploadServer::default(), - bandwidth: default_download_bandwidth(), + rate_limit: default_upload_rate_limit(), } } } diff --git a/src/grpc/dfdaemon_upload.rs b/src/grpc/dfdaemon_upload.rs index a990f630..38780b40 100644 --- a/src/grpc/dfdaemon_upload.rs +++ b/src/grpc/dfdaemon_upload.rs @@ -27,7 +27,6 @@ use dragonfly_api::dfdaemon::v2::{ DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, SyncPiecesRequest, SyncPiecesResponse, TriggerDownloadTaskRequest, }; -use leaky_bucket::RateLimiter; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -69,13 +68,6 @@ impl DfdaemonUploadServer { let service = DfdaemonUploadGRPCServer::new(DfdaemonUploadServerHandler { config: config.clone(), task, - rate_limiter: Arc::new( - RateLimiter::builder() - .initial(config.upload.bandwidth as usize) - .refill(config.upload.bandwidth as usize) - .interval(Duration::from_secs(1)) - .build(), - ), }) .max_decoding_message_size(usize::MAX); @@ -130,9 +122,6 @@ pub struct DfdaemonUploadServerHandler { // task is the task manager. task: Arc, - - // rate_limiter is the rate limiter of the upload speed. - rate_limiter: Arc, } // DfdaemonUploadServerHandler implements the dfdaemon upload grpc service. @@ -196,7 +185,9 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { if piece.is_finished() { out_stream_tx .send(Ok(SyncPiecesResponse { - piece_number: piece.number, + number: piece.number, + offset: piece.offset, + length: piece.length, })) .await .unwrap_or_else(|err| { @@ -280,14 +271,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Status::not_found("piece metadata not found") })?; - // Acquire by the upload bandwidth. - self.rate_limiter.acquire(piece.length as usize).await; - // Get the piece content from the local storage. let mut reader = self .task .piece - .download_from_local_peer_into_async_read(task_id.as_str(), piece_number) + .upload_from_local_peer_into_async_read(task_id.as_str(), piece_number, piece.length) .await .map_err(|err| { error!("read piece content from local storage: {}", err); diff --git a/src/task/mod.rs b/src/task/mod.rs index 3b38016a..7b9df611 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -722,6 +722,7 @@ impl Task { async fn download_from_remote_peer( task_id: String, number: u32, + length: u64, parent: Peer, piece: Arc, storage: Arc, @@ -742,7 +743,7 @@ impl Task { })?; let metadata = piece - .download_from_remote_peer(task_id.as_str(), number, parent.clone()) + .download_from_remote_peer(task_id.as_str(), number, length, parent.clone()) .await .map_err(|err| { error!( @@ -764,6 +765,7 @@ impl Task { download_from_remote_peer( task_id.to_string(), collect_piece.number, + collect_piece.length, collect_piece.parent.clone(), self.piece.clone(), self.storage.clone(), @@ -1156,7 +1158,11 @@ impl Task { for interested_piece in interested_pieces { let mut reader = match self .piece - .download_from_local_peer_into_async_read(task_id, interested_piece.number) + .download_from_local_peer_into_async_read( + task_id, + interested_piece.number, + interested_piece.length, + ) .await { Ok(reader) => reader, diff --git a/src/task/piece.rs b/src/task/piece.rs index ab2f2e54..be160a93 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -23,9 +23,11 @@ use crate::{Error, HTTPError, Result}; use chrono::Utc; use dragonfly_api::common::v2::{Peer, Range}; use dragonfly_api::dfdaemon::v2::DownloadPieceRequest; +use leaky_bucket::RateLimiter; use reqwest::header::{self, HeaderMap}; use sha2::{Digest, Sha256}; use std::sync::Arc; +use std::time::Duration; use tokio::{ fs, io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, SeekFrom}, @@ -52,6 +54,12 @@ pub struct Piece { // http_client is the http client. http_client: Arc, + + // download_rate_limiter is the rate limiter of the download speed in bps(bytes per second). + download_rate_limiter: Arc, + + // upload_rate_limiter is the rate limiter of the upload speed in bps(bytes per second). + upload_rate_limiter: Arc, } // Piece implements the piece manager. @@ -59,9 +67,24 @@ impl Piece { // new returns a new Piece. pub fn new(config: Arc, storage: Arc, http_client: Arc) -> Self { Self { - config, + config: config.clone(), storage, http_client, + download_rate_limiter: Arc::new( + RateLimiter::builder() + .initial(config.download.rate_limit as usize) + .refill(config.download.rate_limit as usize) + .interval(Duration::from_secs(1)) + .fair(false) + .build(), + ), + upload_rate_limiter: Arc::new( + RateLimiter::builder() + .initial(config.upload.rate_limit as usize) + .refill(config.upload.rate_limit as usize) + .interval(Duration::from_secs(1)) + .build(), + ), } } @@ -229,12 +252,30 @@ impl Piece { .collect::>() } + // upload_from_local_peer_into_async_read uploads a single piece from a local peer. + pub async fn upload_from_local_peer_into_async_read( + &self, + task_id: &str, + number: u32, + length: u64, + ) -> Result { + // Acquire the upload rate limiter. + self.upload_rate_limiter.acquire(length as usize).await; + + // Upload the piece content. + self.storage.upload_piece(task_id, number).await + } + // download_from_local_peer_into_async_read downloads a single piece from a local peer. pub async fn download_from_local_peer_into_async_read( &self, task_id: &str, number: u32, + length: u64, ) -> Result { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + // Upload the piece content. self.storage.upload_piece(task_id, number).await } @@ -244,8 +285,12 @@ impl Piece { &self, task_id: &str, number: u32, + length: u64, parent: Peer, ) -> Result { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + // Record the start of downloading piece. self.storage.download_piece_started(task_id, number).await?; @@ -335,10 +380,11 @@ impl Piece { &self, task_id: &str, number: u32, + length: u64, parent: Peer, ) -> Result { // Download the piece from the remote peer. - self.download_from_remote_peer(task_id, number, parent) + self.download_from_remote_peer(task_id, number, length, parent) .await?; // Return reader of the piece. @@ -356,6 +402,9 @@ impl Piece { length: u64, header: HeaderMap, ) -> Result { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + // Record the start of downloading piece. self.storage.download_piece_started(task_id, number).await?; diff --git a/src/task/piece_collector.rs b/src/task/piece_collector.rs index 6e5dc521..97b23b5b 100644 --- a/src/task/piece_collector.rs +++ b/src/task/piece_collector.rs @@ -33,6 +33,9 @@ pub struct CollectedPiece { // number is the piece number. pub number: u32, + // length is the piece length. + pub length: u64, + // parent is the parent peer. pub parent: Peer, } @@ -168,20 +171,19 @@ impl PieceCollector { while let Some(message) = out_stream.try_next().await? { let message = message?; - collected_pieces - .entry(message.piece_number) - .and_modify(|peers| { - peers.insert(parent.id.clone()); - }); + collected_pieces.entry(message.number).and_modify(|peers| { + peers.insert(parent.id.clone()); + }); info!( "received piece metadata {} from parent {}", - message.piece_number, parent.id + message.number, parent.id ); - match collected_pieces.get(&message.piece_number) { + match collected_pieces.get(&message.number) { Some(parent_ids) => { if let Some(parent_id) = parent_ids.iter().next() { - let number = message.piece_number; + let number = message.number; + let length = message.length; let parent = parents .iter() .find(|parent| parent.id == parent_id.as_str()) @@ -193,6 +195,7 @@ impl PieceCollector { collected_piece_tx .send(CollectedPiece { number, + length, parent: parent.clone(), }) .await?; @@ -202,7 +205,7 @@ impl PieceCollector { }; // Remove the piece from collected_pieces. - collected_pieces.remove(&message.piece_number); + collected_pieces.remove(&message.number); } Ok(parent)