diff --git a/Cargo.lock b/Cargo.lock index cdb6edf5..e7b32608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -954,7 +954,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "1.0.0" +version = "1.0.1" dependencies = [ "anyhow", "bytes", @@ -1026,7 +1026,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "1.0.0" +version = "1.0.1" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1057,7 +1057,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "1.0.0" +version = "1.0.1" dependencies = [ "bytesize", "bytesize-serde", @@ -1087,7 +1087,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "1.0.0" +version = "1.0.1" dependencies = [ "headers 0.4.1", "hyper 1.6.0", @@ -1105,7 +1105,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "1.0.0" +version = "1.0.1" dependencies = [ "anyhow", "clap", @@ -1123,7 +1123,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "1.0.0" +version = "1.0.1" dependencies = [ "bincode", "bytes", @@ -1152,7 +1152,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "1.0.0" +version = "1.0.1" dependencies = [ "base64 0.22.1", "bytesize", @@ -1561,7 +1561,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "1.0.0" +version = "1.0.1" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 3528c157..a2a919da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "1.0.0" +version = "1.0.1" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "1.0.0" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.0" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.0" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.0" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.0" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.0" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.0" } +dragonfly-client = { path = "dragonfly-client", version = "1.0.1" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.1" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.1" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.1" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.1" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.1" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.1" } dragonfly-api = "=2.1.40" thiserror = "2.0" futures = "0.3.31" diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 0455c96e..c52eda61 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -359,6 +359,7 @@ impl Content { &self, task_id: &str, offset: u64, + expected_length: u64, reader: &mut R, ) -> Result { // Open the file and seek to the offset. @@ -393,6 +394,13 @@ impl Content { error!("flush {:?} failed: {}", task_path, err); })?; + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + // Calculate the hash of the piece. Ok(WritePieceResponse { length, @@ -573,6 +581,7 @@ impl Content { &self, task_id: &str, offset: u64, + expected_length: u64, reader: &mut R, ) -> Result { // Open the file and seek to the offset. @@ -607,6 +616,13 @@ impl Content { error!("flush {:?} failed: {}", task_path, err); })?; + if length != expected_length { + return Err(Error::Unknown(format!( + "expected length {} but got {}", + expected_length, length + ))); + } + // Calculate the hash of the piece. Ok(WritePieceResponse { length, @@ -730,7 +746,10 @@ mod tests { let data = b"hello, world!"; let mut reader = Cursor::new(data); - content.write_piece(task_id, 0, &mut reader).await.unwrap(); + content + .write_piece(task_id, 0, 13, &mut reader) + .await + .unwrap(); let mut reader = content.read_piece(task_id, 0, 13, None).await.unwrap(); let mut buffer = Vec::new(); @@ -765,7 +784,10 @@ mod tests { let data = b"test"; let mut reader = Cursor::new(data); - let response = content.write_piece(task_id, 0, &mut reader).await.unwrap(); + let response = content + .write_piece(task_id, 0, 4, &mut reader) + .await + .unwrap(); assert_eq!(response.length, 4); assert!(!response.hash.is_empty()); } @@ -872,7 +894,7 @@ mod tests { let data = b"hello, world!"; let mut reader = Cursor::new(data); content - .write_persistent_cache_piece(task_id, 0, &mut reader) + .write_persistent_cache_piece(task_id, 0, 13, &mut reader) .await .unwrap(); @@ -916,7 +938,7 @@ mod tests { let data = b"test"; let mut reader = Cursor::new(data); let response = content - .write_persistent_cache_piece(task_id, 0, &mut reader) + .write_persistent_cache_piece(task_id, 0, 4, &mut reader) .await .unwrap(); assert_eq!(response.length, 4); diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 2e627c78..9160d0d9 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -385,7 +385,7 @@ impl Storage { ) -> Result { let response = self .content - .write_persistent_cache_piece(task_id, offset, reader) + .write_persistent_cache_piece(task_id, offset, length, reader) .await?; let digest = Digest::new(Algorithm::Crc32, response.hash); @@ -454,7 +454,10 @@ impl Storage { buffer.extend_from_slice(bytes); }); - let response = self.content.write_piece(task_id, offset, &mut tee).await?; + let response = self + .content + .write_piece(task_id, offset, length, &mut tee) + .await?; self.cache .write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) @@ -463,7 +466,9 @@ impl Storage { response } else { - self.content.write_piece(task_id, offset, reader).await? + self.content + .write_piece(task_id, offset, length, reader) + .await? }; let digest = Digest::new(Algorithm::Crc32, response.hash); @@ -521,7 +526,10 @@ impl Storage { buffer.extend_from_slice(bytes); }); - let response = self.content.write_piece(task_id, offset, &mut tee).await?; + let response = self + .content + .write_piece(task_id, offset, length, &mut tee) + .await?; self.cache .write_piece(task_id, piece_id, bytes::Bytes::from(buffer)) @@ -530,7 +538,9 @@ impl Storage { response } else { - self.content.write_piece(task_id, offset, reader).await? + self.content + .write_piece(task_id, offset, length, reader) + .await? }; let length = response.length; @@ -668,6 +678,7 @@ impl Storage { } /// download_persistent_cache_piece_from_parent_finished is used for downloading persistent cache piece from parent. + #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub async fn download_persistent_cache_piece_from_parent_finished< R: AsyncRead + Unpin + ?Sized, @@ -676,13 +687,14 @@ impl Storage { piece_id: &str, task_id: &str, offset: u64, + length: u64, expected_digest: &str, parent_id: &str, reader: &mut R, ) -> Result { let response = self .content - .write_persistent_cache_piece(task_id, offset, reader) + .write_persistent_cache_piece(task_id, offset, length, reader) .await?; let length = response.length; diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index eb8bd97d..900e39a6 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -840,6 +840,7 @@ impl Piece { piece_id, task_id, offset, + length, digest.as_str(), parent.id.as_str(), &mut reader, diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index c36f8ce9..7ddf6b2e 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -1588,6 +1588,11 @@ impl Task { } }; + if !piece.is_finished() { + debug!("piece {} is not finished, skip it", piece_id); + continue; + } + // Fake the download from the local. self.piece.download_from_local(task_id, piece.length); info!("finished piece {} from local", piece_id,);