diff --git a/src/config/dfdaemon.rs b/src/config/dfdaemon.rs index 92d137e8..57216c93 100644 --- a/src/config/dfdaemon.rs +++ b/src/config/dfdaemon.rs @@ -117,7 +117,7 @@ fn default_scheduler_enable_back_to_source() -> bool { // default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager. #[inline] fn default_dynconfig_refresh_interval() -> Duration { - Duration::from_secs(1800) + Duration::from_secs(300) } // default_seed_peer_cluster_id is the default cluster id of seed peer. diff --git a/src/grpc/scheduler.rs b/src/grpc/scheduler.rs index 2615b988..7c4a055d 100644 --- a/src/grpc/scheduler.rs +++ b/src/grpc/scheduler.rs @@ -263,7 +263,9 @@ impl SchedulerClient { // Get the scheduler address from the hashring. let addr = self.hashring.read().await; - let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?; + let addr = addr + .get(&key[0..5].to_string()) + .ok_or_else(|| Error::HashRing(key.clone()))?; info!("{} picked {:?}", key, addr); let channel = match Channel::from_shared(format!("http://{}", addr.to_string())) @@ -307,7 +309,10 @@ impl SchedulerClient { .iter() .all(|available_scheduler| available_schedulers.contains(available_scheduler)) { - info!("available schedulers is not changed"); + info!( + "available schedulers is not changed: {:?}", + data.available_schedulers + ); return Ok(()); } drop(available_schedulers); diff --git a/src/task/mod.rs b/src/task/mod.rs index 863f32b8..3b6d9dd2 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -692,6 +692,7 @@ impl Task { number: u32, parent: Peer, piece: Arc, + storage: Arc, semaphore: Arc, ) -> ClientResult { let _permit = semaphore.acquire().await.map_err(|err| { @@ -708,7 +709,7 @@ impl Task { .map_err(|err| { error!( "download piece {} from remote peer {:?} error: {:?}", - number, + storage.piece_id(task_id.as_str(), number), parent.id.clone(), err ); @@ -726,6 +727,7 @@ impl Task { collect_piece.number, collect_piece.parent.clone(), self.piece.clone(), + self.storage.clone(), semaphore.clone(), )); } @@ -752,7 +754,8 @@ impl Task { info!( "finished piece {} from remote peer {:?}", - metadata.number, metadata.parent_id + self.storage.piece_id(task_id, metadata.number), + metadata.parent_id ); // Construct the piece. @@ -807,7 +810,9 @@ impl Task { Ok(Err(Error::DownloadFromRemotePeerFailed(err))) => { error!( "download piece {} from remote peer {} error: {:?}", - err.piece_number, err.parent_id, err + self.storage.piece_id(task_id, err.piece_number), + err.parent_id, + err ); // Send the download piece failed request. @@ -932,7 +937,10 @@ impl Task { ) .await?; - info!("finished piece {} from source", metadata.number); + info!( + "finished piece {} from source", + self.storage.piece_id(task_id, metadata.number) + ); // Construct the piece. let piece = Piece { @@ -1104,7 +1112,8 @@ impl Task { Err(err) => { info!( "download piece {} from local peer error: {:?}", - interested_piece.number, err + self.storage.piece_id(task_id, interested_piece.number), + err ); continue; } @@ -1126,7 +1135,10 @@ impl Task { ) .await?; - info!("finished piece {} from local peer", metadata.number); + info!( + "finished piece {} from local peer", + self.storage.piece_id(task_id, metadata.number) + ); // Construct the piece. let piece = Piece { @@ -1244,7 +1256,10 @@ impl Task { ) .await?; - info!("finished piece {} from source", metadata.number); + info!( + "finished piece {} from source", + self.storage.piece_id(task_id, metadata.number) + ); // Construct the piece. let piece = Piece {