feat(dragonfly-client): add support for content-based task ID generation in Dragonfly client (#1111)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-04-24 20:54:46 +08:00 committed by GitHub
parent e2c7d9000a
commit c21159037a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 313 additions and 206 deletions

20
Cargo.lock generated
View File

@ -938,9 +938,9 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-api" name = "dragonfly-api"
version = "2.1.36" version = "2.1.39"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff3f32ea719a832f5df4f0d87231c04e7e76d9c7748c3618e6810af4cbdfb1e0" checksum = "4ef3a36f55cedea2a004d17cff39bcfe906fc94579cb0b440cf185a0663b645d"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"prost-types", "prost-types",
@ -953,7 +953,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -1022,7 +1022,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-backend" name = "dragonfly-client-backend"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1053,7 +1053,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-config" name = "dragonfly-client-config"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"bytesize", "bytesize",
"bytesize-serde", "bytesize-serde",
@ -1081,7 +1081,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-core" name = "dragonfly-client-core"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"headers 0.4.0", "headers 0.4.0",
"hyper 1.6.0", "hyper 1.6.0",
@ -1099,7 +1099,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-init" name = "dragonfly-client-init"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -1117,7 +1117,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-storage" name = "dragonfly-client-storage"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"bincode", "bincode",
"bytes", "bytes",
@ -1145,7 +1145,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-util" name = "dragonfly-client-util"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytesize", "bytesize",
@ -1558,7 +1558,7 @@ dependencies = [
[[package]] [[package]]
name = "hdfs" name = "hdfs"
version = "0.2.24" version = "0.2.25"
dependencies = [ dependencies = [
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-core", "dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.2.24" version = "0.2.25"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"
@ -22,14 +22,14 @@ readme = "README.md"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.24" } dragonfly-client = { path = "dragonfly-client", version = "0.2.25" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.24" } dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.25" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.24" } dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.25" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.24" } dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.25" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.24" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.25" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.24" } dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.25" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.24" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.25" }
dragonfly-api = "=2.1.36" dragonfly-api = "=2.1.39"
thiserror = "1.0" thiserror = "1.0"
futures = "0.3.31" futures = "0.3.31"
reqwest = { version = "0.12.4", features = [ reqwest = { version = "0.12.4", features = [

View File

@ -20,7 +20,7 @@ use dragonfly_client_core::{
Result, Result,
}; };
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::io::Read; use std::io::{self, Read};
use std::path::PathBuf; use std::path::PathBuf;
use tracing::instrument; use tracing::instrument;
use url::Url; use url::Url;
@ -32,6 +32,34 @@ const SEED_PEER_SUFFIX: &str = "seed";
/// PERSISTENT_CACHE_TASK_SUFFIX is the suffix of the persistent cache task. /// PERSISTENT_CACHE_TASK_SUFFIX is the suffix of the persistent cache task.
const PERSISTENT_CACHE_TASK_SUFFIX: &str = "persistent-cache-task"; const PERSISTENT_CACHE_TASK_SUFFIX: &str = "persistent-cache-task";
/// TaskIDParameter is the parameter of the task id.
pub enum TaskIDParameter {
/// Content uses the content to generate the task id.
Content(String),
/// URLBased uses the url, piece_length, tag, application and filtered_query_params to generate
/// the task id.
URLBased {
url: String,
piece_length: Option<u64>,
tag: Option<String>,
application: Option<String>,
filtered_query_params: Vec<String>,
},
}
/// PersistentCacheTaskIDParameter is the parameter of the persistent cache task id.
pub enum PersistentCacheTaskIDParameter {
/// Content uses the content to generate the persistent cache task id.
Content(String),
/// FileContentBased uses the file path, piece_length, tag and application to generate the persistent cache task id.
FileContentBased {
path: PathBuf,
piece_length: Option<u64>,
tag: Option<String>,
application: Option<String>,
},
}
/// IDGenerator is used to generate the id for the resources. /// IDGenerator is used to generate the id for the resources.
#[derive(Debug)] #[derive(Debug)]
pub struct IDGenerator { pub struct IDGenerator {
@ -71,57 +99,63 @@ impl IDGenerator {
/// task_id generates the task id. /// task_id generates the task id.
#[inline] #[inline]
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn task_id( pub fn task_id(&self, parameter: TaskIDParameter) -> Result<String> {
&self, match parameter {
url: &str, TaskIDParameter::Content(content) => {
piece_length: Option<u64>, Ok(hex::encode(Sha256::digest(content.as_bytes())))
tag: Option<&str>, }
application: Option<&str>, TaskIDParameter::URLBased {
filtered_query_params: Vec<String>, url,
) -> Result<String> { piece_length,
// Filter the query parameters. tag,
let url = Url::parse(url).or_err(ErrorType::ParseError)?; application,
let query = url filtered_query_params,
.query_pairs() } => {
.filter(|(k, _)| !filtered_query_params.contains(&k.to_string())); // Filter the query parameters.
let url = Url::parse(url.as_str()).or_err(ErrorType::ParseError)?;
let query = url
.query_pairs()
.filter(|(k, _)| !filtered_query_params.contains(&k.to_string()));
let mut artifact_url = url.clone(); let mut artifact_url = url.clone();
if query.clone().count() == 0 { if query.clone().count() == 0 {
artifact_url.set_query(None); artifact_url.set_query(None);
} else { } else {
artifact_url.query_pairs_mut().clear().extend_pairs(query); artifact_url.query_pairs_mut().clear().extend_pairs(query);
}
let artifact_url_str = artifact_url.to_string();
let final_url = if artifact_url_str.ends_with('/') && artifact_url.path() == "/" {
artifact_url_str.trim_end_matches('/').to_string()
} else {
artifact_url_str
};
// Initialize the hasher.
let mut hasher = Sha256::new();
// Add the url to generate the task id.
hasher.update(final_url);
// Add the tag to generate the task id.
if let Some(tag) = tag {
hasher.update(tag);
}
// Add the application to generate the task id.
if let Some(application) = application {
hasher.update(application);
}
// Add the piece length to generate the task id.
if let Some(piece_length) = piece_length {
hasher.update(piece_length.to_string());
}
// Generate the task id.
Ok(hex::encode(hasher.finalize()))
}
} }
let artifact_url_str = artifact_url.to_string();
let final_url = if artifact_url_str.ends_with('/') && artifact_url.path() == "/" {
artifact_url_str.trim_end_matches('/').to_string()
} else {
artifact_url_str
};
// Initialize the hasher.
let mut hasher = Sha256::new();
// Add the url to generate the task id.
hasher.update(final_url);
// Add the tag to generate the task id.
if let Some(tag) = tag {
hasher.update(tag);
}
// Add the application to generate the task id.
if let Some(application) = application {
hasher.update(application);
}
// Add the piece length to generate the task id.
if let Some(piece_length) = piece_length {
hasher.update(piece_length.to_string());
}
// Generate the task id.
Ok(hex::encode(hasher.finalize()))
} }
/// persistent_cache_task_id generates the persistent cache task id. /// persistent_cache_task_id generates the persistent cache task id.
@ -129,42 +163,53 @@ impl IDGenerator {
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn persistent_cache_task_id( pub fn persistent_cache_task_id(
&self, &self,
path: &PathBuf, parameter: PersistentCacheTaskIDParameter,
piece_length: Option<u64>,
tag: Option<&str>,
application: Option<&str>,
) -> Result<String> { ) -> Result<String> {
// Calculate the hash of the file.
let f = std::fs::File::open(path)?;
let mut buffer = [0; 4096];
let mut reader = std::io::BufReader::with_capacity(buffer.len(), f);
let mut hasher = crc32fast::Hasher::new(); let mut hasher = crc32fast::Hasher::new();
loop {
let n = reader.read(&mut buffer)?; match parameter {
if n == 0 { PersistentCacheTaskIDParameter::Content(content) => {
break; hasher.update(content.as_bytes());
Ok(hasher.finalize().to_string())
} }
PersistentCacheTaskIDParameter::FileContentBased {
path,
piece_length,
tag,
application,
} => {
// Calculate the hash of the file.
let f = std::fs::File::open(path)?;
let mut buffer = [0; 4096];
let mut reader = io::BufReader::with_capacity(buffer.len(), f);
loop {
match reader.read(&mut buffer) {
Ok(0) => break,
Ok(n) => hasher.update(&buffer[..n]),
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Err(err.into()),
};
}
hasher.update(&buffer[..n]); // Add the tag to generate the persistent cache task id.
if let Some(tag) = tag {
hasher.update(tag.as_bytes());
}
// Add the application to generate the persistent cache task id.
if let Some(application) = application {
hasher.update(application.as_bytes());
}
// Add the piece length to generate the persistent cache task id.
if let Some(piece_length) = piece_length {
hasher.update(piece_length.to_string().as_bytes());
}
// Generate the task id by crc32.
Ok(hasher.finalize().to_string())
}
} }
// Add the tag to generate the persistent cache task id.
if let Some(tag) = tag {
hasher.update(tag.as_bytes());
}
// Add the application to generate the persistent cache task id.
if let Some(application) = application {
hasher.update(application.as_bytes());
}
// Add the piece length to generate the persistent cache task id.
if let Some(piece_length) = piece_length {
hasher.update(piece_length.to_string().as_bytes());
}
// Generate the task id by crc32.
Ok(hasher.finalize().to_string())
} }
/// peer_id generates the peer id. /// peer_id generates the peer id.
@ -225,116 +270,140 @@ mod tests {
let test_cases = vec![ let test_cases = vec![
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com", TaskIDParameter::URLBased {
Some(1024_u64), url: "https://example.com".to_string(),
Some("foo"), piece_length: Some(1024_u64),
Some("bar"), tag: Some("foo".to_string()),
vec![], application: Some("bar".to_string()),
filtered_query_params: vec![],
},
"99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221", "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com", TaskIDParameter::URLBased {
None, url: "https://example.com".to_string(),
Some("foo"), piece_length: None,
Some("bar"), tag: Some("foo".to_string()),
vec![], application: Some("bar".to_string()),
filtered_query_params: vec![],
},
"160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d", "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com", TaskIDParameter::URLBased {
None, url: "https://example.com".to_string(),
Some("foo"), piece_length: None,
None, tag: Some("foo".to_string()),
vec![], application: None,
filtered_query_params: vec![],
},
"2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b", "2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com", TaskIDParameter::URLBased {
None, url: "https://example.com".to_string(),
None, piece_length: None,
Some("bar"), tag: None,
vec![], application: Some("bar".to_string()),
filtered_query_params: vec![],
},
"63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d", "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com", TaskIDParameter::URLBased {
Some(1024_u64), url: "https://example.com".to_string(),
None, piece_length: Some(1024_u64),
None, tag: None,
vec![], application: None,
filtered_query_params: vec![],
},
"40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38", "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"https://example.com?foo=foo&bar=bar", TaskIDParameter::URLBased {
None, url: "https://example.com?foo=foo&bar=bar".to_string(),
None, piece_length: None,
None, tag: None,
vec!["foo".to_string(), "bar".to_string()], application: None,
filtered_query_params: vec!["foo".to_string(), "bar".to_string()],
},
"100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9", "100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9",
), ),
(
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
TaskIDParameter::Content("This is a test file".to_string()),
"e2d0fe1585a63ec6009c8016ff8dda8b17719a637405a4e23c0ff81339148249",
),
]; ];
for (generator, url, piece_length, tag, application, filtered_query_params, expected_id) in for (generator, parameter, expected_id) in test_cases {
test_cases let task_id = generator.task_id(parameter).unwrap();
{
let task_id = generator
.task_id(url, piece_length, tag, application, filtered_query_params)
.unwrap();
assert_eq!(task_id, expected_id); assert_eq!(task_id, expected_id);
} }
} }
#[test] #[test]
fn should_generate_persistent_cache_task_id() { fn should_generate_persistent_cache_task_id() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("testfile");
let mut f = File::create(&file_path).unwrap();
f.write_all("This is a test file".as_bytes()).unwrap();
let test_cases = vec![ let test_cases = vec![
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file", PersistentCacheTaskIDParameter::FileContentBased {
Some(1024_u64), path: file_path.clone(),
Some("tag1"), piece_length: Some(1024_u64),
Some("app1"), tag: Some("tag1".to_string()),
application: Some("app1".to_string()),
},
"223755482", "223755482",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file", PersistentCacheTaskIDParameter::FileContentBased {
None, path: file_path.clone(),
None, piece_length: None,
Some("app1"), tag: None,
application: Some("app1".to_string()),
},
"1152081721", "1152081721",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file", PersistentCacheTaskIDParameter::FileContentBased {
None, path: file_path.clone(),
Some("tag1"), piece_length: None,
None, tag: Some("tag1".to_string()),
application: None,
},
"990623045", "990623045",
), ),
( (
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false), IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file", PersistentCacheTaskIDParameter::FileContentBased {
Some(1024_u64), path: file_path.clone(),
None, piece_length: Some(1024_u64),
None, tag: None,
application: None,
},
"1293485139", "1293485139",
), ),
(
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
PersistentCacheTaskIDParameter::Content("This is a test file".to_string()),
"107352521",
),
]; ];
for (generator, file_content, piece_length, tag, application, expected_id) in test_cases { for (generator, parameter, expected_id) in test_cases {
let dir = tempdir().unwrap(); let task_id = generator.persistent_cache_task_id(parameter).unwrap();
let file_path = dir.path().join("testfile");
let mut f = File::create(&file_path).unwrap();
f.write_all(file_content.as_bytes()).unwrap();
let task_id = generator
.persistent_cache_task_id(&file_path, piece_length, tag, application)
.unwrap();
assert_eq!(task_id, expected_id); assert_eq!(task_id, expected_id);
} }
} }

