fix(dfget): get wrong entries and add tests for entry retrieval functions (#1373)

* feat(grpc): add new DfdaemonDownloadClient creation method

Signed-off-by: Gaius <gaius.qi@gmail.com>

* test(dfget): add tests for entry retrieval functions

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
Signed-off-by: LXDgit2018 <1289504283@qq.com>
This commit is contained in:
Gaius 2025-09-24 12:05:38 +08:00 committed by LXDgit2018
parent c58a9ec84f
commit 7a6768dcc5
6 changed files with 380 additions and 9 deletions

1
Cargo.lock generated
View File

@ -1050,6 +1050,7 @@ dependencies = [
"lazy_static", "lazy_static",
"leaky-bucket", "leaky-bucket",
"local-ip-address", "local-ip-address",
"mocktail",
"openssl", "openssl",
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",

View File

@ -117,6 +117,7 @@ hostname = "^0.4"
tonic-health = "0.12.3" tonic-health = "0.12.3"
hashring = "0.3.6" hashring = "0.3.6"
reqwest-tracing = "0.5" reqwest-tracing = "0.5"
mocktail = "0.3.0"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3

View File

@ -52,4 +52,4 @@ quinn = "0.11.9"
[dev-dependencies] [dev-dependencies]
tempfile.workspace = true tempfile.workspace = true
mocktail = "0.3.0" mocktail.workspace = true

View File

@ -93,6 +93,7 @@ console-subscriber = "0.4.1"
[dev-dependencies] [dev-dependencies]
tempfile.workspace = true tempfile.workspace = true
mocktail.workspace = true
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] } tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] }

View File

