feat: rename argument remote_peer to parent (#141)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-12-12 20:28:35 +08:00 committed by GitHub
parent ee07f83e44
commit 32dd257c39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 46 deletions

View File

@ -632,7 +632,7 @@ impl Task {
task_id: &str, task_id: &str,
host_id: &str, host_id: &str,
peer_id: &str, peer_id: &str,
remote_peers: Vec<Peer>, parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
content_length: u64, content_length: u64,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>, download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
@ -643,7 +643,7 @@ impl Task {
self.config.clone(), self.config.clone(),
task_id, task_id,
interested_pieces.clone(), interested_pieces.clone(),
remote_peers.clone(), parents.clone(),
); );
// Initialize the join set. // Initialize the join set.
@ -660,7 +660,7 @@ impl Task {
async fn download_from_remote_peer( async fn download_from_remote_peer(
task_id: String, task_id: String,
number: u32, number: u32,
remote_peer: Peer, parent: Peer,
piece: Arc<piece::Piece>, piece: Arc<piece::Piece>,
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
) -> ClientResult<metadata::Piece> { ) -> ClientResult<metadata::Piece> {
@ -668,27 +668,23 @@ impl Task {
error!("acquire semaphore error: {:?}", err); error!("acquire semaphore error: {:?}", err);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
piece_number: number, piece_number: number,
parent_id: remote_peer.id.clone(), parent_id: parent.id.clone(),
}) })
})?; })?;
let metadata = piece let metadata = piece
.download_from_remote_peer( .download_from_remote_peer(task_id.as_str(), number, parent.clone())
task_id.as_str(),
number,
remote_peer.clone(),
)
.await .await
.map_err(|err| { .map_err(|err| {
error!( error!(
"download piece {} from remote peer {:?} error: {:?}", "download piece {} from remote peer {:?} error: {:?}",
number, number,
remote_peer.id.clone(), parent.id.clone(),
err err
); );
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed { Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
piece_number: number, piece_number: number,
parent_id: remote_peer.id.clone(), parent_id: parent.id.clone(),
}) })
})?; })?;

View File

