refactor(quic): QUIC downloader support (#1380)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-09-26 00:44:03 +08:00 committed by GitHub
parent ce88f49d1c
commit 9612b57a97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 91 additions and 74 deletions

18
Cargo.lock generated
View File

@ -1018,7 +1018,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"anyhow",
"bytes",
@ -1094,7 +1094,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1125,7 +1125,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1155,7 +1155,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"headers 0.4.1",
"hyper 1.6.0",
@ -1175,7 +1175,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"anyhow",
"clap",
@ -1192,7 +1192,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-metric"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"dragonfly-api",
"dragonfly-client-config",
@ -1207,7 +1207,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"bincode",
"bytes",
@ -1241,7 +1241,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"base64 0.22.1",
"bytes",
@ -1680,7 +1680,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "1.0.24"
version = "1.0.25"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -13,7 +13,7 @@ members = [
]
[workspace.package]
version = "1.0.24"
version = "1.0.25"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -23,14 +23,14 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "1.0.24" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.24" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.24" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.24" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.24" }
dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.24" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.24" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.24" }
dragonfly-client = { path = "dragonfly-client", version = "1.0.25" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.25" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.25" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.25" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.25" }
dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.0.25" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.25" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.25" }
dragonfly-api = "=2.1.70"
thiserror = "2.0"
futures = "0.3.31"
@ -117,7 +117,6 @@ hostname = "^0.4"
tonic-health = "0.12.3"
hashring = "0.3.6"
reqwest-tracing = "0.5"
quinn = "0.11.9"
mocktail = "0.3.0"
[profile.release]

View File

@ -23,4 +23,4 @@ opendal.workspace = true
url.workspace = true
headers.workspace = true
vortex-protocol.workspace = true
quinn.workspace = true
quinn = "0.11.9"

View File

