feat: calculate piece length by file length (#661)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-08-12 21:09:10 +08:00 committed by GitHub
parent 175be388a0
commit e7f7c50fa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 182 additions and 138 deletions

20
Cargo.lock generated
View File

@ -1105,9 +1105,9 @@ dependencies = [
[[package]]
name = "dragonfly-api"
version = "2.0.143"
version = "2.0.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff3ba3f7d411a6aa67e8c812f94e927ed06b782a2f44aa4dbe3df99593da5b7"
checksum = "51247ca7774a0a4b65167904050e1b4df8e6112b0b02e94164e30819683041c5"
dependencies = [
"prost 0.13.1",
"prost-types",
@ -1120,7 +1120,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"anyhow",
"blake3",
@ -1190,7 +1190,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1212,7 +1212,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1233,7 +1233,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"hyper 1.4.0",
"hyper-util",
@ -1248,7 +1248,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"anyhow",
"clap",
@ -1264,7 +1264,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"base16ct",
"chrono",
@ -1288,7 +1288,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"base16ct",
"blake3",
@ -1758,7 +1758,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.1.97"
version = "0.1.98"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.1.97"
version = "0.1.98"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.97" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.97" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.97" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.97" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.97" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.97" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.97" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.98" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.98" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.98" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.98" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.98" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.98" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.98" }
thiserror = "1.0"
dragonfly-api = "2.0.143"
dragonfly-api = "2.0.147"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.4", features = ["full"] }

View File

@ -113,8 +113,8 @@ impl super::Backend for HTTP {
// get gets the content of the request.
async fn get(&self, request: super::GetRequest) -> Result<super::GetResponse<super::Body>> {
info!(
"get request {} {}: {:?}",
request.piece_id, request.url, request.http_header
"get request {} {} {}: {:?}",
request.task_id, request.piece_id, request.url, request.http_header
);
// The header of the request is required.
@ -128,8 +128,8 @@ impl super::Backend for HTTP {
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
"get request failed {} {} {}: {}",
request.task_id, request.piece_id, request.url, err
);
err
})?;
@ -230,6 +230,7 @@ mod tests {
let http_backend = http::HTTP::new();
let mut resp = http_backend
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
url: server.url("/get"),
range: None,

View File

@ -82,6 +82,9 @@ pub struct HeadResponse {
// GetRequest is the get request for backend.
pub struct GetRequest {
// task_id is the id of the task.
pub task_id: String,
// piece_id is the id of the piece.
pub piece_id: String,

View File

@ -99,8 +99,3 @@ pub fn default_cache_dir() -> PathBuf {
#[cfg(target_os = "macos")]
return home::home_dir().unwrap().join(".dragonfly").join("cache");
}
// default_piece_length is the default piece length for task.
pub fn default_piece_length() -> u64 {
4 * 1024 * 1024
}

View File

@ -118,6 +118,10 @@ pub enum DFError {
#[error("invalid content length")]
InvalidContentLength,
// InvalidPieceLength is the error when the piece length is invalid.
#[error("invalid piece length")]
InvalidPieceLength,
// InvalidParameter is the error when the parameter is invalid.
#[error("invalid parameter")]
InvalidParameter,

View File

@ -81,7 +81,7 @@ impl Storage {
pub fn download_task_started(
&self,
id: &str,
piece_length: u64,
piece_length: Option<u64>,
content_length: Option<u64>,
response_header: Option<HeaderMap>,
) -> Result<metadata::Task> {
@ -155,6 +155,7 @@ impl Storage {
ttl: Duration,
path: &Path,
piece_length: u64,
content_length: u64,
expected_digest: &str,
) -> Result<metadata::CacheTask> {
let response = self.content.write_cache_task(id, path).await?;
@ -170,7 +171,7 @@ impl Storage {
id,
ttl,
piece_length,
response.length,
content_length,
digest.to_string().as_str(),
)
}

View File

@ -36,7 +36,7 @@ pub struct Task {
pub id: String,
// piece_length is the length of the piece.
pub piece_length: u64,
pub piece_length: Option<u64>,
// content_length is the length of the content.
pub content_length: Option<u64>,
@ -116,6 +116,11 @@ impl Task {
false
}
// piece_length returns the piece length of the task.
pub fn piece_length(&self) -> Option<u64> {
self.piece_length
}
// content_length returns the content length of the task.
pub fn content_length(&self) -> Option<u64> {
self.content_length
@ -215,6 +220,11 @@ impl CacheTask {
self.persistent
}
// piece_length returns the piece length of the cache task.
pub fn piece_length(&self) -> u64 {
self.piece_length
}
// content_length returns the content length of the cache task.
pub fn content_length(&self) -> u64 {
self.content_length
@ -320,7 +330,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
pub fn download_task_started(
&self,
id: &str,
piece_length: u64,
piece_length: Option<u64>,
content_length: Option<u64>,
response_header: Option<HeaderMap>,
) -> Result<Task> {
@ -341,6 +351,11 @@ impl<E: StorageEngineOwned> Metadata<E> {
task.content_length = content_length;
}
// Protect piece length to be overwritten by None.
if piece_length.is_some() {
task.piece_length = piece_length;
}
// If the task has the response header, the response header
// will not be covered.
if task.response_header.is_empty() {
@ -847,14 +862,14 @@ mod tests {
// Test download_task_started.
metadata
.download_task_started(task_id, 1024, Some(1024), None)
.download_task_started(task_id, Some(1024), Some(1024), None)
.unwrap();
let task = metadata
.get_task(task_id)
.unwrap()
.expect("task should exist after download_task_started");
assert_eq!(task.id, task_id);
assert_eq!(task.piece_length, 1024);
assert_eq!(task.piece_length, Some(1024));
assert_eq!(task.content_length, Some(1024));
assert!(task.response_header.is_empty());
assert_eq!(task.uploading_count, 0);
@ -905,7 +920,7 @@ mod tests {
let task_id = "task2";
metadata
.download_task_started(task_id, 1024, None, None)
.download_task_started(task_id, Some(1024), None, None)
.unwrap();
let tasks = metadata.get_tasks().unwrap();
assert_eq!(tasks.len(), 2, "should get 2 tasks in total");

View File

@ -73,7 +73,6 @@ impl IDGenerator {
digest: Option<&str>,
tag: Option<&str>,
application: Option<&str>,
piece_length: u64,
filtered_query_params: Vec<String>,
) -> Result<String> {
// Filter the query parameters.
@ -105,9 +104,6 @@ impl IDGenerator {
hasher.update(application);
}
// Add the piece length to generate the task id.
hasher.update(piece_length.to_string());
// Generate the task id.
Ok(hex::encode(hasher.finalize()))
}
@ -118,7 +114,6 @@ impl IDGenerator {
path: &PathBuf,
tag: Option<&str>,
application: Option<&str>,
piece_length: u64,
) -> Result<String> {
// Initialize the hasher.
let mut hasher = blake3::Hasher::new();
@ -137,9 +132,6 @@ impl IDGenerator {
hasher.update(application.as_bytes());
}
// Add the piece length to generate the cache task id.
hasher.update(piece_length.to_string().as_bytes());
// Generate the cache task id.
Ok(hasher.finalize().to_hex().to_string())
}

View File

@ -17,7 +17,6 @@
use clap::Parser;
use dragonfly_api::dfdaemon::v2::{download_cache_task_response, DownloadCacheTaskRequest};
use dragonfly_api::errordetails::v2::Backend;
use dragonfly_client_config::default_piece_length;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
@ -52,13 +51,6 @@ pub struct ExportCommand {
)]
tag: String,
#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,
#[arg(
short = 'O',
long = "output",
@ -139,7 +131,7 @@ impl ExportCommand {
);
eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
@ -381,7 +373,6 @@ impl ExportCommand {
persistent: false,
tag: Some(self.tag.clone()),
application: Some(self.application.clone()),
piece_length: self.piece_length,
output_path: absolute_path.to_string_lossy().to_string(),
timeout: Some(
prost_wkt_types::Duration::try_from(self.timeout)

View File

@ -16,9 +16,7 @@
use clap::Parser;
use dragonfly_api::dfdaemon::v2::UploadCacheTaskRequest;
use dragonfly_client_config::{
default_piece_length, dfcache::default_dfcache_persistent_replica_count,
};
use dragonfly_client_config::dfcache::default_dfcache_persistent_replica_count;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
@ -60,13 +58,6 @@ pub struct ImportCommand {
)]
tag: Option<String>,
#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,
#[arg(
long = "ttl",
value_parser= humantime::parse_duration,
@ -148,7 +139,7 @@ impl ImportCommand {
);
eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
@ -283,7 +274,6 @@ impl ImportCommand {
persistent_replica_count: self.persistent_replica_count,
tag: self.tag.clone(),
application: self.application.clone(),
piece_length: self.piece_length,
ttl: Some(
prost_wkt_types::Duration::try_from(self.ttl).or_err(ErrorType::ParseError)?,
),

View File

@ -60,7 +60,7 @@ impl RemoveCommand {
);
eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,

View File

@ -65,7 +65,7 @@ impl StatCommand {
);
eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,

View File

@ -22,7 +22,7 @@ use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient;
use dragonfly_client::grpc::health::HealthClient;
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest};
use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget};
use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::error::{BackendError, ErrorType, OrErr};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::{header_vec_to_hashmap, header_vec_to_reqwest_headermap};
@ -105,13 +105,6 @@ struct Args {
)]
timeout: Duration,
#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,
#[arg(
short = 'd',
long = "digest",
@ -321,7 +314,7 @@ async fn main() -> anyhow::Result<()> {
);
eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
@ -704,7 +697,7 @@ async fn download(
priority: args.priority,
filtered_query_params: args.filtered_query_params.unwrap_or_default(),
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
piece_length: args.piece_length,
piece_length: None,
output_path: Some(args.output.to_string_lossy().to_string()),
timeout: Some(
prost_wkt_types::Duration::try_from(args.timeout)

View File

@ -201,7 +201,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
download.digest.as_deref(),
download.tag.as_deref(),
download.application.as_deref(),
download.piece_length,
download.filtered_query_params.clone(),
)
.map_err(|e| {
@ -291,7 +290,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
error!("missing content length in the response");
return Err(Status::internal("missing content length in the response"));
};
info!("content length: {}", content_length);
info!(
"content length {}, piece length {}",
content_length,
task.piece_length().unwrap_or_default()
);
// Download's range priority is higher than the request header's range.
// If download protocol is http, use the range of the request header.
@ -721,6 +724,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
task
}
};
info!(
"content length {}, piece length {}",
task.content_length(),
task.piece_length()
);
// Clone the cache task.
let task_manager = self.cache_task.clone();
@ -834,7 +842,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
&path.to_path_buf(),
request.tag.as_deref(),
request.application.as_deref(),
request.piece_length,
)
.map_err(|err| {
error!("generate task id: {}", err);

View File

@ -184,7 +184,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
download.digest.as_deref(),
download.tag.as_deref(),
download.application.as_deref(),
download.piece_length,
download.filtered_query_params.clone(),
)
.map_err(|e| {
@ -275,7 +274,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
return Err(Status::internal("missing content length in the response"));
};
info!("content length: {}", content_length);
info!(
"content length {}, piece length {}",
content_length,
task.piece_length().unwrap_or_default()
);
// Download's range priority is higher than the request header's range.
// If download protocol is http, use the range of the request header.
@ -828,6 +831,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
task
}
};
info!(
"content length {}, piece length {}",
task.content_length(),
task.piece_length()
);
// Clone the cache task.
let task_manager = self.cache_task.clone();

View File

@ -39,10 +39,6 @@ pub const DRAGONFLY_REGISTRY_HEADER: &str = "X-Dragonfly-Registry";
// Default value includes the filtered query params of s3, gcs, oss, obs, cos.
pub const DRAGONFLY_FILTERED_QUERY_PARAMS_HEADER: &str = "X-Dragonfly-Filtered-Query-Params";
// DRAGONFLY_PIECE_LENGTH_HEADER is the header key of piece length in http request,
// it specifies the piece length of the task.
pub const DRAGONFLY_PIECE_LENGTH_HEADER: &str = "X-Dragonfly-Piece-Length";
// get_tag gets the tag from http header.
pub fn get_tag(header: &HeaderMap) -> Option<String> {
match header.get(DRAGONFLY_TAG_HEADER) {
@ -122,23 +118,3 @@ pub fn get_filtered_query_params(
None => default_filtered_query_params,
}
}
// get_piece_length gets the piece length from http header.
pub fn get_piece_length(header: &HeaderMap) -> u64 {
match header.get(DRAGONFLY_PIECE_LENGTH_HEADER) {
Some(piece_length) => match piece_length.to_str() {
Ok(piece_length) => match piece_length.parse::<u64>() {
Ok(piece_length) => piece_length,
Err(err) => {
error!("parse piece length from header failed: {}", err);
dragonfly_client_config::default_piece_length()
}
},
Err(err) => {
error!("get piece length from header failed: {}", err);
dragonfly_client_config::default_piece_length()
}
},
None => dragonfly_client_config::default_piece_length(),
}
}

View File

@ -859,7 +859,7 @@ fn make_download_task_request(
rule.filtered_query_params.clone(),
),
request_header: reqwest_headermap_to_hashmap(&reqwest_request_header),
piece_length: header::get_piece_length(&reqwest_request_header),
piece_length: None,
output_path: None,
timeout: None,
need_back_to_source: false,

View File

@ -109,6 +109,24 @@ impl CacheTask {
digest: &str,
request: UploadCacheTaskRequest,
) -> ClientResult<CommonCacheTask> {
// Convert prost_wkt_types::Duration to std::time::Duration.
let ttl = Duration::try_from(request.ttl.ok_or(Error::UnexpectedResponse)?)
.or_err(ErrorType::ParseError)?;
// Get the content length of the file.
let content_length = std::fs::metadata(path)
.map_err(|err| {
error!("get file metadata error: {}", err);
err
})?
.len();
// Get the piece length of the file.
let piece_length = self.piece.calculate_piece_length(
piece::PieceLengthStrategy::OptimizeByFileLength,
content_length,
);
// Notify the scheduler that the cache task is started.
self.scheduler_client
.upload_cache_task_started(UploadCacheTaskStartedRequest {
@ -118,7 +136,7 @@ impl CacheTask {
persistent_replica_count: request.persistent_replica_count,
tag: request.tag.clone(),
application: request.application.clone(),
piece_length: request.piece_length,
piece_length,
ttl: request.ttl,
timeout: request.timeout,
})
@ -128,14 +146,10 @@ impl CacheTask {
err
})?;
// Convert prost_wkt_types::Duration to std::time::Duration.
let ttl = Duration::try_from(request.ttl.ok_or(Error::UnexpectedResponse)?)
.or_err(ErrorType::ParseError)?;
// Create the persistent cache task.
match self
.storage
.create_persistent_cache_task(task_id, ttl, path, request.piece_length, digest)
.create_persistent_cache_task(task_id, ttl, path, piece_length, content_length, digest)
.await
{
Ok(metadata) => {
@ -177,7 +191,7 @@ impl CacheTask {
digest: digest.to_string(),
tag: request.tag,
application: request.application,
piece_length: request.piece_length,
piece_length: metadata.piece_length,
content_length: metadata.content_length,
piece_count: response.piece_count,
state: response.state,
@ -231,7 +245,7 @@ impl CacheTask {
task_id,
ttl,
request.persistent,
request.piece_length,
response.piece_length,
response.content_length,
)
}
@ -270,7 +284,7 @@ impl CacheTask {
let interested_pieces =
match self
.piece
.calculate_interested(request.piece_length, task.content_length, None)
.calculate_interested(task.piece_length, task.content_length, None)
{
Ok(interested_pieces) => interested_pieces,
Err(err) => {

View File

@ -36,6 +36,24 @@ use tracing::{error, info, instrument, Span};
use super::*;
// MAX_PIECE_COUNT is the maximum piece count. If the piece count is upper
// than MAX_PIECE_COUNT, the piece length will be optimized by the file length.
// When piece length becames the MAX_PIECE_LENGTH, the piece piece count
// probably will be upper than MAX_PIECE_COUNT.
const MAX_PIECE_COUNT: u64 = 500;
// MIN_PIECE_LENGTH is the minimum piece length.
const MIN_PIECE_LENGTH: u64 = 4 * 1024 * 1024;
// MAX_PIECE_LENGTH is the maximum piece length.
const MAX_PIECE_LENGTH: u64 = 16 * 1024 * 1024;
// PieceLengthStrategy sets the optimization strategy of piece length.
pub enum PieceLengthStrategy {
// OptimizeByFileLength optimizes the piece length by the file length.
OptimizeByFileLength,
}
// Piece represents a piece manager.
pub struct Piece {
// config is the configuration of the dfdaemon.
@ -254,6 +272,29 @@ impl Piece {
pieces.into_values().collect()
}
// calculate_piece_size calculates the piece size by content_length.
pub fn calculate_piece_length(
&self,
strategy: PieceLengthStrategy,
content_length: u64,
) -> u64 {
match strategy {
PieceLengthStrategy::OptimizeByFileLength => {
let piece_length = (content_length as f64 / MAX_PIECE_COUNT as f64) as u64;
let actual_piece_length = piece_length.next_power_of_two();
match (
actual_piece_length > MIN_PIECE_LENGTH,
actual_piece_length < MAX_PIECE_LENGTH,
) {
(true, true) => actual_piece_length,
(_, false) => MAX_PIECE_LENGTH,
(false, _) => MIN_PIECE_LENGTH,
}
}
}
}
// upload_from_local_peer_into_async_read uploads a single piece from a local peer.
#[instrument(skip_all, fields(piece_id))]
pub async fn upload_from_local_peer_into_async_read(
@ -493,6 +534,7 @@ impl Piece {
let mut response = backend
.get(GetRequest {
task_id: task_id.to_string(),
piece_id: self.storage.piece_id(task_id, number),
url: url.to_string(),
range: Some(Range {

View File

@ -114,9 +114,10 @@ impl Task {
id: &str,
request: Download,
) -> ClientResult<metadata::Task> {
let task = self
.storage
.download_task_started(id, request.piece_length, None, None)?;
let task = self.storage.download_task_started(id, None, None, None)?;
if task.content_length.is_some() && task.piece_length.is_some() {
return Ok(task);
}
// Handle the request header.
let mut request_header =
@ -125,10 +126,6 @@ impl Task {
err
})?;
if task.content_length.is_some() {
return Ok(task);
}
// Remove the range header to prevent the server from
// returning a 206 partial content and returning
// a 200 full content.
@ -156,10 +153,20 @@ impl Task {
}));
}
let content_length = match response.content_length {
Some(content_length) => content_length,
None => return Err(Error::InvalidContentLength),
};
let piece_length = self.piece.calculate_piece_length(
piece::PieceLengthStrategy::OptimizeByFileLength,
content_length,
);
self.storage.download_task_started(
id,
request.piece_length,
response.content_length,
Some(piece_length),
Some(content_length),
response.http_header,
)
}
@ -171,8 +178,7 @@ impl Task {
// download_failed updates the metadata of the task when the task downloads failed.
pub async fn download_failed(&self, id: &str) -> ClientResult<()> {
let _ = self.storage.download_task_failed(id).await?;
Ok(())
self.storage.download_task_failed(id).await.map(|_| ())
}
// prefetch_task_started updates the metadata of the task when the task prefetch started.
@ -211,18 +217,24 @@ impl Task {
return Err(Error::InvalidContentLength);
};
// Calculate the interested pieces to download.
let interested_pieces = match self.piece.calculate_interested(
request.piece_length,
content_length,
request.range,
) {
Ok(interested_pieces) => interested_pieces,
Err(err) => {
error!("calculate interested pieces error: {:?}", err);
return Err(err);
}
// Get the piece length from the task.
let Some(piece_length) = task.piece_length() else {
error!("piece length not found");
return Err(Error::InvalidPieceLength);
};
// Calculate the interested pieces to download.
let interested_pieces =
match self
.piece
.calculate_interested(piece_length, content_length, request.range)
{
Ok(interested_pieces) => interested_pieces,
Err(err) => {
error!("calculate interested pieces error: {:?}", err);
return Err(err);
}
};
info!(
"interested pieces: {:?}",
interested_pieces