feat: remove repeat piece from piece collector (#1062)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-03-27 22:37:43 +08:00 committed by GitHub
parent 4f3e2f6e4f
commit ce4b748c19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 48 additions and 19 deletions

View File

@ -991,7 +991,8 @@ impl PersistentCacheTask {
host: peer.host,
})
.collect(),
);
)
.await;
let mut piece_collector_rx = piece_collector.run().await;
// Initialize the interrupt. If download from parent failed with scheduler or download

View File

@ -15,15 +15,16 @@
*/
use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
use dashmap::DashMap;
use dragonfly_api::common::v2::Host;
use dragonfly_api::dfdaemon::v2::{SyncPersistentCachePiecesRequest, SyncPiecesRequest};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_storage::metadata;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::{error, info, instrument, Instrument};
@ -68,24 +69,28 @@ pub struct PieceCollector {
interested_pieces: Vec<metadata::Piece>,
/// collected_pieces is the pieces collected from peers.
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
}
/// PieceCollector is used to collect pieces from peers.
impl PieceCollector {
/// new creates a new PieceCollector.
#[instrument(skip_all)]
pub fn new(
pub async fn new(
config: Arc<Config>,
host_id: &str,
task_id: &str,
interested_pieces: Vec<metadata::Piece>,
parents: Vec<CollectedParent>,
) -> Self {
let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len()));
let collected_pieces =
Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len())));
let mut collected_pieces_guard = collected_pieces.lock().await;
for interested_piece in &interested_pieces {
collected_pieces.insert(interested_piece.number, String::new());
collected_pieces_guard.insert(interested_piece.number, String::new());
}
drop(collected_pieces_guard);
Self {
config,
@ -140,7 +145,7 @@ impl PieceCollector {
task_id: &str,
parents: Vec<CollectedParent>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<()> {
@ -154,7 +159,7 @@ impl PieceCollector {
task_id: String,
parent: CollectedParent,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<CollectedParent> {
@ -205,7 +210,12 @@ impl PieceCollector {
// Remove the piece from collected_pieces, avoid to collect the same piece from
// different parents.
collected_pieces.remove(&message.number);
{
let mut collected_pieces_guard = collected_pieces.lock().await;
if collected_pieces_guard.remove(&message.number).is_none() {
continue;
}
}
info!(
"received piece {}-{} metadata from parent {}",
@ -249,7 +259,11 @@ impl PieceCollector {
info!("peer {} sync pieces finished", peer.id);
// If all pieces are collected, abort all tasks.
if collected_pieces.is_empty() {
let collected_pieces_guard = collected_pieces.lock().await;
let is_empty = collected_pieces_guard.is_empty();
drop(collected_pieces_guard);
if !is_empty {
info!("all pieces are collected, abort all tasks");
join_set.abort_all();
}
@ -285,24 +299,28 @@ pub struct PersistentCachePieceCollector {
interested_pieces: Vec<metadata::Piece>,
/// collected_pieces is the pieces collected from peers.
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
}
/// PersistentCachePieceCollector is used to collect persistent cache pieces from peers.
impl PersistentCachePieceCollector {
/// new creates a new PieceCollector.
#[instrument(skip_all)]
pub fn new(
pub async fn new(
config: Arc<Config>,
host_id: &str,
task_id: &str,
interested_pieces: Vec<metadata::Piece>,
parents: Vec<CollectedParent>,
) -> Self {
let collected_pieces = Arc::new(DashMap::with_capacity(interested_pieces.len()));
let collected_pieces =
Arc::new(Mutex::new(HashMap::with_capacity(interested_pieces.len())));
let mut collected_pieces_guard = collected_pieces.lock().await;
for interested_piece in &interested_pieces {
collected_pieces.insert(interested_piece.number, String::new());
collected_pieces_guard.insert(interested_piece.number, String::new());
}
drop(collected_pieces_guard);
Self {
config,
@ -357,7 +375,7 @@ impl PersistentCachePieceCollector {
task_id: &str,
parents: Vec<CollectedParent>,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<()> {
@ -371,7 +389,7 @@ impl PersistentCachePieceCollector {
task_id: String,
parent: CollectedParent,
interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, String>>,
collected_pieces: Arc<Mutex<HashMap<u32, String>>>,
collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration,
) -> Result<CollectedParent> {
@ -428,7 +446,12 @@ impl PersistentCachePieceCollector {
// Remove the piece from collected_pieces, avoid to collect the same piece from
// different parents.
collected_pieces.remove(&message.number);
{
let mut collected_pieces_guard = collected_pieces.lock().await;
if collected_pieces_guard.remove(&message.number).is_none() {
continue;
}
}
info!(
"received persistent cache piece {}-{} metadata from parent {}",
@ -472,7 +495,11 @@ impl PersistentCachePieceCollector {
info!("peer {} sync persistent cache pieces finished", peer.id);
// If all pieces are collected, abort all tasks.
if collected_pieces.is_empty() {
let collected_pieces_guard = collected_pieces.lock().await;
let is_empty = collected_pieces_guard.is_empty();
drop(collected_pieces_guard);
if !is_empty {
info!("all persistent cache pieces are collected, abort all tasks");
join_set.abort_all();
}

View File

@ -945,7 +945,8 @@ impl Task {
host: peer.host,
})
.collect(),
);
)
.await;
let mut piece_collector_rx = piece_collector.run().await;
// Initialize the interrupt. If download from parent failed with scheduler or download