From 9612b57a9710072dc49fdace7fc7f7ea43eefe42 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 26 Sep 2025 00:44:03 +0800 Subject: [PATCH] refactor(quic): QUIC downloader support (#1380) Signed-off-by: Gaius --- Cargo.lock | 18 +++--- Cargo.toml | 19 +++--- dragonfly-client-core/Cargo.toml | 2 +- dragonfly-client-core/src/error/mod.rs | 64 ++++++++++----------- dragonfly-client-storage/Cargo.toml | 2 +- dragonfly-client-storage/src/client/quic.rs | 26 +++++---- dragonfly-client-storage/src/server/quic.rs | 24 ++++++-- dragonfly-client/src/bin/dfget/main.rs | 6 +- dragonfly-client/src/resource/piece.rs | 4 +- 9 files changed, 91 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fe77d87..109443bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1018,7 +1018,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "1.0.24" +version = "1.0.25" dependencies = [ "anyhow", "bytes", @@ -1094,7 +1094,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "1.0.24" +version = "1.0.25" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1125,7 +1125,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "1.0.24" +version = "1.0.25" dependencies = [ "bytesize", "bytesize-serde", @@ -1155,7 +1155,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "1.0.24" +version = "1.0.25" dependencies = [ "headers 0.4.1", "hyper 1.6.0", @@ -1175,7 +1175,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "1.0.24" +version = "1.0.25" dependencies = [ "anyhow", "clap", @@ -1192,7 +1192,7 @@ dependencies = [ [[package]] name = "dragonfly-client-metric" -version = "1.0.24" +version = "1.0.25" dependencies = [ "dragonfly-api", "dragonfly-client-config", @@ -1207,7 +1207,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "1.0.24" +version = "1.0.25" dependencies = [ "bincode", "bytes", @@ -1241,7 +1241,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "1.0.24" +version = "1.0.25" dependencies = [ "base64 0.22.1", "bytes", @@ -1680,7 +1680,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "1.0.24" +version = "1.0.25" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 0ad55764..f4473b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ ] [workspace.package] -version = "1.0.24" +version = "1.0.25" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -23,14 +23,14 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "1.0.24" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.24" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.24" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.24" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.24" } -dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.24" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.24" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.24" } +dragonfly-client = { path = "dragonfly-client", version = "1.0.25" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.25" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.25" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.25" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.25" } +dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.25" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.25" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.25" } dragonfly-api = "=2.1.70" thiserror = "2.0" futures = "0.3.31" @@ -117,7 +117,6 @@ hostname = "^0.4" tonic-health = "0.12.3" hashring = "0.3.6" reqwest-tracing = "0.5" -quinn = "0.11.9" mocktail = "0.3.0" [profile.release] diff --git a/dragonfly-client-core/Cargo.toml b/dragonfly-client-core/Cargo.toml index 3a8bb8ae..597cf5f0 100644 --- a/dragonfly-client-core/Cargo.toml +++ b/dragonfly-client-core/Cargo.toml @@ -23,4 +23,4 @@ opendal.workspace = true url.workspace = true headers.workspace = true vortex-protocol.workspace = true -quinn.workspace = true +quinn = "0.11.9" diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index cd8a1847..cdba6327 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -170,38 +170,6 @@ pub enum DFError { #[error(transparent)] VortexProtocolError(#[from] vortex_protocol::error::Error), - /// QuinnConnectError is the error for quinn connect. - #[error(transparent)] - QuinnConnectError(#[from] quinn::ConnectError), - - /// QuinnConnectionError is the error for quinn connection. - #[error(transparent)] - QuinnConnectionError(#[from] quinn::ConnectionError), - - /// QuinnWriteError is the error for quinn write. - #[error(transparent)] - QuinnWriteError(#[from] quinn::WriteError), - - /// QuinnReadError is the error for quinn read. - #[error(transparent)] - QuinnReadError(#[from] quinn::ReadError), - - /// QuinnReadExactError is the error for quinn read exact. - #[error(transparent)] - QuinnReadExactError(#[from] quinn::ReadExactError), - - /// QuinnRustlsNoInitialCipherSuite is the error for quinn no initial cipher suite. - #[error(transparent)] - QuinnRustlsNoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite), - - /// QuinnRustlsError is the error for rustls. - #[error(transparent)] - QuinnRustlsError(#[from] quinn::rustls::Error), - - /// QuinnClosedStream is the error for quinn closed stream. - #[error(transparent)] - QuinnClosedStream(#[from] quinn::ClosedStream), - /// TonicStatus is the error for tonic status. #[error(transparent)] TonicStatus(#[from] tonic::Status), @@ -282,6 +250,38 @@ impl From> for DFError { } } +// --- Conversions for quinn (QUIC) errors --- +// These allow using the ? operator directly with quinn APIs without repetitive map_err. +impl From for DFError { + fn from(err: quinn::ConnectError) -> Self { + DFError::Unknown(format!("quinn connect error: {}", err)) + } +} + +impl From for DFError { + fn from(err: quinn::ConnectionError) -> Self { + DFError::Unknown(format!("quinn connection error: {}", err)) + } +} + +impl From for DFError { + fn from(err: quinn::WriteError) -> Self { + DFError::Unknown(format!("quinn write error: {}", err)) + } +} + +impl From for DFError { + fn from(err: quinn::ReadError) -> Self { + DFError::Unknown(format!("quinn read error: {}", err)) + } +} + +impl From for DFError { + fn from(err: quinn::ReadExactError) -> Self { + DFError::Unknown(format!("quinn read exact error: {}", err)) + } +} + /// SendTimeoutError is the error for send timeout. impl From> for DFError { fn from(err: tokio::sync::mpsc::error::SendTimeoutError) -> Self { diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index 4b977986..4a367bd4 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -31,10 +31,10 @@ bytesize.workspace = true leaky-bucket.workspace = true vortex-protocol.workspace = true rustls.workspace = true -quinn.workspace = true num_cpus = "1.17" bincode = "1.3.3" walkdir = "2.5.0" +quinn = "0.11.9" nix = { version = "0.30.1", features = ["socket", "net"] } [dev-dependencies] diff --git a/dragonfly-client-storage/src/client/quic.rs b/dragonfly-client-storage/src/client/quic.rs index 58dcde03..ab496ea3 100644 --- a/dragonfly-client-storage/src/client/quic.rs +++ b/dragonfly-client-storage/src/client/quic.rs @@ -179,12 +179,17 @@ impl QUICClient { &self, request: Bytes, ) -> ClientResult<(RecvStream, SendStream)> { - let mut client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from( - quinn::rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(NoVerifier::new()) - .with_no_client_auth(), - )?)); + let mut client_config = ClientConfig::new(Arc::new( + QuicClientConfig::try_from( + quinn::rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(NoVerifier::new()) + .with_no_client_auth(), + ) + .map_err(|err| { + ClientError::Unknown(format!("failed to create quic client config: {}", err)) + })?, + )); let mut transport = TransportConfig::default(); transport.congestion_controller_factory(Arc::new(BbrConfig::default())); @@ -304,6 +309,7 @@ struct NoVerifier(Arc); /// NoVerifier implements a no-op server certificate verifier. impl NoVerifier { + /// Creates a new NoVerifier instance. pub fn new() -> Arc { Arc::new(Self(Arc::new( quinn::rustls::crypto::ring::default_provider(), @@ -313,7 +319,7 @@ impl NoVerifier { /// NoVerifier implements the ServerCertVerifier trait to skip certificate verification. impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { - /// verify_server_cert always returns Ok, effectively skipping verification. + /// Verifies the server certificate. fn verify_server_cert( &self, _end_entity: &CertificateDer<'_>, @@ -325,7 +331,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { Ok(quinn::rustls::client::danger::ServerCertVerified::assertion()) } - /// verify_tls12_signature verifies TLS 1.2 signatures using the provided algorithms. + /// Verifies a TLS 1.2 signature. fn verify_tls12_signature( &self, message: &[u8], @@ -340,7 +346,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { ) } - /// verify_tls13_signature verifies TLS 1.3 signatures using the provided algorithms. + /// Verifies a TLS 1.3 signature. fn verify_tls13_signature( &self, message: &[u8], @@ -355,7 +361,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { ) } - /// supported_verify_schemes returns the supported signature schemes. + /// Returns the supported signature schemes. fn supported_verify_schemes(&self) -> Vec { self.0.signature_verification_algorithms.supported_schemes() } diff --git a/dragonfly-client-storage/src/server/quic.rs b/dragonfly-client-storage/src/server/quic.rs index 12ce6a17..a9fd8c7b 100644 --- a/dragonfly-client-storage/src/server/quic.rs +++ b/dragonfly-client-storage/src/server/quic.rs @@ -86,8 +86,7 @@ impl QUICServer { pub async fn run(&mut self) -> ClientResult<()> { let (certs, key) = generate_simple_self_signed_certs("d7y", vec!["d7y".into()])?; let mut server_config = ServerConfig::with_single_cert(certs, key).map_err(|err| { - error!("failed to create server config: {}", err); - ClientError::QuinnRustlsError(err) + ClientError::Unknown(format!("failed to create server config: {}", err)) })?; let mut transport = TransportConfig::default(); @@ -158,6 +157,7 @@ impl QUICServerHandler { }); } Err(err) => { + // Downgrade common close cases to debug to reduce noisy logs. match err { quinn::ConnectionError::ApplicationClosed(_) | quinn::ConnectionError::LocallyClosed => { @@ -230,7 +230,10 @@ impl QUICServerHandler { self.write_response(response.freeze(), &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?; - writer.finish()?; + + if let Err(err) = writer.finish() { + error!("failed to finish stream: {}", err); + } } Err(err) => { // Collect upload piece failure metrics. @@ -239,7 +242,10 @@ impl QUICServerHandler { let error_response: Bytes = Vortex::Error(Header::new_error(err.len() as u32), err).into(); self.write_response(error_response, &mut writer).await?; - writer.finish()?; + + if let Err(err) = writer.finish() { + error!("failed to finish stream: {}", err); + } } } @@ -292,7 +298,10 @@ impl QUICServerHandler { self.write_response(response.freeze(), &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?; - writer.finish()?; + + if let Err(err) = writer.finish() { + error!("failed to finish stream: {}", err); + } } Err(err) => { // Collect upload piece failure metrics. @@ -301,7 +310,10 @@ impl QUICServerHandler { let error_response: Bytes = Vortex::Error(Header::new_error(err.len() as u32), err).into(); self.write_response(error_response, &mut writer).await?; - writer.finish()?; + + if let Err(err) = writer.finish() { + error!("failed to finish stream: {}", err); + } } } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 8524e2df..abbcd5da 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -28,10 +28,8 @@ use dragonfly_client::tracing::init_command_tracing; use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry}; use dragonfly_client_config::VersionValueParser; use dragonfly_client_config::{self, dfdaemon, dfget}; -use dragonfly_client_core::{ - error::{ErrorType, OrErr}, - Error, Result, -}; +use dragonfly_client_core::error::{ErrorType, OrErr}; +use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap}; use glob::Pattern; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 56d3e37b..c2ee1f26 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -831,7 +831,9 @@ impl Piece { .await? } ("quic", Some(ip), _, Some(port)) => { - self.quic_downloader + let quic_downloader = + piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build(); + quic_downloader .download_persistent_cache_piece( format!("{}:{}", ip, port).as_str(), number,