refactor(quic): integrate QUIC and downloader (#1378)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-09-25 19:57:04 +08:00 committed by GitHub
parent 677049c769
commit b7dc472df9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 95 additions and 100 deletions

View File

@ -117,6 +117,7 @@ hostname = "^0.4"
tonic-health = "0.12.3" tonic-health = "0.12.3"
hashring = "0.3.6" hashring = "0.3.6"
reqwest-tracing = "0.5" reqwest-tracing = "0.5"
quinn = "0.11.9"
mocktail = "0.3.0" mocktail = "0.3.0"
[profile.release] [profile.release]

View File

@ -23,4 +23,4 @@ opendal.workspace = true
url.workspace = true url.workspace = true
headers.workspace = true headers.workspace = true
vortex-protocol.workspace = true vortex-protocol.workspace = true
quinn = "0.11.9" quinn.workspace = true

View File

@ -170,6 +170,38 @@ pub enum DFError {
#[error(transparent)] #[error(transparent)]
VortexProtocolError(#[from] vortex_protocol::error::Error), VortexProtocolError(#[from] vortex_protocol::error::Error),
/// QuinnConnectError is the error for quinn connect.
#[error(transparent)]
QuinnConnectError(#[from] quinn::ConnectError),
/// QuinnConnectionError is the error for quinn connection.
#[error(transparent)]
QuinnConnectionError(#[from] quinn::ConnectionError),
/// QuinnWriteError is the error for quinn write.
#[error(transparent)]
QuinnWriteError(#[from] quinn::WriteError),
/// QuinnReadError is the error for quinn read.
#[error(transparent)]
QuinnReadError(#[from] quinn::ReadError),
/// QuinnReadExactError is the error for quinn read exact.
#[error(transparent)]
QuinnReadExactError(#[from] quinn::ReadExactError),
/// QuinnRustlsNoInitialCipherSuite is the error for quinn no initial cipher suite.
#[error(transparent)]
QuinnRustlsNoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
/// QuinnRustlsError is the error for rustls.
#[error(transparent)]
QuinnRustlsError(#[from] quinn::rustls::Error),
/// QuinnClosedStream is the error for quinn closed stream.
#[error(transparent)]
QuinnClosedStream(#[from] quinn::ClosedStream),
/// TonicStatus is the error for tonic status. /// TonicStatus is the error for tonic status.
#[error(transparent)] #[error(transparent)]
TonicStatus(#[from] tonic::Status), TonicStatus(#[from] tonic::Status),
@ -250,38 +282,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for DFError {
} }
} }
// --- Conversions for quinn (QUIC) errors ---
// These allow using the ? operator directly with quinn APIs without repetitive map_err.
impl From<quinn::ConnectError> for DFError {
fn from(err: quinn::ConnectError) -> Self {
DFError::Unknown(format!("quinn connect error: {}", err))
}
}
impl From<quinn::ConnectionError> for DFError {
fn from(err: quinn::ConnectionError) -> Self {
DFError::Unknown(format!("quinn connection error: {}", err))
}
}
impl From<quinn::WriteError> for DFError {
fn from(err: quinn::WriteError) -> Self {
DFError::Unknown(format!("quinn write error: {}", err))
}
}
impl From<quinn::ReadError> for DFError {
fn from(err: quinn::ReadError) -> Self {
DFError::Unknown(format!("quinn read error: {}", err))
}
}
impl From<quinn::ReadExactError> for DFError {
fn from(err: quinn::ReadExactError) -> Self {
DFError::Unknown(format!("quinn read exact error: {}", err))
}
}
/// SendTimeoutError is the error for send timeout. /// SendTimeoutError is the error for send timeout.
impl<T> From<tokio::sync::mpsc::error::SendTimeoutError<T>> for DFError { impl<T> From<tokio::sync::mpsc::error::SendTimeoutError<T>> for DFError {
fn from(err: tokio::sync::mpsc::error::SendTimeoutError<T>) -> Self { fn from(err: tokio::sync::mpsc::error::SendTimeoutError<T>) -> Self {

View File

@ -31,10 +31,10 @@ bytesize.workspace = true
leaky-bucket.workspace = true leaky-bucket.workspace = true
vortex-protocol.workspace = true vortex-protocol.workspace = true
rustls.workspace = true rustls.workspace = true
quinn.workspace = true
num_cpus = "1.17" num_cpus = "1.17"
bincode = "1.3.3" bincode = "1.3.3"
walkdir = "2.5.0" walkdir = "2.5.0"
quinn = "0.11.9"
[dev-dependencies] [dev-dependencies]
tempfile.workspace = true tempfile.workspace = true

View File

@ -16,11 +16,14 @@
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult}; use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error as ClientError, Result as ClientResult,
};
use quinn::crypto::rustls::QuicClientConfig; use quinn::crypto::rustls::QuicClientConfig;
use quinn::{ClientConfig, Endpoint, RecvStream, SendStream}; use quinn::{ClientConfig, Endpoint, RecvStream, SendStream};
use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio::time; use tokio::time;
@ -172,26 +175,22 @@ impl QUICClient {
&self, &self,
request: Bytes, request: Bytes,
) -> ClientResult<(RecvStream, SendStream)> { ) -> ClientResult<(RecvStream, SendStream)> {
let crypto = quinn::rustls::ClientConfig::builder() let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(
.dangerous() quinn::rustls::ClientConfig::builder()
.with_custom_certificate_verifier(NoVerifier::new()) .dangerous()
.with_no_client_auth(); .with_custom_certificate_verifier(NoVerifier::new())
.with_no_client_auth(),
)?));
let client_config = // Port is zero to let the OS assign an ephemeral port.
ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).map_err( let mut endpoint =
|err| ClientError::Unknown(format!("failed to create quic client config: {}", err)), Endpoint::client(SocketAddr::new(self.config.storage.server.ip.unwrap(), 0))?;
)?)); endpoint.set_default_client_config(client_config);
let endpoint = { // Connect's server name used for verifying the certificate. Since we used
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); // NoVerifier, it can be anything.
let mut endpoint = Endpoint::client(bind_addr)?;
endpoint.set_default_client_config(client_config);
endpoint
};
let remote_addr: SocketAddr = self.addr.parse().unwrap();
let connection = endpoint let connection = endpoint
.connect(remote_addr, "quic")? .connect(self.addr.parse().or_err(ErrorType::ParseError)?, "d7y")?
.await .await
.inspect_err(|err| error!("failed to connect to {}: {}", self.addr, err))?; .inspect_err(|err| error!("failed to connect to {}: {}", self.addr, err))?;
@ -292,6 +291,7 @@ impl QUICClient {
#[derive(Debug)] #[derive(Debug)]
struct NoVerifier(Arc<quinn::rustls::crypto::CryptoProvider>); struct NoVerifier(Arc<quinn::rustls::crypto::CryptoProvider>);
/// NoVerifier implements a no-op server certificate verifier.
impl NoVerifier { impl NoVerifier {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new(Self(Arc::new( Arc::new(Self(Arc::new(
@ -300,7 +300,9 @@ impl NoVerifier {
} }
} }
/// NoVerifier implements the ServerCertVerifier trait to skip certificate verification.
impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier { impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
/// verify_server_cert always returns Ok, effectively skipping verification.
fn verify_server_cert( fn verify_server_cert(
&self, &self,
_end_entity: &CertificateDer<'_>, _end_entity: &CertificateDer<'_>,
@ -312,6 +314,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
Ok(quinn::rustls::client::danger::ServerCertVerified::assertion()) Ok(quinn::rustls::client::danger::ServerCertVerified::assertion())
} }
/// verify_tls12_signature verifies TLS 1.2 signatures using the provided algorithms.
fn verify_tls12_signature( fn verify_tls12_signature(
&self, &self,
message: &[u8], message: &[u8],
@ -326,6 +329,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
) )
} }
/// verify_tls13_signature verifies TLS 1.3 signatures using the provided algorithms.
fn verify_tls13_signature( fn verify_tls13_signature(
&self, &self,
message: &[u8], message: &[u8],
@ -340,6 +344,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
) )
} }
/// supported_verify_schemes returns the supported signature schemes.
fn supported_verify_schemes(&self) -> Vec<quinn::rustls::SignatureScheme> { fn supported_verify_schemes(&self) -> Vec<quinn::rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes() self.0.signature_verification_algorithms.supported_schemes()
} }

View File

@ -83,23 +83,26 @@ impl QUICServer {
/// Starts the storage quic server. /// Starts the storage quic server.
pub async fn run(&mut self) -> ClientResult<()> { pub async fn run(&mut self) -> ClientResult<()> {
// Configure TLS for the QUIC server. let (certs, key) = generate_simple_self_signed_certs("d7y", vec!["d7y".into()])?;
let (certs, key) = generate_simple_self_signed_certs("quic", vec!["quic".into()])?;
let server_config = ServerConfig::with_single_cert(certs, key).map_err(|err| { let server_config = ServerConfig::with_single_cert(certs, key).map_err(|err| {
ClientError::Unknown(format!("failed to create server config: {}", err)) error!("failed to create server config: {}", err);
ClientError::QuinnRustlsError(err)
})?; })?;
// Create a QUIC endpoint and bind it.
let endpoint = Endpoint::server(server_config, self.addr)?; let endpoint = Endpoint::server(server_config, self.addr)?;
info!("storage quic server listening on {}", self.addr); info!("storage quic server listening on {}", self.addr);
loop { loop {
tokio::select! { tokio::select! {
Some(conn) = endpoint.accept() => { Some(quic_accepted) = endpoint.accept() => {
info!("QUIC connection incoming"); let quic = quic_accepted.await?;
let remote_address = quic.remote_address();
debug!("accepted connection from {}", remote_address);
let handler = self.handler.clone(); let handler = self.handler.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = handler.handle(conn).await { if let Err(err) = handler.handle(quic, remote_address).await {
error!("failed to handle connection: {}", err); error!("failed to handle connection from {}: {}", remote_address, err);
} }
}); });
}, },
@ -131,18 +134,15 @@ pub struct QUICServerHandler {
impl QUICServerHandler { impl QUICServerHandler {
/// handle handles a single QUIC connection. /// handle handles a single QUIC connection.
#[instrument(skip_all)] #[instrument(skip_all)]
async fn handle(&self, conn: quinn::Incoming) -> ClientResult<()> { async fn handle(
let connection = conn.await?; &self,
let remote_address = connection.remote_address().to_string(); connection: quinn::Connection,
debug!("accepted connection from {}", remote_address); remote_address: SocketAddr,
) -> ClientResult<()> {
// A QUIC connection can have multiple streams.
loop { loop {
match connection.accept_bi().await { match connection.accept_bi().await {
Ok((send, recv)) => { Ok((send, recv)) => {
info!("new bidirectional stream accepted");
let handler = self.clone(); let handler = self.clone();
let remote_address = remote_address.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = handler.handle_stream(recv, send, remote_address).await { if let Err(err) = handler.handle_stream(recv, send, remote_address).await {
error!("failed to handle stream: {}", err); error!("failed to handle stream: {}", err);
@ -150,13 +150,10 @@ impl QUICServerHandler {
}); });
} }
Err(err) => { Err(err) => {
// Downgrade common close cases to debug to reduce noisy logs.
match err { match err {
quinn::ConnectionError::ApplicationClosed(_) => { quinn::ConnectionError::ApplicationClosed(_)
debug!("peer closed connection (application closed): {}", err); | quinn::ConnectionError::LocallyClosed => {
} debug!("connection closed: {}", err);
quinn::ConnectionError::LocallyClosed => {
debug!("connection closed locally: {}", err);
} }
_ => { _ => {
error!("failed to accept bidirectional stream: {}", err); error!("failed to accept bidirectional stream: {}", err);
@ -181,7 +178,7 @@ impl QUICServerHandler {
&self, &self,
mut reader: quinn::RecvStream, mut reader: quinn::RecvStream,
mut writer: quinn::SendStream, mut writer: quinn::SendStream,
remote_address: String, remote_address: SocketAddr,
) -> ClientResult<()> { ) -> ClientResult<()> {
let header = self.read_header(&mut reader).await?; let header = self.read_header(&mut reader).await?;
match header.tag() { match header.tag() {
@ -203,7 +200,7 @@ impl QUICServerHandler {
let piece_id = self.storage.piece_id(task_id, piece_number); let piece_id = self.storage.piece_id(task_id, piece_number);
Span::current().record("host_id", host_id); Span::current().record("host_id", host_id);
Span::current().record("remote_address", remote_address.as_str()); Span::current().record("remote_address", remote_address.to_string().as_str());
Span::current().record("task_id", task_id); Span::current().record("task_id", task_id);
Span::current().record("piece_id", piece_id.as_str()); Span::current().record("piece_id", piece_id.as_str());
@ -225,10 +222,7 @@ impl QUICServerHandler {
self.write_response(response.freeze(), &mut writer).await?; self.write_response(response.freeze(), &mut writer).await?;
self.write_stream(&mut content_reader, &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?;
// Gracefully finish the stream so the client sees EOF. writer.finish()?;
if let Err(err) = writer.finish() {
error!("failed to finish stream: {}", err);
}
} }
Err(err) => { Err(err) => {
// Collect upload piece failure metrics. // Collect upload piece failure metrics.
@ -237,9 +231,7 @@ impl QUICServerHandler {
let error_response: Bytes = let error_response: Bytes =
Vortex::Error(Header::new_error(err.len() as u32), err).into(); Vortex::Error(Header::new_error(err.len() as u32), err).into();
self.write_response(error_response, &mut writer).await?; self.write_response(error_response, &mut writer).await?;
if let Err(err) = writer.finish() { writer.finish()?;
error!("failed to finish stream after error: {}", err);
}
} }
} }
@ -263,7 +255,7 @@ impl QUICServerHandler {
let piece_id = self.storage.piece_id(task_id, piece_number); let piece_id = self.storage.piece_id(task_id, piece_number);
Span::current().record("host_id", host_id); Span::current().record("host_id", host_id);
Span::current().record("remote_address", remote_address.as_str()); Span::current().record("remote_address", remote_address.to_string().as_str());
Span::current().record("task_id", task_id); Span::current().record("task_id", task_id);
Span::current().record("piece_id", piece_id.as_str()); Span::current().record("piece_id", piece_id.as_str());
@ -292,9 +284,7 @@ impl QUICServerHandler {
self.write_response(response.freeze(), &mut writer).await?; self.write_response(response.freeze(), &mut writer).await?;
self.write_stream(&mut content_reader, &mut writer).await?; self.write_stream(&mut content_reader, &mut writer).await?;
if let Err(err) = writer.finish() { writer.finish()?;
error!("failed to finish stream: {}", err);
}
} }
Err(err) => { Err(err) => {
// Collect upload piece failure metrics. // Collect upload piece failure metrics.
@ -303,9 +293,7 @@ impl QUICServerHandler {
let error_response: Bytes = let error_response: Bytes =
Vortex::Error(Header::new_error(err.len() as u32), err).into(); Vortex::Error(Header::new_error(err.len() as u32), err).into();
self.write_response(error_response, &mut writer).await?; self.write_response(error_response, &mut writer).await?;
if let Err(err) = writer.finish() { writer.finish()?;
error!("failed to finish stream after error: {}", err);
}
} }
} }

View File

@ -28,8 +28,10 @@ use dragonfly_client::tracing::init_command_tracing;
use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry}; use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry};
use dragonfly_client_config::VersionValueParser; use dragonfly_client_config::VersionValueParser;
use dragonfly_client_config::{self, dfdaemon, dfget}; use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::error::{ErrorType, OrErr}; use dragonfly_client_core::{
use dragonfly_client_core::{Error, Result}; error::{ErrorType, OrErr},
Error, Result,
};
use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap}; use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap};
use glob::Pattern; use glob::Pattern;
use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};

View File

@ -73,6 +73,9 @@ pub struct Piece {
/// tcp_downloader is the TCP piece downloader. /// tcp_downloader is the TCP piece downloader.
tcp_downloader: Arc<dyn piece_downloader::Downloader>, tcp_downloader: Arc<dyn piece_downloader::Downloader>,
/// quic_downloader is the QUIC piece downloader.
quic_downloader: Arc<dyn piece_downloader::Downloader>,
/// backend_factory is the backend factory. /// backend_factory is the backend factory.
backend_factory: Arc<BackendFactory>, backend_factory: Arc<BackendFactory>,
@ -106,6 +109,7 @@ impl Piece {
.build(), .build(),
tcp_downloader: piece_downloader::DownloaderFactory::new("tcp", config.clone())? tcp_downloader: piece_downloader::DownloaderFactory::new("tcp", config.clone())?
.build(), .build(),
quic_downloader: piece_downloader::DownloaderFactory::new("quic", config)?.build(),
backend_factory, backend_factory,
download_rate_limiter, download_rate_limiter,
upload_rate_limiter, upload_rate_limiter,
@ -439,10 +443,7 @@ impl Piece {
.await? .await?
} }
("quic", Some(ip), _, Some(port)) => { ("quic", Some(ip), _, Some(port)) => {
// Lazily create a QUIC downloader via factory to avoid holding all three. self.quic_downloader
let quic_downloader =
piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build();
quic_downloader
.download_piece( .download_piece(
format!("{}:{}", ip, port).as_str(), format!("{}:{}", ip, port).as_str(),
number, number,
@ -830,9 +831,7 @@ impl Piece {
.await? .await?
} }
("quic", Some(ip), _, Some(port)) => { ("quic", Some(ip), _, Some(port)) => {
let quic_downloader = self.quic_downloader
piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build();
quic_downloader
.download_persistent_cache_piece( .download_persistent_cache_piece(
format!("{}:{}", ip, port).as_str(), format!("{}:{}", ip, port).as_str(),
number, number,