diff --git a/Cargo.lock b/Cargo.lock index dbd677b5..f15a26c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,7 @@ dependencies = [ "lazy_static", "leaky-bucket", "local-ip-address", + "mocktail", "openssl", "opentelemetry", "opentelemetry-otlp", diff --git a/Cargo.toml b/Cargo.toml index b441eb2c..d3b39046 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ hostname = "^0.4" tonic-health = "0.12.3" hashring = "0.3.6" reqwest-tracing = "0.5" +mocktail = "0.3.0" [profile.release] opt-level = 3 diff --git a/dragonfly-client-util/Cargo.toml b/dragonfly-client-util/Cargo.toml index 0a85d927..e2ec5c53 100644 --- a/dragonfly-client-util/Cargo.toml +++ b/dragonfly-client-util/Cargo.toml @@ -52,4 +52,4 @@ quinn = "0.11.9" [dev-dependencies] tempfile.workspace = true -mocktail = "0.3.0" +mocktail.workspace = true diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index a360ffbf..c1f82e53 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -93,6 +93,7 @@ console-subscriber = "0.4.1" [dev-dependencies] tempfile.workspace = true +mocktail.workspace = true [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 69264699..2a6a2575 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -712,7 +712,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re /// Get all entries in the directory with include files filter. async fn get_all_entries( - url: &Url, + base_url: &Url, header: Option>, include_files: Option>, object_storage: Option, @@ -723,7 +723,7 @@ async fn get_all_entries( Some(files) => { let mut urls = HashSet::with_capacity(files.len()); for file in files { - let url = url.join(&file).or_err(ErrorType::ParseError)?; + let url = base_url.join(&file).or_err(ErrorType::ParseError)?; urls.insert(url); } @@ -731,7 +731,7 @@ async fn get_all_entries( } None => { let mut urls = HashSet::with_capacity(1); - urls.insert(url.clone()); + urls.insert(base_url.clone()); urls } }; @@ -750,7 +750,7 @@ async fn get_all_entries( }); let parent = url.join(".").or_err(ErrorType::ParseError)?; - if parent.path() != "/" { + if parent.path() != base_url.path() { entries.insert(DirEntry { url: parent.to_string(), content_length: 0, @@ -784,7 +784,7 @@ async fn get_all_entries( .join(".") .or_err(ErrorType::ParseError)?; - if parent.path() != "/" { + if parent.path() != base_url.path() { dir_entries.push(DirEntry { url: parent.to_string(), content_length: 0, @@ -796,9 +796,10 @@ async fn get_all_entries( let mut seen = HashSet::new(); entries.retain(|entry| seen.insert(entry.clone())); entries.extend(dir_entries.clone()); - info!("add entries {:?} by dir url: {}", dir_entries, url); + info!("add entries {:?} by dir url: {}", entries, url); } + info!("get all entries: {:?}", entries); Ok(entries.into_iter().collect()) } @@ -1008,7 +1009,6 @@ async fn get_entries( download_client: DfdaemonDownloadClient, ) -> Result> { info!("list task entries: {:?}", url); - // List task entries. let response = download_client .list_task_entries(ListTaskEntriesRequest { task_id: Uuid::new_v4().to_string(), @@ -1025,6 +1025,7 @@ async fn get_entries( error!("list task entries failed: {}", err); })?; + info!("list task entries response: {:?}", response.entries); Ok(response .entries .into_iter() @@ -1184,6 +1185,9 @@ fn is_normal_relative_path(path: &str) -> bool { #[cfg(test)] mod tests { use super::*; + use dragonfly_api::dfdaemon::v2::{Entry, ListTaskEntriesResponse}; + use mocktail::prelude::*; + use std::collections::HashMap; use tempfile::tempdir; #[test] @@ -1400,4 +1404,313 @@ mod tests { let result = make_output_by_entry(url, output, entry); assert!(result.is_err()); } + + #[tokio::test] + async fn should_get_empty_entries() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!(entries.len(), 0); + } + + #[tokio::test] + async fn should_get_all_entries_in_subdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/".to_string(), + content_length: 0, + is_dir: true, + }, + DirEntry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/".to_string(), + content_length: 0, + is_dir: true, + }, + ] + .into_iter() + .collect::>() + ); + } + + #[tokio::test] + async fn should_get_all_entries_in_rootdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ] + .into_iter() + .collect::>() + ); + } + + #[tokio::test] + async fn should_get_all_entries_in_rootdir_and_subdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/".to_string(), + content_length: 0, + is_dir: true, + }, + DirEntry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/".to_string(), + content_length: 0, + is_dir: true, + }, + ] + .into_iter() + .collect::>() + ); + } } diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 8b3c5ed8..f2122cd0 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -70,6 +70,7 @@ use tonic::{ use tower::{service_fn, ServiceBuilder}; use tracing::{error, info, instrument, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use url::Url; use super::interceptor::{ExtractTracingInterceptor, InjectTracingInterceptor}; @@ -1343,7 +1344,61 @@ pub struct DfdaemonDownloadClient { /// DfdaemonDownloadClient implements the grpc client of the dfdaemon download. impl DfdaemonDownloadClient { - /// new_unix creates a new DfdaemonDownloadClient with unix domain socket. + /// Creates a new DfdaemonDownloadClient. + pub async fn new(config: Arc, addr: String) -> ClientResult { + let domain_name = Url::parse(addr.as_str())? + .host_str() + .ok_or(ClientError::InvalidParameter) + .inspect_err(|_err| { + error!("invalid address: {}", addr); + })? + .to_string(); + + let channel = match config + .upload + .client + .load_client_tls_config(domain_name.as_str()) + .await? + { + Some(client_tls_config) => { + Channel::from_static(Box::leak(addr.clone().into_boxed_str())) + .tls_config(client_tls_config)? + .buffer_size(super::BUFFER_SIZE) + .connect_timeout(super::CONNECT_TIMEOUT) + .timeout(super::REQUEST_TIMEOUT) + .tcp_keepalive(Some(super::TCP_KEEPALIVE)) + .http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL) + .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) + .connect() + .await + .inspect_err(|err| { + error!("connect to {} failed: {}", addr, err); + }) + .or_err(ErrorType::ConnectError)? + } + None => Channel::from_static(Box::leak(addr.clone().into_boxed_str())) + .buffer_size(super::BUFFER_SIZE) + .connect_timeout(super::CONNECT_TIMEOUT) + .timeout(super::REQUEST_TIMEOUT) + .tcp_keepalive(Some(super::TCP_KEEPALIVE)) + .http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL) + .keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT) + .connect() + .await + .inspect_err(|err| { + error!("connect to {} failed: {}", addr, err); + }) + .or_err(ErrorType::ConnectError)?, + }; + + let client = + DfdaemonDownloadGRPCClient::with_interceptor(channel, InjectTracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); + Ok(Self { client }) + } + + /// Creates a new DfdaemonDownloadClient with unix domain socket. pub async fn new_unix(socket_path: PathBuf) -> ClientResult { // Ignore the uri because it is not used. let channel = Endpoint::try_from("http://[::]:50051")