feat: collects pieces from multiple parents with load balancing strategy (#1173)

* feat: collects pieces from multiple parents with load balancing strategy

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: update

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: update

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: update

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-06-05 19:19:23 +08:00 committed by GitHub
parent 7f5b517f37
commit d899d9982f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 81 deletions

36
Cargo.lock generated
View File

@ -824,6 +824,20 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.9",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.5.0" version = "2.5.0"
@ -940,19 +954,21 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
"bytesize", "bytesize",
"chrono", "chrono",
"clap", "clap",
"dashmap",
"dragonfly-api", "dragonfly-api",
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-config", "dragonfly-client-config",
"dragonfly-client-core", "dragonfly-client-core",
"dragonfly-client-storage", "dragonfly-client-storage",
"dragonfly-client-util", "dragonfly-client-util",
"fastrand",
"fs2", "fs2",
"fslock", "fslock",
"futures", "futures",
@ -1010,7 +1026,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-backend" name = "dragonfly-client-backend"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1041,7 +1057,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-config" name = "dragonfly-client-config"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"bytesize", "bytesize",
"bytesize-serde", "bytesize-serde",
@ -1069,7 +1085,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-core" name = "dragonfly-client-core"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"headers 0.4.0", "headers 0.4.0",
"hyper 1.6.0", "hyper 1.6.0",
@ -1087,7 +1103,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-init" name = "dragonfly-client-init"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -1105,7 +1121,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-storage" name = "dragonfly-client-storage"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"bincode", "bincode",
"bytes", "bytes",
@ -1133,7 +1149,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-util" name = "dragonfly-client-util"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytesize", "bytesize",
@ -1230,9 +1246,9 @@ dependencies = [
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "2.1.1" version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]] [[package]]
name = "findshlibs" name = "findshlibs"
@ -1548,7 +1564,7 @@ dependencies = [
[[package]] [[package]]
name = "hdfs" name = "hdfs"
version = "0.2.33" version = "0.2.34"
dependencies = [ dependencies = [
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-core", "dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.2.33" version = "0.2.34"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.33" } dragonfly-client = { path = "dragonfly-client", version = "0.2.34" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.33" } dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.34" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.33" } dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.34" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.33" } dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.34" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.33" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.34" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.33" } dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.34" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.33" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.34" }
dragonfly-api = "=2.1.39" dragonfly-api = "=2.1.39"
thiserror = "2.0" thiserror = "2.0"
futures = "0.3.31" futures = "0.3.31"

View File

@ -84,6 +84,8 @@ http-body-util = "0.1.3"
termion = "4.0.5" termion = "4.0.5"
tabled = "0.19.0" tabled = "0.19.0"
path-absolutize = "3.1.1" path-absolutize = "3.1.1"
dashmap = "6.1.0"
fastrand = "2.3.0"
[dev-dependencies] [dev-dependencies]
tempfile.workspace = true tempfile.workspace = true

View File

@ -133,11 +133,6 @@ impl DfdaemonDownloadServer {
// Initialize health reporter. // Initialize health reporter.
let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
// Set the serving status of the download grpc server.
health_reporter
.set_serving::<DfdaemonDownloadGRPCServer<DfdaemonDownloadServerHandler>>()
.await;
// Start download grpc server with unix domain socket. // Start download grpc server with unix domain socket.
fs::create_dir_all(self.socket_path.parent().unwrap()).await?; fs::create_dir_all(self.socket_path.parent().unwrap()).await?;
fs::remove_file(self.socket_path.clone()) fs::remove_file(self.socket_path.clone())
@ -175,6 +170,12 @@ impl DfdaemonDownloadServer {
// Notify the download grpc server is started. // Notify the download grpc server is started.
_ = grpc_server_started_barrier.wait() => { _ = grpc_server_started_barrier.wait() => {
info!("download server is ready to start"); info!("download server is ready to start");
health_reporter
.set_serving::<DfdaemonDownloadGRPCServer<DfdaemonDownloadServerHandler>>()
.await;
info!("download server's health status set to serving");
} }
// Wait for shutdown signal. // Wait for shutdown signal.
_ = shutdown.recv() => { _ = shutdown.recv() => {

View File

@ -143,11 +143,6 @@ impl DfdaemonUploadServer {
// Initialize health reporter. // Initialize health reporter.
let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
// Set the serving status of the upload grpc server.
health_reporter
.set_serving::<DfdaemonUploadGRPCServer<DfdaemonUploadServerHandler>>()
.await;
// TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here. // TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here.
// Only use the LoadShed layer and the ConcurrencyLimit layer. // Only use the LoadShed layer and the ConcurrencyLimit layer.
let rate_limit_layer = ServiceBuilder::new() let rate_limit_layer = ServiceBuilder::new()
@ -180,7 +175,13 @@ impl DfdaemonUploadServer {
tokio::select! { tokio::select! {
// Notify the upload grpc server is started. // Notify the upload grpc server is started.
_ = grpc_server_started_barrier.wait() => { _ = grpc_server_started_barrier.wait() => {
info!("upload server is ready"); info!("upload server is ready to start");
health_reporter
.set_serving::<DfdaemonUploadGRPCServer<DfdaemonUploadServerHandler>>()
.await;
info!("upload server's health status set to serving");
} }
// Wait for shutdown signal. // Wait for shutdown signal.
_ = shutdown.recv() => { _ = shutdown.recv() => {

View File

@ -15,20 +15,21 @@
*/ */
use crate::grpc::dfdaemon_upload::DfdaemonUploadClient; use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
use dashmap::DashMap;
use dragonfly_api::common::v2::Host; use dragonfly_api::common::v2::Host;
use dragonfly_api::dfdaemon::v2::{SyncPersistentCachePiecesRequest, SyncPiecesRequest}; use dragonfly_api::dfdaemon::v2::{SyncPersistentCachePiecesRequest, SyncPiecesRequest};
use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result}; use dragonfly_client_core::{Error, Result};
use dragonfly_client_storage::metadata; use dragonfly_client_storage::metadata;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{error, info, instrument, Instrument}; use tracing::{error, info, instrument, Instrument};
const DEFAULT_WAIT_FOR_PIECE_FROM_DIFFERENT_PARENTS: Duration = Duration::from_millis(5);
/// CollectedParent is the parent peer collected from the parent. /// CollectedParent is the parent peer collected from the parent.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CollectedParent { pub struct CollectedParent {
@ -68,8 +69,8 @@ pub struct PieceCollector {
/// interested_pieces is the pieces interested by the collector. /// interested_pieces is the pieces interested by the collector.
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
/// collected_pieces is the pieces collected from peers. /// collected_pieces is a map to store the collected pieces from different parents.
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
} }
/// PieceCollector is used to collect pieces from peers. /// PieceCollector is used to collect pieces from peers.
@ -82,14 +83,10 @@ impl PieceCollector {
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
parents: Vec<CollectedParent>, parents: Vec<CollectedParent>,
) -> Self { ) -> Self {
let collected_pieces = let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len()));
Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len())));
let mut collected_pieces_guard = collected_pieces.lock().await;
for interested_piece in &interested_pieces { for interested_piece in &interested_pieces {
collected_pieces_guard.insert(interested_piece.number, String::new()); collected_pieces.insert(interested_piece.number, Vec::new());
} }
drop(collected_pieces_guard);
Self { Self {
config, config,
@ -135,7 +132,25 @@ impl PieceCollector {
collected_piece_rx collected_piece_rx
} }
/// collect_from_parents collects pieces from parents. /// collect_from_parents collects pieces from multiple parents with load balancing strategy.
///
/// The collection process works in two phases:
/// 1. **Synchronization Phase**: Waits for a configured duration (DEFAULT_WAIT_FOR_PIECE_FROM_DIFFERENT_PARENTS)
/// to collect the same piece information from different parents. This allows the collector
/// to gather multiple sources for each piece.
///
/// 2. **Selection Phase**: After the wait period, randomly selects one parent from the available
/// candidates for each piece and forwards it to the piece downloader.
///
/// **Load Balancing Strategy**:
/// The random parent selection is designed to distribute download load across multiple parents
/// during concurrent piece downloads. This approach ensures:
/// - Optimal utilization of bandwidth from multiple parent nodes
/// - Prevention of overwhelming any single parent with too many requests
/// - Better overall download performance through parallel connections
///
/// This strategy is particularly effective when downloading multiple pieces simultaneously,
/// as it naturally spreads the workload across the available parent pool.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn collect_from_parents( async fn collect_from_parents(
@ -144,7 +159,7 @@ impl PieceCollector {
task_id: &str, task_id: &str,
parents: Vec<CollectedParent>, parents: Vec<CollectedParent>,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration, collected_piece_timeout: Duration,
) -> Result<()> { ) -> Result<()> {
@ -158,7 +173,7 @@ impl PieceCollector {
task_id: String, task_id: String,
parent: CollectedParent, parent: CollectedParent,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration, collected_piece_timeout: Duration,
) -> Result<CollectedParent> { ) -> Result<CollectedParent> {
@ -206,18 +221,33 @@ impl PieceCollector {
error!("sync pieces from parent {} failed: {}", parent.id, err); error!("sync pieces from parent {} failed: {}", parent.id, err);
})? { })? {
let message = message?; let message = message?;
if let Some(mut parents) = collected_pieces.get_mut(&message.number) {
// Remove the piece from collected_pieces, avoid to collect the same piece from parents.push(parent.clone());
// different parents. } else {
{
let mut collected_pieces_guard = collected_pieces.lock().await;
if collected_pieces_guard.remove(&message.number).is_none() {
continue; continue;
} }
// Wait for collecting the piece from different parents when the first
// piece is collected.
tokio::time::sleep(DEFAULT_WAIT_FOR_PIECE_FROM_DIFFERENT_PARENTS).await;
let parents = match collected_pieces.remove(&message.number) {
Some((_, parents)) => parents,
None => continue,
};
let parent = match parents.get(fastrand::usize(..parents.len())) {
Some(parent) => parent,
None => {
error!(
"collected_pieces does not contain parent for piece {}",
message.number
);
continue;
} }
};
info!( info!(
"received piece {}-{} metadata from parent {}", "picked up piece {}-{} metadata from parent {}",
task_id, message.number, parent.id task_id, message.number, parent.id
); );
@ -258,11 +288,7 @@ impl PieceCollector {
info!("peer {} sync pieces finished", peer.id); info!("peer {} sync pieces finished", peer.id);
// If all pieces are collected, abort all tasks. // If all pieces are collected, abort all tasks.
let collected_pieces_guard = collected_pieces.lock().await; if collected_pieces.is_empty() {
let is_empty = collected_pieces_guard.is_empty();
drop(collected_pieces_guard);
if is_empty {
info!("all pieces are collected, abort all tasks"); info!("all pieces are collected, abort all tasks");
join_set.abort_all(); join_set.abort_all();
} }
@ -297,8 +323,8 @@ pub struct PersistentCachePieceCollector {
/// interested_pieces is the pieces interested by the collector. /// interested_pieces is the pieces interested by the collector.
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
/// collected_pieces is the pieces collected from peers. /// collected_pieces is a map to store the collected pieces from different parents.
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
} }
/// PersistentCachePieceCollector is used to collect persistent cache pieces from peers. /// PersistentCachePieceCollector is used to collect persistent cache pieces from peers.
@ -311,14 +337,10 @@ impl PersistentCachePieceCollector {
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
parents: Vec<CollectedParent>, parents: Vec<CollectedParent>,
) -> Self { ) -> Self {
let collected_pieces = let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len()));
Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len())));
let mut collected_pieces_guard = collected_pieces.lock().await;
for interested_piece in &interested_pieces { for interested_piece in &interested_pieces {
collected_pieces_guard.insert(interested_piece.number, String::new()); collected_pieces.insert(interested_piece.number, Vec::new());
} }
drop(collected_pieces_guard);
Self { Self {
config, config,
@ -364,7 +386,25 @@ impl PersistentCachePieceCollector {
collected_piece_rx collected_piece_rx
} }
/// collect_from_parents collects pieces from parents. /// collect_from_parents collects pieces from multiple parents with load balancing strategy.
///
/// The collection process works in two phases:
/// 1. **Synchronization Phase**: Waits for a configured duration (DEFAULT_WAIT_FOR_PIECE_FROM_DIFFERENT_PARENTS)
/// to collect the same piece information from different parents. This allows the collector
/// to gather multiple sources for each piece.
///
/// 2. **Selection Phase**: After the wait period, randomly selects one parent from the available
/// candidates for each piece and forwards it to the piece downloader.
///
/// **Load Balancing Strategy**:
/// The random parent selection is designed to distribute download load across multiple parents
/// during concurrent piece downloads. This approach ensures:
/// - Optimal utilization of bandwidth from multiple parent nodes
/// - Prevention of overwhelming any single parent with too many requests
/// - Better overall download performance through parallel connections
///
/// This strategy is particularly effective when downloading multiple pieces simultaneously,
/// as it naturally spreads the workload across the available parent pool.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn collect_from_parents( async fn collect_from_parents(
@ -373,7 +413,7 @@ impl PersistentCachePieceCollector {
task_id: &str, task_id: &str,
parents: Vec<CollectedParent>, parents: Vec<CollectedParent>,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration, collected_piece_timeout: Duration,
) -> Result<()> { ) -> Result<()> {
@ -387,7 +427,7 @@ impl PersistentCachePieceCollector {
task_id: String, task_id: String,
parent: CollectedParent, parent: CollectedParent,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>, collected_pieces: Arc<DashMap<u32, Vec<CollectedParent>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration, collected_piece_timeout: Duration,
) -> Result<CollectedParent> { ) -> Result<CollectedParent> {
@ -441,18 +481,33 @@ impl PersistentCachePieceCollector {
); );
})? { })? {
let message = message?; let message = message?;
if let Some(mut parents) = collected_pieces.get_mut(&message.number) {
// Remove the piece from collected_pieces, avoid to collect the same piece from parents.push(parent.clone());
// different parents. } else {
{
let mut collected_pieces_guard = collected_pieces.lock().await;
if collected_pieces_guard.remove(&message.number).is_none() {
continue; continue;
} }
// Wait for collecting the piece from different parents when the first
// piece is collected.
tokio::time::sleep(DEFAULT_WAIT_FOR_PIECE_FROM_DIFFERENT_PARENTS).await;
let parents = match collected_pieces.remove(&message.number) {
Some((_, parents)) => parents,
None => continue,
};
let parent = match parents.get(fastrand::usize(..parents.len())) {
Some(parent) => parent,
None => {
error!(
"collected_pieces does not contain parent for piece {}",
message.number
);
continue;
} }
};
info!( info!(
"received persistent cache piece {}-{} metadata from parent {}", "picked up piece {}-{} metadata from parent {}",
task_id, message.number, parent.id task_id, message.number, parent.id
); );
@ -493,11 +548,7 @@ impl PersistentCachePieceCollector {
info!("peer {} sync persistent cache pieces finished", peer.id); info!("peer {} sync persistent cache pieces finished", peer.id);
// If all pieces are collected, abort all tasks. // If all pieces are collected, abort all tasks.
let collected_pieces_guard = collected_pieces.lock().await; if collected_pieces.is_empty() {
let is_empty = collected_pieces_guard.is_empty();
drop(collected_pieces_guard);
if is_empty {
info!("all persistent cache pieces are collected, abort all tasks"); info!("all persistent cache pieces are collected, abort all tasks");
join_set.abort_all(); join_set.abort_all();
} }