feat: remove write buffer and check piece length when write piece finished

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-07-02 23:36:06 +08:00
parent fb3be39b50
commit cad36b3a19
No known key found for this signature in database
GPG Key ID: 647A0EE86907F1AF
6 changed files with 66 additions and 26 deletions

16
Cargo.lock generated
View File

@ -954,7 +954,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client" name = "dragonfly-client"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -1026,7 +1026,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-backend" name = "dragonfly-client-backend"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"dragonfly-api", "dragonfly-api",
"dragonfly-client-core", "dragonfly-client-core",
@ -1057,7 +1057,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-config" name = "dragonfly-client-config"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"bytesize", "bytesize",
"bytesize-serde", "bytesize-serde",
@ -1087,7 +1087,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-core" name = "dragonfly-client-core"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"headers 0.4.1", "headers 0.4.1",
"hyper 1.6.0", "hyper 1.6.0",
@ -1105,7 +1105,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-init" name = "dragonfly-client-init"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -1123,7 +1123,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-storage" name = "dragonfly-client-storage"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"bincode", "bincode",
"bytes", "bytes",
@ -1152,7 +1152,7 @@ dependencies = [
[[package]] [[package]]
name = "dragonfly-client-util" name = "dragonfly-client-util"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytesize", "bytesize",
@ -1561,7 +1561,7 @@ dependencies = [
[[package]] [[package]]
name = "hdfs" name = "hdfs"
version = "1.0.0" version = "1.0.1"
dependencies = [ dependencies = [
"dragonfly-client-backend", "dragonfly-client-backend",
"dragonfly-client-core", "dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "1.0.0" version = "1.0.1"
authors = ["The Dragonfly Developers"] authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/" homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git" repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "1.0.0" } dragonfly-client = { path = "dragonfly-client", version = "1.0.1" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.0" } dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.1" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.0" } dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.1" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.0" } dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.1" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.0" } dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.1" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.0" } dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.1" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.0" } dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.1" }
dragonfly-api = "=2.1.40" dragonfly-api = "=2.1.40"
thiserror = "2.0" thiserror = "2.0"
futures = "0.3.31" futures = "0.3.31"

View File

@ -359,6 +359,7 @@ impl Content {
&self, &self,
task_id: &str, task_id: &str,
offset: u64, offset: u64,
expected_length: u64,
reader: &mut R, reader: &mut R,
) -> Result<WritePieceResponse> { ) -> Result<WritePieceResponse> {
// Open the file and seek to the offset. // Open the file and seek to the offset.
@ -393,6 +394,13 @@ impl Content {
error!("flush {:?} failed: {}", task_path, err); error!("flush {:?} failed: {}", task_path, err);
})?; })?;
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
// Calculate the hash of the piece. // Calculate the hash of the piece.
Ok(WritePieceResponse { Ok(WritePieceResponse {
length, length,
@ -573,6 +581,7 @@ impl Content {
&self, &self,
task_id: &str, task_id: &str,
offset: u64, offset: u64,
expected_length: u64,
reader: &mut R, reader: &mut R,
) -> Result<WritePieceResponse> { ) -> Result<WritePieceResponse> {
// Open the file and seek to the offset. // Open the file and seek to the offset.
@ -607,6 +616,13 @@ impl Content {
error!("flush {:?} failed: {}", task_path, err); error!("flush {:?} failed: {}", task_path, err);
})?; })?;
if length != expected_length {
return Err(Error::Unknown(format!(
"expected length {} but got {}",
expected_length, length
)));
}
// Calculate the hash of the piece. // Calculate the hash of the piece.
Ok(WritePieceResponse { Ok(WritePieceResponse {
length, length,
@ -730,7 +746,10 @@ mod tests {
let data = b"hello, world!"; let data = b"hello, world!";
let mut reader = Cursor::new(data); let mut reader = Cursor::new(data);
content.write_piece(task_id, 0, &mut reader).await.unwrap(); content
.write_piece(task_id, 0, 13, &mut reader)
.await
.unwrap();
let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap();
let mut buffer = Vec::new(); let mut buffer = Vec::new();
@ -765,7 +784,10 @@ mod tests {
let data = b"test"; let data = b"test";
let mut reader = Cursor::new(data); let mut reader = Cursor::new(data);
let response = content.write_piece(task_id, 0, &mut reader).await.unwrap(); let response = content
.write_piece(task_id, 0, 4, &mut reader)
.await
.unwrap();
assert_eq!(response.length, 4); assert_eq!(response.length, 4);
assert!(!response.hash.is_empty()); assert!(!response.hash.is_empty());
} }
@ -872,7 +894,7 @@ mod tests {
let data = b"hello, world!"; let data = b"hello, world!";
let mut reader = Cursor::new(data); let mut reader = Cursor::new(data);
content content
.write_persistent_cache_piece(task_id, 0, &mut reader) .write_persistent_cache_piece(task_id, 0, 13, &mut reader)
.await .await
.unwrap(); .unwrap();
@ -916,7 +938,7 @@ mod tests {
let data = b"test"; let data = b"test";
let mut reader = Cursor::new(data); let mut reader = Cursor::new(data);
let response = content let response = content
.write_persistent_cache_piece(task_id, 0, &mut reader) .write_persistent_cache_piece(task_id, 0, 4, &mut reader)
.await .await
.unwrap(); .unwrap();
assert_eq!(response.length, 4); assert_eq!(response.length, 4);

View File

@ -385,7 +385,7 @@ impl Storage {
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = self let response = self
.content .content
.write_persistent_cache_piece(task_id, offset, reader) .write_persistent_cache_piece(task_id, offset, length, reader)
.await?; .await?;
let digest = Digest::new(Algorithm::Crc32, response.hash); let digest = Digest::new(Algorithm::Crc32, response.hash);
@ -454,7 +454,10 @@ impl Storage {
buffer.extend_from_slice(bytes); buffer.extend_from_slice(bytes);
}); });
let response = self.content.write_piece(task_id, offset, &mut tee).await?; let response = self
.content
.write_piece(task_id, offset, length, &mut tee)
.await?;
self.cache self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) .write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
@ -463,7 +466,9 @@ impl Storage {
response response
} else { } else {
self.content.write_piece(task_id, offset, reader).await? self.content
.write_piece(task_id, offset, length, reader)
.await?
}; };
let digest = Digest::new(Algorithm::Crc32, response.hash); let digest = Digest::new(Algorithm::Crc32, response.hash);
@ -521,7 +526,10 @@ impl Storage {
buffer.extend_from_slice(bytes); buffer.extend_from_slice(bytes);
}); });
let response = self.content.write_piece(task_id, offset, &mut tee).await?; let response = self
.content
.write_piece(task_id, offset, length, &mut tee)
.await?;
self.cache self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) .write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
@ -530,7 +538,9 @@ impl Storage {
response response
} else { } else {
self.content.write_piece(task_id, offset, reader).await? self.content
.write_piece(task_id, offset, length, reader)
.await?
}; };
let length = response.length; let length = response.length;
@ -668,6 +678,7 @@ impl Storage {
} }
/// download_persistent_cache_piece_from_parent_finished is used for downloading persistent cache piece from parent. /// download_persistent_cache_piece_from_parent_finished is used for downloading persistent cache piece from parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn download_persistent_cache_piece_from_parent_finished< pub async fn download_persistent_cache_piece_from_parent_finished<
R: AsyncRead + Unpin + ?Sized, R: AsyncRead + Unpin + ?Sized,
@ -676,13 +687,14 @@ impl Storage {
piece_id: &str, piece_id: &str,
task_id: &str, task_id: &str,
offset: u64, offset: u64,
length: u64,
expected_digest: &str, expected_digest: &str,
parent_id: &str, parent_id: &str,
reader: &mut R, reader: &mut R,
) -> Result<metadata::Piece> { ) -> Result<metadata::Piece> {
let response = self let response = self
.content .content
.write_persistent_cache_piece(task_id, offset, reader) .write_persistent_cache_piece(task_id, offset, length, reader)
.await?; .await?;
let length = response.length; let length = response.length;

View File

@ -840,6 +840,7 @@ impl Piece {
piece_id, piece_id,
task_id, task_id,
offset, offset,
length,
digest.as_str(), digest.as_str(),
parent.id.as_str(), parent.id.as_str(),
&mut reader, &mut reader,

View File

@ -1588,6 +1588,11 @@ impl Task {
} }
}; };
if !piece.is_finished() {
debug!("piece {} is not finished, skip it", piece_id);
continue;
}
// Fake the download from the local. // Fake the download from the local.
self.piece.download_from_local(task_id, piece.length); self.piece.download_from_local(task_id, piece.length);
info!("finished piece {} from local", piece_id,); info!("finished piece {} from local", piece_id,);