diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index 6f6ae460..a7eeb3c3 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -292,7 +292,7 @@ impl Dfdaemon for DfdaemonServerHandler { // Get the piece content from the local storage. let mut reader = match task .piece - .download_from_local_peer(&task_id, None, interested_piece_number) + .download_from_local_peer(&task_id, interested_piece_number) .await { Ok(reader) => reader, @@ -347,7 +347,7 @@ impl Dfdaemon for DfdaemonServerHandler { type DownloadTaskStream = ReceiverStream>; // download_task tells the dfdaemon to download the task. - #[instrument(skip_all, fields(task_id, peer_id), ret)] + #[instrument(skip_all, fields(task_id, peer_id))] async fn download_task( &self, request: Request, @@ -431,58 +431,17 @@ impl Dfdaemon for DfdaemonServerHandler { // Initialize stream channel. let (out_stream_tx, out_stream_rx) = mpsc::channel(128); tokio::spawn(async move { - match task - .download_into_file( - task_id.as_str(), - host_id.as_str(), - peer_id.as_str(), - content_length, - header.clone(), - download.clone(), - ) - .await - { - Ok(mut download_progress_rx) => { - while let Some(finished_piece) = download_progress_rx.recv().await { - out_stream_tx - .send(Ok(DownloadTaskResponse { - content_length, - piece: Some(Piece { - number: finished_piece.number, - parent_id: None, - offset: finished_piece.offset, - length: finished_piece.length, - digest: finished_piece.clone().digest, - content: None, - traffic_type: None, - cost: finished_piece.prost_cost(), - created_at: Some(prost_wkt_types::Timestamp::from( - finished_piece.created_at, - )), - }), - })) - .await - .unwrap_or_else(|e| { - error!("send to out stream: {}", e); - }); - } - } - Err(e) => { - error!("download task: {}", e); - out_stream_tx - .send(Err(Status::internal(e.to_string()))) - .await - .unwrap_or_else(|e| { - error!("send to out stream: {}", e); - }); - - // Download task failed. - task.download_task_failed(task_id.as_str()) - .unwrap_or_else(|e| { - error!("download task failed: {}", e); - }); - } - } + task.download_into_file( + task_id.as_str(), + host_id.as_str(), + peer_id.as_str(), + content_length, + header.clone(), + download.clone(), + out_stream_tx.clone(), + ) + .await; + drop(out_stream_tx); }); Ok(Response::new(ReceiverStream::new(out_stream_rx))) diff --git a/src/storage/content.rs b/src/storage/content.rs index 79566316..fbb76191 100644 --- a/src/storage/content.rs +++ b/src/storage/content.rs @@ -54,7 +54,7 @@ impl Content { } // read_piece reads the piece from the content. - #[instrument(skip(self, offset, length))] + #[instrument(skip_all)] pub async fn read_piece( &self, task_id: &str, @@ -67,7 +67,7 @@ impl Content { } // write_piece writes the piece to the content. - #[instrument(skip(self, offset, reader))] + #[instrument(skip_all)] pub async fn write_piece( &self, task_id: &str, diff --git a/src/storage/metadata.rs b/src/storage/metadata.rs index d54dee9a..d5aae926 100644 --- a/src/storage/metadata.rs +++ b/src/storage/metadata.rs @@ -205,7 +205,7 @@ impl Metadata { } // download_task_started updates the metadata of the task when the task downloads started. - #[instrument(skip(self, piece_length))] + #[instrument(skip_all)] pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> { let task = match self.get_task(id)? { // If the task exists, update the updated_at. @@ -227,7 +227,7 @@ impl Metadata { } // set_task_content_length sets the content length of the task. - #[instrument(skip(self, content_length))] + #[instrument(skip_all)] pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> { if let Some(mut task) = self.get_task(id)? { task.content_length = Some(content_length); @@ -238,7 +238,7 @@ impl Metadata { } // upload_task_finished updates the metadata of the task when task uploads finished. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn upload_task_finished(&self, id: &str) -> Result<()> { match self.get_task(id)? { Some(mut task) => { @@ -251,7 +251,7 @@ impl Metadata { } // download_task_failed updates the metadata of the task when the task downloads failed. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_task_failed(&self, id: &str) -> Result<()> { match self.get_task(id)? { Some(_piece) => self.delete_task(id), @@ -260,7 +260,7 @@ impl Metadata { } // get_task gets the task metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_task(&self, id: &str) -> Result> { let handle = self.cf_handle(TASK_CF_NAME)?; match self.db.get_cf(handle, id)? { @@ -270,7 +270,7 @@ impl Metadata { } // put_task puts the task metadata. - #[instrument(skip(self, task))] + #[instrument(skip_all)] fn put_task(&self, id: &str, task: &Task) -> Result<()> { let handle = self.cf_handle(TASK_CF_NAME)?; let json = serde_json::to_string(&task)?; @@ -279,7 +279,7 @@ impl Metadata { } // delete_task deletes the task metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] fn delete_task(&self, id: &str) -> Result<()> { let handle = self.cf_handle(TASK_CF_NAME)?; self.db.delete_cf(handle, id.as_bytes())?; @@ -287,7 +287,7 @@ impl Metadata { } // download_piece_started updates the metadata of the piece when the piece downloads started. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> { self.put_piece( task_id, @@ -301,7 +301,7 @@ impl Metadata { } // download_piece_finished updates the metadata of the piece when the piece downloads finished. - #[instrument(skip(self, offset, length, digest))] + #[instrument(skip_all)] pub fn download_piece_finished( &self, task_id: &str, @@ -324,7 +324,7 @@ impl Metadata { } // download_piece_failed updates the metadata of the piece when the piece downloads failed. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { match self.get_piece(task_id, number)? { Some(_piece) => self.delete_piece(task_id, number), @@ -333,7 +333,7 @@ impl Metadata { } // upload_piece_finished updates the metadata of the piece when piece uploads finished. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<()> { match self.get_piece(task_id, number)? { Some(mut piece) => { @@ -346,7 +346,7 @@ impl Metadata { } // get_piece gets the piece metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { let id = self.piece_id(task_id, number); let handle = self.cf_handle(PIECE_CF_NAME)?; @@ -357,7 +357,7 @@ impl Metadata { } // get_pieces gets the pieces metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_pieces(&self, task_id: &str) -> Result> { let handle = self.cf_handle(PIECE_CF_NAME)?; let iter = self.db.prefix_iterator_cf(handle, task_id.as_bytes()); @@ -374,7 +374,7 @@ impl Metadata { } // put_piece puts the piece metadata. - #[instrument(skip(self, piece))] + #[instrument(skip_all)] fn put_piece(&self, task_id: &str, piece: &Piece) -> Result<()> { let id = self.piece_id(task_id, piece.number); let handle = self.cf_handle(PIECE_CF_NAME)?; @@ -384,7 +384,7 @@ impl Metadata { } // delete_piece deletes the piece metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> { let id = self.piece_id(task_id, number); let handle = self.cf_handle(PIECE_CF_NAME)?; @@ -393,7 +393,7 @@ impl Metadata { } // piece_id returns the piece id. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8ee8cf57..b216bdec 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -42,31 +42,31 @@ impl Storage { } // download_task_started updates the metadata of the task when the task downloads started. - #[instrument(skip(self, piece_length))] + #[instrument(skip_all)] pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> { self.metadata.download_task_started(id, piece_length) } // set_task_content_length sets the content length of the task. - #[instrument(skip(self, content_length))] + #[instrument(skip_all)] pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> { self.metadata.set_task_content_length(id, content_length) } // download_task_failed updates the metadata of the task when the task downloads failed. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_task_failed(&self, id: &str) -> Result<()> { self.metadata.download_task_failed(id) } // upload_task_finished updates the metadata of the task when task uploads finished. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn upload_task_finished(&self, id: &str) -> Result<()> { self.metadata.upload_task_finished(id) } // get_task returns the task metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_task(&self, id: &str) -> Result> { let task = self.metadata.get_task(id)?; Ok(task) @@ -74,13 +74,13 @@ impl Storage { // download_piece_started updates the metadata of the piece and writes // the data of piece to file when the piece downloads started. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> { self.metadata.download_piece_started(task_id, number) } // download_piece_from_source_finished is used for downloading piece from source. - #[instrument(skip(self, offset, length, reader))] + #[instrument(skip_all)] pub async fn download_piece_from_source_finished( &self, task_id: &str, @@ -103,7 +103,7 @@ impl Storage { } // download_piece_from_remote_peer_finished is used for downloading piece from remote peer. - #[instrument(skip(self, offset, expected_digest, reader))] + #[instrument(skip_all)] pub async fn download_piece_from_remote_peer_finished( &self, task_id: &str, @@ -132,14 +132,14 @@ impl Storage { } // download_piece_failed updates the metadata of the piece when the piece downloads failed. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { self.metadata.download_piece_failed(task_id, number) } // upload_piece updates the metadata of the piece and // returns the data of the piece. - #[instrument(skip(self))] + #[instrument(skip_all)] pub async fn upload_piece(&self, task_id: &str, number: u32) -> Result { match self.metadata.get_piece(task_id, number)? { Some(piece) => { @@ -155,20 +155,20 @@ impl Storage { } // get_piece returns the piece metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { let piece = self.metadata.get_piece(task_id, number)?; Ok(piece) } // get_pieces returns the pieces metadata. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn get_pieces(&self, task_id: &str) -> Result> { self.metadata.get_pieces(task_id) } // piece_id returns the piece id. - #[instrument(skip(self))] + #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) } diff --git a/src/task/mod.rs b/src/task/mod.rs index 7b9f3a0b..f2093817 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -19,14 +19,15 @@ use crate::grpc::scheduler::SchedulerClient; use crate::storage::{metadata, Storage}; use crate::utils::http::headermap_to_hashmap; use crate::utils::id_generator::IDGenerator; -use crate::{Error, Result}; +use crate::{Error, Result as ClientResult}; use dragonfly_api::common::v2::{Download, Piece, TrafficType}; +use dragonfly_api::dfdaemon::v2::DownloadTaskResponse; use dragonfly_api::scheduler::v2::{ announce_peer_request, announce_peer_response, download_piece_back_to_source_failed_request, AnnouncePeerRequest, DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest, DownloadPieceFailedRequest, DownloadPieceFinishedRequest, HttpResponse, RegisterPeerRequest, }; -use mpsc::Receiver; +use mpsc::Sender; use reqwest::header::{self, HeaderMap}; use std::sync::Arc; use std::time::Duration; @@ -37,6 +38,7 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; +use tonic::Status; use tracing::{error, info, instrument}; pub mod piece; @@ -86,24 +88,25 @@ impl Task { // get gets a task metadata. #[instrument(skip(self))] - pub fn get(&self, task_id: &str) -> Result> { + pub fn get(&self, task_id: &str) -> ClientResult> { self.storage.get_task(task_id) } // download_task_started updates the metadata of the task when the task downloads started. #[instrument(skip(self, piece_length))] - pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> { + pub fn download_task_started(&self, id: &str, piece_length: u64) -> ClientResult<()> { self.storage.download_task_started(id, piece_length) } // download_task_failed updates the metadata of the task when the task downloads failed. #[instrument(skip(self))] - pub fn download_task_failed(&self, id: &str) -> Result<()> { + pub fn download_task_failed(&self, id: &str) -> ClientResult<()> { self.storage.download_task_failed(id) } // download_into_file downloads a task into a file. - #[instrument(skip(self, content_length, header, download))] + #[instrument(skip(self, content_length, header, download, download_progress_tx))] + #[allow(clippy::too_many_arguments)] pub async fn download_into_file( &self, task_id: &str, @@ -112,78 +115,154 @@ impl Task { content_length: u64, header: HeaderMap, download: Download, - ) -> Result> { - // Initialize the download progress channel. - let (download_progress_tx, download_progress_rx) = mpsc::channel(128); - + download_progress_tx: Sender>, + ) { // Convert the timeout. let timeout: Option = match download.timeout.clone() { - Some(timeout) => { - Some(Duration::try_from(timeout).map_err(|_| Error::InvalidParameter())?) - } + Some(timeout) => match Duration::try_from(timeout) { + Ok(timeout) => Some(timeout), + Err(err) => { + error!("convert timeout error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::invalid_argument("invalid timeout"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }, None => None, }; // Open the file. - let mut f = OpenOptions::new() + let mut f = match OpenOptions::new() .create(true) .write(true) .open(download.output_path.as_str()) - .await?; + .await + { + Ok(f) => f, + Err(err) => { + error!("open file error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::internal("open file error"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }; // Calculate the interested pieces to download. - let interested_pieces = self.piece.calculate_interested( - task_id, - peer_id, + let interested_pieces = match self.piece.calculate_interested( download.piece_length, content_length, download.range.clone(), - )?; + ) { + Ok(interested_pieces) => interested_pieces, + Err(err) => { + error!("calculate interested pieces error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::invalid_argument( + "calculate interested pieces error", + ))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }; info!("interested pieces: {:?}", interested_pieces); // Get the task from the local storage. - let task = self - .get(task_id)? - .ok_or(Error::TaskNotFound(task_id.to_string()))?; + let task = match self.get(task_id) { + Ok(Some(task)) => task, + Ok(None) => { + error!("task not found"); + if let Err(err) = download_progress_tx + .send(Err(Status::not_found("task not found"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + Err(err) => { + error!("get task error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::internal("get task error"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }; // If the task is finished, return the file. if task.is_finished() { info!("task is finished, download the pieces from the local peer"); // Download the pieces from the local peer. - return self + match self .download_partial_from_local_peer_into_file( &mut f, task_id, - peer_id, - interested_pieces, + interested_pieces.clone(), + content_length, + download_progress_tx.clone(), ) - .await; + .await + { + Ok(_) => {} + Err(err) => { + error!("download from local peer error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::internal("download from local peer error"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }; } info!("download the pieces from local peer"); // Download the pieces from the local peer. - let mut finished_pieces: Vec = Vec::new(); - while let Some(finished_piece) = self + let finished_pieces = match self .download_partial_from_local_peer_into_file( &mut f, task_id, - peer_id, interested_pieces.clone(), + content_length, + download_progress_tx.clone(), ) - .await? - .recv() .await { - info!("finished piece from local peer: {:?}", finished_piece); + Ok(finished_pieces) => finished_pieces, + Err(err) => { + error!("download from local peer error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::internal("download from local peer error"))) + .await + { + error!("send download progress error: {:?}", err); + } - // Send the download progress. - download_progress_tx.send(finished_piece.clone()).await?; - - // Store the finished piece. - finished_pieces.push(finished_piece); - } + return; + } + }; // Remove the finished pieces from the pieces. let interested_pieces = self @@ -197,7 +276,7 @@ impl Task { // Check if all pieces are downloaded. if interested_pieces.is_empty() { info!("all pieces are downloaded from local peer"); - return Ok(download_progress_rx); + return; }; info!("download the pieces with scheduler"); @@ -210,31 +289,50 @@ impl Task { host_id, peer_id, interested_pieces.clone(), + content_length, download.clone(), + download_progress_tx.clone(), ) .await { - Ok(download_progress_rx) => Ok(download_progress_rx), + Ok(_) => {} Err(err) => { - error!("download partial with scheduler into file error: {:?}", err); + error!("download with scheduler error: {:?}", err); // Download the pieces from the source. - self.download_partial_from_source_into_file( - &mut f, - interested_pieces, - task_id, - peer_id, - download.url.clone(), - header.clone(), - timeout, - ) - .await + match self + .download_partial_from_source_into_file( + &mut f, + task_id, + interested_pieces.clone(), + download.url.clone(), + header.clone(), + content_length, + timeout, + download_progress_tx.clone(), + ) + .await + { + Ok(_) => {} + Err(err) => { + error!("download from source error: {:?}", err); + if let Err(err) = download_progress_tx + .send(Err(Status::internal("download from source error"))) + .await + { + error!("send download progress error: {:?}", err); + } + + return; + } + }; } - } + }; } // download_partial_with_scheduler_into_file downloads a partial task with scheduler into a file. - #[instrument(skip(self, f, interested_pieces, download))] + #[instrument(skip_all)] + #[allow(clippy::too_many_arguments)] async fn download_partial_with_scheduler_into_file( &self, f: &mut fs::File, @@ -242,11 +340,10 @@ impl Task { host_id: &str, peer_id: &str, interested_pieces: Vec, + content_length: u64, download: Download, - ) -> Result> { - // Initialize the download progress channel. - let (download_progress_tx, download_progress_rx) = mpsc::channel(128); - + download_progress_tx: Sender>, + ) -> ClientResult> { // Convert the header. let header: HeaderMap = (&download.header).try_into()?; @@ -296,7 +393,8 @@ impl Task { announce_peer_response::Response::EmptyTaskResponse(response) => { // If the task is empty, return an empty vector. info!("empty task response: {:?}", response); - return Ok(download_progress_rx); + drop(in_stream_tx); + return Ok(Vec::new()); } announce_peer_response::Response::NormalTaskResponse(response) => { // If the task is normal, download the pieces from the remote peer. @@ -307,7 +405,6 @@ impl Task { .piece .collect_interested_from_remote_peer( task_id, - peer_id, interested_pieces.clone(), candidate_parents, ) @@ -318,7 +415,6 @@ impl Task { .piece .download_from_remote_peer( task_id, - peer_id, collect_interested_piece.number, collect_interested_piece.parent.clone(), ) @@ -365,6 +461,24 @@ impl Task { .write_into_file_and_verify(&mut reader, f, metadata.digest.as_str()) .await?; + info!( + "finished piece {} from remote peer {}", + collect_interested_piece.parent.id, metadata.number + ); + + // Construct the piece. + let piece = Piece { + number: metadata.number, + parent_id: Some(collect_interested_piece.parent.id.clone()), + offset: metadata.offset, + length: metadata.length, + digest: metadata.digest.clone(), + content: None, + traffic_type: Some(TrafficType::RemotePeer as i32), + cost: metadata.prost_cost(), + created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), + }; + // Send the download piece finished request. in_stream_tx .send(AnnouncePeerRequest { @@ -374,21 +488,7 @@ impl Task { request: Some( announce_peer_request::Request::DownloadPieceFinishedRequest( DownloadPieceFinishedRequest { - piece: Some(Piece { - number: metadata.number, - parent_id: Some( - collect_interested_piece.parent.id.clone(), - ), - offset: metadata.offset, - length: metadata.length, - digest: metadata.digest.clone(), - content: None, - traffic_type: Some(TrafficType::RemotePeer as i32), - cost: metadata.prost_cost(), - created_at: Some(prost_wkt_types::Timestamp::from( - metadata.created_at, - )), - }), + piece: Some(piece.clone()), }, ), ), @@ -396,11 +496,23 @@ impl Task { .await?; // Send the download progress. - download_progress_tx.send(metadata.clone()).await?; + download_progress_tx + .send(Ok(DownloadTaskResponse { + content_length, + piece: Some(piece.clone()), + })) + .await?; // Store the finished piece. finished_pieces.push(metadata.clone()); } + + // Check if all pieces are downloaded. + if finished_pieces.len() == interested_pieces.len() { + info!("all pieces are downloaded with scheduler"); + drop(in_stream_tx); + return Ok(finished_pieces); + } } announce_peer_response::Response::NeedBackToSourceResponse(response) => { // If the task need back to source, download the pieces from the source. @@ -410,7 +522,7 @@ impl Task { interested_pieces.clone(), ); - for interested_piece in interested_pieces { + for interested_piece in interested_pieces.clone() { // Seek to the offset of the piece. if let Err(err) = f.seek(SeekFrom::Start(interested_piece.offset)).await { error!("seek error: {:?}", err); @@ -422,7 +534,6 @@ impl Task { .piece .download_from_source( task_id, - peer_id, interested_piece.number, download.url.clone().as_str(), interested_piece.offset, @@ -496,21 +607,58 @@ impl Task { .write_into_file_and_verify(&mut reader, f, metadata.digest.as_str()) .await?; + info!("finished piece {} from source", metadata.number); + + // Construct the piece. + let piece = Piece { + number: metadata.number, + parent_id: None, + offset: metadata.offset, + length: metadata.length, + digest: metadata.digest.clone(), + content: None, + traffic_type: Some(TrafficType::BackToSource as i32), + cost: metadata.prost_cost(), + created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), + }; + + // Send the download piece finished request. + in_stream_tx + .send(AnnouncePeerRequest { + host_id: host_id.to_string(), + task_id: task_id.to_string(), + peer_id: peer_id.to_string(), + request: Some( + announce_peer_request::Request::DownloadPieceFinishedRequest( + DownloadPieceFinishedRequest { + piece: Some(piece.clone()), + }, + ), + ), + }) + .await?; + // Send the download progress. - download_progress_tx.send(metadata).await?; + download_progress_tx + .send(Ok(DownloadTaskResponse { + content_length, + piece: Some(piece.clone()), + })) + .await?; // Store the finished piece. - finished_pieces.push(interested_piece.clone()); + finished_pieces.push(metadata.clone()); + } + + if finished_pieces.len() == interested_pieces.len() { + info!("all pieces are downloaded from source"); + drop(in_stream_tx); + return Ok(finished_pieces); } } } } - // Check if all pieces are downloaded. - if finished_pieces.len() == interested_pieces.len() { - return Ok(download_progress_rx); - } - // If not all pieces are downloaded, return an error. Err(Error::Unknown( "not all pieces are downloaded with scheduler".to_string(), @@ -518,17 +666,15 @@ impl Task { } // download_partial_from_local_peer_into_file downloads a partial task from a local peer into a file. - #[instrument(skip(self, f, interested_pieces))] + #[instrument(skip_all)] async fn download_partial_from_local_peer_into_file( &self, f: &mut fs::File, task_id: &str, - peer_id: &str, interested_pieces: Vec, - ) -> Result> { - // Initialize the download progress channel. - let (download_progress_tx, download_progress_rx) = mpsc::channel(128); - + content_length: u64, + download_progress_tx: Sender>, + ) -> ClientResult> { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); @@ -542,7 +688,7 @@ impl Task { // Download the piece from the local peer. let mut reader = match self .piece - .download_from_local_peer(task_id, Some(peer_id), interested_piece.number) + .download_from_local_peer(task_id, interested_piece.number) .await { Ok(reader) => reader, @@ -563,32 +709,50 @@ impl Task { .write_into_file_and_verify(&mut reader, f, metadata.digest.as_str()) .await?; + info!("finished piece {} from local peer", metadata.number); + + // Construct the piece. + let piece = Piece { + number: metadata.number, + parent_id: None, + offset: metadata.offset, + length: metadata.length, + digest: metadata.digest.clone(), + content: None, + traffic_type: Some(TrafficType::LocalPeer as i32), + cost: metadata.prost_cost(), + created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), + }; + // Send the download progress. - download_progress_tx.send(metadata).await?; + download_progress_tx + .send(Ok(DownloadTaskResponse { + content_length, + piece: Some(piece.clone()), + })) + .await?; // Store the finished piece. finished_pieces.push(interested_piece.clone()); } - Ok(download_progress_rx) + Ok(finished_pieces) } // download_partial_from_source_into_file downloads a partial task from the source into a file. - #[instrument(skip(self, f, interested_pieces, url, header, timeout))] + #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] async fn download_partial_from_source_into_file( &self, f: &mut fs::File, - interested_pieces: Vec, task_id: &str, - peer_id: &str, + interested_pieces: Vec, url: String, header: HeaderMap, + content_length: u64, timeout: Option, - ) -> Result> { - // Initialize the download progress channel. - let (download_progress_tx, download_progress_rx) = mpsc::channel(128); - + download_progress_tx: Sender>, + ) -> ClientResult> { // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); @@ -599,7 +763,6 @@ impl Task { .piece .download_from_source( task_id, - peer_id, interested_piece.number, url.as_str(), interested_piece.offset, @@ -620,16 +783,37 @@ impl Task { .write_into_file_and_verify(&mut reader, f, metadata.digest.as_str()) .await?; + info!("finished piece {} from source", metadata.number); + + // Construct the piece. + let piece = Piece { + number: metadata.number, + parent_id: None, + offset: metadata.offset, + length: metadata.length, + digest: metadata.digest.clone(), + content: None, + traffic_type: Some(TrafficType::LocalPeer as i32), + cost: metadata.prost_cost(), + created_at: Some(prost_wkt_types::Timestamp::from(metadata.created_at)), + }; + // Send the download progress. - download_progress_tx.send(metadata).await?; + download_progress_tx + .send(Ok(DownloadTaskResponse { + content_length, + piece: Some(piece.clone()), + })) + .await?; // Store the finished piece. - finished_pieces.push(interested_piece.clone()); + finished_pieces.push(metadata.clone()); } // Check if all pieces are downloaded. if finished_pieces.len() == interested_pieces.len() { - return Ok(download_progress_rx); + info!("all pieces are downloaded from source"); + return Ok(finished_pieces); } // If not all pieces are downloaded, return an error. @@ -639,14 +823,14 @@ impl Task { } // get_content_length gets the content length of the task. - #[instrument(skip(self, url, header, timeout))] + #[instrument(skip_all)] pub async fn get_content_length( &self, task_id: &str, url: &str, header: HeaderMap, timeout: Option, - ) -> Result { + ) -> ClientResult { let task = self .storage .get_task(task_id)? diff --git a/src/task/piece.rs b/src/task/piece.rs index c03c8c1b..e2c68a59 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -117,11 +117,9 @@ impl Piece { } // calculate_interested calculates the interested pieces by content_length and range. - #[instrument(skip(self, piece_length, content_length, range))] + #[instrument(skip(self))] pub fn calculate_interested( &self, - task_id: &str, - peer_id: &str, piece_length: u64, content_length: u64, range: Option, @@ -146,7 +144,7 @@ impl Piece { // If offset is greater than content_length, break the loop. if offset >= content_length { let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; - piece.length = piece_length + content_length - piece.offset; + piece.length = piece_length + content_length - offset; pieces.push(piece); break; } @@ -156,7 +154,6 @@ impl Piece { break; } - offset = (number + 1) * piece_length; if offset > range.start { pieces.push(metadata::Piece { number: number as u32, @@ -170,6 +167,7 @@ impl Piece { }); } + offset = (number + 1) * piece_length; number += 1; } @@ -188,12 +186,11 @@ impl Piece { // If offset is greater than content_length, break the loop. if offset >= content_length { let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; - piece.length = piece_length + content_length - piece.offset; + piece.length = piece_length + content_length - offset; pieces.push(piece); break; } - offset = (number + 1) * piece_length; pieces.push(metadata::Piece { number: number as u32, offset, @@ -205,6 +202,7 @@ impl Piece { finished_at: None, }); + offset = (number + 1) * piece_length; number += 1; } @@ -234,11 +232,10 @@ impl Piece { } // collect_interested_from_remote_peer collects the interested pieces from remote peers. - #[instrument(skip(self, interested_pieces, candidate_parents))] + #[instrument(skip_all)] pub async fn collect_interested_from_remote_peer( &self, task_id: &str, - peer_id: &str, interested_pieces: Vec, candidate_parents: Vec, ) -> Vec { @@ -309,22 +306,20 @@ impl Piece { } // download_from_local_peer downloads a single piece from a local peer. - #[instrument(skip(self))] + #[instrument(skip_all, fields(number))] pub async fn download_from_local_peer( &self, task_id: &str, - peer_id: Option<&str>, number: u32, ) -> Result { self.storage.upload_piece(task_id, number).await } // download_from_remote_peer downloads a single piece from a remote peer. - #[instrument(skip(self, remote_peer))] + #[instrument(skip_all, fields(number))] pub async fn download_from_remote_peer( &self, task_id: &str, - peer_id: &str, number: u32, remote_peer: Peer, ) -> Result { @@ -408,12 +403,11 @@ impl Piece { } // download_from_source downloads a single piece from the source. - #[instrument(skip(self, url, offset, length, header, timeout))] + #[instrument(skip_all, fields(number))] #[allow(clippy::too_many_arguments)] pub async fn download_from_source( &self, task_id: &str, - peer_id: &str, number: u32, url: &str, offset: u64,