From b7dc472df901b5c81e1b80e5f3fe903824f15690 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 25 Sep 2025 19:57:04 +0800 Subject: [PATCH] refactor(quic): integrate QUIC and downloader (#1378) Signed-off-by: Gaius --- Cargo.toml | 1 + dragonfly-client-core/Cargo.toml | 2 +- dragonfly-client-core/src/error/mod.rs | 64 ++++++++++----------- dragonfly-client-storage/Cargo.toml | 2 +- dragonfly-client-storage/src/client/quic.rs | 43 ++++++++------ dragonfly-client-storage/src/server/quic.rs | 64 +++++++++------------ dragonfly-client/src/bin/dfget/main.rs | 6 +- dragonfly-client/src/resource/piece.rs | 13 ++--- 8 files changed, 95 insertions(+), 100 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 31c9e66b..0ad55764 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ hostname = "^0.4" tonic-health = "0.12.3" hashring = "0.3.6" reqwest-tracing = "0.5" +quinn = "0.11.9" mocktail = "0.3.0" [profile.release] diff --git a/dragonfly-client-core/Cargo.toml b/dragonfly-client-core/Cargo.toml index 597cf5f0..3a8bb8ae 100644 --- a/dragonfly-client-core/Cargo.toml +++ b/dragonfly-client-core/Cargo.toml @@ -23,4 +23,4 @@ opendal.workspace = true url.workspace = true headers.workspace = true vortex-protocol.workspace = true -quinn = "0.11.9" +quinn.workspace = true diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index cdba6327..cd8a1847 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -170,6 +170,38 @@ pub enum DFError { #[error(transparent)] VortexProtocolError(#[from] vortex_protocol::error::Error), + /// QuinnConnectError is the error for quinn connect. + #[error(transparent)] + QuinnConnectError(#[from] quinn::ConnectError), + + /// QuinnConnectionError is the error for quinn connection. + #[error(transparent)] + QuinnConnectionError(#[from] quinn::ConnectionError), + + /// QuinnWriteError is the error for quinn write. + #[error(transparent)] + QuinnWriteError(#[from] quinn::WriteError), + + /// QuinnReadError is the error for quinn read. + #[error(transparent)] + QuinnReadError(#[from] quinn::ReadError), + + /// QuinnReadExactError is the error for quinn read exact. + #[error(transparent)] + QuinnReadExactError(#[from] quinn::ReadExactError), + + /// QuinnRustlsNoInitialCipherSuite is the error for quinn no initial cipher suite. + #[error(transparent)] + QuinnRustlsNoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite), + + /// QuinnRustlsError is the error for rustls. + #[error(transparent)] + QuinnRustlsError(#[from] quinn::rustls::Error), + + /// QuinnClosedStream is the error for quinn closed stream. + #[error(transparent)] + QuinnClosedStream(#[from] quinn::ClosedStream), + /// TonicStatus is the error for tonic status. #[error(transparent)] TonicStatus(#[from] tonic::Status), @@ -250,38 +282,6 @@ impl From> for DFError { } } -// --- Conversions for quinn (QUIC) errors --- -// These allow using the ? operator directly with quinn APIs without repetitive map_err. -impl From for DFError { - fn from(err: quinn::ConnectError) -> Self { - DFError::Unknown(format!("quinn connect error: {}", err)) - } -} - -impl From for DFError { - fn from(err: quinn::ConnectionError) -> Self { - DFError::Unknown(format!("quinn connection error: {}", err)) - } -} - -impl From for DFError { - fn from(err: quinn::WriteError) -> Self { - DFError::Unknown(format!("quinn write error: {}", err)) - } -} - -impl From for DFError { - fn from(err: quinn::ReadError) -> Self { - DFError::Unknown(format!("quinn read error: {}", err)) - } -} - -impl From for DFError { - fn from(err: quinn::ReadExactError) -> Self { - DFError::Unknown(format!("quinn read exact error: {}", err)) - } -} - /// SendTimeoutError is the error for send timeout. impl From> for DFError { fn from(err: tokio::sync::mpsc::error::SendTimeoutError) -> Self { diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index c740f06c..bb6cec7d 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -31,10 +31,10 @@ bytesize.workspace = true leaky-bucket.workspace = true vortex-protocol.workspace = true rustls.workspace = true +quinn.workspace = true num_cpus = "1.17" bincode = "1.3.3" walkdir = "2.5.0" -quinn = "0.11.9" [dev-dependencies] tempfile.workspace = true diff --git a/dragonfly-client-storage/src/client/quic.rs b/dragonfly-client-storage/src/client/quic.rs index 7d886808..028fe049 100644 --- a/dragonfly-client-storage/src/client/quic.rs +++ b/dragonfly-client-storage/src/client/quic.rs @@ -16,11 +16,14 @@ use bytes::{Bytes, BytesMut}; use dragonfly_client_config::dfdaemon::Config; -use dragonfly_client_core::{Error as ClientError, Result as ClientResult}; +use dragonfly_client_core::{ + error::{ErrorType, OrErr}, + Error as ClientError, Result as ClientResult, +}; use quinn::crypto::rustls::QuicClientConfig; use quinn::{ClientConfig, Endpoint, RecvStream, SendStream}; use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use tokio::io::AsyncRead; use tokio::time; @@ -172,26 +175,22 @@ impl QUICClient { &self, request: Bytes, ) -> ClientResult<(RecvStream, SendStream)> { - let crypto = quinn::rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(NoVerifier::new()) - .with_no_client_auth(); + let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from( + quinn::rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(NoVerifier::new()) + .with_no_client_auth(), + )?)); - let client_config = - ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).map_err( - |err| ClientError::Unknown(format!("failed to create quic client config: {}", err)), - )?)); + // Port is zero to let the OS assign an ephemeral port. + let mut endpoint = + Endpoint::client(SocketAddr::new(self.config.storage.server.ip.unwrap(), 0))?; + endpoint.set_default_client_config(client_config); - let endpoint = { - let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); - let mut endpoint = Endpoint::client(bind_addr)?; - endpoint.set_default_client_config(client_config); - endpoint - }; - - let remote_addr: SocketAddr = self.addr.parse().unwrap(); + // Connect's server name used for verifying the certificate. Since we used + // NoVerifier, it can be anything. let connection = endpoint - .connect(remote_addr, "quic")? + .connect(self.addr.parse().or_err(ErrorType::ParseError)?, "d7y")? .await .inspect_err(|err| error!("failed to connect to {}: {}", self.addr, err))?; @@ -292,6 +291,7 @@ impl QUICClient { #[derive(Debug)] struct NoVerifier(Arc); +/// NoVerifier implements a no-op server certificate verifier. impl NoVerifier { pub fn new() -> Arc { Arc::new(Self(Arc::new( @@ -300,7 +300,9 @@ impl NoVerifier { } } +/// NoVerifier implements the ServerCertVerifier trait to skip certificate verification. impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { + /// verify_server_cert always returns Ok, effectively skipping verification. fn verify_server_cert( &self, _end_entity: &CertificateDer<'_>, @@ -312,6 +314,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { Ok(quinn::rustls::client::danger::ServerCertVerified::assertion()) } + /// verify_tls12_signature verifies TLS 1.2 signatures using the provided algorithms. fn verify_tls12_signature( &self, message: &[u8], @@ -326,6 +329,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { ) } + /// verify_tls13_signature verifies TLS 1.3 signatures using the provided algorithms. fn verify_tls13_signature( &self, message: &[u8], @@ -340,6 +344,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { ) } + /// supported_verify_schemes returns the supported signature schemes. fn supported_verify_schemes(&self) -> Vec { self.0.signature_verification_algorithms.supported_schemes() } diff --git a/dragonfly-client-storage/src/server/quic.rs b/dragonfly-client-storage/src/server/quic.rs index 91c036c7..7822bed7 100644 --- a/dragonfly-client-storage/src/server/quic.rs +++ b/dragonfly-client-storage/src/server/quic.rs @@ -83,23 +83,26 @@ impl QUICServer { /// Starts the storage quic server. pub async fn run(&mut self) -> ClientResult<()> { - // Configure TLS for the QUIC server. - let (certs, key) = generate_simple_self_signed_certs("quic", vec!["quic".into()])?; + let (certs, key) = generate_simple_self_signed_certs("d7y", vec!["d7y".into()])?; let server_config = ServerConfig::with_single_cert(certs, key).map_err(|err| { - ClientError::Unknown(format!("failed to create server config: {}", err)) + error!("failed to create server config: {}", err); + ClientError::QuinnRustlsError(err) })?; - // Create a QUIC endpoint and bind it. + let endpoint = Endpoint::server(server_config, self.addr)?; info!("storage quic server listening on {}", self.addr); loop { tokio::select! { - Some(conn) = endpoint.accept() => { - info!("QUIC connection incoming"); + Some(quic_accepted) = endpoint.accept() => { + let quic = quic_accepted.await?; + let remote_address = quic.remote_address(); + debug!("accepted connection from {}", remote_address); + let handler = self.handler.clone(); tokio::spawn(async move { - if let Err(err) = handler.handle(conn).await { - error!("failed to handle connection: {}", err); + if let Err(err) = handler.handle(quic, remote_address).await { + error!("failed to handle connection from {}: {}", remote_address, err); } }); }, @@ -131,18 +134,15 @@ pub struct QUICServerHandler { impl QUICServerHandler { /// handle handles a single QUIC connection. #[instrument(skip_all)] - async fn handle(&self, conn: quinn::Incoming) -> ClientResult<()> { - let connection = conn.await?; - let remote_address = connection.remote_address().to_string(); - debug!("accepted connection from {}", remote_address); - - // A QUIC connection can have multiple streams. + async fn handle( + &self, + connection: quinn::Connection, + remote_address: SocketAddr, + ) -> ClientResult<()> { loop { match connection.accept_bi().await { Ok((send, recv)) => { - info!("new bidirectional stream accepted"); let handler = self.clone(); - let remote_address = remote_address.clone(); tokio::spawn(async move { if let Err(err) = handler.handle_stream(recv, send, remote_address).await { error!("failed to handle stream: {}", err); @@ -150,13 +150,10 @@ impl QUICServerHandler { }); } Err(err) => { - // Downgrade common close cases to debug to reduce noisy logs. match err { - quinn::ConnectionError::ApplicationClosed(_) => { - debug!("peer closed connection (application closed): {}", err); - } - quinn::ConnectionError::LocallyClosed => { - debug!("connection closed locally: {}", err); + quinn::ConnectionError::ApplicationClosed(_) + | quinn::ConnectionError::LocallyClosed => { + debug!("connection closed: {}", err); } _ => { error!("failed to accept bidirectional stream: {}", err); @@ -181,7 +178,7 @@ impl QUICServerHandler { &self, mut reader: quinn::RecvStream, mut writer: quinn::SendStream, - remote_address: String, + remote_address: SocketAddr, ) -> ClientResult<()> { let header = self.read_header(&mut reader).await?; match header.tag() { @@ -203,7 +200,7 @@ impl QUICServerHandler { let piece_id = self.storage.piece_id(task_id, piece_number); Span::current().record("host_id", host_id); - Span::current().record("remote_address", remote_address.as_str()); + Span::current().record("remote_address", remote_address.to_string().as_str()); Span::current().record("task_id", task_id); Span::current().record("piece_id", piece_id.as_str()); @@ -225,10 +222,7 @@ impl QUICServerHandler { self.write_response(response.freeze(), &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?; - // Gracefully finish the stream so the client sees EOF. - if let Err(err) = writer.finish() { - error!("failed to finish stream: {}", err); - } + writer.finish()?; } Err(err) => { // Collect upload piece failure metrics. @@ -237,9 +231,7 @@ impl QUICServerHandler { let error_response: Bytes = Vortex::Error(Header::new_error(err.len() as u32), err).into(); self.write_response(error_response, &mut writer).await?; - if let Err(err) = writer.finish() { - error!("failed to finish stream after error: {}", err); - } + writer.finish()?; } } @@ -263,7 +255,7 @@ impl QUICServerHandler { let piece_id = self.storage.piece_id(task_id, piece_number); Span::current().record("host_id", host_id); - Span::current().record("remote_address", remote_address.as_str()); + Span::current().record("remote_address", remote_address.to_string().as_str()); Span::current().record("task_id", task_id); Span::current().record("piece_id", piece_id.as_str()); @@ -292,9 +284,7 @@ impl QUICServerHandler { self.write_response(response.freeze(), &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?; - if let Err(err) = writer.finish() { - error!("failed to finish stream: {}", err); - } + writer.finish()?; } Err(err) => { // Collect upload piece failure metrics. @@ -303,9 +293,7 @@ impl QUICServerHandler { let error_response: Bytes = Vortex::Error(Header::new_error(err.len() as u32), err).into(); self.write_response(error_response, &mut writer).await?; - if let Err(err) = writer.finish() { - error!("failed to finish stream after error: {}", err); - } + writer.finish()?; } } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index ba118dce..b9932083 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -28,8 +28,10 @@ use dragonfly_client::tracing::init_command_tracing; use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry}; use dragonfly_client_config::VersionValueParser; use dragonfly_client_config::{self, dfdaemon, dfget}; -use dragonfly_client_core::error::{ErrorType, OrErr}; -use dragonfly_client_core::{Error, Result}; +use dragonfly_client_core::{ + error::{ErrorType, OrErr}, + Error, Result, +}; use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap}; use glob::Pattern; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index bcee7f51..56d3e37b 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -73,6 +73,9 @@ pub struct Piece { /// tcp_downloader is the TCP piece downloader. tcp_downloader: Arc, + /// quic_downloader is the QUIC piece downloader. + quic_downloader: Arc, + /// backend_factory is the backend factory. backend_factory: Arc, @@ -106,6 +109,7 @@ impl Piece { .build(), tcp_downloader: piece_downloader::DownloaderFactory::new("tcp", config.clone())? .build(), + quic_downloader: piece_downloader::DownloaderFactory::new("quic", config)?.build(), backend_factory, download_rate_limiter, upload_rate_limiter, @@ -439,10 +443,7 @@ impl Piece { .await? } ("quic", Some(ip), _, Some(port)) => { - // Lazily create a QUIC downloader via factory to avoid holding all three. - let quic_downloader = - piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build(); - quic_downloader + self.quic_downloader .download_piece( format!("{}:{}", ip, port).as_str(), number, @@ -830,9 +831,7 @@ impl Piece { .await? } ("quic", Some(ip), _, Some(port)) => { - let quic_downloader = - piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build(); - quic_downloader + self.quic_downloader .download_persistent_cache_piece( format!("{}:{}", ip, port).as_str(), number,