From 0e4bcbe0466fd08bcb51fe6d3cd1910d4fa3500b Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 28 Feb 2024 14:04:31 +0800 Subject: [PATCH] feat: if download back-to-source failed, proxy returns http code and header (#279) Signed-off-by: Gaius --- Cargo.lock | 23 +++- Cargo.toml | 5 +- src/bin/dfget/main.rs | 2 + src/grpc/dfdaemon_download.rs | 38 +++++- src/lib.rs | 4 + src/proxy/mod.rs | 234 +++++++++++++++++++--------------- src/task/mod.rs | 8 +- 7 files changed, 197 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec96c886..f3e5d77a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -586,9 +586,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.95" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12be8059402e09ac11a1bed5640074b22c283279f7800b110060fae2384dba7e" +checksum = "bfdd1f0811fdcc14ae36c565f894b405334e3810de68d062521c0b27be69b7dc" dependencies = [ "prost 0.11.9", "prost-types 0.12.3", @@ -627,6 +627,7 @@ dependencies = [ "humantime-serde", "hyper 1.1.0", "hyper-rustls", + "hyper-tls 0.6.0", "hyper-util", "indicatif", "lazy_static", @@ -1240,6 +1241,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.2" @@ -2340,7 +2357,7 @@ dependencies = [ "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", - "hyper-tls", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", diff --git a/Cargo.toml b/Cargo.toml index dd721f82..30480c6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ hostname = "^0.3" local-ip-address = "0.5.3" rocksdb = "0.22.0" num_cpus = "1.0" -dragonfly-api = "2.0.95" +dragonfly-api = "2.0.100" chrono = { version = "0.4.34", features = ["serde"] } sysinfo = "0.29.6" sha2 = "0.10" @@ -79,7 +79,8 @@ openssl = { version = "0.10", features = ["vendored"] } humantime-serde = "1.1.1" leaky-bucket = "1.0.1" hyper = { version = "1.1", features = ["full"] } -hyper-util = { version = "0.1.2", features = ["tokio", "server-auto"] } +hyper-util = { version = "0.1.2", features = ["client-legacy", "tokio", "server-auto", "http1"] } +hyper-tls = "0.6.0" tokio-rustls = "0.25" hyper-rustls = "0.26" http-body-util = "0.1.0" diff --git a/src/bin/dfget/main.rs b/src/bin/dfget/main.rs index f02b37fb..018069d5 100644 --- a/src/bin/dfget/main.rs +++ b/src/bin/dfget/main.rs @@ -247,6 +247,8 @@ async fn main() -> Result<(), anyhow::Error> { output_path: Some(args.output.into_os_string().into_string().unwrap()), timeout: Some(prost_wkt_types::Duration::try_from(args.timeout)?), need_back_to_source: false, + certificate: None, + tls_verify: false, }), }) .await diff --git a/src/grpc/dfdaemon_download.rs b/src/grpc/dfdaemon_download.rs index 4d0fa18b..f6b6a81a 100644 --- a/src/grpc/dfdaemon_download.rs +++ b/src/grpc/dfdaemon_download.rs @@ -16,8 +16,8 @@ use crate::shutdown; use crate::task; -use crate::utils::http::{get_range, hashmap_to_reqwest_headermap}; -use crate::Result as ClientResult; +use crate::utils::http::{get_range, hashmap_to_reqwest_headermap, reqwest_headermap_to_hashmap}; +use crate::{Error as ClientError, Result as ClientResult}; use dragonfly_api::common::v2::Task; use dragonfly_api::dfdaemon::v2::{ dfdaemon_download_client::DfdaemonDownloadClient as DfdaemonDownloadGRPCClient, @@ -27,6 +27,7 @@ use dragonfly_api::dfdaemon::v2::{ DeleteTaskRequest, DownloadTaskRequest, DownloadTaskResponse, StatTaskRequest as DfdaemonStatTaskRequest, UploadTaskRequest, }; +use dragonfly_api::errordetails::v2::Http; use dragonfly_api::scheduler::v2::{ LeaveHostRequest as SchedulerLeaveHostRequest, StatTaskRequest as SchedulerStatTaskRequest, }; @@ -38,6 +39,7 @@ use tokio::fs; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; +use tonic::Code; use tonic::{ transport::{Channel, Endpoint, Server, Uri}, Request, Response, Status, @@ -193,7 +195,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Download task started. info!("download task started: {:?}", download); - let task = self + let task = match self .task .download_started( task_id.as_str(), @@ -202,10 +204,32 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { request_header.clone(), ) .await - .map_err(|e| { - error!("download task started: {}", e); - Status::internal(e.to_string()) - })?; + { + Err(ClientError::HTTP(err)) => { + error!("download started failed by HTTP error: {}", err); + match serde_json::to_vec::(&Http { + header: reqwest_headermap_to_hashmap(&err.header), + status_code: err.status_code.as_u16() as i32, + }) { + Ok(json) => { + return Err(Status::with_details( + Code::Internal, + err.to_string(), + json.into(), + )); + } + Err(e) => { + error!("serialize HTTP error: {}", e); + return Err(Status::internal(e.to_string())); + } + } + } + Err(err) => { + error!("download started failed: {}", err); + return Err(Status::internal(err.to_string())); + } + Ok(task) => task, + }; // Download's range priority is higher than the request header's range. // If download protocol is http, use the range of the request header. diff --git a/src/lib.rs b/src/lib.rs index f7cc3186..88f2ef59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,10 @@ pub enum Error { #[error(transparent)] Http(#[from] hyper::http::Error), + // HyperUtilClientLegacyError is the error for hyper util client legacy. + #[error(transparent)] + HyperUtilClientLegacyError(#[from] hyper_util::client::legacy::Error), + // Validation is the error for validation. #[error(transparent)] Validation(#[from] validator::ValidationErrors), diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index a0ac46d7..8998b15c 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -31,6 +31,7 @@ use dragonfly_api::common::v2::{Download, TaskType}; use dragonfly_api::dfdaemon::v2::{ download_task_response, DownloadTaskRequest, DownloadTaskStartedResponse, }; +use dragonfly_api::errordetails::v2::Http; use futures_util::TryStreamExt; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody}; use hyper::body::Frame; @@ -39,7 +40,11 @@ use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::upgrade::Upgraded; use hyper::{Method, Request}; -use hyper_util::rt::{tokio::TokioIo, TokioExecutor}; +use hyper_tls::HttpsConnector; +use hyper_util::{ + client::legacy::Client, + rt::{tokio::TokioIo, TokioExecutor}, +}; use rcgen::Certificate; use rustls::ServerConfig; use std::collections::HashMap; @@ -176,9 +181,6 @@ pub async fn handler( request: Request, ca_cert: Arc>, ) -> ClientResult { - info!("handle request: {:?}", request); - - // TODO: Handle the mirror request. // If host is not set, it is the mirror request. if request.uri().host().is_none() { // Handle CONNECT request. @@ -201,81 +203,69 @@ pub async fn handler( return http_handler(config, task, request).await; } -// registry_mirror_http_handler handles the http request for the registry mirror. +// registry_mirror_http_handler handles the http request for the registry mirror by client. #[instrument(skip_all)] pub async fn registry_mirror_http_handler( config: Arc, task: Arc, - mut request: Request, + request: Request, ) -> ClientResult { - let registry_mirror_uri = http::Uri::from_static(Box::leak( - config.proxy.registry_mirror.addr.clone().into_boxed_str(), - )); - - *request.uri_mut() = registry_mirror_uri.clone(); - request.headers_mut().insert( - hyper::header::HOST, - registry_mirror_uri - .host() - .ok_or_else(|| ClientError::Unknown("registry mirror host is not set".to_string()))? - .parse()?, - ); - + let request = make_registry_mirror_request(config.clone(), request)?; return http_handler(config, task, request).await; } -// registry_mirror_https_handler handles the https request for the registry mirror. +// registry_mirror_https_handler handles the https request for the registry mirror by client. #[instrument(skip_all)] pub async fn registry_mirror_https_handler( config: Arc, task: Arc, - mut request: Request, + request: Request, ca_cert: Arc>, ) -> ClientResult { - let registry_mirror_uri = http::Uri::from_static(Box::leak( - config.proxy.registry_mirror.addr.clone().into_boxed_str(), - )); - - *request.uri_mut() = registry_mirror_uri.clone(); - request.headers_mut().insert( - hyper::header::HOST, - registry_mirror_uri - .host() - .ok_or_else(|| ClientError::Unknown("registry mirror host is not set".to_string()))? - .parse()?, - ); - + let request = make_registry_mirror_request(config.clone(), request)?; return https_handler(config, task, request, ca_cert).await; } -// http_handler handles the http request. +// http_handler handles the http request by client. #[instrument(skip_all)] pub async fn http_handler( config: Arc, task: Arc, request: Request, ) -> ClientResult { + info!("handle HTTP request: {:?}", request); + + // If find the matching rule, proxy the request via the dfdaemon. let request_uri = request.uri(); if let Some(rule) = find_matching_rule(config.proxy.rules.clone(), request_uri.to_string().as_str()) { info!( - "proxy HTTP request via dfdaemon for Method: {}, URI: {}", + "proxy HTTP request via dfdaemon for method: {}, uri: {}", request.method(), request_uri ); return proxy_by_dfdaemon(config, task, rule.clone(), request).await; } + if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) { + info!( + "proxy HTTPS request directly to remote server for method: {}, uri: {}", + request.method(), + request.uri() + ); + return proxy_https(request).await; + } + info!( - "proxy HTTP request directly to remote server for Method: {}, URI: {}", + "proxy HTTP request directly to remote server for method: {}, uri: {}", request.method(), - request_uri + request.uri() ); - proxy_http(request).await + return proxy_http(request).await; } -// https_handler handles the https request. +// https_handler handles the https request by client. #[instrument(skip_all)] pub async fn https_handler( config: Arc, @@ -283,6 +273,8 @@ pub async fn https_handler( request: Request, ca_cert: Arc>, ) -> ClientResult { + info!("handle HTTPS request: {:?}", request); + // Proxy the request directly to the remote server. if let Some(host) = request.uri().host() { let host = host.to_string(); @@ -302,6 +294,7 @@ pub async fn https_handler( return Ok(make_error_response( http::StatusCode::BAD_REQUEST, "CONNECT must be to a socket address", + None, )); } } @@ -363,27 +356,38 @@ pub async fn upgraded_handler( Span::current().record("uri", request.uri().to_string().as_str()); Span::current().record("method", request.method().as_str()); + // If find the matching rule, proxy the request via the dfdaemon. let request_uri = request.uri(); if let Some(rule) = find_matching_rule(config.proxy.rules.clone(), request_uri.to_string().as_str()) { info!( - "proxy HTTPS request via dfdaemon for Method: {}, URI: {}", + "proxy HTTPS request via dfdaemon for method: {}, uri: {}", request.method(), request_uri ); return proxy_by_dfdaemon(config, task, rule.clone(), request).await; } + if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) { + info!( + "proxy HTTPS request directly to remote server for method: {}, uri: {}", + request.method(), + request.uri() + ); + return proxy_https(request).await; + } + info!( - "proxy HTTPS request directly to remote server for Method: {}, URI: {}", + "proxy HTTP request directly to remote server for method: {}, uri: {}", request.method(), - request_uri + request.uri() ); - proxy_https(request).await + return proxy_http(request).await; } // proxy_by_dfdaemon proxies the request via the dfdaemon. +#[instrument(skip_all)] async fn proxy_by_dfdaemon( config: Arc, task: Arc, @@ -399,6 +403,7 @@ async fn proxy_by_dfdaemon( return Ok(make_error_response( http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string().as_str(), + None, )); } }; @@ -411,6 +416,7 @@ async fn proxy_by_dfdaemon( return Ok(make_error_response( http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string().as_str(), + None, )); } }; @@ -421,13 +427,37 @@ async fn proxy_by_dfdaemon( .await { Ok(response) => response, - Err(err) => { - error!("initiate download task failed: {}", err); - return Ok(make_error_response( - http::StatusCode::INTERNAL_SERVER_ERROR, - err.to_string().as_str(), - )); - } + Err(err) => match err { + ClientError::TonicStatus(err) => { + match serde_json::from_slice::(err.details()) { + Ok(http) => { + error!("download task failed by HTTP error: {:?}", http); + return Ok(make_error_response( + http::StatusCode::from_u16(http.status_code as u16) + .unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR), + "download task failed", + Some(hashmap_to_hyper_header_map(&http.header)?), + )); + } + Err(err) => { + error!("download task failed by tonic status: {}", err.to_string()); + return Ok(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + err.to_string().as_str(), + None, + )); + } + }; + } + _ => { + error!("download task failed: {}", err); + return Ok(make_error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + err.to_string().as_str(), + None, + )); + } + }, }; // Handle the response from the download grpc server. @@ -437,6 +467,7 @@ async fn proxy_by_dfdaemon( return Ok(make_error_response( http::StatusCode::INTERNAL_SERVER_ERROR, "response message failed", + None, )); }; @@ -449,6 +480,7 @@ async fn proxy_by_dfdaemon( return Ok(make_error_response( http::StatusCode::INTERNAL_SERVER_ERROR, "response is not started", + None, )); }; @@ -555,19 +587,21 @@ async fn proxy_by_dfdaemon( } // proxy_http proxies the HTTP request directly to the remote server. +#[instrument(skip_all)] async fn proxy_http(request: Request) -> ClientResult { let Some(host) = request.uri().host() else { error!("CONNECT host is not socket addr: {:?}", request.uri()); return Ok(make_error_response( http::StatusCode::BAD_REQUEST, "CONNECT must be to a socket address", + None, )); }; let port = request.uri().port_u16().unwrap_or(80); let stream = TcpStream::connect((host, port)).await?; let io = TokioIo::new(stream); - let (mut sender, conn) = Builder::new() + let (mut client, conn) = Builder::new() .preserve_header_case(true) .title_case_headers(true) .handshake(io) @@ -579,61 +613,44 @@ async fn proxy_http(request: Request) -> ClientResult) -> ClientResult { - let Some(host) = request.uri().host() else { - error!("CONNECT host is not socket addr: {:?}", request.uri()); - return Ok(make_error_response( - http::StatusCode::BAD_REQUEST, - "CONNECT must be to a socket address", - )); - }; - let port = request.uri().port_u16().unwrap_or(443); - - let stream = TcpStream::connect((host, port)).await?; - let io = TokioIo::new(stream); - let (mut sender, conn) = Builder::new() - .preserve_header_case(true) - .title_case_headers(true) - .handshake(io) - .await?; - - tokio::task::spawn(async move { - if let Err(err) = conn.await { - error!("connection failed: {:?}", err); - } - }); - - // Get the authority and path from the request. - let Some(authority) = request.uri().authority() else { - error!("CONNECT authority is not set: {:?}", request.uri()); - return Ok(make_error_response( - http::StatusCode::BAD_REQUEST, - "CONNECT authority is not set", - )); - }; - let path = request.uri().path(); - - // TODO: When body is not empty, the request will be blocked. - // Construct the new request. - let mut new_request = Request::builder() - .uri(path) - .header(hyper::header::HOST, authority.as_str()) - .body(Empty::::new())?; - - // Copy the headers from the original request to the new request. - for (name, value) in request.headers() { - new_request.headers_mut().insert(name, value.clone()); - } - - let response = sender.send_request(new_request).await?; + let https = HttpsConnector::new(); + let client = Client::builder(TokioExecutor::new()).build::<_, hyper::body::Incoming>(https); + let response = client.request(request).await?; Ok(response.map(|b| b.map_err(ClientError::from).boxed())) } +// make_registry_mirror_request makes a registry mirror request by the request. +#[instrument(skip_all)] +fn make_registry_mirror_request( + config: Arc, + mut request: Request, +) -> ClientResult> { + let registry_mirror_uri = format!( + "{}{}", + config.proxy.registry_mirror.addr, + request.uri().path() + ) + .parse::()?; + + *request.uri_mut() = registry_mirror_uri.clone(); + request.headers_mut().insert( + hyper::header::HOST, + registry_mirror_uri + .host() + .ok_or_else(|| ClientError::Unknown("registry mirror host is not set".to_string()))? + .parse()?, + ); + + Ok(request) +} + // make_download_task_requet makes a download task request by the request. #[instrument(skip_all)] fn make_download_task_request( @@ -641,7 +658,10 @@ fn make_download_task_request( rule: Rule, ) -> ClientResult { // Convert the Reqwest header to the Hyper header. - let reqwest_request_header = hyper_headermap_to_reqwest_headermap(request.headers()); + let mut reqwest_request_header = hyper_headermap_to_reqwest_headermap(request.headers()); + + // Registry will return the 403 status code if the Host header is set. + reqwest_request_header.remove(reqwest::header::HOST); // Construct the download url. Ok(DownloadTaskRequest { @@ -663,6 +683,8 @@ fn make_download_task_request( output_path: None, timeout: None, need_back_to_source: false, + certificate: None, + tls_verify: false, }), }) } @@ -726,9 +748,19 @@ fn find_matching_rule(rules: Option>, url: &str) -> Option { // make_error_response makes an error response with the given status and message. #[instrument(skip_all)] -fn make_error_response(status: http::StatusCode, message: &str) -> Response { +fn make_error_response( + status: http::StatusCode, + message: &str, + header: Option, +) -> Response { let mut response = Response::new(full(message.as_bytes().to_vec())); *response.status_mut() = status; + if let Some(header) = header { + for (k, v) in header.iter() { + response.headers_mut().insert(k, v.clone()); + } + } + response } diff --git a/src/task/mod.rs b/src/task/mod.rs index bf1ea57c..a9c6a51f 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -27,13 +27,14 @@ use dragonfly_api::dfdaemon::{ self, v2::{download_task_response, DownloadTaskResponse}, }; +use dragonfly_api::errordetails::v2::Http; use dragonfly_api::scheduler::v2::{ announce_peer_request, announce_peer_response, download_piece_back_to_source_failed_request, AnnouncePeerRequest, DownloadPeerBackToSourceFailedRequest, DownloadPeerBackToSourceFinishedRequest, DownloadPeerBackToSourceStartedRequest, DownloadPeerFailedRequest, DownloadPeerFinishedRequest, DownloadPeerStartedRequest, DownloadPieceBackToSourceFailedRequest, DownloadPieceBackToSourceFinishedRequest, - DownloadPieceFailedRequest, DownloadPieceFinishedRequest, HttpResponse, RegisterPeerRequest, + DownloadPieceFailedRequest, DownloadPieceFinishedRequest, RegisterPeerRequest, RescheduleRequest, }; use reqwest::header::HeaderMap; @@ -1066,11 +1067,10 @@ impl Task { request: Some(announce_peer_request::Request::DownloadPieceBackToSourceFailedRequest( DownloadPieceBackToSourceFailedRequest{ piece_number: None, - response: Some(download_piece_back_to_source_failed_request::Response::HttpResponse( - HttpResponse{ + response: Some(download_piece_back_to_source_failed_request::Response::Http( + Http{ header: reqwest_headermap_to_hashmap(&err.header), status_code: err.status_code.as_u16() as i32, - status: err.status_code.canonical_reason().unwrap_or("").to_string(), } )), }