@ -712,7 +712,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
/// Get all entries in the directory with include files filter. /// Get all entries in the directory with include files filter.
async fn get_all_entries( async fn get_all_entries(
url: &Url, base_url: &Url,
header: Option<Vec<String>>, header: Option<Vec<String>>,
include_files: Option<Vec<String>>, include_files: Option<Vec<String>>,
object_storage: Option<ObjectStorage>, object_storage: Option<ObjectStorage>,
@ -723,7 +723,7 @@ async fn get_all_entries(
Some(files) => { Some(files) => {
let mut urls = HashSet::with_capacity(files.len()); let mut urls = HashSet::with_capacity(files.len());
for file in files { for file in files {
let url = url.join(&file).or_err(ErrorType::ParseError)?; let url = base_url.join(&file).or_err(ErrorType::ParseError)?;
urls.insert(url); urls.insert(url);
} }
@ -731,7 +731,7 @@ async fn get_all_entries(
} }
None => { None => {
let mut urls = HashSet::with_capacity(1); let mut urls = HashSet::with_capacity(1);
urls.insert(url.clone()); urls.insert(base_url.clone());
urls urls
} }
}; };
@ -750,7 +750,7 @@ async fn get_all_entries(
}); });
let parent = url.join(".").or_err(ErrorType::ParseError)?; let parent = url.join(".").or_err(ErrorType::ParseError)?;
if parent.path() != "/" { if parent.path() != base_url.path() {
entries.insert(DirEntry { entries.insert(DirEntry {
url: parent.to_string(), url: parent.to_string(),
content_length: 0, content_length: 0,
@ -784,7 +784,7 @@ async fn get_all_entries(
.join(".") .join(".")
.or_err(ErrorType::ParseError)?; .or_err(ErrorType::ParseError)?;
if parent.path() != "/" { if parent.path() != base_url.path() {
dir_entries.push(DirEntry { dir_entries.push(DirEntry {
url: parent.to_string(), url: parent.to_string(),
content_length: 0, content_length: 0,
@ -796,9 +796,10 @@ async fn get_all_entries(
let mut seen = HashSet::new(); let mut seen = HashSet::new();
entries.retain(|entry| seen.insert(entry.clone())); entries.retain(|entry| seen.insert(entry.clone()));
entries.extend(dir_entries.clone()); entries.extend(dir_entries.clone());
info!("add entries {:?} by dir url: {}", dir_entries, url); info!("add entries {:?} by dir url: {}", entries, url);
} }
info!("get all entries: {:?}", entries);
Ok(entries.into_iter().collect()) Ok(entries.into_iter().collect())
} }
@ -1008,7 +1009,6 @@ async fn get_entries(
download_client: DfdaemonDownloadClient, download_client: DfdaemonDownloadClient,
) -> Result<Vec<DirEntry>> { ) -> Result<Vec<DirEntry>> {
info!("list task entries: {:?}", url); info!("list task entries: {:?}", url);
// List task entries.
let response = download_client let response = download_client
.list_task_entries(ListTaskEntriesRequest { .list_task_entries(ListTaskEntriesRequest {
task_id: Uuid::new_v4().to_string(), task_id: Uuid::new_v4().to_string(),
@ -1025,6 +1025,7 @@ async fn get_entries(
error!("list task entries failed: {}", err); error!("list task entries failed: {}", err);
})?; })?;
info!("list task entries response: {:?}", response.entries);
Ok(response Ok(response
.entries .entries
.into_iter() .into_iter()
@ -1184,6 +1185,9 @@ fn is_normal_relative_path(path: &str) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use dragonfly_api::dfdaemon::v2::{Entry, ListTaskEntriesResponse};
use mocktail::prelude::*;
use std::collections::HashMap;
use tempfile::tempdir; use tempfile::tempdir;
#[test] #[test]
@ -1400,4 +1404,313 @@ mod tests {
let result = make_output_by_entry(url, output, entry); let result = make_output_by_entry(url, output, entry);
assert!(result.is_err()); assert!(result.is_err());
} }
#[tokio::test]
async fn should_get_empty_entries() {
let mut mocks = MockSet::new();
mocks.mock(|when, then| {
when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries");
then.pb(ListTaskEntriesResponse {
content_length: 0,
response_header: HashMap::new(),
status_code: None,
entries: vec![],
});
});
let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks);
server.start().await.unwrap();
let dfdaemon_download_client = DfdaemonDownloadClient::new(
Arc::new(dfdaemon::Config::default()),
format!("http://0.0.0.0:{}", server.port().unwrap()),
)
.await
.unwrap();
let entries = get_all_entries(
&Url::parse("http://example.com/root/").unwrap(),
None,
None,
None,
None,
dfdaemon_download_client,
)
.await
.unwrap();
assert_eq!(entries.len(), 0);
}
#[tokio::test]
async fn should_get_all_entries_in_subdir() {
let mut mocks = MockSet::new();
mocks.mock(|when, then| {
when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries");
then.pb(ListTaskEntriesResponse {
content_length: 0,
response_header: HashMap::new(),
status_code: None,
entries: vec![
Entry {
url: "http://example.com/root/dir1/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir1/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir2/file1.txt".to_string(),
content_length: 200,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir2/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
],
});
});
let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks);
server.start().await.unwrap();
let dfdaemon_download_client = DfdaemonDownloadClient::new(
Arc::new(dfdaemon::Config::default()),
format!("http://0.0.0.0:{}", server.port().unwrap()),
)
.await
.unwrap();
let entries = get_all_entries(
&Url::parse("http://example.com/root/").unwrap(),
None,
None,
None,
None,
dfdaemon_download_client,
)
.await
.unwrap();
assert_eq!(
entries.into_iter().collect::<HashSet<_>>(),
vec![
DirEntry {
url: "http://example.com/root/dir1/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir1/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir1/".to_string(),
content_length: 0,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir2/file1.txt".to_string(),
content_length: 200,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir2/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir2/".to_string(),
content_length: 0,
is_dir: true,
},
]
.into_iter()
.collect::<HashSet<_>>()
);
}
#[tokio::test]
async fn should_get_all_entries_in_rootdir() {
let mut mocks = MockSet::new();
mocks.mock(|when, then| {
when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries");
then.pb(ListTaskEntriesResponse {
content_length: 0,
response_header: HashMap::new(),
status_code: None,
entries: vec![
Entry {
url: "http://example.com/root/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
],
});
});
let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks);
server.start().await.unwrap();
let dfdaemon_download_client = DfdaemonDownloadClient::new(
Arc::new(dfdaemon::Config::default()),
format!("http://0.0.0.0:{}", server.port().unwrap()),
)
.await
.unwrap();
let entries = get_all_entries(
&Url::parse("http://example.com/root/").unwrap(),
None,
None,
None,
None,
dfdaemon_download_client,
)
.await
.unwrap();
assert_eq!(
entries.into_iter().collect::<HashSet<_>>(),
vec![
DirEntry {
url: "http://example.com/root/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
]
.into_iter()
.collect::<HashSet<_>>()
);
}
#[tokio::test]
async fn should_get_all_entries_in_rootdir_and_subdir() {
let mut mocks = MockSet::new();
mocks.mock(|when, then| {
when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries");
then.pb(ListTaskEntriesResponse {
content_length: 0,
response_header: HashMap::new(),
status_code: None,
entries: vec![
Entry {
url: "http://example.com/root/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir1/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir1/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir2/file1.txt".to_string(),
content_length: 200,
is_dir: false,
},
Entry {
url: "http://example.com/root/dir2/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
],
});
});
let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks);
server.start().await.unwrap();
let dfdaemon_download_client = DfdaemonDownloadClient::new(
Arc::new(dfdaemon::Config::default()),
format!("http://0.0.0.0:{}", server.port().unwrap()),
)
.await
.unwrap();
let entries = get_all_entries(
&Url::parse("http://example.com/root/").unwrap(),
None,
None,
None,
None,
dfdaemon_download_client,
)
.await
.unwrap();
assert_eq!(
entries.into_iter().collect::<HashSet<_>>(),
vec![
DirEntry {
url: "http://example.com/root/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir1/file1.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir1/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir1/".to_string(),
content_length: 0,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir2/file1.txt".to_string(),
content_length: 200,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir2/file2.txt".to_string(),
content_length: 200,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir2/".to_string(),
content_length: 0,
is_dir: true,
},
]
.into_iter()
.collect::<HashSet<_>>()
);
}
} }

View File

@ -70,6 +70,7 @@ use tonic::{
use tower::{service_fn, ServiceBuilder}; use tower::{service_fn, ServiceBuilder};
use tracing::{error, info, instrument, Instrument, Span}; use tracing::{error, info, instrument, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;
use super::interceptor::{ExtractTracingInterceptor, InjectTracingInterceptor}; use super::interceptor::{ExtractTracingInterceptor, InjectTracingInterceptor};
@ -1343,7 +1344,61 @@ pub struct DfdaemonDownloadClient {
/// DfdaemonDownloadClient implements the grpc client of the dfdaemon download. /// DfdaemonDownloadClient implements the grpc client of the dfdaemon download.
impl DfdaemonDownloadClient { impl DfdaemonDownloadClient {
/// new_unix creates a new DfdaemonDownloadClient with unix domain socket. /// Creates a new DfdaemonDownloadClient.
pub async fn new(config: Arc<Config>, addr: String) -> ClientResult<Self> {
let domain_name = Url::parse(addr.as_str())?
.host_str()
.ok_or(ClientError::InvalidParameter)
.inspect_err(|_err| {
error!("invalid address: {}", addr);
})?
.to_string();
let channel = match config
.upload
.client
.load_client_tls_config(domain_name.as_str())
.await?
{
Some(client_tls_config) => {
Channel::from_static(Box::leak(addr.clone().into_boxed_str()))
.tls_config(client_tls_config)?
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr, err);
})
.or_err(ErrorType::ConnectError)?
}
None => Channel::from_static(Box::leak(addr.clone().into_boxed_str()))
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr, err);
})
.or_err(ErrorType::ConnectError)?,
};
let client =
DfdaemonDownloadGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })
}
/// Creates a new DfdaemonDownloadClient with unix domain socket.
pub async fn new_unix(socket_path: PathBuf) -> ClientResult<Self> { pub async fn new_unix(socket_path: PathBuf) -> ClientResult<Self> {
// Ignore the uri because it is not used. // Ignore the uri because it is not used.
let channel = Endpoint::try_from("http://[::]:50051") let channel = Endpoint::try_from("http://[::]:50051")