From ce4b748c19d707d3985b801c5394f0079cf4e804 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 27 Mar 2025 22:37:43 +0800 Subject: [PATCH] feat: remove repeat piece from piece collector (#1062) Signed-off-by: Gaius --- .../src/resource/persistent_cache_task.rs | 3 +- .../src/resource/piece_collector.rs | 61 +++++++++++++------ dragonfly-client/src/resource/task.rs | 3 +- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 3e665a78..d47b7e0c 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -991,7 +991,8 @@ impl PersistentCacheTask { host: peer.host, }) .collect(), - ); + ) + .await; let mut piece_collector_rx = piece_collector.run().await; // Initialize the interrupt. If download from parent failed with scheduler or download diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 198cf2df..2f9197c4 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -15,15 +15,16 @@ */ use crate::grpc::dfdaemon_upload::DfdaemonUploadClient; -use dashmap::DashMap; use dragonfly_api::common::v2::Host; use dragonfly_api::dfdaemon::v2::{SyncPersistentCachePiecesRequest, SyncPiecesRequest}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_storage::metadata; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::Mutex; use tokio::task::JoinSet; use tokio_stream::StreamExt; use tracing::{error, info, instrument, Instrument}; @@ -68,24 +69,28 @@ pub struct PieceCollector { interested_pieces: Vec, /// collected_pieces is the pieces collected from peers. - collected_pieces: Arc>, + collected_pieces: Arc>>, } /// PieceCollector is used to collect pieces from peers. impl PieceCollector { /// new creates a new PieceCollector. #[instrument(skip_all)] - pub fn new( + pub async fn new( config: Arc, host_id: &str, task_id: &str, interested_pieces: Vec, parents: Vec, ) -> Self { - let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len())); + let collected_pieces = + Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len()))); + + let mut collected_pieces_guard = collected_pieces.lock().await; for interested_piece in &interested_pieces { - collected_pieces.insert(interested_piece.number, String::new()); + collected_pieces_guard.insert(interested_piece.number, String::new()); } + drop(collected_pieces_guard); Self { config, @@ -140,7 +145,7 @@ impl PieceCollector { task_id: &str, parents: Vec, interested_pieces: Vec, - collected_pieces: Arc>, + collected_pieces: Arc>>, collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result<()> { @@ -154,7 +159,7 @@ impl PieceCollector { task_id: String, parent: CollectedParent, interested_pieces: Vec, - collected_pieces: Arc>, + collected_pieces: Arc>>, collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result { @@ -205,7 +210,12 @@ impl PieceCollector { // Remove the piece from collected_pieces, avoid to collect the same piece from // different parents. - collected_pieces.remove(&message.number); + { + let mut collected_pieces_guard = collected_pieces.lock().await; + if collected_pieces_guard.remove(&message.number).is_none() { + continue; + } + } info!( "received piece {}-{} metadata from parent {}", @@ -249,7 +259,11 @@ impl PieceCollector { info!("peer {} sync pieces finished", peer.id); // If all pieces are collected, abort all tasks. - if collected_pieces.is_empty() { + let collected_pieces_guard = collected_pieces.lock().await; + let is_empty = collected_pieces_guard.is_empty(); + drop(collected_pieces_guard); + + if !is_empty { info!("all pieces are collected, abort all tasks"); join_set.abort_all(); } @@ -285,24 +299,28 @@ pub struct PersistentCachePieceCollector { interested_pieces: Vec, /// collected_pieces is the pieces collected from peers. - collected_pieces: Arc>, + collected_pieces: Arc>>, } /// PersistentCachePieceCollector is used to collect persistent cache pieces from peers. impl PersistentCachePieceCollector { /// new creates a new PieceCollector. #[instrument(skip_all)] - pub fn new( + pub async fn new( config: Arc, host_id: &str, task_id: &str, interested_pieces: Vec, parents: Vec, ) -> Self { - let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len())); + let collected_pieces = + Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len()))); + + let mut collected_pieces_guard = collected_pieces.lock().await; for interested_piece in &interested_pieces { - collected_pieces.insert(interested_piece.number, String::new()); + collected_pieces_guard.insert(interested_piece.number, String::new()); } + drop(collected_pieces_guard); Self { config, @@ -357,7 +375,7 @@ impl PersistentCachePieceCollector { task_id: &str, parents: Vec, interested_pieces: Vec, - collected_pieces: Arc>, + collected_pieces: Arc>>, collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result<()> { @@ -371,7 +389,7 @@ impl PersistentCachePieceCollector { task_id: String, parent: CollectedParent, interested_pieces: Vec, - collected_pieces: Arc>, + collected_pieces: Arc>>, collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result { @@ -428,7 +446,12 @@ impl PersistentCachePieceCollector { // Remove the piece from collected_pieces, avoid to collect the same piece from // different parents. - collected_pieces.remove(&message.number); + { + let mut collected_pieces_guard = collected_pieces.lock().await; + if collected_pieces_guard.remove(&message.number).is_none() { + continue; + } + } info!( "received persistent cache piece {}-{} metadata from parent {}", @@ -472,7 +495,11 @@ impl PersistentCachePieceCollector { info!("peer {} sync persistent cache pieces finished", peer.id); // If all pieces are collected, abort all tasks. - if collected_pieces.is_empty() { + let collected_pieces_guard = collected_pieces.lock().await; + let is_empty = collected_pieces_guard.is_empty(); + drop(collected_pieces_guard); + + if !is_empty { info!("all persistent cache pieces are collected, abort all tasks"); join_set.abort_all(); } diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 5d7c0259..9b59244f 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -945,7 +945,8 @@ impl Task { host: peer.host, }) .collect(), - ); + ) + .await; let mut piece_collector_rx = piece_collector.run().await; // Initialize the interrupt. If download from parent failed with scheduler or download