refactor(dfget): Improve logic and error handling in `filter_entries` (#1258)
- Updated function signature to use references (`&Url`, `&[String]`) for efficiency.
- Improved error handling with detailed `ValidationError` messages instead of generic `UnexpectedResponse`.
- Renamed `rel_path_to_entry` to `entries_by_relative_path` for better clarity.
- Replaced `Vec` with `HashSet` for filtered entries to avoid duplicates.
- Simplified parent directory path construction using `join("")`.
- Enhanced doc comments to clearly describe functionality and behavior.
- Streamlined pattern compilation and iteration using `iter()`.
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
cca88b3eea
commit
b30993eef0
|
|
@ -954,7 +954,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
|
@ -1029,7 +1029,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-backend"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"dragonfly-api",
|
||||
"dragonfly-client-core",
|
||||
|
|
@ -1060,7 +1060,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-config"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"bytesize",
|
||||
"bytesize-serde",
|
||||
|
|
@ -1090,7 +1090,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-core"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"headers 0.4.1",
|
||||
"hyper 1.6.0",
|
||||
|
|
@ -1108,7 +1108,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-init"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
|
|
@ -1126,7 +1126,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-storage"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes",
|
||||
|
|
@ -1155,7 +1155,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "dragonfly-client-util"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytesize",
|
||||
|
|
@ -1564,7 +1564,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "hdfs"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
dependencies = [
|
||||
"dragonfly-client-backend",
|
||||
"dragonfly-client-core",
|
||||
|
|
|
|||
16
Cargo.toml
16
Cargo.toml
|
|
@ -12,7 +12,7 @@ members = [
|
|||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
authors = ["The Dragonfly Developers"]
|
||||
homepage = "https://d7y.io/"
|
||||
repository = "https://github.com/dragonflyoss/client.git"
|
||||
|
|
@ -22,13 +22,13 @@ readme = "README.md"
|
|||
edition = "2021"
|
||||
|
||||
[workspace.dependencies]
|
||||
dragonfly-client = { path = "dragonfly-client", version = "1.0.6" }
|
||||
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.6" }
|
||||
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.6" }
|
||||
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.6" }
|
||||
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.6" }
|
||||
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.6" }
|
||||
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.6" }
|
||||
dragonfly-client = { path = "dragonfly-client", version = "1.0.7" }
|
||||
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.7" }
|
||||
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.7" }
|
||||
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.7" }
|
||||
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.7" }
|
||||
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.7" }
|
||||
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.7" }
|
||||
dragonfly-api = "=2.1.49"
|
||||
thiserror = "2.0"
|
||||
futures = "0.3.31"
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ where
|
|||
}
|
||||
|
||||
/// The File Entry of a directory, including some relevant file metadata.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
pub struct DirEntry {
|
||||
/// url is the url of the entry.
|
||||
pub url: String,
|
||||
|
|
|
|||
|
|
@ -129,7 +129,13 @@ pub struct ExportCommand {
|
|||
|
||||
/// Implement the execute for ExportCommand.
|
||||
impl ExportCommand {
|
||||
/// execute executes the export command.
|
||||
/// Executes the export command with comprehensive validation and advanced error handling.
|
||||
///
|
||||
/// This function serves as the main entry point for the dfcache export command execution.
|
||||
/// It handles the complete workflow including argument parsing, validation, logging setup,
|
||||
/// dfdaemon client connection, and export operation execution. The function provides
|
||||
/// sophisticated error reporting with colored terminal output, including specialized
|
||||
/// handling for backend errors with HTTP status codes and headers.
|
||||
pub async fn execute(&self) -> Result<()> {
|
||||
// Parse command line arguments.
|
||||
Args::parse();
|
||||
|
|
@ -436,7 +442,13 @@ impl ExportCommand {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// run runs the export command.
|
||||
/// Executes the export operation to retrieve cached files from the persistent cache system.
|
||||
///
|
||||
/// This function handles the core export functionality by downloading a cached file from the
|
||||
/// dfdaemon persistent cache system. It supports two transfer modes: direct file transfer
|
||||
/// by dfdaemon (hardlink/copy) or streaming piece content through the client for manual
|
||||
/// file assembly. The operation provides real-time progress feedback and handles file
|
||||
/// creation, directory setup, and efficient piece-by-piece writing with sparse file allocation.
|
||||
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
|
||||
// Dfcache needs to notify dfdaemon to transfer the piece content of downloading file via unix domain socket
|
||||
// when the `transfer_from_dfdaemon` is true. Otherwise, dfdaemon will download the file and hardlink or
|
||||
|
|
@ -565,7 +577,12 @@ impl ExportCommand {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// validate_args validates the command line arguments.
|
||||
/// Validates command line arguments for the export operation to ensure safe file output.
|
||||
///
|
||||
/// This function performs essential validation of the output path to prevent file conflicts
|
||||
/// and ensure the target location is suitable for export operations. It checks parent
|
||||
/// directory existence, prevents accidental file overwrites, and validates path accessibility
|
||||
/// before allowing the export operation to proceed.
|
||||
fn validate_args(&self) -> Result<()> {
|
||||
let absolute_path = Path::new(&self.output).absolutize()?;
|
||||
match absolute_path.parent() {
|
||||
|
|
|
|||
|
|
@ -128,7 +128,13 @@ pub struct ImportCommand {
|
|||
|
||||
/// Implement the execute for ImportCommand.
|
||||
impl ImportCommand {
|
||||
/// execute executes the import sub command.
|
||||
/// Executes the import sub command with comprehensive validation and error handling.
|
||||
///
|
||||
/// This function serves as the main entry point for the dfcache import command execution.
|
||||
/// It handles the complete workflow including argument parsing, validation, logging setup,
|
||||
/// dfdaemon client connection, and import operation execution. The function provides
|
||||
/// detailed error reporting with colored terminal output and follows a fail-fast approach
|
||||
/// with immediate process termination on any critical failures.
|
||||
pub async fn execute(&self) -> Result<()> {
|
||||
// Parse command line arguments.
|
||||
Args::parse();
|
||||
|
|
@ -326,7 +332,13 @@ impl ImportCommand {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// run runs the import sub command.
|
||||
/// Executes the cache import operation by uploading a file to the persistent cache system.
|
||||
///
|
||||
/// This function handles the core import functionality by uploading a local file to the
|
||||
/// dfdaemon persistent cache system. It provides visual feedback through a progress spinner,
|
||||
/// converts the file path to absolute format, and configures the cache task with specified
|
||||
/// parameters including TTL, replica count, and piece length. The operation is asynchronous
|
||||
/// and provides completion feedback with the generated task ID.
|
||||
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
|
||||
let absolute_path = Path::new(&self.path).absolutize()?;
|
||||
info!("import file: {}", absolute_path.to_string_lossy());
|
||||
|
|
@ -363,7 +375,12 @@ impl ImportCommand {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// validate_args validates the command line arguments.
|
||||
/// Validates command line arguments for the import operation to ensure safe and correct execution.
|
||||
///
|
||||
/// This function performs comprehensive validation of import-specific parameters to prevent
|
||||
/// invalid operations and ensure the import request meets all system requirements. It validates
|
||||
/// TTL boundaries, file existence and type, and piece length constraints before allowing the
|
||||
/// import operation to proceed.
|
||||
fn validate_args(&self) -> Result<()> {
|
||||
if self.ttl < Duration::from_secs(5 * 60)
|
||||
|| self.ttl > Duration::from_secs(7 * 24 * 60 * 60)
|
||||
|
|
|
|||
|
|
@ -106,7 +106,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
|
||||
/// Creates and validates a dfdaemon download client with health checking.
|
||||
///
|
||||
/// This function establishes a connection to the dfdaemon service via Unix domain socket
|
||||
/// and performs a health check to ensure the service is running and ready to handle
|
||||
/// download requests. Only after successful health verification does it return the
|
||||
/// download client for actual use.
|
||||
pub async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
|
||||
// Check dfdaemon's health.
|
||||
let health_client = HealthClient::new_unix(endpoint.clone()).await?;
|
||||
|
|
|
|||
|
|
@ -74,7 +74,13 @@ pub struct StatCommand {
|
|||
|
||||
/// Implement the execute for StatCommand.
|
||||
impl StatCommand {
|
||||
/// execute executes the stat command.
|
||||
/// Executes the stat command with comprehensive error handling and user feedback.
|
||||
///
|
||||
/// This function serves as the main entry point for the dfcache stat command execution.
|
||||
/// It handles the complete lifecycle including argument parsing, logging initialization,
|
||||
/// dfdaemon client setup, and command execution with detailed error reporting. The
|
||||
/// function provides colored terminal output for better user experience and exits
|
||||
/// with appropriate status codes on failure.
|
||||
pub async fn execute(&self) -> Result<()> {
|
||||
// Parse command line arguments.
|
||||
Args::parse();
|
||||
|
|
@ -234,7 +240,12 @@ impl StatCommand {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// run runs the stat command.
|
||||
/// Executes the stat command to retrieve and display persistent cache task information.
|
||||
///
|
||||
/// This function queries the dfdaemon service for detailed information about a specific
|
||||
/// persistent cache task and presents it in a formatted table for user consumption.
|
||||
/// It handles data conversion from raw protocol buffer values to human-readable formats
|
||||
/// including byte sizes, durations, and timestamps with proper timezone conversion.
|
||||
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
|
||||
let task = dfdaemon_download_client
|
||||
.stat_persistent_cache_task(StatPersistentCacheTaskRequest {
|
||||
|
|
|
|||
|
|
@ -36,7 +36,8 @@ use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
|
|||
use local_ip_address::local_ip;
|
||||
use path_absolutize::*;
|
||||
use percent_encoding::percent_decode_str;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
|
@ -110,7 +111,7 @@ struct Args {
|
|||
|
||||
#[arg(
|
||||
long = "content-for-calculating-task-id",
|
||||
help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on url, piece-length, tag, application, and filtered-query-params."
|
||||
help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on URL, piece-length, tag, application, and filtered-query-params."
|
||||
)]
|
||||
content_for_calculating_task_id: Option<String>,
|
||||
|
||||
|
|
@ -140,7 +141,7 @@ struct Args {
|
|||
#[arg(
|
||||
long = "digest",
|
||||
required = false,
|
||||
help = "Verify the integrity of the downloaded file using the specified digest, support sha256, sha512, crc32. If the digest is not specified, the downloaded file will not be verified. Format: <algorithm>:<digest>, e.g. sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef, crc32:12345678"
|
||||
help = "Verify the integrity of the downloaded file using the specified digest, support sha256, sha512, crc32. If the digest is not specified, the downloaded file will not be verified. Format: <algorithm>:<digest>. Examples: sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef, crc32:12345678"
|
||||
)]
|
||||
digest: Option<String>,
|
||||
|
||||
|
|
@ -162,14 +163,14 @@ struct Args {
|
|||
#[arg(
|
||||
long = "application",
|
||||
default_value = "",
|
||||
help = "Different applications for the same url will be divided into different tasks"
|
||||
help = "Different applications for the same URL will be divided into different tasks"
|
||||
)]
|
||||
application: String,
|
||||
|
||||
#[arg(
|
||||
long = "tag",
|
||||
default_value = "",
|
||||
help = "Different tags for the same url will be divided into different tasks"
|
||||
help = "Different tags for the same URL will be divided into different tasks"
|
||||
)]
|
||||
tag: String,
|
||||
|
||||
|
|
@ -177,21 +178,21 @@ struct Args {
|
|||
short = 'H',
|
||||
long = "header",
|
||||
required = false,
|
||||
help = "Specify the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'"
|
||||
help = "Specify the header for downloading file. Examples: --header='Content-Type: application/json' --header='Accept: application/json'"
|
||||
)]
|
||||
header: Option<Vec<String>>,
|
||||
|
||||
#[arg(
|
||||
long = "filtered-query-param",
|
||||
required = false,
|
||||
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filtered-query-param='signature' --filtered-query-param='timeout'"
|
||||
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task. Examples: --filtered-query-param='signature' --filtered-query-param='timeout'"
|
||||
)]
|
||||
filtered_query_params: Option<Vec<String>>,
|
||||
|
||||
#[arg(
|
||||
long = "include-files",
|
||||
required = false,
|
||||
help = "Specify which files to download when retrieving a directory. The file paths listed in include-files should be relative to the current directory. e.g. --include-files='*.txt' --include-files='example.txt'"
|
||||
help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files='*.txt' --include-files='subdir/file.txt'"
|
||||
)]
|
||||
include_files: Option<Vec<String>>,
|
||||
|
||||
|
|
@ -602,7 +603,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// run runs the dfget command.
|
||||
/// Runs the dfget command to download files or directories from a given URL.
|
||||
///
|
||||
/// This function serves as the main entry point for the dfget download operation.
|
||||
/// It handles both single file downloads and directory downloads based on the URL format.
|
||||
/// The function performs path normalization, validates the URL scheme's capabilities,
|
||||
/// and delegates to the appropriate download handler.
|
||||
async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
|
||||
// Get the absolute path of the output file.
|
||||
args.output = Path::new(&args.output).absolutize()?.into();
|
||||
|
|
@ -622,7 +628,13 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -
|
|||
download(args, ProgressBar::new(0), dfdaemon_download_client).await
|
||||
}
|
||||
|
||||
/// download_dir downloads all files in the directory.
|
||||
/// Downloads all files in a directory from various storage backends (object storage, HDFS, etc.).
|
||||
///
|
||||
/// This function handles directory-based downloads by recursively fetching all entries
|
||||
/// in the specified directory. It supports filtering files based on include patterns,
|
||||
/// enforces download limits, and performs concurrent downloads with configurable
|
||||
/// concurrency control. The function creates the necessary directory structure
|
||||
/// locally and downloads files while preserving the remote directory hierarchy.
|
||||
async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> {
|
||||
// Initialize the object storage config and the hdfs config.
|
||||
let object_storage = Some(ObjectStorage {
|
||||
|
|
@ -640,14 +652,10 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
|
|||
});
|
||||
|
||||
// Get all entries in the directory.
|
||||
let entries = get_entries(args.clone(), object_storage, hdfs, download_client.clone()).await?;
|
||||
// Filter the entries by the include_files.
|
||||
let entries = match args.include_files {
|
||||
Some(ref include_files) => {
|
||||
filter_entries(args.url.clone(), entries, include_files.clone())?
|
||||
}
|
||||
None => entries,
|
||||
};
|
||||
let mut entries = get_entries(&args, object_storage, hdfs, download_client.clone()).await?;
|
||||
if let Some(ref include_files) = args.include_files {
|
||||
entries = filter_entries(&args.url, entries, include_files)?;
|
||||
}
|
||||
|
||||
// If the entries is empty, then return directly.
|
||||
if entries.is_empty() {
|
||||
|
|
@ -724,7 +732,13 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// download downloads the single file.
|
||||
/// Downloads a single file from various storage backends using the dfdaemon service.
|
||||
///
|
||||
/// This function handles single file downloads by communicating with a dfdaemon client.
|
||||
/// It supports multiple storage protocols (object storage, HDFS, HTTP/HTTPS) and provides
|
||||
/// two transfer modes: direct download by dfdaemon or streaming piece content through
|
||||
/// the client. The function includes progress tracking, file creation, and proper error
|
||||
/// handling throughout the download process.
|
||||
async fn download(
|
||||
args: Args,
|
||||
progress_bar: ProgressBar,
|
||||
|
|
@ -897,9 +911,15 @@ async fn download(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// get_entries gets all entries in the directory.
|
||||
/// Retrieves all directory entries from a remote storage location.
|
||||
///
|
||||
/// This function communicates with the dfdaemon service to list all entries
|
||||
/// (files and subdirectories) in the specified directory URL. It supports
|
||||
/// various storage backends including object storage and HDFS by passing
|
||||
/// the appropriate credentials and configuration. The function converts
|
||||
/// the gRPC response into a local `DirEntry` format for further processing.
|
||||
async fn get_entries(
|
||||
args: Args,
|
||||
args: &Args,
|
||||
object_storage: Option<ObjectStorage>,
|
||||
hdfs: Option<Hdfs>,
|
||||
download_client: DfdaemonDownloadClient,
|
||||
|
|
@ -910,7 +930,7 @@ async fn get_entries(
|
|||
.list_task_entries(ListTaskEntriesRequest {
|
||||
task_id: Uuid::new_v4().to_string(),
|
||||
url: args.url.to_string(),
|
||||
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
|
||||
request_header: header_vec_to_hashmap(args.header.clone().unwrap_or_default())?,
|
||||
timeout: None,
|
||||
certificate_chain: Vec::new(),
|
||||
object_storage,
|
||||
|
|
@ -933,7 +953,74 @@ async fn get_entries(
|
|||
.collect())
|
||||
}
|
||||
|
||||
/// make_output_by_entry makes the output path by the entry information.
|
||||
/// Filters directory entries based on include patterns and validates their URLs.
|
||||
///
|
||||
/// This function takes a collection of directory entries and filters them based on
|
||||
/// glob patterns specified in `include_files`. It performs URL validation to ensure
|
||||
/// all entries have valid URLs and that their paths fall within the scope of the
|
||||
/// root URL. When an entry matches a pattern, both the entry and its parent
|
||||
/// directory (if it exists) are included in the result.
|
||||
fn filter_entries(
|
||||
url: &Url,
|
||||
entries: Vec<DirEntry>,
|
||||
include_files: &[String],
|
||||
) -> Result<Vec<DirEntry>> {
|
||||
let patterns: Vec<Pattern> = include_files
|
||||
.iter()
|
||||
.filter_map(|include_file| Pattern::new(include_file).ok())
|
||||
.collect();
|
||||
|
||||
// Build a HashMap of DirEntry objects keyed by relative paths for filtering and
|
||||
// validates URLs and ensures paths are within the root URL's scope.
|
||||
let mut entries_by_relative_path = HashMap::with_capacity(entries.len());
|
||||
for entry in entries {
|
||||
let entry_url = Url::parse(&entry.url).map_err(|err| {
|
||||
error!("failed to parse entry URL '{}': {}", entry.url, err);
|
||||
Error::ValidationError(format!("invalid URL: {}", entry.url))
|
||||
})?;
|
||||
|
||||
let entry_path = entry_url.path();
|
||||
match entry_path.strip_prefix(url.path()) {
|
||||
Some(relative_path) => entries_by_relative_path
|
||||
.insert(relative_path.trim_start_matches('/').to_string(), entry),
|
||||
None => {
|
||||
error!(
|
||||
"entry path '{}' does not belong to the root path",
|
||||
entry_path
|
||||
);
|
||||
return Err(Error::ValidationError(format!(
|
||||
"path '{}' is outside the expected scope",
|
||||
entry_path
|
||||
)));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Filter entries by matching relative paths against patterns, including
|
||||
// parent directories for matches.
|
||||
let mut filtered_entries = HashSet::new();
|
||||
for (relative_path, entry) in &entries_by_relative_path {
|
||||
if patterns.iter().any(|pat| pat.matches(relative_path)) {
|
||||
filtered_entries.insert(entry.clone());
|
||||
if let Some(parent) = std::path::Path::new(relative_path).parent() {
|
||||
if let Some(parent_entry) =
|
||||
entries_by_relative_path.get(&parent.join("").to_string_lossy().to_string())
|
||||
{
|
||||
filtered_entries.insert(parent_entry.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(filtered_entries.into_iter().collect())
|
||||
}
|
||||
|
||||
/// Constructs the local output path for a directory entry based on its remote URL.
|
||||
///
|
||||
/// This function maps a remote directory entry to its corresponding local file system
|
||||
/// path by replacing the remote root directory with the local output directory.
|
||||
/// It handles URL percent-decoding to ensure proper path construction and maintains
|
||||
/// the relative directory structure from the remote source.
|
||||
fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result<PathBuf> {
|
||||
// Get the root directory of the download directory and the output root directory.
|
||||
let root_dir = url.path().to_string();
|
||||
|
|
@ -951,68 +1038,12 @@ fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result<Path
|
|||
.into())
|
||||
}
|
||||
|
||||
/// filter_entries filters the entries by the include_files.
|
||||
fn filter_entries(
|
||||
url: Url,
|
||||
entries: Vec<DirEntry>,
|
||||
include_files: Vec<String>,
|
||||
) -> Result<Vec<DirEntry>> {
|
||||
let root_path = url.path();
|
||||
|
||||
// Compile all glob patterns from include_files.
|
||||
let patterns: Vec<Pattern> = include_files
|
||||
.into_iter()
|
||||
.filter_map(|file| Pattern::new(&file).ok())
|
||||
.collect();
|
||||
|
||||
// Map relative path (from root) to DirEntry for quick lookup.
|
||||
let mut rel_path_to_entry = std::collections::HashMap::new();
|
||||
for entry in entries {
|
||||
let entry_url = Url::parse(&entry.url).map_err(|_| {
|
||||
error!("backend url {} is not a right url", entry.url);
|
||||
Error::UnexpectedResponse
|
||||
})?;
|
||||
let entry_path = entry_url.path();
|
||||
let rel_path = entry_path
|
||||
.strip_prefix(root_path)
|
||||
.map(|p| p.trim_start_matches('/'))
|
||||
.ok_or_else(|| {
|
||||
error!("backend url {} is not a right url", entry_path);
|
||||
Error::UnexpectedResponse
|
||||
})?;
|
||||
rel_path_to_entry.insert(rel_path.to_string(), entry);
|
||||
}
|
||||
|
||||
let mut filtered_entries = Vec::new();
|
||||
let mut seen_rel_paths = std::collections::HashSet::new();
|
||||
|
||||
// For each entry, if it matches any pattern, add it and its parent directory (if present).
|
||||
for (rel_path, entry) in &rel_path_to_entry {
|
||||
// Check if this entry matches any glob pattern.
|
||||
if patterns.iter().any(|pat| pat.matches(rel_path)) {
|
||||
// Add the entry itself if not already added.
|
||||
if seen_rel_paths.insert(rel_path.clone()) {
|
||||
filtered_entries.push(entry.clone());
|
||||
}
|
||||
// Add the immediate parent directory if present in the map and not already added.
|
||||
if let Some(parent) = std::path::Path::new(rel_path).parent() {
|
||||
// Always use trailing slash for directory keys.
|
||||
let parent_str = format!("{}/", parent.to_string_lossy());
|
||||
if !parent_str.is_empty() {
|
||||
if let Some(parent_entry) = rel_path_to_entry.get(&parent_str) {
|
||||
if seen_rel_paths.insert(parent_str.clone()) {
|
||||
filtered_entries.push(parent_entry.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(filtered_entries)
|
||||
}
|
||||
|
||||
/// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
|
||||
/// Creates and validates a dfdaemon download client with health checking.
|
||||
///
|
||||
/// This function establishes a connection to the dfdaemon service via Unix domain socket
|
||||
/// and performs a health check to ensure the service is running and ready to handle
|
||||
/// download requests. Only after successful health verification does it return the
|
||||
/// download client for actual use.
|
||||
async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
|
||||
// Check dfdaemon's health.
|
||||
let health_client = HealthClient::new_unix(endpoint.clone()).await?;
|
||||
|
|
@ -1023,7 +1054,13 @@ async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownl
|
|||
Ok(dfdaemon_download_client)
|
||||
}
|
||||
|
||||
/// validate_args validates the command line arguments.
|
||||
/// Validates command line arguments for consistency and safety requirements.
|
||||
///
|
||||
/// This function performs comprehensive validation of the download arguments to ensure
|
||||
/// they are logically consistent and safe to execute. It checks URL-output path matching,
|
||||
/// directory existence, file conflicts, piece length constraints, and glob pattern validity.
|
||||
/// The validation prevents common user errors and potential security issues before
|
||||
/// starting the download process.
|
||||
fn validate_args(args: &Args) -> Result<()> {
|
||||
// If the URL is a directory, the output path should be a directory.
|
||||
if args.url.path().ends_with('/') && !args.output.is_dir() {
|
||||
|
|
@ -1072,29 +1109,42 @@ fn validate_args(args: &Args) -> Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
// If the include_files is provided, then check the include_files.
|
||||
fn is_valid_relative_path(p: &str) -> bool {
|
||||
!p.is_empty()
|
||||
&& !p.starts_with('/')
|
||||
&& !p.contains("..")
|
||||
&& !p.chars().any(|c| c == '\0')
|
||||
&& Path::new(p).is_relative()
|
||||
}
|
||||
if let Some(ref include_files) = args.include_files {
|
||||
let invalid = include_files
|
||||
.iter()
|
||||
.any(|file| Pattern::new(file).is_err() || !is_valid_relative_path(file));
|
||||
if invalid {
|
||||
return Err(Error::ValidationError(format!(
|
||||
"invalid include files: {}",
|
||||
include_files.join(", ")
|
||||
)));
|
||||
for include_file in include_files {
|
||||
if Pattern::new(include_file).is_err() {
|
||||
return Err(Error::ValidationError(format!(
|
||||
"invalid glob pattern in include_files: '{}'",
|
||||
include_file
|
||||
)));
|
||||
}
|
||||
|
||||
if !is_normal_relative_path(include_file) {
|
||||
return Err(Error::ValidationError(format!(
|
||||
"path is not a normal relative path in include_files: '{}'. It must not contain '..', '.', or start with '/'.",
|
||||
include_file
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates that a path string is a normal relative path without unsafe components.
|
||||
///
|
||||
/// This function ensures that a given path is both relative (doesn't start with '/')
|
||||
/// and contains only normal path components. It rejects paths with parent directory
|
||||
/// references ('..'), current directory references ('.'), or any other special
|
||||
/// path components that could be used for directory traversal attacks or unexpected
|
||||
/// file system navigation.
|
||||
fn is_normal_relative_path(path: &str) -> bool {
|
||||
let path = Path::new(path);
|
||||
path.is_relative()
|
||||
&& path
|
||||
.components()
|
||||
.all(|comp| matches!(comp, Component::Normal(_)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
@ -1296,13 +1346,14 @@ mod tests {
|
|||
];
|
||||
|
||||
let filtered_entries =
|
||||
filter_entries(url, entries, vec!["dir/file.txt".to_string()]).unwrap();
|
||||
filter_entries(&url, entries, &["dir/file.txt".to_string()]).unwrap();
|
||||
assert_eq!(filtered_entries.len(), 2);
|
||||
assert_eq!(
|
||||
filtered_entries[0].url,
|
||||
"http://example.com/root/dir/file.txt"
|
||||
);
|
||||
assert_eq!(filtered_entries[1].url, "http://example.com/root/dir/");
|
||||
assert!(filtered_entries
|
||||
.iter()
|
||||
.any(|entry| entry.url == "http://example.com/root/dir/file.txt"));
|
||||
assert!(filtered_entries
|
||||
.iter()
|
||||
.any(|entry| entry.url == "http://example.com/root/dir/"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -1342,16 +1393,14 @@ mod tests {
|
|||
];
|
||||
|
||||
let filtered_entries =
|
||||
filter_entries(url, entries, vec!["dir/subdir/*.txt".to_string()]).unwrap();
|
||||
filter_entries(&url, entries, &["dir/subdir/*.txt".to_string()]).unwrap();
|
||||
assert_eq!(filtered_entries.len(), 2);
|
||||
assert_eq!(
|
||||
filtered_entries[0].url,
|
||||
"http://example.com/root/dir/subdir/file3.txt"
|
||||
);
|
||||
assert_eq!(
|
||||
filtered_entries[1].url,
|
||||
"http://example.com/root/dir/subdir/"
|
||||
);
|
||||
assert!(filtered_entries
|
||||
.iter()
|
||||
.any(|entry| entry.url == "http://example.com/root/dir/subdir/file3.txt"));
|
||||
assert!(filtered_entries
|
||||
.iter()
|
||||
.any(|entry| entry.url == "http://example.com/root/dir/subdir/"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -1391,7 +1440,7 @@ mod tests {
|
|||
];
|
||||
|
||||
let filtered_entries =
|
||||
filter_entries(url, entries, vec!["dir/subdir/*".to_string()]).unwrap();
|
||||
filter_entries(&url, entries, &["dir/subdir/*".to_string()]).unwrap();
|
||||
println!("{:?}", filtered_entries);
|
||||
assert_eq!(filtered_entries.len(), 4);
|
||||
}
|
||||
|
|
@ -1412,7 +1461,7 @@ mod tests {
|
|||
},
|
||||
];
|
||||
|
||||
let filtered_entries = filter_entries(url, entries, vec!["dir/subdir/*".to_string()]);
|
||||
assert!(matches!(filtered_entries, Err(Error::UnexpectedResponse)));
|
||||
let filtered_entries = filter_entries(&url, entries, &["dir/subdir/*".to_string()]);
|
||||
assert!(matches!(filtered_entries, Err(Error::ValidationError(_))));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue