From 55a42d449ba35c1b01b0bd5576d7ae981ece3310 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 9 Jan 2024 12:31:47 +0800 Subject: [PATCH] feat: add log for collecter spawn (#204) Signed-off-by: Gaius --- src/grpc/scheduler.rs | 14 ++++-- src/task/mod.rs | 87 ++++++++++++++++++++++++------------- src/task/piece_collector.rs | 43 ++++++++++++------ 3 files changed, 98 insertions(+), 46 deletions(-) diff --git a/src/grpc/scheduler.rs b/src/grpc/scheduler.rs index 7c4a055d..dd4e1bd2 100644 --- a/src/grpc/scheduler.rs +++ b/src/grpc/scheduler.rs @@ -31,7 +31,7 @@ use std::sync::Arc; use tokio::sync::RwLock; use tokio::task::JoinSet; use tonic::transport::Channel; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, Instrument}; #[derive(Debug, Copy, Clone, Hash, PartialEq)] struct VNode { @@ -152,6 +152,8 @@ impl SchedulerClient { addr: SocketAddr, request: tonic::Request, ) -> Result<()> { + info!("announce host to {:?}", addr); + // Connect to the scheduler. let channel = Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? @@ -164,7 +166,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler_addr, request)); + join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set.join_next().await { @@ -192,6 +194,8 @@ impl SchedulerClient { addr: SocketAddr, request: tonic::Request, ) -> Result<()> { + info!("announce host to {}", addr); + // Connect to the scheduler. let channel = Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? @@ -204,7 +208,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(announce_host(*available_scheduler_addr, request)); + join_set.spawn(announce_host(*available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set.join_next().await { @@ -231,6 +235,8 @@ impl SchedulerClient { addr: SocketAddr, request: tonic::Request, ) -> Result<()> { + info!("leave host from {}", addr); + // Connect to the scheduler. let channel = Channel::from_shared(format!("http://{}", addr)) .map_err(|_| Error::InvalidURI(addr.to_string()))? @@ -243,7 +249,7 @@ impl SchedulerClient { Ok(()) } - join_set.spawn(leave_host(*available_scheduler_addr, request)); + join_set.spawn(leave_host(*available_scheduler_addr, request).in_current_span()); } while let Some(message) = join_set.join_next().await { diff --git a/src/task/mod.rs b/src/task/mod.rs index 3b6d9dd2..63bfc265 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -45,7 +45,7 @@ use tokio::time::sleep; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::Request; use tonic::Status; -use tracing::{error, info}; +use tracing::{error, info, Instrument}; pub mod piece; pub mod piece_collector; @@ -695,6 +695,12 @@ impl Task { storage: Arc, semaphore: Arc, ) -> ClientResult { + info!( + "start to download piece {} from remote peer {:?}", + storage.piece_id(task_id.as_str(), number), + parent.id.clone() + ); + let _permit = semaphore.acquire().await.map_err(|err| { error!("acquire semaphore error: {:?}", err); Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { @@ -722,14 +728,17 @@ impl Task { Ok(metadata) } - join_set.spawn(download_from_remote_peer( - task_id.to_string(), - collect_piece.number, - collect_piece.parent.clone(), - self.piece.clone(), - self.storage.clone(), - semaphore.clone(), - )); + join_set.spawn( + download_from_remote_peer( + task_id.to_string(), + collect_piece.number, + collect_piece.parent.clone(), + self.piece.clone(), + self.storage.clone(), + semaphore.clone(), + ) + .in_current_span(), + ); } // Initialize the finished pieces. @@ -890,8 +899,14 @@ impl Task { length: u64, header: HeaderMap, piece: Arc, + storage: Arc, semaphore: Arc, ) -> ClientResult { + info!( + "start to download piece {} from source", + storage.piece_id(task_id.as_str(), number) + ); + let _permit = semaphore.acquire().await?; let metadata = piece @@ -908,16 +923,20 @@ impl Task { Ok(metadata) } - join_set.spawn(download_from_source( - task_id.to_string(), - interested_piece.number, - url.clone(), - interested_piece.offset, - interested_piece.length, - header.clone(), - self.piece.clone(), - semaphore.clone(), - )); + join_set.spawn( + download_from_source( + task_id.to_string(), + interested_piece.number, + url.clone(), + interested_piece.offset, + interested_piece.length, + header.clone(), + self.piece.clone(), + self.storage.clone(), + semaphore.clone(), + ) + .in_current_span(), + ); } // Wait for the pieces to be downloaded. @@ -1209,8 +1228,14 @@ impl Task { length: u64, header: HeaderMap, piece: Arc, + storage: Arc, semaphore: Arc, ) -> ClientResult { + info!( + "start to download piece {} from source", + storage.piece_id(task_id.as_str(), number) + ); + let _permit = semaphore.acquire().await?; let metadata = piece @@ -1227,16 +1252,20 @@ impl Task { Ok(metadata) } - join_set.spawn(download_from_source( - task_id.to_string(), - interested_piece.number, - url.clone(), - interested_piece.offset, - interested_piece.length, - header.clone(), - self.piece.clone(), - semaphore.clone(), - )); + join_set.spawn( + download_from_source( + task_id.to_string(), + interested_piece.number, + url.clone(), + interested_piece.offset, + interested_piece.length, + header.clone(), + self.piece.clone(), + self.storage.clone(), + semaphore.clone(), + ) + .in_current_span(), + ); } // Wait for the pieces to be downloaded. diff --git a/src/task/piece_collector.rs b/src/task/piece_collector.rs index bf8e2ac4..6e5dc521 100644 --- a/src/task/piece_collector.rs +++ b/src/task/piece_collector.rs @@ -26,7 +26,7 @@ use std::time::Duration; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tracing::{error, info}; +use tracing::{error, info, Instrument}; // CollectedPiece is the piece collected from a peer. pub struct CollectedPiece { @@ -101,7 +101,7 @@ impl PieceCollector { .await .unwrap_or_else(|err| { error!("collect pieces failed: {}", err); - }) + }); }); collected_piece_rx @@ -128,6 +128,8 @@ impl PieceCollector { collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result { + info!("sync pieces from parent {}", parent.id); + // If candidate_parent.host is None, skip it. let host = parent.host.clone().ok_or_else(|| { error!("peer {:?} host is empty", parent); @@ -136,7 +138,15 @@ impl PieceCollector { // Create a dfdaemon client. let dfdaemon_upload_client = - DfdaemonUploadClient::new(format!("http://{}:{}", host.ip, host.port)).await?; + DfdaemonUploadClient::new(format!("http://{}:{}", host.ip, host.port)) + .await + .map_err(|err| { + error!( + "create dfdaemon upload client from parent {} failed: {}", + parent.id, err + ); + err + })?; let response = dfdaemon_upload_client .sync_pieces(SyncPiecesRequest { @@ -146,7 +156,11 @@ impl PieceCollector { .map(|piece| piece.number) .collect(), }) - .await?; + .await + .map_err(|err| { + error!("sync pieces from parent {} failed: {}", parent.id, err); + err + })?; // If the response repeating timeout exceeds the piece download timeout, the stream will return error. let out_stream = response.into_inner().timeout(collected_piece_timeout); @@ -194,15 +208,18 @@ impl PieceCollector { Ok(parent) } - join_set.spawn(sync_pieces( - task_id.clone(), - parent.clone(), - parents.clone(), - interested_pieces.clone(), - collected_pieces.clone(), - collected_piece_tx.clone(), - collected_piece_timeout, - )); + join_set.spawn( + sync_pieces( + task_id.clone(), + parent.clone(), + parents.clone(), + interested_pieces.clone(), + collected_pieces.clone(), + collected_piece_tx.clone(), + collected_piece_timeout, + ) + .in_current_span(), + ); } // Wait for all tasks to finish.