From af1e5c27842f709d73f11dd29db42c3a4b6200cd Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 20 Nov 2023 21:42:24 +0800 Subject: [PATCH] feat: add indicatif to dfget and change length to uint32 (#119) Signed-off-by: Gaius --- Cargo.toml | 3 +- src/bin/dfget/main.rs | 27 +++++++++++++-- src/grpc/dfdaemon.rs | 60 +++++++++++++++++++++++++++++++-- src/storage/metadata.rs | 24 ++++++------- src/storage/mod.rs | 18 +++++----- src/task/mod.rs | 71 ++++++++++++--------------------------- src/task/piece.rs | 49 +++++++++++---------------- src/utils/id_generator.rs | 12 +++---- 8 files changed, 151 insertions(+), 113 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d336cb9a..786dc7aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ local-ip-address = "0.5.3" rocksdb = "0.21.0" num_cpus = "1.0" chrono = { version = "0.4.26", features = ["serde"] } -dragonfly-api = "2.0.53" +dragonfly-api = "2.0.56" sysinfo = "0.29.6" sha2 = "0.10" hex = "0.4" @@ -68,3 +68,4 @@ http = "0.2" rand = "0.8.5" prost-wkt-types = "0.4" tower = "0.4.13" +indicatif = "0.17.7" diff --git a/src/bin/dfget/main.rs b/src/bin/dfget/main.rs index d54985d4..be04e0b4 100644 --- a/src/bin/dfget/main.rs +++ b/src/bin/dfget/main.rs @@ -23,9 +23,11 @@ use dragonfly_client::config::dfget; use dragonfly_client::grpc::dfdaemon::DfdaemonClient; use dragonfly_client::tracing::init_tracing; use dragonfly_client::Error; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; +use std::{cmp::min, fmt::Write}; use tracing::Level; use url::Url; @@ -69,7 +71,7 @@ struct Args { default_value_t = 4194304, help = "Set the byte length of the piece" )] - piece_length: i32, + piece_length: u64, #[arg( long = "download-rate-limit", @@ -178,13 +180,32 @@ async fn main() -> Result<(), anyhow::Error> { }) .await?; - // TODO: Support to print progress. + // Initialize progress bar. + let pb = ProgressBar::new(0); + pb.set_style( + ProgressStyle::with_template( + "[{elapsed_precise}] [{wide_bar}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})", + ) + .unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn Write| { + write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() + }) + .progress_chars("#>-"), + ); + + // Dwonload file. + let mut downloaded = 0; let mut out_stream = response.into_inner(); while let Some(message) = out_stream.message().await? { let piece = message.piece.ok_or(Error::InvalidParameter())?; - println!("{:?}", piece) + pb.set_length(message.content_length); + + downloaded += piece.length; + let position = min(downloaded + piece.length, message.content_length); + pb.set_position(position); } + pb.finish_with_message("downloaded"); Ok(()) } diff --git a/src/grpc/dfdaemon.rs b/src/grpc/dfdaemon.rs index 2c330a6a..373adddb 100644 --- a/src/grpc/dfdaemon.rs +++ b/src/grpc/dfdaemon.rs @@ -16,6 +16,7 @@ use crate::shutdown; use crate::task; +use crate::utils::http::hashmap_to_headermap; use crate::Result as ClientResult; use dragonfly_api::common::v2::{Piece, Task}; use dragonfly_api::dfdaemon::v2::{ @@ -154,7 +155,7 @@ impl Dfdaemon for DfdaemonServerHandler { let task = self.task.clone(); // Get the piece numbers from the local storage. - let piece_numbers: Vec = task + let piece_numbers: Vec = task .piece .get_all(task_id.as_str()) .map_err(|e| { @@ -298,14 +299,67 @@ impl Dfdaemon for DfdaemonServerHandler { // Clone the task. let task = self.task.clone(); + // Generate the task id. + let task_id = task + .id_generator + .task_id( + download.url.as_str(), + download.digest.as_deref(), + download.tag.as_deref(), + download.application.as_deref(), + download.piece_length, + download.filters.clone(), + ) + .map_err(|e| { + error!("generate task id: {}", e); + Status::invalid_argument(e.to_string()) + })?; + + // Generate the host id. + let host_id = task.id_generator.host_id(); + + // Generate the peer id. + let peer_id = task.id_generator.peer_id(); + + // Convert the header. + let header = hashmap_to_headermap(&download.header).map_err(|e| { + error!("convert header: {}", e); + Status::invalid_argument(e.to_string()) + })?; + + // Get the content length. + let content_length = task + .get_content_length( + task_id.as_str(), + download.url.as_str(), + header.clone(), + None, + ) + .await + .map_err(|e| { + error!("get content length: {}", e); + Status::internal(e.to_string()) + })?; + // Initialize stream channel. let (out_stream_tx, out_stream_rx) = mpsc::channel(128); tokio::spawn(async move { - match task.download_into_file(download).await { + match task + .download_into_file( + task_id.as_str(), + host_id.as_str(), + peer_id.as_str(), + content_length, + header.clone(), + download.clone(), + ) + .await + { Ok(mut download_progress_rx) => { while let Some(finished_piece) = download_progress_rx.recv().await { out_stream_tx .send(Ok(DownloadTaskResponse { + content_length, piece: Some(Piece { number: finished_piece.number, parent_id: None, @@ -419,7 +473,7 @@ impl DfdaemonClient { pub async fn get_piece_numbers( &self, request: GetPieceNumbersRequest, - ) -> ClientResult> { + ) -> ClientResult> { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/src/storage/metadata.rs b/src/storage/metadata.rs index b90c310c..fd86af19 100644 --- a/src/storage/metadata.rs +++ b/src/storage/metadata.rs @@ -51,13 +51,13 @@ pub struct Task { pub id: String, // piece_length is the length of the piece. - pub piece_length: i32, + pub piece_length: u64, // uploaded_count is the count of the task uploaded by other peers. pub uploaded_count: u64, // content_length is the length of the task. - pub content_length: Option, + pub content_length: Option, // updated_at is the time when the task metadata is updated. If the task is downloaded // by other peers, it will also update updated_at. @@ -87,7 +87,7 @@ impl Task { #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Piece { // number is the piece number. - pub number: i32, + pub number: u32, // offset is the offset of the piece in the task. pub offset: u64, @@ -203,7 +203,7 @@ impl Metadata { } // download_task_started updates the metadata of the task when the task downloads started. - pub fn download_task_started(&self, id: &str, piece_length: i32) -> Result<()> { + pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> { let task = match self.get_task(id)? { // If the task exists, update the updated_at. Some(mut task) => { @@ -224,7 +224,7 @@ impl Metadata { } // set_task_content_length sets the content length of the task. - pub fn set_task_content_length(&self, id: &str, content_length: i64) -> Result<()> { + pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> { if let Some(mut task) = self.get_task(id)? { task.content_length = Some(content_length); return self.put_task(id, &task); @@ -278,7 +278,7 @@ impl Metadata { } // download_piece_started updates the metadata of the piece when the piece downloads started. - pub fn download_piece_started(&self, task_id: &str, number: i32) -> Result<()> { + pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> { self.put_piece( task_id, &Piece { @@ -294,7 +294,7 @@ impl Metadata { pub fn download_piece_finished( &self, task_id: &str, - number: i32, + number: u32, offset: u64, length: u64, digest: &str, @@ -313,7 +313,7 @@ impl Metadata { } // download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, task_id: &str, number: i32) -> Result<()> { + pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { match self.get_piece(task_id, number)? { Some(_piece) => self.delete_piece(task_id, number), None => Err(Error::PieceNotFound(self.piece_id(task_id, number))), @@ -321,7 +321,7 @@ impl Metadata { } // upload_piece_finished updates the metadata of the piece when piece uploads finished. - pub fn upload_piece_finished(&self, task_id: &str, number: i32) -> Result<()> { + pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<()> { match self.get_piece(task_id, number)? { Some(mut piece) => { piece.uploaded_count += 1; @@ -333,7 +333,7 @@ impl Metadata { } // get_piece gets the piece metadata. - pub fn get_piece(&self, task_id: &str, number: i32) -> Result> { + pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { let id = self.piece_id(task_id, number); let handle = self.cf_handle(PIECE_CF_NAME)?; match self.db.get_cf(handle, id.as_bytes())? { @@ -368,7 +368,7 @@ impl Metadata { } // delete_piece deletes the piece metadata. - fn delete_piece(&self, task_id: &str, number: i32) -> Result<()> { + fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> { let id = self.piece_id(task_id, number); let handle = self.cf_handle(PIECE_CF_NAME)?; self.db.delete_cf(handle, id.as_bytes())?; @@ -376,7 +376,7 @@ impl Metadata { } // piece_id returns the piece id. - pub fn piece_id(&self, task_id: &str, number: i32) -> String { + pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e2ae7288..8cfb5574 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -41,12 +41,12 @@ impl Storage { } // download_task_started updates the metadata of the task when the task downloads started. - pub fn download_task_started(&self, id: &str, piece_length: i32) -> Result<()> { + pub fn download_task_started(&self, id: &str, piece_length: u64) -> Result<()> { self.metadata.download_task_started(id, piece_length) } // set_task_content_length sets the content length of the task. - pub fn set_task_content_length(&self, id: &str, content_length: i64) -> Result<()> { + pub fn set_task_content_length(&self, id: &str, content_length: u64) -> Result<()> { self.metadata.set_task_content_length(id, content_length) } @@ -68,7 +68,7 @@ impl Storage { // download_piece_started updates the metadata of the piece and writes // the data of piece to file when the piece downloads started. - pub fn download_piece_started(&self, task_id: &str, number: i32) -> Result<()> { + pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<()> { self.metadata.download_piece_started(task_id, number) } @@ -76,7 +76,7 @@ impl Storage { pub async fn download_piece_from_source_finished( &self, task_id: &str, - number: i32, + number: u32, offset: u64, length: u64, reader: &mut R, @@ -98,7 +98,7 @@ impl Storage { pub async fn download_piece_from_remote_peer_finished( &self, task_id: &str, - number: i32, + number: u32, offset: u64, expected_digest: &str, reader: &mut R, @@ -123,13 +123,13 @@ impl Storage { } // download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, task_id: &str, number: i32) -> Result<()> { + pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { self.metadata.download_piece_failed(task_id, number) } // upload_piece updates the metadata of the piece and // returns the data of the piece. - pub async fn upload_piece(&self, task_id: &str, number: i32) -> Result { + pub async fn upload_piece(&self, task_id: &str, number: u32) -> Result { match self.metadata.get_piece(task_id, number)? { Some(piece) => { let reader = self @@ -144,7 +144,7 @@ impl Storage { } // get_piece returns the piece metadata. - pub fn get_piece(&self, task_id: &str, number: i32) -> Result> { + pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { let piece = self.metadata.get_piece(task_id, number)?; Ok(piece) } @@ -155,7 +155,7 @@ impl Storage { } // piece_id returns the piece id. - pub fn piece_id(&self, task_id: &str, number: i32) -> String { + pub fn piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 62612deb..4a90d9d7 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -17,7 +17,7 @@ use crate::backend::http::{Request as HTTPRequest, HTTP}; use crate::grpc::scheduler::SchedulerClient; use crate::storage::{metadata, Storage}; -use crate::utils::http::{hashmap_to_headermap, headermap_to_hashmap}; +use crate::utils::http::headermap_to_hashmap; use crate::utils::id_generator::IDGenerator; use crate::{Error, Result}; use dragonfly_api::common::v2::{Download, Piece, TrafficType}; @@ -44,7 +44,7 @@ pub mod piece; // Task represents a task manager. pub struct Task { // id_generator is the id generator. - id_generator: Arc, + pub id_generator: Arc, // manager_client is the grpc client of the manager. storage: Arc, @@ -92,33 +92,16 @@ impl Task { // download_into_file downloads a task into a file. pub async fn download_into_file( &self, + task_id: &str, + host_id: &str, + peer_id: &str, + content_length: u64, + header: HeaderMap, download: Download, ) -> Result> { // Initialize the download progress channel. let (download_progress_tx, download_progress_rx) = mpsc::channel(128); - // Generate the host id. - let host_id = self.id_generator.host_id(); - - // Generate the task id. - let task_id = self.id_generator.task_id( - download.url.clone(), - download.digest.clone(), - download.tag.clone(), - download.application.clone(), - download.piece_length, - download.filters.clone(), - )?; - - // Generate the peer id. - let peer_id = self.id_generator.peer_id(); - - // Get the output path. - let output_path = download.output_path.clone(); - - // Convert the header. - let header = hashmap_to_headermap(&download.header)?; - // Convert the timeout. let timeout: Option = match download.timeout.clone() { Some(timeout) => { @@ -130,17 +113,7 @@ impl Task { // Open the file. let mut f = OpenOptions::new() .write(true) - .open(output_path.as_str()) - .await?; - - // Get the content length of the task. - let content_length = self - .get_content_length( - task_id.as_str(), - download.url.as_str(), - header.clone(), - timeout, - ) + .open(download.output_path.as_str()) .await?; // Calculate the interested pieces to download. @@ -151,7 +124,7 @@ impl Task { )?; // Get the task from the local storage. - let task = self.get(task_id.as_str())?; + let task = self.get(task_id)?; match task { Some(task) => { // If the task is finished, return the file. @@ -160,7 +133,7 @@ impl Task { return self .download_partial_from_local_peer_into_file( &mut f, - task_id.as_str(), + task_id, interested_pieces, ) .await; @@ -171,7 +144,7 @@ impl Task { while let Some(finished_piece) = self .download_partial_from_local_peer_into_file( &mut f, - task_id.as_str(), + task_id, interested_pieces.clone(), ) .await? @@ -199,9 +172,9 @@ impl Task { match self .download_partial_with_scheduler_into_file( &mut f, - task_id.as_str(), - host_id.as_str(), - peer_id.as_str(), + task_id, + host_id, + peer_id, interested_pieces.clone(), download.clone(), ) @@ -215,7 +188,7 @@ impl Task { self.download_partial_from_source_into_file( &mut f, interested_pieces, - task_id.as_str(), + task_id, download.url.clone(), header.clone(), timeout, @@ -229,9 +202,9 @@ impl Task { match self .download_partial_with_scheduler_into_file( &mut f, - task_id.as_str(), - host_id.as_str(), - peer_id.as_str(), + task_id, + host_id, + peer_id, interested_pieces.clone(), download.clone(), ) @@ -245,7 +218,7 @@ impl Task { self.download_partial_from_source_into_file( &mut f, interested_pieces, - task_id.as_str(), + task_id, download.url.clone(), header.clone(), timeout, @@ -653,13 +626,13 @@ impl Task { } // get_content_length gets the content length of the task. - async fn get_content_length( + pub async fn get_content_length( &self, task_id: &str, url: &str, header: HeaderMap, timeout: Option, - ) -> Result { + ) -> Result { let task = self .storage .get_task(task_id)? @@ -686,7 +659,7 @@ impl Task { .ok_or(Error::InvalidContentLength())? .to_str() .map_err(|_| Error::InvalidContentLength())? - .parse::() + .parse::() .map_err(|_| Error::InvalidContentLength())?; // Set the content length of the task. diff --git a/src/task/piece.rs b/src/task/piece.rs index 17679b84..7e1510c1 100644 --- a/src/task/piece.rs +++ b/src/task/piece.rs @@ -40,7 +40,7 @@ use tracing::error; // CollectPiece represents a piece to collect. pub struct CollectPiece { // number is the piece number. - pub number: i32, + pub number: u32, // parent is the parent peer. pub parent: Peer, @@ -74,7 +74,7 @@ impl Piece { } // get gets a piece from the local storage. - pub fn get(&self, task_id: &str, number: i32) -> Result> { + pub fn get(&self, task_id: &str, number: u32) -> Result> { self.storage.get_piece(task_id, number) } @@ -116,20 +116,10 @@ impl Piece { // calculate_interested calculates the interested pieces by content_length and range. pub fn calculate_interested( &self, - piece_length: i32, - content_length: i64, + piece_length: u64, + content_length: u64, range: Option, ) -> Result> { - // piece_length must be greater than 0. - if piece_length <= 0 { - return Err(Error::InvalidParameter()); - } - - // content_length must be greater than 0. - if content_length < 0 { - return Err(Error::InvalidContentLength()); - } - // If content_length is 0, return empty piece. if content_length == 0 { return Ok(Vec::new()); @@ -137,7 +127,7 @@ impl Piece { // If range is not None, calculate the pieces by range. if let Some(range) = range { - if range.start < 0 || range.length <= 0 { + if range.length == 0 { return Err(Error::InvalidParameter()); } @@ -148,8 +138,7 @@ impl Piece { // If offset is greater than content_length, break the loop. if offset >= content_length { let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; - piece.length = - (piece_length + content_length as i32 - piece.offset as i32) as u64; + piece.length = piece_length + content_length - piece.offset; pieces.push(piece); break; } @@ -159,12 +148,12 @@ impl Piece { break; } - offset = i64::from((number + 1) * piece_length); + offset = (number + 1) * piece_length; if offset > range.start { pieces.push(metadata::Piece { - number, - offset: offset as u64, - length: piece_length as u64, + number: number as u32, + offset, + length: piece_length, digest: "".to_string(), uploaded_count: 0, updated_at: Utc::now().naive_utc(), @@ -187,16 +176,16 @@ impl Piece { // If offset is greater than content_length, break the loop. if offset >= content_length { let mut piece = pieces.pop().ok_or(Error::InvalidParameter())?; - piece.length = (piece_length + content_length as i32 - piece.offset as i32) as u64; + piece.length = piece_length + content_length - piece.offset; pieces.push(piece); break; } - offset = i64::from((number + 1) * piece_length); + offset = (number + 1) * piece_length; pieces.push(metadata::Piece { - number, - offset: offset as u64, - length: piece_length as u64, + number: number as u32, + offset, + length: piece_length, digest: "".to_string(), uploaded_count: 0, updated_at: Utc::now().naive_utc(), @@ -283,7 +272,7 @@ impl Piece { collect_pieces.shuffle(&mut rand::thread_rng()); // Filter the collect pieces and remove the duplicate pieces. - let mut visited: Vec = Vec::new(); + let mut visited: Vec = Vec::new(); collect_pieces.retain(|collect_piece| { interested_pieces .iter() @@ -304,7 +293,7 @@ impl Piece { pub async fn download_from_local_peer( &self, task_id: &str, - number: i32, + number: u32, ) -> Result { self.storage.upload_piece(task_id, number).await } @@ -313,7 +302,7 @@ impl Piece { pub async fn download_from_remote_peer( &self, task_id: &str, - number: i32, + number: u32, remote_peer: Peer, ) -> Result { // Create a dfdaemon client. @@ -400,7 +389,7 @@ impl Piece { pub async fn download_from_source( &self, task_id: &str, - number: i32, + number: u32, url: &str, offset: u64, length: u64, diff --git a/src/utils/id_generator.rs b/src/utils/id_generator.rs index fae1ffa7..440e2e2f 100644 --- a/src/utils/id_generator.rs +++ b/src/utils/id_generator.rs @@ -43,15 +43,15 @@ impl IDGenerator { // task_id generates the task id. pub fn task_id( &self, - url: String, - digest: Option, - tag: Option, - application: Option, - piece_length: i32, + url: &str, + digest: Option<&str>, + tag: Option<&str>, + application: Option<&str>, + piece_length: u64, filters: Vec, ) -> Result { // Filter the query parameters. - let url = Url::parse(url.as_str())?; + let url = Url::parse(url)?; let query = url .query_pairs() .filter(|(k, _)| filters.contains(&k.to_string()));