From 832af14c913d34f71b72e9c00efbe9be1fd17e32 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 13 Jun 2024 15:51:54 +0800 Subject: [PATCH] feat: wait for piece finished before update piece metadata (#535) Signed-off-by: Gaius --- Cargo.lock | 16 ++++++++-------- Cargo.toml | 16 ++++++++-------- dragonfly-client-storage/src/lib.rs | 17 +++++------------ dragonfly-client-storage/src/metadata.rs | 18 +++++++++++++++--- 4 files changed, 36 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f01a10f..75dca624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -965,7 +965,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.79" +version = "0.1.80" dependencies = [ "anyhow", "bytes", @@ -1029,7 +1029,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.79" +version = "0.1.80" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1049,7 +1049,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.79" +version = "0.1.80" dependencies = [ "dragonfly-client-core", "home", @@ -1068,7 +1068,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.79" +version = "0.1.80" dependencies = [ "hyper 1.2.0", "hyper-util", @@ -1082,7 +1082,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.79" +version = "0.1.80" dependencies = [ "anyhow", "clap", @@ -1098,7 +1098,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.79" +version = "0.1.80" dependencies = [ "base16ct", "blake3", @@ -1122,7 +1122,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.79" +version = "0.1.80" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1581,7 +1581,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.79" +version = "0.1.80" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 93d82b7b..c932d369 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.79" +version = "0.1.80" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.79" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.79" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.79" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.79" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.79" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.79" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.79" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.80" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.80" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.80" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.80" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.80" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.80" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.80" } thiserror = "1.0" dragonfly-api = "2.0.115" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 93b26a3a..3063e546 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -23,7 +23,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; -use tracing::info; +use tracing::{error, info}; pub mod content; pub mod metadata; @@ -210,6 +210,9 @@ impl Storage { number: u32, range: Option, ) -> Result { + // Wait for the piece to be finished. + self.wait_for_piece_finished(task_id, number).await?; + // Start uploading the task. self.metadata.upload_task_started(task_id)?; @@ -220,16 +223,6 @@ impl Storage { return Err(err); } - // Wait for the piece to be finished. - if let Err(err) = self.wait_for_piece_finished(task_id, number).await { - // Failed uploading the task. - self.metadata.upload_task_failed(task_id)?; - - // Failed uploading the piece. - self.metadata.upload_piece_failed(task_id, number)?; - return Err(err); - } - // Get the piece metadata and return the content of the piece. match self.metadata.get_piece(task_id, number)? { Some(piece) => { @@ -294,7 +287,6 @@ impl Storage { loop { tokio::select! { _ = interval.tick() => { - let piece = self .get_piece(task_id, number)? .ok_or_else(|| Error::PieceNotFound(self.piece_id(task_id, number)))?; @@ -311,6 +303,7 @@ impl Storage { wait_for_piece_count += 1; } _ = &mut piece_timeout => { + self.metadata.wait_for_piece_finished_failed(task_id, number).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err)); return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number))); } } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 69254e82..6cf2b114 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -24,7 +24,7 @@ use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::time::Duration; -use tracing::error; +use tracing::{error, info}; use crate::storage_engine::{rocksdb::RocksdbStorageEngine, DatabaseObject, StorageEngineOwned}; @@ -391,6 +391,7 @@ impl Metadata { /// delete_task deletes the task metadata. pub fn delete_task(&self, task_id: &str) -> Result<()> { + info!("delete task metadata {}", task_id); self.db.delete::(task_id.as_bytes()) } @@ -441,8 +442,12 @@ impl Metadata { /// download_piece_failed updates the metadata of the piece when the piece downloads failed. pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { - self.db - .delete::(self.piece_id(task_id, number).as_bytes()) + self.delete_piece(task_id, number) + } + + // wait_for_piece_finished_failed waits for the piece to be finished or failed. + pub fn wait_for_piece_finished_failed(&self, task_id: &str, number: u32) -> Result<()> { + self.delete_piece(task_id, number) } /// upload_piece_started updates the metadata of the piece when piece uploads started. @@ -508,6 +513,13 @@ impl Metadata { iter.map(|ele| ele.map(|(_, piece)| piece)).collect() } + /// delete_piece deletes the piece metadata. + pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> { + info!("delete piece metadata {}", self.piece_id(task_id, number)); + self.db + .delete::(self.piece_id(task_id, number).as_bytes()) + } + /// delete_pieces deletes the piece metadatas. pub fn delete_pieces(&self, task_id: &str) -> Result<()> { let iter = self.db.prefix_iter::(task_id.as_bytes())?;