View File

@ -42,11 +42,10 @@ pub struct ImportCommand {
path: PathBuf, path: PathBuf,
#[arg( #[arg(
long = "id", long = "content-for-calculating-task-id",
required = false, help = "Specify the content used to calculate the persistent cache task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the persistent cache task ID based on url, piece-length, tag, application, and filtered-query-params."
help = "Specify the id of the persistent cache task. If id is none, dfdaemon will generate the new task id based on the file content, tag and application by crc32 algorithm."
)] )]
id: Option<String>, content_for_calculating_task_id: Option<String>,
#[arg( #[arg(
long = "persistent-replica-count", long = "persistent-replica-count",
@ -341,7 +340,7 @@ impl ImportCommand {
let persistent_cache_task = dfdaemon_download_client let persistent_cache_task = dfdaemon_download_client
.upload_persistent_cache_task(UploadPersistentCacheTaskRequest { .upload_persistent_cache_task(UploadPersistentCacheTaskRequest {
task_id: self.id.clone(), content_for_calculating_task_id: self.content_for_calculating_task_id.clone(),
path: absolute_path.to_string_lossy().to_string(), path: absolute_path.to_string_lossy().to_string(),
persistent_replica_count: self.persistent_replica_count, persistent_replica_count: self.persistent_replica_count,
tag: self.tag.clone(), tag: self.tag.clone(),
@ -372,15 +371,6 @@ impl ImportCommand {
))); )));
} }
if let Some(id) = self.id.as_ref() {
if id.len() != 64 {
return Err(Error::ValidationError(format!(
"id length must be 64 bytes, but got {}",
id.len()
)));
}
}
if self.path.is_dir() { if self.path.is_dir() {
return Err(Error::ValidationError(format!( return Err(Error::ValidationError(format!(
"path {} is a directory", "path {} is a directory",

View File

@ -108,6 +108,12 @@ struct Args {
)] )]
force_hard_link: bool, force_hard_link: bool,
#[arg(
long = "content-for-calculating-task-id",
help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on url, piece-length, tag, application, and filtered-query-params."
)]
content_for_calculating_task_id: Option<String>,
#[arg( #[arg(
short = 'O', short = 'O',
long = "output", long = "output",
@ -774,6 +780,7 @@ async fn download(
hdfs, hdfs,
load_to_cache: false, load_to_cache: false,
force_hard_link: args.force_hard_link, force_hard_link: args.force_hard_link,
content_for_calculating_task_id: args.content_for_calculating_task_id,
}), }),
}) })
.await .await