@ -170,38 +170,6 @@ pub enum DFError {
#[error(transparent)]
VortexProtocolError(#[from] vortex_protocol::error::Error),
/// QuinnConnectError is the error for quinn connect.
#[error(transparent)]
QuinnConnectError(#[from] quinn::ConnectError),
/// QuinnConnectionError is the error for quinn connection.
#[error(transparent)]
QuinnConnectionError(#[from] quinn::ConnectionError),
/// QuinnWriteError is the error for quinn write.
#[error(transparent)]
QuinnWriteError(#[from] quinn::WriteError),
/// QuinnReadError is the error for quinn read.
#[error(transparent)]
QuinnReadError(#[from] quinn::ReadError),
/// QuinnReadExactError is the error for quinn read exact.
#[error(transparent)]
QuinnReadExactError(#[from] quinn::ReadExactError),
/// QuinnRustlsNoInitialCipherSuite is the error for quinn no initial cipher suite.
#[error(transparent)]
QuinnRustlsNoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite),
/// QuinnRustlsError is the error for rustls.
#[error(transparent)]
QuinnRustlsError(#[from] quinn::rustls::Error),
/// QuinnClosedStream is the error for quinn closed stream.
#[error(transparent)]
QuinnClosedStream(#[from] quinn::ClosedStream),
/// TonicStatus is the error for tonic status.
#[error(transparent)]
TonicStatus(#[from] tonic::Status),
@ -282,6 +250,38 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for DFError {
}
}
// --- Conversions for quinn (QUIC) errors ---
// These allow using the ? operator directly with quinn APIs without repetitive map_err.
impl From<quinn::ConnectError> for DFError {
fn from(err: quinn::ConnectError) -> Self {
DFError::Unknown(format!("quinn connect error: {}", err))
}
}
impl From<quinn::ConnectionError> for DFError {
fn from(err: quinn::ConnectionError) -> Self {
DFError::Unknown(format!("quinn connection error: {}", err))
}
}
impl From<quinn::WriteError> for DFError {
fn from(err: quinn::WriteError) -> Self {
DFError::Unknown(format!("quinn write error: {}", err))
}
}
impl From<quinn::ReadError> for DFError {
fn from(err: quinn::ReadError) -> Self {
DFError::Unknown(format!("quinn read error: {}", err))
}
}
impl From<quinn::ReadExactError> for DFError {
fn from(err: quinn::ReadExactError) -> Self {
DFError::Unknown(format!("quinn read exact error: {}", err))
}
}
/// SendTimeoutError is the error for send timeout.
impl<T> From<tokio::sync::mpsc::error::SendTimeoutError<T>> for DFError {
fn from(err: tokio::sync::mpsc::error::SendTimeoutError<T>) -> Self {

View File

@ -31,10 +31,10 @@ bytesize.workspace = true
leaky-bucket.workspace = true
vortex-protocol.workspace = true
rustls.workspace = true
quinn.workspace = true
num_cpus = "1.17"
bincode = "1.3.3"
walkdir = "2.5.0"
quinn = "0.11.9"
nix = { version = "0.30.1", features = ["socket", "net"] }
[dev-dependencies]

View File

@ -179,12 +179,17 @@ impl QUICClient {
&self,
request: Bytes,
) -> ClientResult<(RecvStream, SendStream)> {
let mut client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(
quinn::rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(NoVerifier::new())
.with_no_client_auth(),
)?));
let mut client_config = ClientConfig::new(Arc::new(
QuicClientConfig::try_from(
quinn::rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(NoVerifier::new())
.with_no_client_auth(),
)
.map_err(|err| {
ClientError::Unknown(format!("failed to create quic client config: {}", err))
})?,
));
let mut transport = TransportConfig::default();
transport.congestion_controller_factory(Arc::new(BbrConfig::default()));
@ -304,6 +309,7 @@ struct NoVerifier(Arc<quinn::rustls::crypto::CryptoProvider>);
/// NoVerifier implements a no-op server certificate verifier.
impl NoVerifier {
/// Creates a new NoVerifier instance.
pub fn new() -> Arc<Self> {
Arc::new(Self(Arc::new(
quinn::rustls::crypto::ring::default_provider(),
@ -313,7 +319,7 @@ impl NoVerifier {
/// NoVerifier implements the ServerCertVerifier trait to skip certificate verification.
impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
/// verify_server_cert always returns Ok, effectively skipping verification.
/// Verifies the server certificate.
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
@ -325,7 +331,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
Ok(quinn::rustls::client::danger::ServerCertVerified::assertion())
}
/// verify_tls12_signature verifies TLS 1.2 signatures using the provided algorithms.
/// Verifies a TLS 1.2 signature.
fn verify_tls12_signature(
&self,
message: &[u8],
@ -340,7 +346,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
)
}
/// verify_tls13_signature verifies TLS 1.3 signatures using the provided algorithms.
/// Verifies a TLS 1.3 signature.
fn verify_tls13_signature(
&self,
message: &[u8],
@ -355,7 +361,7 @@ impl quinn::rustls::client::danger::ServerCertVerifier for NoVerifier {
)
}
/// supported_verify_schemes returns the supported signature schemes.
/// Returns the supported signature schemes.
fn supported_verify_schemes(&self) -> Vec<quinn::rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}

View File

@ -86,8 +86,7 @@ impl QUICServer {
pub async fn run(&mut self) -> ClientResult<()> {
let (certs, key) = generate_simple_self_signed_certs("d7y", vec!["d7y".into()])?;
let mut server_config = ServerConfig::with_single_cert(certs, key).map_err(|err| {
error!("failed to create server config: {}", err);
ClientError::QuinnRustlsError(err)
ClientError::Unknown(format!("failed to create server config: {}", err))
})?;
let mut transport = TransportConfig::default();
@ -158,6 +157,7 @@ impl QUICServerHandler {
});
}
Err(err) => {
// Downgrade common close cases to debug to reduce noisy logs.
match err {
quinn::ConnectionError::ApplicationClosed(_)
| quinn::ConnectionError::LocallyClosed => {
@ -230,7 +230,10 @@ impl QUICServerHandler {
self.write_response(response.freeze(), &mut writer).await?;
self.write_stream(&mut content_reader, &mut writer).await?;
writer.finish()?;
if let Err(err) = writer.finish() {
error!("failed to finish stream: {}", err);
}
}
Err(err) => {
// Collect upload piece failure metrics.
@ -239,7 +242,10 @@ impl QUICServerHandler {
let error_response: Bytes =
Vortex::Error(Header::new_error(err.len() as u32), err).into();
self.write_response(error_response, &mut writer).await?;
writer.finish()?;
if let Err(err) = writer.finish() {
error!("failed to finish stream: {}", err);
}
}
}
@ -292,7 +298,10 @@ impl QUICServerHandler {
self.write_response(response.freeze(), &mut writer).await?;
self.write_stream(&mut content_reader, &mut writer).await?;
writer.finish()?;
if let Err(err) = writer.finish() {
error!("failed to finish stream: {}", err);
}
}
Err(err) => {
// Collect upload piece failure metrics.
@ -301,7 +310,10 @@ impl QUICServerHandler {
let error_response: Bytes =
Vortex::Error(Header::new_error(err.len() as u32), err).into();
self.write_response(error_response, &mut writer).await?;
writer.finish()?;
if let Err(err) = writer.finish() {
error!("failed to finish stream: {}", err);
}
}
}

View File

@ -28,10 +28,8 @@ use dragonfly_client::tracing::init_command_tracing;
use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry};
use dragonfly_client_config::VersionValueParser;
use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap};
use glob::Pattern;
use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};

View File

@ -831,7 +831,9 @@ impl Piece {
.await?
}
("quic", Some(ip), _, Some(port)) => {
self.quic_downloader
let quic_downloader =
piece_downloader::DownloaderFactory::new("quic", self.config.clone())?.build();
quic_downloader
.download_persistent_cache_piece(
format!("{}:{}", ip, port).as_str(),
number,