diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index ec451def..2a6a2575 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -18,7 +18,7 @@ use bytesize::ByteSize; use clap::Parser; use dragonfly_api::common::v2::{Download, Hdfs, ObjectStorage, TaskType}; use dragonfly_api::dfdaemon::v2::{ - download_task_response, DownloadTaskRequest, ListTaskEntriesRequest, ListTaskEntriesResponse, + download_task_response, DownloadTaskRequest, ListTaskEntriesRequest, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient; @@ -712,7 +712,7 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re /// Get all entries in the directory with include files filter. async fn get_all_entries( - url: &Url, + base_url: &Url, header: Option>, include_files: Option>, object_storage: Option, @@ -723,7 +723,7 @@ async fn get_all_entries( Some(files) => { let mut urls = HashSet::with_capacity(files.len()); for file in files { - let url = url.join(&file).or_err(ErrorType::ParseError)?; + let url = base_url.join(&file).or_err(ErrorType::ParseError)?; urls.insert(url); } @@ -731,7 +731,7 @@ async fn get_all_entries( } None => { let mut urls = HashSet::with_capacity(1); - urls.insert(url.clone()); + urls.insert(base_url.clone()); urls } }; @@ -750,7 +750,7 @@ async fn get_all_entries( }); let parent = url.join(".").or_err(ErrorType::ParseError)?; - if parent.path() != "/" { + if parent.path() != base_url.path() { entries.insert(DirEntry { url: parent.to_string(), content_length: 0, @@ -784,7 +784,7 @@ async fn get_all_entries( .join(".") .or_err(ErrorType::ParseError)?; - if parent.path() != "/" { + if parent.path() != base_url.path() { dir_entries.push(DirEntry { url: parent.to_string(), content_length: 0, @@ -796,9 +796,10 @@ async fn get_all_entries( let mut seen = HashSet::new(); entries.retain(|entry| seen.insert(entry.clone())); entries.extend(dir_entries.clone()); - info!("add entries {:?} by dir url: {}", dir_entries, url); + info!("add entries {:?} by dir url: {}", entries, url); } + info!("get all entries: {:?}", entries); Ok(entries.into_iter().collect()) } @@ -1008,7 +1009,6 @@ async fn get_entries( download_client: DfdaemonDownloadClient, ) -> Result> { info!("list task entries: {:?}", url); - // List task entries. let response = download_client .list_task_entries(ListTaskEntriesRequest { task_id: Uuid::new_v4().to_string(), @@ -1025,6 +1025,7 @@ async fn get_entries( error!("list task entries failed: {}", err); })?; + info!("list task entries response: {:?}", response.entries); Ok(response .entries .into_iter() @@ -1184,7 +1185,9 @@ fn is_normal_relative_path(path: &str) -> bool { #[cfg(test)] mod tests { use super::*; + use dragonfly_api::dfdaemon::v2::{Entry, ListTaskEntriesResponse}; use mocktail::prelude::*; + use std::collections::HashMap; use tempfile::tempdir; #[test] @@ -1403,12 +1406,16 @@ mod tests { } #[tokio::test] - async fn should_get_all_entries() { + async fn should_get_empty_entries() { let mut mocks = MockSet::new(); mocks.mock(|when, then| { - when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries") - .pb(ListTaskEntriesResponse {}); - then.pb(ListTaskEntriesRequest { hosts: vec![] }); + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![], + }); }); let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); @@ -1429,6 +1436,281 @@ mod tests { None, dfdaemon_download_client, ) - .await; + .await + .unwrap(); + + assert_eq!(entries.len(), 0); + } + + #[tokio::test] + async fn should_get_all_entries_in_subdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/".to_string(), + content_length: 0, + is_dir: true, + }, + DirEntry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/".to_string(), + content_length: 0, + is_dir: true, + }, + ] + .into_iter() + .collect::>() + ); + } + + #[tokio::test] + async fn should_get_all_entries_in_rootdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ] + .into_iter() + .collect::>() + ); + } + + #[tokio::test] + async fn should_get_all_entries_in_rootdir_and_subdir() { + let mut mocks = MockSet::new(); + mocks.mock(|when, then| { + when.path("/dfdaemon.v2.DfdaemonDownload/ListTaskEntries"); + then.pb(ListTaskEntriesResponse { + content_length: 0, + response_header: HashMap::new(), + status_code: None, + entries: vec![ + Entry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + Entry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + ], + }); + }); + + let server = MockServer::new_grpc("dfdaemon.v2.DfdaemonDownload").with_mocks(mocks); + server.start().await.unwrap(); + + let dfdaemon_download_client = DfdaemonDownloadClient::new( + Arc::new(dfdaemon::Config::default()), + format!("http://0.0.0.0:{}", server.port().unwrap()), + ) + .await + .unwrap(); + + let entries = get_all_entries( + &Url::parse("http://example.com/root/").unwrap(), + None, + None, + None, + None, + dfdaemon_download_client, + ) + .await + .unwrap(); + + assert_eq!( + entries.into_iter().collect::>(), + vec![ + DirEntry { + url: "http://example.com/root/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file1.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/file2.txt".to_string(), + content_length: 100, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir1/".to_string(), + content_length: 0, + is_dir: true, + }, + DirEntry { + url: "http://example.com/root/dir2/file1.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/file2.txt".to_string(), + content_length: 200, + is_dir: false, + }, + DirEntry { + url: "http://example.com/root/dir2/".to_string(), + content_length: 0, + is_dir: true, + }, + ] + .into_iter() + .collect::>() + ); } }