feat: add error log for backend (#522)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
ccf60ca125
commit
d500a0f6c0
|
|
@ -1070,10 +1070,13 @@ dependencies = [
|
|||
name = "dragonfly-client-core"
|
||||
version = "0.1.75"
|
||||
dependencies = [
|
||||
"hyper 1.2.0",
|
||||
"hyper-util",
|
||||
"libloading",
|
||||
"reqwest",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -14,10 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use dragonfly_client_core::{
|
||||
error::{ErrorType, OrErr},
|
||||
Error, Result,
|
||||
};
|
||||
use dragonfly_client_core::{Error, Result};
|
||||
use dragonfly_client_util::tls::NoVerifier;
|
||||
use futures::TryStreamExt;
|
||||
use rustls_pki_types::CertificateDer;
|
||||
|
|
@ -59,8 +56,7 @@ impl HTTP {
|
|||
|
||||
let client = reqwest::Client::builder()
|
||||
.use_preconfigured_tls(client_config_builder)
|
||||
.build()
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
.build()?;
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
|
@ -70,7 +66,10 @@ impl HTTP {
|
|||
impl crate::Backend for HTTP {
|
||||
// head gets the header of the request.
|
||||
async fn head(&self, request: crate::HeadRequest) -> Result<crate::HeadResponse> {
|
||||
info!("get request: {} {:?}", request.url, request.http_header);
|
||||
info!(
|
||||
"get request {} {}: {:?}",
|
||||
request.task_id, request.url, request.http_header
|
||||
);
|
||||
// The header of the request is required.
|
||||
let header = request.http_header.ok_or(Error::InvalidParameter)?;
|
||||
|
||||
|
|
@ -85,15 +84,20 @@ impl crate::Backend for HTTP {
|
|||
.timeout(request.timeout)
|
||||
.send()
|
||||
.await
|
||||
.or_err(ErrorType::HTTPError)
|
||||
.map_err(|err| {
|
||||
error!("head request failed: {}", err);
|
||||
error!(
|
||||
"head request failed {} {}: {}",
|
||||
request.task_id, request.url, err
|
||||
);
|
||||
err
|
||||
})?;
|
||||
|
||||
let header = response.headers().clone();
|
||||
let status_code = response.status();
|
||||
info!("head response: {:?} {:?}", status_code, header);
|
||||
info!(
|
||||
"head response {} {}: {:?} {:?}",
|
||||
request.task_id, request.url, status_code, header
|
||||
);
|
||||
Ok(crate::HeadResponse {
|
||||
success: status_code.is_success(),
|
||||
content_length: response.content_length(),
|
||||
|
|
@ -105,7 +109,10 @@ impl crate::Backend for HTTP {
|
|||
|
||||
// get gets the content of the request.
|
||||
async fn get(&self, request: crate::GetRequest) -> Result<crate::GetResponse<crate::Body>> {
|
||||
info!("get request: {} {:?}", request.url, request.http_header);
|
||||
info!(
|
||||
"get request {} {}: {:?}",
|
||||
request.piece_id, request.url, request.http_header
|
||||
);
|
||||
// The header of the request is required.
|
||||
let header = request.http_header.ok_or(Error::InvalidParameter)?;
|
||||
let response = self
|
||||
|
|
@ -115,9 +122,11 @@ impl crate::Backend for HTTP {
|
|||
.timeout(request.timeout)
|
||||
.send()
|
||||
.await
|
||||
.or_err(ErrorType::HTTPError)
|
||||
.map_err(|err| {
|
||||
error!("get request failed: {}", err);
|
||||
error!(
|
||||
"get request failed {} {}: {}",
|
||||
request.piece_id, request.url, err
|
||||
);
|
||||
err
|
||||
})?;
|
||||
|
||||
|
|
@ -167,6 +176,7 @@ mod tests {
|
|||
let http_backend = HTTP::new();
|
||||
let resp = http_backend
|
||||
.head(HeadRequest {
|
||||
task_id: "test".to_string(),
|
||||
url: server.url("/head"),
|
||||
http_header: Some(HeaderMap::new()),
|
||||
timeout: std::time::Duration::from_secs(5),
|
||||
|
|
@ -191,6 +201,7 @@ mod tests {
|
|||
let http_backend = HTTP::new();
|
||||
let resp = http_backend
|
||||
.head(HeadRequest {
|
||||
task_id: "test".to_string(),
|
||||
url: server.url("/head"),
|
||||
http_header: None,
|
||||
timeout: std::time::Duration::from_secs(5),
|
||||
|
|
@ -214,6 +225,7 @@ mod tests {
|
|||
let http_backend = HTTP::new();
|
||||
let mut resp = http_backend
|
||||
.get(GetRequest {
|
||||
piece_id: "test".to_string(),
|
||||
url: server.url("/get"),
|
||||
range: None,
|
||||
http_header: Some(HeaderMap::new()),
|
||||
|
|
|
|||
|
|
@ -39,6 +39,9 @@ pub type Body = Box<dyn AsyncRead + Send + Unpin>;
|
|||
|
||||
// HeadRequest is the head request for backend.
|
||||
pub struct HeadRequest {
|
||||
// task_id is the id of the task.
|
||||
pub task_id: String,
|
||||
|
||||
// url is the url of the request.
|
||||
pub url: String,
|
||||
|
||||
|
|
@ -72,6 +75,9 @@ pub struct HeadResponse {
|
|||
|
||||
// GetRequest is the get request for backend.
|
||||
pub struct GetRequest {
|
||||
// piece_id is the id of the piece.
|
||||
pub piece_id: String,
|
||||
|
||||
// url is the url of the request.
|
||||
pub url: String,
|
||||
|
||||
|
|
|
|||
|
|
@ -14,4 +14,7 @@ reqwest.workspace = true
|
|||
thiserror.workspace = true
|
||||
tonic.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper-util.workspace = true
|
||||
libloading = "0.8.3"
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ use super::message::Message;
|
|||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum ErrorType {
|
||||
StorageError,
|
||||
HTTPError,
|
||||
ConfigError,
|
||||
SerializeError,
|
||||
ValidationError,
|
||||
|
|
@ -41,7 +40,6 @@ impl ErrorType {
|
|||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
ErrorType::StorageError => "StorageError",
|
||||
ErrorType::HTTPError => "HTTPError",
|
||||
ErrorType::ConfigError => "ConfigError",
|
||||
ErrorType::ValidationError => "ValidationError",
|
||||
ErrorType::ParseError => "ParseError",
|
||||
|
|
|
|||
|
|
@ -137,10 +137,26 @@ pub enum DFError {
|
|||
#[error(transparent)]
|
||||
TonicStatus(#[from] tonic::Status),
|
||||
|
||||
// TonicStreamElapsed is the error for tonic stream elapsed.
|
||||
#[error(transparent)]
|
||||
TokioStreamElapsed(#[from] tokio_stream::Elapsed),
|
||||
|
||||
// ReqwestError is the error for reqwest.
|
||||
#[error(transparent)]
|
||||
ReqwesError(#[from] reqwest::Error),
|
||||
|
||||
// HyperError is the error for hyper.
|
||||
#[error(transparent)]
|
||||
HyperError(#[from] hyper::Error),
|
||||
|
||||
// BackendError is the error for backend.
|
||||
#[error(transparent)]
|
||||
BackendError(BackendError),
|
||||
|
||||
// HyperUtilClientLegacyError is the error for hyper util client legacy.
|
||||
#[error(transparent)]
|
||||
HyperUtilClientLegacyError(#[from] hyper_util::client::legacy::Error),
|
||||
|
||||
// ExternalError is the error for external error.
|
||||
#[error(transparent)]
|
||||
ExternalError(#[from] ExternalError),
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ use std::path::Path;
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::AsyncRead;
|
||||
use tracing::info;
|
||||
|
||||
pub mod content;
|
||||
pub mod metadata;
|
||||
|
|
@ -298,8 +299,11 @@ impl Storage {
|
|||
|
||||
// If the piece is finished, return.
|
||||
if piece.is_finished() {
|
||||
info!("wait for piece finished success {}", self.piece_id(task_id, number));
|
||||
return Ok(piece);
|
||||
}
|
||||
|
||||
info!("wait for piece finished {}", self.piece_id(task_id, number));
|
||||
}
|
||||
_ = &mut piece_timeout => {
|
||||
return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number)));
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ pub fn reqwest_headermap_to_hashmap(header: &HeaderMap<HeaderValue>) -> HashMap<
|
|||
pub fn hashmap_to_reqwest_headermap(
|
||||
header: &HashMap<String, String>,
|
||||
) -> Result<HeaderMap<HeaderValue>> {
|
||||
let header: HeaderMap = (header).try_into().or_err(ErrorType::HTTPError)?;
|
||||
let header: HeaderMap = (header).try_into().or_err(ErrorType::ParseError)?;
|
||||
Ok(header)
|
||||
}
|
||||
|
||||
|
|
@ -49,7 +49,7 @@ pub fn hashmap_to_reqwest_headermap(
|
|||
pub fn hashmap_to_hyper_header_map(
|
||||
header: &HashMap<String, String>,
|
||||
) -> Result<hyper::header::HeaderMap> {
|
||||
let header: hyper::header::HeaderMap = (header).try_into().or_err(ErrorType::HTTPError)?;
|
||||
let header: hyper::header::HeaderMap = (header).try_into().or_err(ErrorType::ParseError)?;
|
||||
Ok(header)
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +108,7 @@ pub fn header_vec_to_hashmap(raw_header: Vec<String>) -> Result<HashMap<String,
|
|||
pub fn get_range(header: &HeaderMap, content_length: u64) -> Result<Option<Range>> {
|
||||
match header.get(reqwest::header::RANGE) {
|
||||
Some(range) => {
|
||||
let range = range.to_str().or_err(ErrorType::HTTPError)?;
|
||||
let range = range.to_str().or_err(ErrorType::ParseError)?;
|
||||
Ok(Some(parse_range_header(range, content_length)?))
|
||||
}
|
||||
None => Ok(None),
|
||||
|
|
@ -120,10 +120,10 @@ pub fn get_range(header: &HeaderMap, content_length: u64) -> Result<Option<Range
|
|||
// "Range": "bytes=150-", "Range": "bytes=0-0,-1".
|
||||
pub fn parse_range_header(range_header_value: &str, content_length: u64) -> Result<Range> {
|
||||
let parsed_ranges =
|
||||
http_range_header::parse_range_header(range_header_value).or_err(ErrorType::HTTPError)?;
|
||||
http_range_header::parse_range_header(range_header_value).or_err(ErrorType::ParseError)?;
|
||||
let valid_ranges = parsed_ranges
|
||||
.validate(content_length)
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
.or_err(ErrorType::ParseError)?;
|
||||
|
||||
// Not support multiple ranges.
|
||||
let valid_range = valid_ranges
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ use dragonfly_api::dfdaemon::v2::{
|
|||
};
|
||||
use dragonfly_api::errordetails::v2::Backend;
|
||||
use dragonfly_client_config::dfdaemon::{Config, Rule};
|
||||
use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr};
|
||||
use dragonfly_client_core::error::{ErrorType, OrErr};
|
||||
use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
|
||||
use dragonfly_client_util::{
|
||||
http::{
|
||||
|
|
@ -684,8 +684,7 @@ async fn proxy_http(request: Request<hyper::body::Incoming>) -> ClientResult<Res
|
|||
.preserve_header_case(true)
|
||||
.title_case_headers(true)
|
||||
.handshake(io)
|
||||
.await
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
.await?;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = conn.await {
|
||||
|
|
@ -693,17 +692,8 @@ async fn proxy_http(request: Request<hyper::body::Incoming>) -> ClientResult<Res
|
|||
}
|
||||
});
|
||||
|
||||
let response = client
|
||||
.send_request(request)
|
||||
.await
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
//Ok(response.map(|b| b.map_err(ClientError::from).boxed()))
|
||||
Ok(response.map(|b| {
|
||||
b.map_err(|e| {
|
||||
ClientError::from(ExternalError::new(ErrorType::HTTPError).with_cause(Box::new(e)))
|
||||
})
|
||||
.boxed()
|
||||
}))
|
||||
let response = client.send_request(request).await?;
|
||||
Ok(response.map(|b| b.map_err(ClientError::from).boxed()))
|
||||
}
|
||||
|
||||
// proxy_https proxies the HTTPS request directly to the remote server.
|
||||
|
|
@ -737,13 +727,8 @@ async fn proxy_https(
|
|||
.build();
|
||||
|
||||
let client = Client::builder(TokioExecutor::new()).build::<_, hyper::body::Incoming>(https);
|
||||
let response = client.request(request).await.or_err(ErrorType::HTTPError)?;
|
||||
Ok(response.map(|b| {
|
||||
b.map_err(|e| {
|
||||
ClientError::from(ExternalError::new(ErrorType::HTTPError).with_cause(Box::new(e)))
|
||||
})
|
||||
.boxed()
|
||||
}))
|
||||
let response = client.request(request).await?;
|
||||
Ok(response.map(|b| b.map_err(ClientError::from).boxed()))
|
||||
}
|
||||
|
||||
// make_registry_mirror_request makes a registry mirror request by the request.
|
||||
|
|
|
|||
|
|
@ -134,6 +134,7 @@ impl Task {
|
|||
let backend = self.backend_factory.build(download.url.as_str())?;
|
||||
let response = backend
|
||||
.head(HeadRequest {
|
||||
task_id: id.to_string(),
|
||||
url: download.url,
|
||||
http_header: Some(request_header),
|
||||
timeout: self.config.download.piece_timeout,
|
||||
|
|
@ -460,7 +461,7 @@ impl Task {
|
|||
.timeout(self.config.scheduler.schedule_timeout);
|
||||
tokio::pin!(out_stream);
|
||||
|
||||
while let Some(message) = out_stream.try_next().await.or_err(ErrorType::HTTPError)? {
|
||||
while let Some(message) = out_stream.try_next().await? {
|
||||
// Check if the schedule count is exceeded.
|
||||
schedule_count += 1;
|
||||
if schedule_count >= self.config.scheduler.max_schedule_count {
|
||||
|
|
@ -1017,7 +1018,7 @@ impl Task {
|
|||
// Convert the header.
|
||||
let request_header: HeaderMap = (&download.request_header)
|
||||
.try_into()
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
.or_err(ErrorType::ParseError)?;
|
||||
|
||||
// Initialize the finished pieces.
|
||||
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
|
||||
|
|
@ -1308,7 +1309,7 @@ impl Task {
|
|||
// Convert the header.
|
||||
let request_header: HeaderMap = (&download.request_header)
|
||||
.try_into()
|
||||
.or_err(ErrorType::HTTPError)?;
|
||||
.or_err(ErrorType::ParseError)?;
|
||||
|
||||
// Initialize the finished pieces.
|
||||
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();
|
||||
|
|
|
|||
|
|
@ -405,6 +405,7 @@ impl Piece {
|
|||
let backend = self.backend_factory.build(url)?;
|
||||
let mut response = backend
|
||||
.get(GetRequest {
|
||||
piece_id: self.storage.piece_id(task_id, number),
|
||||
url: url.to_string(),
|
||||
range: Some(Range {
|
||||
start: offset,
|
||||
|
|
|
|||
Loading…
Reference in New Issue