diff --git a/src/task/mod.rs b/src/task/mod.rs index 71332947..361bb300 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -632,7 +632,7 @@ impl Task { task_id: &str, host_id: &str, peer_id: &str, - remote_peers: Vec, + parents: Vec, interested_pieces: Vec, content_length: u64, download_progress_tx: Sender>, @@ -643,7 +643,7 @@ impl Task { self.config.clone(), task_id, interested_pieces.clone(), - remote_peers.clone(), + parents.clone(), ); // Initialize the join set. @@ -660,7 +660,7 @@ impl Task { async fn download_from_remote_peer( task_id: String, number: u32, - remote_peer: Peer, + parent: Peer, piece: Arc, semaphore: Arc, ) -> ClientResult { @@ -668,27 +668,23 @@ impl Task { error!("acquire semaphore error: {:?}", err); Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { piece_number: number, - parent_id: remote_peer.id.clone(), + parent_id: parent.id.clone(), }) })?; let metadata = piece - .download_from_remote_peer( - task_id.as_str(), - number, - remote_peer.clone(), - ) + .download_from_remote_peer(task_id.as_str(), number, parent.clone()) .await .map_err(|err| { error!( "download piece {} from remote peer {:?} error: {:?}", number, - remote_peer.id.clone(), + parent.id.clone(), err ); Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { piece_number: number, - parent_id: remote_peer.id.clone(), + parent_id: parent.id.clone(), }) })?; diff --git a/src/task/piece.rs b/src/task/piece.rs index 45b866a9..0ce9e9b7 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -253,16 +253,16 @@ impl Piece { &self, task_id: &str, number: u32, - remote_peer: Peer, + parent: Peer, ) -> Result { // Record the start of downloading piece. self.storage.download_piece_started(task_id, number).await?; // Create a dfdaemon client. - let host = remote_peer + let host = parent .host .clone() - .ok_or(Error::InvalidPeer(remote_peer.id.clone()))?; + .ok_or(Error::InvalidPeer(parent.id.clone()))?; let dfdaemon_client = DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?; @@ -308,7 +308,7 @@ impl Piece { number, piece.offset, piece.digest.as_str(), - remote_peer.id.as_str(), + parent.id.as_str(), &mut content.as_slice(), ) .await @@ -336,10 +336,10 @@ impl Piece { &self, task_id: &str, number: u32, - remote_peer: Peer, + parent: Peer, ) -> Result { // Download the piece from the remote peer. - self.download_from_remote_peer(task_id, number, remote_peer) + self.download_from_remote_peer(task_id, number, parent) .await?; // Return reader of the piece. diff --git a/src/task/piece_collector.rs b/src/task/piece_collector.rs index f2b6a141..3a756e31 100644 --- a/src/task/piece_collector.rs +++ b/src/task/piece_collector.rs @@ -45,8 +45,8 @@ pub struct PieceCollector { // task_id is the id of the task. task_id: String, - // peers is the peers to collect pieces from. - peers: Vec, + // parents is the parent peers. + parents: Vec, // interested_pieces is the pieces interested by the collector. interested_pieces: Vec, @@ -56,12 +56,12 @@ pub struct PieceCollector { } impl PieceCollector { - // NewPieceCollector returns a new PieceCollector. + // new creates a new PieceCollector. pub fn new( config: Arc, task_id: &str, interested_pieces: Vec, - peers: Vec, + parents: Vec, ) -> Self { // Initialize collected_pieces. let collected_pieces = Arc::new(DashMap::new()); @@ -75,24 +75,24 @@ impl PieceCollector { Self { config, task_id: task_id.to_string(), - peers, + parents, interested_pieces, collected_pieces, } } - // Run runs the collector. + // run runs the piece collector. pub async fn run(&self) -> Receiver { let task_id = self.task_id.clone(); - let peers = self.peers.clone(); + let parents = self.parents.clone(); let interested_pieces = self.interested_pieces.clone(); let collected_pieces = self.collected_pieces.clone(); let collected_piece_timeout = self.config.download.piece_timeout; let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128); tokio::spawn(async move { - Self::collect_from_peers( + Self::collect_from_remote_peers( task_id, - peers, + parents, interested_pieces, collected_pieces, collected_piece_tx, @@ -107,10 +107,10 @@ impl PieceCollector { collected_piece_rx } - // collect collects a piece from peers. - async fn collect_from_peers( + // collect_from_remote_peers collects pieces from remote peers. + async fn collect_from_remote_peers( task_id: String, - peers: Vec, + parents: Vec, interested_pieces: Vec, collected_pieces: Arc>>, collected_piece_tx: Sender, @@ -118,20 +118,20 @@ impl PieceCollector { ) -> Result<()> { // Create a task to collect pieces from peers. let mut join_set = JoinSet::new(); - for peer in peers.iter() { + for parent in parents.iter() { async fn sync_pieces( task_id: String, - peer: Peer, - peers: Vec, + parent: Peer, + parents: Vec, interested_pieces: Vec, collected_pieces: Arc>>, collected_piece_tx: Sender, collected_piece_timeout: Duration, ) -> Result { // If candidate_parent.host is None, skip it. - let host = peer.host.clone().ok_or_else(|| { - error!("peer {:?} host is empty", peer); - Error::InvalidPeer(peer.id.clone()) + let host = parent.host.clone().ok_or_else(|| { + error!("peer {:?} host is empty", parent); + Error::InvalidPeer(parent.id.clone()) })?; // Create a dfdaemon client. @@ -159,20 +159,20 @@ impl PieceCollector { collected_pieces .entry(message.piece_number) .and_modify(|peers| { - peers.insert(peer.id.clone()); + peers.insert(parent.id.clone()); }); match collected_pieces.get(&message.piece_number) { - Some(parents) => { - if let Some(parent) = parents.iter().next() { + Some(parent_ids) => { + if let Some(parent_id) = parent_ids.iter().next() { let number = message.piece_number; - let parent = peers + let parent = parents .iter() - .find(|peer| peer.id == parent.as_str()) + .find(|parent| parent.id == parent_id.as_str()) .ok_or_else(|| { - error!("parent {} not found", parent.as_str()); - Error::InvalidPeer(parent.clone()) - })?; + error!("parent {} not found", parent_id.as_str()); + Error::InvalidPeer(parent_id.clone()) + })?; collected_piece_tx .send(CollectedPiece { @@ -188,13 +188,13 @@ impl PieceCollector { }; } - Ok(peer) + Ok(parent) } join_set.spawn(sync_pieces( task_id.clone(), - peer.clone(), - peers.clone(), + parent.clone(), + parents.clone(), interested_pieces.clone(), collected_pieces.clone(), collected_piece_tx.clone(),