@ -253,16 +253,16 @@ impl Piece {
&self, &self,
task_id: &str, task_id: &str,
number: u32, number: u32,
remote_peer: Peer, parent: Peer,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
// Record the start of downloading piece. // Record the start of downloading piece.
self.storage.download_piece_started(task_id, number).await?; self.storage.download_piece_started(task_id, number).await?;
// Create a dfdaemon client. // Create a dfdaemon client.
let host = remote_peer let host = parent
.host .host
.clone() .clone()
.ok_or(Error::InvalidPeer(remote_peer.id.clone()))?; .ok_or(Error::InvalidPeer(parent.id.clone()))?;
let dfdaemon_client = let dfdaemon_client =
DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?; DfdaemonClient::new(format!("http://{}:{}", host.ip, host.port)).await?;
@ -308,7 +308,7 @@ impl Piece {
number, number,
piece.offset, piece.offset,
piece.digest.as_str(), piece.digest.as_str(),
remote_peer.id.as_str(), parent.id.as_str(),
&mut content.as_slice(), &mut content.as_slice(),
) )
.await .await
@ -336,10 +336,10 @@ impl Piece {
&self, &self,
task_id: &str, task_id: &str,
number: u32, number: u32,
remote_peer: Peer, parent: Peer,
) -> Result<impl AsyncRead> { ) -> Result<impl AsyncRead> {
// Download the piece from the remote peer. // Download the piece from the remote peer.
self.download_from_remote_peer(task_id, number, remote_peer) self.download_from_remote_peer(task_id, number, parent)
.await?; .await?;
// Return reader of the piece. // Return reader of the piece.

View File

@ -45,8 +45,8 @@ pub struct PieceCollector {
// task_id is the id of the task. // task_id is the id of the task.
task_id: String, task_id: String,
// peers is the peers to collect pieces from. // parents is the parent peers.
peers: Vec<Peer>, parents: Vec<Peer>,
// interested_pieces is the pieces interested by the collector. // interested_pieces is the pieces interested by the collector.
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
@ -56,12 +56,12 @@ pub struct PieceCollector {
} }
impl PieceCollector { impl PieceCollector {
// NewPieceCollector returns a new PieceCollector. // new creates a new PieceCollector.
pub fn new( pub fn new(
config: Arc<Config>, config: Arc<Config>,
task_id: &str, task_id: &str,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
peers: Vec<Peer>, parents: Vec<Peer>,
) -> Self { ) -> Self {
// Initialize collected_pieces. // Initialize collected_pieces.
let collected_pieces = Arc::new(DashMap::new()); let collected_pieces = Arc::new(DashMap::new());
@ -75,24 +75,24 @@ impl PieceCollector {
Self { Self {
config, config,
task_id: task_id.to_string(), task_id: task_id.to_string(),
peers, parents,
interested_pieces, interested_pieces,
collected_pieces, collected_pieces,
} }
} }
// Run runs the collector. // run runs the piece collector.
pub async fn run(&self) -> Receiver<CollectedPiece> { pub async fn run(&self) -> Receiver<CollectedPiece> {
let task_id = self.task_id.clone(); let task_id = self.task_id.clone();
let peers = self.peers.clone(); let parents = self.parents.clone();
let interested_pieces = self.interested_pieces.clone(); let interested_pieces = self.interested_pieces.clone();
let collected_pieces = self.collected_pieces.clone(); let collected_pieces = self.collected_pieces.clone();
let collected_piece_timeout = self.config.download.piece_timeout; let collected_piece_timeout = self.config.download.piece_timeout;
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128); let (collected_piece_tx, collected_piece_rx) = mpsc::channel(128);
tokio::spawn(async move { tokio::spawn(async move {
Self::collect_from_peers( Self::collect_from_remote_peers(
task_id, task_id,
peers, parents,
interested_pieces, interested_pieces,
collected_pieces, collected_pieces,
collected_piece_tx, collected_piece_tx,
@ -107,10 +107,10 @@ impl PieceCollector {
collected_piece_rx collected_piece_rx
} }
// collect collects a piece from peers. // collect_from_remote_peers collects pieces from remote peers.
async fn collect_from_peers( async fn collect_from_remote_peers(
task_id: String, task_id: String,
peers: Vec<Peer>, parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>, collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
@ -118,20 +118,20 @@ impl PieceCollector {
) -> Result<()> { ) -> Result<()> {
// Create a task to collect pieces from peers. // Create a task to collect pieces from peers.
let mut join_set = JoinSet::new(); let mut join_set = JoinSet::new();
for peer in peers.iter() { for parent in parents.iter() {
async fn sync_pieces( async fn sync_pieces(
task_id: String, task_id: String,
peer: Peer, parent: Peer,
peers: Vec<Peer>, parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>, interested_pieces: Vec<metadata::Piece>,
collected_pieces: Arc<DashMap<u32, DashSet<String>>>, collected_pieces: Arc<DashMap<u32, DashSet<String>>>,
collected_piece_tx: Sender<CollectedPiece>, collected_piece_tx: Sender<CollectedPiece>,
collected_piece_timeout: Duration, collected_piece_timeout: Duration,
) -> Result<Peer> { ) -> Result<Peer> {
// If candidate_parent.host is None, skip it. // If candidate_parent.host is None, skip it.
let host = peer.host.clone().ok_or_else(|| { let host = parent.host.clone().ok_or_else(|| {
error!("peer {:?} host is empty", peer); error!("peer {:?} host is empty", parent);
Error::InvalidPeer(peer.id.clone()) Error::InvalidPeer(parent.id.clone())
})?; })?;
// Create a dfdaemon client. // Create a dfdaemon client.
@ -159,20 +159,20 @@ impl PieceCollector {
collected_pieces collected_pieces
.entry(message.piece_number) .entry(message.piece_number)
.and_modify(|peers| { .and_modify(|peers| {
peers.insert(peer.id.clone()); peers.insert(parent.id.clone());
}); });
match collected_pieces.get(&message.piece_number) { match collected_pieces.get(&message.piece_number) {
Some(parents) => { Some(parent_ids) => {
if let Some(parent) = parents.iter().next() { if let Some(parent_id) = parent_ids.iter().next() {
let number = message.piece_number; let number = message.piece_number;
let parent = peers let parent = parents
.iter() .iter()
.find(|peer| peer.id == parent.as_str()) .find(|parent| parent.id == parent_id.as_str())
.ok_or_else(|| { .ok_or_else(|| {
error!("parent {} not found", parent.as_str()); error!("parent {} not found", parent_id.as_str());
Error::InvalidPeer(parent.clone()) Error::InvalidPeer(parent_id.clone())
})?; })?;
collected_piece_tx collected_piece_tx
.send(CollectedPiece { .send(CollectedPiece {
@ -188,13 +188,13 @@ impl PieceCollector {
}; };
} }
Ok(peer) Ok(parent)
} }
join_set.spawn(sync_pieces( join_set.spawn(sync_pieces(
task_id.clone(), task_id.clone(),
peer.clone(), parent.clone(),
peers.clone(), parents.clone(),
interested_pieces.clone(), interested_pieces.clone(),
collected_pieces.clone(), collected_pieces.clone(),
collected_piece_tx.clone(), collected_piece_tx.clone(),