diff --git a/Cargo.lock b/Cargo.lock index 49bbe871..4b8e66d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,9 +938,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.1.36" +version = "2.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff3f32ea719a832f5df4f0d87231c04e7e76d9c7748c3618e6810af4cbdfb1e0" +checksum = "4ef3a36f55cedea2a004d17cff39bcfe906fc94579cb0b440cf185a0663b645d" dependencies = [ "prost 0.13.5", "prost-types", @@ -953,7 +953,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.24" +version = "0.2.25" dependencies = [ "anyhow", "bytes", @@ -1022,7 +1022,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.24" +version = "0.2.25" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1053,7 +1053,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.24" +version = "0.2.25" dependencies = [ "bytesize", "bytesize-serde", @@ -1081,7 +1081,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.24" +version = "0.2.25" dependencies = [ "headers 0.4.0", "hyper 1.6.0", @@ -1099,7 +1099,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.24" +version = "0.2.25" dependencies = [ "anyhow", "clap", @@ -1117,7 +1117,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.24" +version = "0.2.25" dependencies = [ "bincode", "bytes", @@ -1145,7 +1145,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.24" +version = "0.2.25" dependencies = [ "base64 0.22.1", "bytesize", @@ -1558,7 +1558,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.24" +version = "0.2.25" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 155a84b9..6959a4be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.24" +version = "0.2.25" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,14 +22,14 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.24" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.24" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.24" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.24" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.24" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.24" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.24" } -dragonfly-api = "=2.1.36" +dragonfly-client = { path = "dragonfly-client", version = "0.2.25" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.25" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.25" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.25" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.25" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.25" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.25" } +dragonfly-api = "=2.1.39" thiserror = "1.0" futures = "0.3.31" reqwest = { version = "0.12.4", features = [ diff --git a/dragonfly-client-util/src/id_generator/mod.rs b/dragonfly-client-util/src/id_generator/mod.rs index a2484af4..f9f3ed72 100644 --- a/dragonfly-client-util/src/id_generator/mod.rs +++ b/dragonfly-client-util/src/id_generator/mod.rs @@ -20,7 +20,7 @@ use dragonfly_client_core::{ Result, }; use sha2::{Digest, Sha256}; -use std::io::Read; +use std::io::{self, Read}; use std::path::PathBuf; use tracing::instrument; use url::Url; @@ -32,6 +32,34 @@ const SEED_PEER_SUFFIX: &str = "seed"; /// PERSISTENT_CACHE_TASK_SUFFIX is the suffix of the persistent cache task. const PERSISTENT_CACHE_TASK_SUFFIX: &str = "persistent-cache-task"; +/// TaskIDParameter is the parameter of the task id. +pub enum TaskIDParameter { + /// Content uses the content to generate the task id. + Content(String), + /// URLBased uses the url, piece_length, tag, application and filtered_query_params to generate + /// the task id. + URLBased { + url: String, + piece_length: Option, + tag: Option, + application: Option, + filtered_query_params: Vec, + }, +} + +/// PersistentCacheTaskIDParameter is the parameter of the persistent cache task id. +pub enum PersistentCacheTaskIDParameter { + /// Content uses the content to generate the persistent cache task id. + Content(String), + /// FileContentBased uses the file path, piece_length, tag and application to generate the persistent cache task id. + FileContentBased { + path: PathBuf, + piece_length: Option, + tag: Option, + application: Option, + }, +} + /// IDGenerator is used to generate the id for the resources. #[derive(Debug)] pub struct IDGenerator { @@ -71,57 +99,63 @@ impl IDGenerator { /// task_id generates the task id. #[inline] #[instrument(skip_all)] - pub fn task_id( - &self, - url: &str, - piece_length: Option, - tag: Option<&str>, - application: Option<&str>, - filtered_query_params: Vec, - ) -> Result { - // Filter the query parameters. - let url = Url::parse(url).or_err(ErrorType::ParseError)?; - let query = url - .query_pairs() - .filter(|(k, _)| !filtered_query_params.contains(&k.to_string())); + pub fn task_id(&self, parameter: TaskIDParameter) -> Result { + match parameter { + TaskIDParameter::Content(content) => { + Ok(hex::encode(Sha256::digest(content.as_bytes()))) + } + TaskIDParameter::URLBased { + url, + piece_length, + tag, + application, + filtered_query_params, + } => { + // Filter the query parameters. + let url = Url::parse(url.as_str()).or_err(ErrorType::ParseError)?; + let query = url + .query_pairs() + .filter(|(k, _)| !filtered_query_params.contains(&k.to_string())); - let mut artifact_url = url.clone(); - if query.clone().count() == 0 { - artifact_url.set_query(None); - } else { - artifact_url.query_pairs_mut().clear().extend_pairs(query); + let mut artifact_url = url.clone(); + if query.clone().count() == 0 { + artifact_url.set_query(None); + } else { + artifact_url.query_pairs_mut().clear().extend_pairs(query); + } + + let artifact_url_str = artifact_url.to_string(); + let final_url = if artifact_url_str.ends_with('/') && artifact_url.path() == "/" { + artifact_url_str.trim_end_matches('/').to_string() + } else { + artifact_url_str + }; + + // Initialize the hasher. + let mut hasher = Sha256::new(); + + // Add the url to generate the task id. + hasher.update(final_url); + + // Add the tag to generate the task id. + if let Some(tag) = tag { + hasher.update(tag); + } + + // Add the application to generate the task id. + if let Some(application) = application { + hasher.update(application); + } + + // Add the piece length to generate the task id. + if let Some(piece_length) = piece_length { + hasher.update(piece_length.to_string()); + } + + // Generate the task id. + Ok(hex::encode(hasher.finalize())) + } } - - let artifact_url_str = artifact_url.to_string(); - let final_url = if artifact_url_str.ends_with('/') && artifact_url.path() == "/" { - artifact_url_str.trim_end_matches('/').to_string() - } else { - artifact_url_str - }; - - // Initialize the hasher. - let mut hasher = Sha256::new(); - - // Add the url to generate the task id. - hasher.update(final_url); - - // Add the tag to generate the task id. - if let Some(tag) = tag { - hasher.update(tag); - } - - // Add the application to generate the task id. - if let Some(application) = application { - hasher.update(application); - } - - // Add the piece length to generate the task id. - if let Some(piece_length) = piece_length { - hasher.update(piece_length.to_string()); - } - - // Generate the task id. - Ok(hex::encode(hasher.finalize())) } /// persistent_cache_task_id generates the persistent cache task id. @@ -129,42 +163,53 @@ impl IDGenerator { #[instrument(skip_all)] pub fn persistent_cache_task_id( &self, - path: &PathBuf, - piece_length: Option, - tag: Option<&str>, - application: Option<&str>, + parameter: PersistentCacheTaskIDParameter, ) -> Result { - // Calculate the hash of the file. - let f = std::fs::File::open(path)?; - let mut buffer = [0; 4096]; - let mut reader = std::io::BufReader::with_capacity(buffer.len(), f); let mut hasher = crc32fast::Hasher::new(); - loop { - let n = reader.read(&mut buffer)?; - if n == 0 { - break; + + match parameter { + PersistentCacheTaskIDParameter::Content(content) => { + hasher.update(content.as_bytes()); + Ok(hasher.finalize().to_string()) } + PersistentCacheTaskIDParameter::FileContentBased { + path, + piece_length, + tag, + application, + } => { + // Calculate the hash of the file. + let f = std::fs::File::open(path)?; + let mut buffer = [0; 4096]; + let mut reader = io::BufReader::with_capacity(buffer.len(), f); + loop { + match reader.read(&mut buffer) { + Ok(0) => break, + Ok(n) => hasher.update(&buffer[..n]), + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, + Err(err) => return Err(err.into()), + }; + } - hasher.update(&buffer[..n]); + // Add the tag to generate the persistent cache task id. + if let Some(tag) = tag { + hasher.update(tag.as_bytes()); + } + + // Add the application to generate the persistent cache task id. + if let Some(application) = application { + hasher.update(application.as_bytes()); + } + + // Add the piece length to generate the persistent cache task id. + if let Some(piece_length) = piece_length { + hasher.update(piece_length.to_string().as_bytes()); + } + + // Generate the task id by crc32. + Ok(hasher.finalize().to_string()) + } } - - // Add the tag to generate the persistent cache task id. - if let Some(tag) = tag { - hasher.update(tag.as_bytes()); - } - - // Add the application to generate the persistent cache task id. - if let Some(application) = application { - hasher.update(application.as_bytes()); - } - - // Add the piece length to generate the persistent cache task id. - if let Some(piece_length) = piece_length { - hasher.update(piece_length.to_string().as_bytes()); - } - - // Generate the task id by crc32. - Ok(hasher.finalize().to_string()) } /// peer_id generates the peer id. @@ -225,116 +270,140 @@ mod tests { let test_cases = vec![ ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com", - Some(1024_u64), - Some("foo"), - Some("bar"), - vec![], + TaskIDParameter::URLBased { + url: "https://example.com".to_string(), + piece_length: Some(1024_u64), + tag: Some("foo".to_string()), + application: Some("bar".to_string()), + filtered_query_params: vec![], + }, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com", - None, - Some("foo"), - Some("bar"), - vec![], + TaskIDParameter::URLBased { + url: "https://example.com".to_string(), + piece_length: None, + tag: Some("foo".to_string()), + application: Some("bar".to_string()), + filtered_query_params: vec![], + }, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com", - None, - Some("foo"), - None, - vec![], + TaskIDParameter::URLBased { + url: "https://example.com".to_string(), + piece_length: None, + tag: Some("foo".to_string()), + application: None, + filtered_query_params: vec![], + }, "2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com", - None, - None, - Some("bar"), - vec![], + TaskIDParameter::URLBased { + url: "https://example.com".to_string(), + piece_length: None, + tag: None, + application: Some("bar".to_string()), + filtered_query_params: vec![], + }, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com", - Some(1024_u64), - None, - None, - vec![], + TaskIDParameter::URLBased { + url: "https://example.com".to_string(), + piece_length: Some(1024_u64), + tag: None, + application: None, + filtered_query_params: vec![], + }, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "https://example.com?foo=foo&bar=bar", - None, - None, - None, - vec!["foo".to_string(), "bar".to_string()], + TaskIDParameter::URLBased { + url: "https://example.com?foo=foo&bar=bar".to_string(), + piece_length: None, + tag: None, + application: None, + filtered_query_params: vec!["foo".to_string(), "bar".to_string()], + }, "100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9", ), + ( + IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), + TaskIDParameter::Content("This is a test file".to_string()), + "e2d0fe1585a63ec6009c8016ff8dda8b17719a637405a4e23c0ff81339148249", + ), ]; - for (generator, url, piece_length, tag, application, filtered_query_params, expected_id) in - test_cases - { - let task_id = generator - .task_id(url, piece_length, tag, application, filtered_query_params) - .unwrap(); + for (generator, parameter, expected_id) in test_cases { + let task_id = generator.task_id(parameter).unwrap(); assert_eq!(task_id, expected_id); } } #[test] fn should_generate_persistent_cache_task_id() { + let dir = tempdir().unwrap(); + let file_path = dir.path().join("testfile"); + let mut f = File::create(&file_path).unwrap(); + f.write_all("This is a test file".as_bytes()).unwrap(); + let test_cases = vec![ ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "This is a test file", - Some(1024_u64), - Some("tag1"), - Some("app1"), + PersistentCacheTaskIDParameter::FileContentBased { + path: file_path.clone(), + piece_length: Some(1024_u64), + tag: Some("tag1".to_string()), + application: Some("app1".to_string()), + }, "223755482", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "This is a test file", - None, - None, - Some("app1"), + PersistentCacheTaskIDParameter::FileContentBased { + path: file_path.clone(), + piece_length: None, + tag: None, + application: Some("app1".to_string()), + }, "1152081721", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "This is a test file", - None, - Some("tag1"), - None, + PersistentCacheTaskIDParameter::FileContentBased { + path: file_path.clone(), + piece_length: None, + tag: Some("tag1".to_string()), + application: None, + }, "990623045", ), ( IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), - "This is a test file", - Some(1024_u64), - None, - None, + PersistentCacheTaskIDParameter::FileContentBased { + path: file_path.clone(), + piece_length: Some(1024_u64), + tag: None, + application: None, + }, "1293485139", ), + ( + IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), + PersistentCacheTaskIDParameter::Content("This is a test file".to_string()), + "107352521", + ), ]; - for (generator, file_content, piece_length, tag, application, expected_id) in test_cases { - let dir = tempdir().unwrap(); - let file_path = dir.path().join("testfile"); - let mut f = File::create(&file_path).unwrap(); - f.write_all(file_content.as_bytes()).unwrap(); - - let task_id = generator - .persistent_cache_task_id(&file_path, piece_length, tag, application) - .unwrap(); + for (generator, parameter, expected_id) in test_cases { + let task_id = generator.persistent_cache_task_id(parameter).unwrap(); assert_eq!(task_id, expected_id); } } diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index 02f3c5a1..91814ab1 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -42,11 +42,10 @@ pub struct ImportCommand { path: PathBuf, #[arg( - long = "id", - required = false, - help = "Specify the id of the persistent cache task. If id is none, dfdaemon will generate the new task id based on the file content, tag and application by crc32 algorithm." + long = "content-for-calculating-task-id", + help = "Specify the content used to calculate the persistent cache task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the persistent cache task ID based on url, piece-length, tag, application, and filtered-query-params." )] - id: Option, + content_for_calculating_task_id: Option, #[arg( long = "persistent-replica-count", @@ -341,7 +340,7 @@ impl ImportCommand { let persistent_cache_task = dfdaemon_download_client .upload_persistent_cache_task(UploadPersistentCacheTaskRequest { - task_id: self.id.clone(), + content_for_calculating_task_id: self.content_for_calculating_task_id.clone(), path: absolute_path.to_string_lossy().to_string(), persistent_replica_count: self.persistent_replica_count, tag: self.tag.clone(), @@ -372,15 +371,6 @@ impl ImportCommand { ))); } - if let Some(id) = self.id.as_ref() { - if id.len() != 64 { - return Err(Error::ValidationError(format!( - "id length must be 64 bytes, but got {}", - id.len() - ))); - } - } - if self.path.is_dir() { return Err(Error::ValidationError(format!( "path {} is a directory", diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 72ac1a21..2e506c82 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -108,6 +108,12 @@ struct Args { )] force_hard_link: bool, + #[arg( + long = "content-for-calculating-task-id", + help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on url, piece-length, tag, application, and filtered-query-params." + )] + content_for_calculating_task_id: Option, + #[arg( short = 'O', long = "output", @@ -774,6 +780,7 @@ async fn download( hdfs, load_to_cache: false, force_hard_link: args.force_hard_link, + content_for_calculating_task_id: args.content_for_calculating_task_id, }), }) .await diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 5faa0456..be340146 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -41,7 +41,10 @@ use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error as ClientError, Result as ClientResult, }; -use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap}; +use dragonfly_client_util::{ + http::{get_range, hashmap_to_headermap, headermap_to_hashmap}, + id_generator::{PersistentCacheTaskIDParameter, TaskIDParameter}, +}; use hyper_util::rt::TokioIo; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; @@ -234,13 +237,16 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { let task_id = self .task .id_generator - .task_id( - download.url.as_str(), - download.piece_length, - download.tag.as_deref(), - download.application.as_deref(), - download.filtered_query_params.clone(), - ) + .task_id(match download.content_for_calculating_task_id.clone() { + Some(content) => TaskIDParameter::Content(content), + None => TaskIDParameter::URLBased { + url: download.url.clone(), + piece_length: download.piece_length, + tag: download.tag.clone(), + application: download.application.clone(), + filtered_query_params: download.filtered_query_params.clone(), + }, + }) .map_err(|e| { error!("generate task id: {}", e); Status::invalid_argument(e.to_string()) @@ -950,22 +956,22 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { info!("upload persistent cache task {:?}", request); // Generate the task id. - let task_id = match request.task_id.as_deref() { - Some(task_id) => task_id.to_string(), - None => self - .task - .id_generator - .persistent_cache_task_id( - &path.to_path_buf(), - request.piece_length, - request.tag.as_deref(), - request.application.as_deref(), - ) - .map_err(|err| { - error!("generate persistent cache task id: {}", err); - Status::invalid_argument(err.to_string()) - })?, - }; + let task_id = self + .task + .id_generator + .persistent_cache_task_id(match request.content_for_calculating_task_id.clone() { + Some(content) => PersistentCacheTaskIDParameter::Content(content), + None => PersistentCacheTaskIDParameter::FileContentBased { + path: path.to_path_buf(), + piece_length: request.piece_length, + tag: request.tag.clone(), + application: request.application.clone(), + }, + }) + .map_err(|err| { + error!("generate persistent cache task id: {}", err); + Status::invalid_argument(err.to_string()) + })?; info!("generate persistent cache task id: {}", task_id); // Generate the host id. diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index fdd2a2ff..e5816c6d 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -48,6 +48,7 @@ use dragonfly_client_core::{ }; use dragonfly_client_util::{ http::{get_range, hashmap_to_headermap, headermap_to_hashmap}, + id_generator::TaskIDParameter, net::{get_interface_info, Interface}, }; use std::net::SocketAddr; @@ -235,13 +236,16 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { let task_id = self .task .id_generator - .task_id( - download.url.as_str(), - download.piece_length, - download.tag.as_deref(), - download.application.as_deref(), - download.filtered_query_params.clone(), - ) + .task_id(match download.content_for_calculating_task_id.clone() { + Some(content) => TaskIDParameter::Content(content), + None => TaskIDParameter::URLBased { + url: download.url.clone(), + piece_length: download.piece_length, + tag: download.tag.clone(), + application: download.application.clone(), + filtered_query_params: download.filtered_query_params.clone(), + }, + }) .map_err(|e| { error!("generate task id: {}", e); Status::invalid_argument(e.to_string()) diff --git a/dragonfly-client/src/proxy/header.rs b/dragonfly-client/src/proxy/header.rs index 360ce903..2ecd5907 100644 --- a/dragonfly-client/src/proxy/header.rs +++ b/dragonfly-client/src/proxy/header.rs @@ -73,6 +73,12 @@ pub const DRAGONFLY_FORCE_HARD_LINK_HEADER: &str = "X-Dragonfly-Force-Hard-Link" /// to 4mib, for example: 4mib, 1gib pub const DRAGONFLY_PIECE_LENGTH: &str = "X-Dragonfly-Piece-Length"; +/// DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is the header key of content for calculating task id. +/// If DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is set, use its value to calculate the task ID. +/// Otherwise, calculate the task ID based on `url`, `piece_length`, `tag`, `application`, and `filtered_query_params`. +pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID: &str = + "X-Dragonfly-Content-For-Calculating-Task-ID"; + /// get_tag gets the tag from http header. #[instrument(skip_all)] pub fn get_tag(header: &HeaderMap) -> Option { @@ -213,6 +219,14 @@ pub fn get_piece_length(header: &HeaderMap) -> Option { } } +/// get_content_for_calculating_task_id gets the content for calculating task id from http header. +pub fn get_content_for_calculating_task_id(header: &HeaderMap) -> Option { + header + .get(DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID) + .and_then(|content| content.to_str().ok()) + .map(|content| content.to_string()) +} + #[cfg(test)] mod tests { use super::*; @@ -365,4 +379,20 @@ mod tests { headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("0")); assert_eq!(get_piece_length(&headers), Some(ByteSize::b(0))); } + + #[test] + fn test_get_content_for_calculating_task_id() { + let mut headers = HeaderMap::new(); + headers.insert( + DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID, + HeaderValue::from_static("test-content"), + ); + assert_eq!( + get_content_for_calculating_task_id(&headers), + Some("test-content".to_string()) + ); + + let empty_headers = HeaderMap::new(); + assert_eq!(get_registry(&empty_headers), None); + } } diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index b47769b7..72f99e5c 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -1067,6 +1067,7 @@ fn make_download_task_request( need_piece_content: false, load_to_cache: false, force_hard_link: header::get_force_hard_link(&header), + content_for_calculating_task_id: header::get_content_for_calculating_task_id(&header), }), }) }