chore(dependencies): update dependencies and remove unused metakeys (#1322)

* chore(dependencies): update dependencies and remove unused metakeys

Signed-off-by: Gaius <gaius.qi@gmail.com>

* build(deps): update dependencies and remove unused features

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-09-02 10:27:20 +08:00 committed by GitHub
parent d71b8d35ad
commit 2070c8156e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 533 additions and 281 deletions

645
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -29,13 +29,11 @@ dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.1
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.15" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.15" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.15" } dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.15" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.15" } dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.15" }
dragonfly-api = "2.1.59" dragonfly-api = "2.1.61"
thiserror = "2.0" thiserror = "2.0"
futures = "0.3.31" futures = "0.3.31"
reqwest = { version = "0.12.4", features = [ reqwest = { version = "0.12.22", features = [
"stream", "stream",
"native-tls",
"default-tls",
"rustls-tls", "rustls-tls",
"gzip", "gzip",
"brotli", "brotli",
@ -43,7 +41,7 @@ reqwest = { version = "0.12.4", features = [
"deflate", "deflate",
"blocking", "blocking",
"hickory-dns", "hickory-dns",
] } ], default-features = false }
reqwest-middleware = "0.4" reqwest-middleware = "0.4"
rcgen = { version = "0.12.1", features = ["x509-parser"] } rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.6", features = ["full"] } hyper = { version = "1.6", features = ["full"] }
@ -55,11 +53,11 @@ hyper-util = { version = "0.1.16", features = [
"http1", "http1",
"http2", "http2",
] } ] }
hyper-rustls = { version = "0.26", features = ["http1", "http2", "logging"] } hyper-rustls = { version = "0.27.7", features = ["http1", "http2", "logging", "ring"] }
http-range-header = "0.4.2" http-range-header = "0.4.2"
tracing = "0.1" tracing = "0.1"
url = "2.5.4" url = "2.5.4"
rustls = { version = "0.22.4", features = ["tls12"] } rustls = { version = "0.23.31", features = ["tls12", "ring"] }
rustls-pki-types = "1.12.0" rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0" rustls-pemfile = "2.2.0"
sha2 = "0.10" sha2 = "0.10"
@ -83,7 +81,7 @@ humantime = "2.1.0"
prost-wkt-types = "0.6" prost-wkt-types = "0.6"
chrono = { version = "0.4.41", features = ["serde", "clock"] } chrono = { version = "0.4.41", features = ["serde", "clock"] }
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"] }
opendal = { version = "0.48.0", features = [ opendal = { git = "https://github.com/apache/opendal.git", rev = "7c9c505b4880cad14824b2a22090723ef68ecce5", features = [
"services-s3", "services-s3",
"services-azblob", "services-azblob",
"services-gcs", "services-gcs",
@ -100,7 +98,9 @@ bytesize = { version = "1.3.3", features = ["serde"] }
bytesize-serde = "0.2.1" bytesize-serde = "0.2.1"
percent-encoding = "2.3.2" percent-encoding = "2.3.2"
tempfile = "3.20.0" tempfile = "3.20.0"
tokio-rustls = "0.25.0-alpha.4" tokio-rustls = { version = "0.26.2", default-features = false, features = [
"ring",
] }
serde_json = "1.0.143" serde_json = "1.0.143"
lru = "0.12.5" lru = "0.12.5"
fs2 = "0.4.3" fs2 = "0.4.3"

View File

@ -17,7 +17,7 @@
use dragonfly_api::common; use dragonfly_api::common;
use dragonfly_client_core::error::BackendError; use dragonfly_client_core::error::BackendError;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult}; use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use opendal::{layers::TimeoutLayer, Metakey, Operator}; use opendal::{layers::TimeoutLayer, Operator};
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use std::time::Duration; use std::time::Duration;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
@ -110,7 +110,6 @@ impl super::Backend for Hdfs {
operator operator
.list_with(decoded_path.as_str()) .list_with(decoded_path.as_str())
.recursive(true) .recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here. .await // Do the list op here.
.map_err(|err| { .map_err(|err| {
error!( error!(

View File

@ -44,6 +44,9 @@ pub struct HTTP {
impl HTTP { impl HTTP {
/// new returns a new HTTP. /// new returns a new HTTP.
pub fn new(scheme: &str) -> Result<HTTP> { pub fn new(scheme: &str) -> Result<HTTP> {
// Initialize the ring as the default TLS provider.
let _ = rustls::crypto::ring::default_provider().install_default();
// Default TLS client config with no validation. // Default TLS client config with no validation.
let client_config_builder = rustls::ClientConfig::builder() let client_config_builder = rustls::ClientConfig::builder()
.dangerous() .dangerous()
@ -408,6 +411,8 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
/// Start a https server with given public key and private key. /// Start a https server with given public key and private key.
async fn start_https_server(cert_pem: &str, key_pem: &str) -> String { async fn start_https_server(cert_pem: &str, key_pem: &str) -> String {
let _ = rustls::crypto::ring::default_provider().install_default();
let server_certs = load_certs_from_pem(cert_pem).unwrap(); let server_certs = load_certs_from_pem(cert_pem).unwrap();
let server_key = load_key_from_pem(key_pem).unwrap(); let server_key = load_key_from_pem(key_pem).unwrap();

View File

@ -39,12 +39,6 @@ const POOL_MAX_IDLE_PER_HOST: usize = 1024;
/// KEEP_ALIVE_INTERVAL is the keep alive interval for TCP connection. /// KEEP_ALIVE_INTERVAL is the keep alive interval for TCP connection.
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(60); const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(60);
/// HTTP2_KEEP_ALIVE_INTERVAL is the interval for HTTP2 keep alive.
const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(300);
/// HTTP2_KEEP_ALIVE_TIMEOUT is the timeout for HTTP2 keep alive.
const HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
/// MAX_RETRY_TIMES is the max retry times for the request. /// MAX_RETRY_TIMES is the max retry times for the request.
const MAX_RETRY_TIMES: u32 = 1; const MAX_RETRY_TIMES: u32 = 1;

View File

@ -17,7 +17,11 @@
use dragonfly_api::common; use dragonfly_api::common;
use dragonfly_client_core::error::BackendError; use dragonfly_client_core::error::BackendError;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult}; use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use opendal::{layers::TimeoutLayer, raw::HttpClient, Metakey, Operator}; use opendal::{
layers::{HttpClientLayer, TimeoutLayer},
raw::HttpClient,
Operator,
};
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use std::fmt; use std::fmt;
use std::result::Result; use std::result::Result;
@ -178,6 +182,9 @@ pub struct ObjectStorage {
impl ObjectStorage { impl ObjectStorage {
/// Returns ObjectStorage that implements the Backend trait. /// Returns ObjectStorage that implements the Backend trait.
pub fn new(scheme: Scheme) -> ClientResult<ObjectStorage> { pub fn new(scheme: Scheme) -> ClientResult<ObjectStorage> {
// Initialize the ring as the default TLS provider.
let _ = rustls::crypto::ring::default_provider().install_default();
// Initialize the reqwest client. // Initialize the reqwest client.
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.gzip(true) .gzip(true)
@ -187,9 +194,6 @@ impl ObjectStorage {
.hickory_dns(true) .hickory_dns(true)
.pool_max_idle_per_host(super::POOL_MAX_IDLE_PER_HOST) .pool_max_idle_per_host(super::POOL_MAX_IDLE_PER_HOST)
.tcp_keepalive(super::KEEP_ALIVE_INTERVAL) .tcp_keepalive(super::KEEP_ALIVE_INTERVAL)
.http2_keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.http2_keep_alive_while_idle(true)
.build()?; .build()?;
Ok(Self { scheme, client }) Ok(Self { scheme, client })
@ -254,7 +258,6 @@ impl ObjectStorage {
builder = builder builder = builder
.access_key_id(access_key_id) .access_key_id(access_key_id)
.secret_access_key(access_key_secret) .secret_access_key(access_key_secret)
.http_client(HttpClient::with(self.client.clone()))
.bucket(&parsed_url.bucket) .bucket(&parsed_url.bucket)
.region(region); .region(region);
@ -270,6 +273,7 @@ impl ObjectStorage {
Ok(Operator::new(builder)? Ok(Operator::new(builder)?
.finish() .finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(TimeoutLayer::new().with_timeout(timeout)))
} }
@ -282,9 +286,7 @@ impl ObjectStorage {
) -> ClientResult<Operator> { ) -> ClientResult<Operator> {
// Initialize the GCS operator with the object storage. // Initialize the GCS operator with the object storage.
let mut builder = opendal::services::Gcs::default(); let mut builder = opendal::services::Gcs::default();
builder = builder builder = builder.bucket(&parsed_url.bucket);
.http_client(HttpClient::with(self.client.clone()))
.bucket(&parsed_url.bucket);
// Configure the credentials using the local path to the credential file if provided. // Configure the credentials using the local path to the credential file if provided.
// Otherwise, configure using the Application Default Credentials (ADC). // Otherwise, configure using the Application Default Credentials (ADC).
@ -304,6 +306,7 @@ impl ObjectStorage {
Ok(Operator::new(builder)? Ok(Operator::new(builder)?
.finish() .finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(TimeoutLayer::new().with_timeout(timeout)))
} }
@ -340,12 +343,12 @@ impl ObjectStorage {
builder = builder builder = builder
.account_name(access_key_id) .account_name(access_key_id)
.account_key(access_key_secret) .account_key(access_key_secret)
.http_client(HttpClient::with(self.client.clone()))
.container(&parsed_url.bucket) .container(&parsed_url.bucket)
.endpoint(endpoint); .endpoint(endpoint);
Ok(Operator::new(builder)? Ok(Operator::new(builder)?
.finish() .finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(TimeoutLayer::new().with_timeout(timeout)))
} }
@ -356,40 +359,57 @@ impl ObjectStorage {
object_storage: common::v2::ObjectStorage, object_storage: common::v2::ObjectStorage,
timeout: Duration, timeout: Duration,
) -> ClientResult<Operator> { ) -> ClientResult<Operator> {
// OSS requires the access key id, access key secret, and endpoint. if let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
let (Some(access_key_id), Some(access_key_secret), Some(endpoint)) = (
&object_storage.access_key_id, &object_storage.access_key_id,
&object_storage.access_key_secret, &object_storage.access_key_secret,
&object_storage.endpoint, &object_storage.endpoint,
) else { ) {
return Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
endpoint
})
),
status_code: None,
header: None,
})));
};
// Initialize the OSS operator with the object storage. // Initialize the OSS operator with the object storage.
let mut builder = opendal::services::Oss::default(); let mut builder = opendal::services::Oss::default();
builder = builder builder = builder
.access_key_id(access_key_id) .access_key_id(access_key_id)
.access_key_secret(access_key_secret) .access_key_secret(access_key_secret)
.endpoint(endpoint) .endpoint(endpoint)
.http_client(HttpClient::with(self.client.clone()))
.root("/") .root("/")
.bucket(&parsed_url.bucket); .bucket(&parsed_url.bucket);
Ok(Operator::new(builder)? return Ok(Operator::new(builder)?
.finish() .finish()
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout)));
}
if let (Some(security_token), Some(endpoint)) =
(&object_storage.security_token, &object_storage.endpoint)
{
// Initialize the OSS operator with the object storage.
let mut builder = opendal::services::Oss::default();
builder = builder
.security_token(security_token)
.endpoint(endpoint)
.root("/")
.bucket(&parsed_url.bucket);
return Ok(Operator::new(builder)?
.finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout)));
}
Err(ClientError::BackendError(Box::new(BackendError {
message: format!(
"{} {}",
self.scheme,
make_need_fields_message!(object_storage {
access_key_id,
access_key_secret,
security_token,
endpoint
})
),
status_code: None,
header: None,
})))
} }
/// obs_operator initializes the OBS operator with the parsed URL and object storage. /// obs_operator initializes the OBS operator with the parsed URL and object storage.
@ -426,11 +446,11 @@ impl ObjectStorage {
.access_key_id(access_key_id) .access_key_id(access_key_id)
.secret_access_key(access_key_secret) .secret_access_key(access_key_secret)
.endpoint(endpoint) .endpoint(endpoint)
.http_client(HttpClient::with(self.client.clone()))
.bucket(&parsed_url.bucket); .bucket(&parsed_url.bucket);
Ok(Operator::new(builder)? Ok(Operator::new(builder)?
.finish() .finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(TimeoutLayer::new().with_timeout(timeout)))
} }
@ -468,11 +488,11 @@ impl ObjectStorage {
.secret_id(access_key_id) .secret_id(access_key_id)
.secret_key(access_key_secret) .secret_key(access_key_secret)
.endpoint(endpoint) .endpoint(endpoint)
.http_client(HttpClient::with(self.client.clone()))
.bucket(&parsed_url.bucket); .bucket(&parsed_url.bucket);
Ok(Operator::new(builder)? Ok(Operator::new(builder)?
.finish() .finish()
.layer(HttpClientLayer::new(HttpClient::with(self.client.clone())))
.layer(TimeoutLayer::new().with_timeout(timeout))) .layer(TimeoutLayer::new().with_timeout(timeout)))
} }
} }
@ -513,7 +533,6 @@ impl crate::Backend for ObjectStorage {
operator operator
.list_with(&parsed_url.key) .list_with(&parsed_url.key)
.recursive(true) .recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here. .await // Do the list op here.
.map_err(|err| { .map_err(|err| {
error!( error!(
@ -757,8 +776,17 @@ mod tests {
}, },
), ),
(Scheme::GCS, ObjectStorageInfo::default()), (Scheme::GCS, ObjectStorageInfo::default()),
// (
// Scheme::ABS,
// ObjectStorageInfo {
// endpoint: Some("test-endpoint.local".into()),
// access_key_id: Some("access-key-id".into()),
// access_key_secret: Some("access-key-secret".into()),
// ..Default::default()
// },
// ),
( (
Scheme::ABS, Scheme::OSS,
ObjectStorageInfo { ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()), endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()), access_key_id: Some("access-key-id".into()),
@ -770,8 +798,7 @@ mod tests {
Scheme::OSS, Scheme::OSS,
ObjectStorageInfo { ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()), endpoint: Some("test-endpoint.local".into()),
access_key_id: Some("access-key-id".into()), security_token: Some("security-token".into()),
access_key_secret: Some("access-key-secret".into()),
..Default::default() ..Default::default()
}, },
), ),
@ -1073,28 +1100,28 @@ mod tests {
let test_cases = vec![ let test_cases = vec![
( (
ObjectStorageInfo::default(), ObjectStorageInfo::default(),
"backend error oss need access_key_id, access_key_secret, endpoint", "backend error oss need access_key_id, access_key_secret, security_token, endpoint",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
access_key_id: Some("access_key_id".into()), access_key_id: Some("access_key_id".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need access_key_secret, endpoint", "backend error oss need access_key_secret, security_token, endpoint",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
access_key_secret: Some("access_key_secret".into()), access_key_secret: Some("access_key_secret".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need access_key_id, endpoint", "backend error oss need access_key_id, security_token, endpoint",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
endpoint: Some("test-endpoint.local".into()), endpoint: Some("test-endpoint.local".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need access_key_id, access_key_secret", "backend error oss need access_key_id, access_key_secret, security_token",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
@ -1102,7 +1129,7 @@ mod tests {
access_key_secret: Some("access_key_secret".into()), access_key_secret: Some("access_key_secret".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need endpoint", "backend error oss need security_token, endpoint",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
@ -1110,7 +1137,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()), endpoint: Some("test-endpoint.local".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need access_key_secret", "backend error oss need access_key_secret, security_token",
), ),
( (
ObjectStorageInfo { ObjectStorageInfo {
@ -1118,7 +1145,7 @@ mod tests {
endpoint: Some("test-endpoint.local".into()), endpoint: Some("test-endpoint.local".into()),
..Default::default() ..Default::default()
}, },
"backend error oss need access_key_id", "backend error oss need access_key_id, security_token",
), ),
]; ];

View File

@ -248,6 +248,12 @@ struct Args {
)] )]
storage_predefined_acl: Option<String>, storage_predefined_acl: Option<String>,
#[arg(
long,
help = "Specify the temporary STS security token for accessing Object Storage Service(OSS)"
)]
storage_security_token: Option<String>,
#[arg( #[arg(
long, long,
help = "Specify the delegation token for Hadoop Distributed File System(HDFS)" help = "Specify the delegation token for Hadoop Distributed File System(HDFS)"
@ -651,6 +657,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
access_key_id: args.storage_access_key_id.clone(), access_key_id: args.storage_access_key_id.clone(),
access_key_secret: args.storage_access_key_secret.clone(), access_key_secret: args.storage_access_key_secret.clone(),
session_token: args.storage_session_token.clone(), session_token: args.storage_session_token.clone(),
security_token: args.storage_security_token.clone(),
region: args.storage_region.clone(), region: args.storage_region.clone(),
endpoint: args.storage_endpoint.clone(), endpoint: args.storage_endpoint.clone(),
credential_path: args.storage_credential_path.clone(), credential_path: args.storage_credential_path.clone(),
@ -761,6 +768,7 @@ async fn download(
access_key_id: args.storage_access_key_id.clone(), access_key_id: args.storage_access_key_id.clone(),
access_key_secret: args.storage_access_key_secret.clone(), access_key_secret: args.storage_access_key_secret.clone(),
session_token: args.storage_session_token.clone(), session_token: args.storage_session_token.clone(),
security_token: args.storage_security_token.clone(),
region: args.storage_region.clone(), region: args.storage_region.clone(),
endpoint: args.storage_endpoint.clone(), endpoint: args.storage_endpoint.clone(),
credential_path: args.storage_credential_path.clone(), credential_path: args.storage_credential_path.clone(),