From d500a0f6c04c033d90d23cdfcccb366cabefb933 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 5 Jun 2024 14:19:17 +0800 Subject: [PATCH] feat: add error log for backend (#522) Signed-off-by: Gaius --- Cargo.lock | 3 ++ dragonfly-client-backend/src/http.rs | 38 +++++++++++++++-------- dragonfly-client-backend/src/lib.rs | 6 ++++ dragonfly-client-core/Cargo.toml | 3 ++ dragonfly-client-core/src/error/errors.rs | 2 -- dragonfly-client-core/src/error/mod.rs | 16 ++++++++++ dragonfly-client-storage/src/lib.rs | 4 +++ dragonfly-client-util/src/http/mod.rs | 10 +++--- dragonfly-client/src/proxy/mod.rs | 27 ++++------------ dragonfly-client/src/task/mod.rs | 7 +++-- dragonfly-client/src/task/piece.rs | 1 + 11 files changed, 73 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e6477d6..dd894669 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,10 +1070,13 @@ dependencies = [ name = "dragonfly-client-core" version = "0.1.75" dependencies = [ + "hyper 1.2.0", + "hyper-util", "libloading", "reqwest", "thiserror", "tokio", + "tokio-stream", "tonic", ] diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 011d01e1..cbd6c27b 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -14,10 +14,7 @@ * limitations under the License. */ -use dragonfly_client_core::{ - error::{ErrorType, OrErr}, - Error, Result, -}; +use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::tls::NoVerifier; use futures::TryStreamExt; use rustls_pki_types::CertificateDer; @@ -59,8 +56,7 @@ impl HTTP { let client = reqwest::Client::builder() .use_preconfigured_tls(client_config_builder) - .build() - .or_err(ErrorType::HTTPError)?; + .build()?; Ok(client) } } @@ -70,7 +66,10 @@ impl HTTP { impl crate::Backend for HTTP { // head gets the header of the request. async fn head(&self, request: crate::HeadRequest) -> Result { - info!("get request: {} {:?}", request.url, request.http_header); + info!( + "get request {} {}: {:?}", + request.task_id, request.url, request.http_header + ); // The header of the request is required. let header = request.http_header.ok_or(Error::InvalidParameter)?; @@ -85,15 +84,20 @@ impl crate::Backend for HTTP { .timeout(request.timeout) .send() .await - .or_err(ErrorType::HTTPError) .map_err(|err| { - error!("head request failed: {}", err); + error!( + "head request failed {} {}: {}", + request.task_id, request.url, err + ); err })?; let header = response.headers().clone(); let status_code = response.status(); - info!("head response: {:?} {:?}", status_code, header); + info!( + "head response {} {}: {:?} {:?}", + request.task_id, request.url, status_code, header + ); Ok(crate::HeadResponse { success: status_code.is_success(), content_length: response.content_length(), @@ -105,7 +109,10 @@ impl crate::Backend for HTTP { // get gets the content of the request. async fn get(&self, request: crate::GetRequest) -> Result> { - info!("get request: {} {:?}", request.url, request.http_header); + info!( + "get request {} {}: {:?}", + request.piece_id, request.url, request.http_header + ); // The header of the request is required. let header = request.http_header.ok_or(Error::InvalidParameter)?; let response = self @@ -115,9 +122,11 @@ impl crate::Backend for HTTP { .timeout(request.timeout) .send() .await - .or_err(ErrorType::HTTPError) .map_err(|err| { - error!("get request failed: {}", err); + error!( + "get request failed {} {}: {}", + request.piece_id, request.url, err + ); err })?; @@ -167,6 +176,7 @@ mod tests { let http_backend = HTTP::new(); let resp = http_backend .head(HeadRequest { + task_id: "test".to_string(), url: server.url("/head"), http_header: Some(HeaderMap::new()), timeout: std::time::Duration::from_secs(5), @@ -191,6 +201,7 @@ mod tests { let http_backend = HTTP::new(); let resp = http_backend .head(HeadRequest { + task_id: "test".to_string(), url: server.url("/head"), http_header: None, timeout: std::time::Duration::from_secs(5), @@ -214,6 +225,7 @@ mod tests { let http_backend = HTTP::new(); let mut resp = http_backend .get(GetRequest { + piece_id: "test".to_string(), url: server.url("/get"), range: None, http_header: Some(HeaderMap::new()), diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index fa270586..673ec230 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -39,6 +39,9 @@ pub type Body = Box; // HeadRequest is the head request for backend. pub struct HeadRequest { + // task_id is the id of the task. + pub task_id: String, + // url is the url of the request. pub url: String, @@ -72,6 +75,9 @@ pub struct HeadResponse { // GetRequest is the get request for backend. pub struct GetRequest { + // piece_id is the id of the piece. + pub piece_id: String, + // url is the url of the request. pub url: String, diff --git a/dragonfly-client-core/Cargo.toml b/dragonfly-client-core/Cargo.toml index 59093411..186a0c9e 100644 --- a/dragonfly-client-core/Cargo.toml +++ b/dragonfly-client-core/Cargo.toml @@ -14,4 +14,7 @@ reqwest.workspace = true thiserror.workspace = true tonic.workspace = true tokio.workspace = true +tokio-stream.workspace = true +hyper.workspace = true +hyper-util.workspace = true libloading = "0.8.3" diff --git a/dragonfly-client-core/src/error/errors.rs b/dragonfly-client-core/src/error/errors.rs index 31793f95..34fd2d55 100644 --- a/dragonfly-client-core/src/error/errors.rs +++ b/dragonfly-client-core/src/error/errors.rs @@ -22,7 +22,6 @@ use super::message::Message; #[derive(Debug, PartialEq, Eq, Clone)] pub enum ErrorType { StorageError, - HTTPError, ConfigError, SerializeError, ValidationError, @@ -41,7 +40,6 @@ impl ErrorType { pub fn as_str(&self) -> &'static str { match self { ErrorType::StorageError => "StorageError", - ErrorType::HTTPError => "HTTPError", ErrorType::ConfigError => "ConfigError", ErrorType::ValidationError => "ValidationError", ErrorType::ParseError => "ParseError", diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index bc8501ca..3ed9deba 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -137,10 +137,26 @@ pub enum DFError { #[error(transparent)] TonicStatus(#[from] tonic::Status), + // TonicStreamElapsed is the error for tonic stream elapsed. + #[error(transparent)] + TokioStreamElapsed(#[from] tokio_stream::Elapsed), + + // ReqwestError is the error for reqwest. + #[error(transparent)] + ReqwesError(#[from] reqwest::Error), + + // HyperError is the error for hyper. + #[error(transparent)] + HyperError(#[from] hyper::Error), + // BackendError is the error for backend. #[error(transparent)] BackendError(BackendError), + // HyperUtilClientLegacyError is the error for hyper util client legacy. + #[error(transparent)] + HyperUtilClientLegacyError(#[from] hyper_util::client::legacy::Error), + // ExternalError is the error for external error. #[error(transparent)] ExternalError(#[from] ExternalError), diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 10b904a7..91d5b989 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -23,6 +23,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; +use tracing::info; pub mod content; pub mod metadata; @@ -298,8 +299,11 @@ impl Storage { // If the piece is finished, return. if piece.is_finished() { + info!("wait for piece finished success {}", self.piece_id(task_id, number)); return Ok(piece); } + + info!("wait for piece finished {}", self.piece_id(task_id, number)); } _ = &mut piece_timeout => { return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number))); diff --git a/dragonfly-client-util/src/http/mod.rs b/dragonfly-client-util/src/http/mod.rs index 340800dd..741fb8eb 100644 --- a/dragonfly-client-util/src/http/mod.rs +++ b/dragonfly-client-util/src/http/mod.rs @@ -41,7 +41,7 @@ pub fn reqwest_headermap_to_hashmap(header: &HeaderMap) -> HashMap< pub fn hashmap_to_reqwest_headermap( header: &HashMap, ) -> Result> { - let header: HeaderMap = (header).try_into().or_err(ErrorType::HTTPError)?; + let header: HeaderMap = (header).try_into().or_err(ErrorType::ParseError)?; Ok(header) } @@ -49,7 +49,7 @@ pub fn hashmap_to_reqwest_headermap( pub fn hashmap_to_hyper_header_map( header: &HashMap, ) -> Result { - let header: hyper::header::HeaderMap = (header).try_into().or_err(ErrorType::HTTPError)?; + let header: hyper::header::HeaderMap = (header).try_into().or_err(ErrorType::ParseError)?; Ok(header) } @@ -108,7 +108,7 @@ pub fn header_vec_to_hashmap(raw_header: Vec) -> Result Result> { match header.get(reqwest::header::RANGE) { Some(range) => { - let range = range.to_str().or_err(ErrorType::HTTPError)?; + let range = range.to_str().or_err(ErrorType::ParseError)?; Ok(Some(parse_range_header(range, content_length)?)) } None => Ok(None), @@ -120,10 +120,10 @@ pub fn get_range(header: &HeaderMap, content_length: u64) -> Result Result { let parsed_ranges = - http_range_header::parse_range_header(range_header_value).or_err(ErrorType::HTTPError)?; + http_range_header::parse_range_header(range_header_value).or_err(ErrorType::ParseError)?; let valid_ranges = parsed_ranges .validate(content_length) - .or_err(ErrorType::HTTPError)?; + .or_err(ErrorType::ParseError)?; // Not support multiple ranges. let valid_range = valid_ranges diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index f42f89c2..e8eabc6b 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -27,7 +27,7 @@ use dragonfly_api::dfdaemon::v2::{ }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_client_config::dfdaemon::{Config, Rule}; -use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr}; +use dragonfly_client_core::error::{ErrorType, OrErr}; use dragonfly_client_core::{Error as ClientError, Result as ClientResult}; use dragonfly_client_util::{ http::{ @@ -684,8 +684,7 @@ async fn proxy_http(request: Request) -> ClientResult) -> ClientResult(https); - let response = client.request(request).await.or_err(ErrorType::HTTPError)?; - Ok(response.map(|b| { - b.map_err(|e| { - ClientError::from(ExternalError::new(ErrorType::HTTPError).with_cause(Box::new(e))) - }) - .boxed() - })) + let response = client.request(request).await?; + Ok(response.map(|b| b.map_err(ClientError::from).boxed())) } // make_registry_mirror_request makes a registry mirror request by the request. diff --git a/dragonfly-client/src/task/mod.rs b/dragonfly-client/src/task/mod.rs index 7f271f2f..9da212c4 100644 --- a/dragonfly-client/src/task/mod.rs +++ b/dragonfly-client/src/task/mod.rs @@ -134,6 +134,7 @@ impl Task { let backend = self.backend_factory.build(download.url.as_str())?; let response = backend .head(HeadRequest { + task_id: id.to_string(), url: download.url, http_header: Some(request_header), timeout: self.config.download.piece_timeout, @@ -460,7 +461,7 @@ impl Task { .timeout(self.config.scheduler.schedule_timeout); tokio::pin!(out_stream); - while let Some(message) = out_stream.try_next().await.or_err(ErrorType::HTTPError)? { + while let Some(message) = out_stream.try_next().await? { // Check if the schedule count is exceeded. schedule_count += 1; if schedule_count >= self.config.scheduler.max_schedule_count { @@ -1017,7 +1018,7 @@ impl Task { // Convert the header. let request_header: HeaderMap = (&download.request_header) .try_into() - .or_err(ErrorType::HTTPError)?; + .or_err(ErrorType::ParseError)?; // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); @@ -1308,7 +1309,7 @@ impl Task { // Convert the header. let request_header: HeaderMap = (&download.request_header) .try_into() - .or_err(ErrorType::HTTPError)?; + .or_err(ErrorType::ParseError)?; // Initialize the finished pieces. let mut finished_pieces: Vec = Vec::new(); diff --git a/dragonfly-client/src/task/piece.rs b/dragonfly-client/src/task/piece.rs index c96dda92..abec4f99 100644 --- a/dragonfly-client/src/task/piece.rs +++ b/dragonfly-client/src/task/piece.rs @@ -405,6 +405,7 @@ impl Piece { let backend = self.backend_factory.build(url)?; let mut response = backend .get(GetRequest { + piece_id: self.storage.piece_id(task_id, number), url: url.to_string(), range: Some(Range { start: offset,