View File

@ -41,7 +41,10 @@ use dragonfly_client_core::{
error::{ErrorType, OrErr}, error::{ErrorType, OrErr},
Error as ClientError, Result as ClientResult, Error as ClientError, Result as ClientResult,
}; };
use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap}; use dragonfly_client_util::{
http::{get_range, hashmap_to_headermap, headermap_to_hashmap},
id_generator::{PersistentCacheTaskIDParameter, TaskIDParameter},
};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -234,13 +237,16 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let task_id = self let task_id = self
.task .task
.id_generator .id_generator
.task_id( .task_id(match download.content_for_calculating_task_id.clone() {
download.url.as_str(), Some(content) => TaskIDParameter::Content(content),
download.piece_length, None => TaskIDParameter::URLBased {
download.tag.as_deref(), url: download.url.clone(),
download.application.as_deref(), piece_length: download.piece_length,
download.filtered_query_params.clone(), tag: download.tag.clone(),
) application: download.application.clone(),
filtered_query_params: download.filtered_query_params.clone(),
},
})
.map_err(|e| { .map_err(|e| {
error!("generate task id: {}", e); error!("generate task id: {}", e);
Status::invalid_argument(e.to_string()) Status::invalid_argument(e.to_string())
@ -950,22 +956,22 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
info!("upload persistent cache task {:?}", request); info!("upload persistent cache task {:?}", request);
// Generate the task id. // Generate the task id.
let task_id = match request.task_id.as_deref() { let task_id = self
Some(task_id) => task_id.to_string(), .task
None => self .id_generator
.task .persistent_cache_task_id(match request.content_for_calculating_task_id.clone() {
.id_generator Some(content) => PersistentCacheTaskIDParameter::Content(content),
.persistent_cache_task_id( None => PersistentCacheTaskIDParameter::FileContentBased {
&path.to_path_buf(), path: path.to_path_buf(),
request.piece_length, piece_length: request.piece_length,
request.tag.as_deref(), tag: request.tag.clone(),
request.application.as_deref(), application: request.application.clone(),
) },
.map_err(|err| { })
error!("generate persistent cache task id: {}", err); .map_err(|err| {
Status::invalid_argument(err.to_string()) error!("generate persistent cache task id: {}", err);
})?, Status::invalid_argument(err.to_string())
}; })?;
info!("generate persistent cache task id: {}", task_id); info!("generate persistent cache task id: {}", task_id);
// Generate the host id. // Generate the host id.

View File

@ -48,6 +48,7 @@ use dragonfly_client_core::{
}; };
use dragonfly_client_util::{ use dragonfly_client_util::{
http::{get_range, hashmap_to_headermap, headermap_to_hashmap}, http::{get_range, hashmap_to_headermap, headermap_to_hashmap},
id_generator::TaskIDParameter,
net::{get_interface_info, Interface}, net::{get_interface_info, Interface},
}; };
use std::net::SocketAddr; use std::net::SocketAddr;
@ -235,13 +236,16 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let task_id = self let task_id = self
.task .task
.id_generator .id_generator
.task_id( .task_id(match download.content_for_calculating_task_id.clone() {
download.url.as_str(), Some(content) => TaskIDParameter::Content(content),
download.piece_length, None => TaskIDParameter::URLBased {
download.tag.as_deref(), url: download.url.clone(),
download.application.as_deref(), piece_length: download.piece_length,
download.filtered_query_params.clone(), tag: download.tag.clone(),
) application: download.application.clone(),
filtered_query_params: download.filtered_query_params.clone(),
},
})
.map_err(|e| { .map_err(|e| {
error!("generate task id: {}", e); error!("generate task id: {}", e);
Status::invalid_argument(e.to_string()) Status::invalid_argument(e.to_string())

View File

@ -73,6 +73,12 @@ pub const DRAGONFLY_FORCE_HARD_LINK_HEADER: &str = "X-Dragonfly-Force-Hard-Link"
/// to 4mib, for example: 4mib, 1gib /// to 4mib, for example: 4mib, 1gib
pub const DRAGONFLY_PIECE_LENGTH: &str = "X-Dragonfly-Piece-Length"; pub const DRAGONFLY_PIECE_LENGTH: &str = "X-Dragonfly-Piece-Length";
/// DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is the header key of content for calculating task id.
/// If DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is set, use its value to calculate the task ID.
/// Otherwise, calculate the task ID based on `url`, `piece_length`, `tag`, `application`, and `filtered_query_params`.
pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID: &str =
"X-Dragonfly-Content-For-Calculating-Task-ID";
/// get_tag gets the tag from http header. /// get_tag gets the tag from http header.
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn get_tag(header: &HeaderMap) -> Option<String> { pub fn get_tag(header: &HeaderMap) -> Option<String> {
@ -213,6 +219,14 @@ pub fn get_piece_length(header: &HeaderMap) -> Option<ByteSize> {
} }
} }
/// get_content_for_calculating_task_id gets the content for calculating task id from http header.
pub fn get_content_for_calculating_task_id(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID)
.and_then(|content| content.to_str().ok())
.map(|content| content.to_string())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -365,4 +379,20 @@ mod tests {
headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("0")); headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("0"));
assert_eq!(get_piece_length(&headers), Some(ByteSize::b(0))); assert_eq!(get_piece_length(&headers), Some(ByteSize::b(0)));
} }
#[test]
fn test_get_content_for_calculating_task_id() {
let mut headers = HeaderMap::new();
headers.insert(
DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID,
HeaderValue::from_static("test-content"),
);
assert_eq!(
get_content_for_calculating_task_id(&headers),
Some("test-content".to_string())
);
let empty_headers = HeaderMap::new();
assert_eq!(get_registry(&empty_headers), None);
}
} }

View File

@ -1067,6 +1067,7 @@ fn make_download_task_request(
need_piece_content: false, need_piece_content: false,
load_to_cache: false, load_to_cache: false,
force_hard_link: header::get_force_hard_link(&header), force_hard_link: header::get_force_hard_link(&header),
content_for_calculating_task_id: header::get_content_for_calculating_task_id(&header),
}), }),
}) })
} }