diff --git a/Cargo.lock b/Cargo.lock index 1104b61a..353f2dd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -959,7 +959,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "bytes", @@ -1022,7 +1022,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.53" +version = "0.1.54" dependencies = [ "dragonfly-client-core", "futures", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.53" +version = "0.1.54" dependencies = [ "dragonfly-client-core", "home", @@ -1059,7 +1059,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.53" +version = "0.1.54" dependencies = [ "libloading", "reqwest", @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "clap", @@ -1086,7 +1086,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.53" +version = "0.1.54" dependencies = [ "base16ct", "blake3", @@ -1110,7 +1110,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.53" +version = "0.1.54" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1568,7 +1568,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.53" +version = "0.1.54" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 7005cdfc..500a697f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.53" +version = "0.1.54" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.53" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.53" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.53" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.53" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.53" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.53" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.53" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.54" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.54" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.54" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.54" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.54" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.54" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.54" } thiserror = "1.0" dragonfly-api = "2.0.112" reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index b7b3c4af..64090e5e 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -187,7 +187,7 @@ impl Storage { } // download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result { + pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { self.metadata.download_piece_failed(task_id, number) } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 670783b9..cfb576eb 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -25,8 +25,7 @@ use std::time::Duration; use tracing::error; use crate::storage_engine::{ - rocksdb::RocksdbStorageEngine, DatabaseObject, Operations, StorageEngine, StorageEngineOwned, - Transaction, + rocksdb::RocksdbStorageEngine, DatabaseObject, Operations, StorageEngineOwned, Transaction, }; // Task is the metadata of the task. @@ -209,21 +208,6 @@ where } impl Metadata { - /// with_txn executes the enclosed closure within a transaction. - fn with_txn(&self, f: impl FnOnce(&::Txn) -> Result) -> Result { - let txn = self.db.start_transaction(); - match f(&txn) { - Ok(result) => { - txn.commit()?; - Ok(result) - } - Err(err) => { - txn.rollback()?; - Err(err) - } - } - } - /// download_task_started updates the metadata of the task when the task downloads started. pub fn download_task_started( &self, @@ -233,111 +217,109 @@ impl Metadata { response_header: Option, ) -> Result { // Convert the response header to hashmap. - let get_response_header = || { - response_header - .as_ref() - .map(reqwest_headermap_to_hashmap) - .unwrap_or_default() + let response_header = response_header + .as_ref() + .map(reqwest_headermap_to_hashmap) + .unwrap_or_default(); + + let txn = self.db.start_transaction(); + let task = match txn.get_for_update::(id.as_bytes())? { + Some(mut task) => { + // If the task exists, update the task metadata. + task.updated_at = Utc::now().naive_utc(); + task.peer_ids.insert(peer_id.to_string()); + + // If the task has the response header, the response header + // will not be covered. + if task.response_header.is_empty() { + task.response_header = response_header; + } + + task + } + None => Task { + id: id.to_string(), + peer_ids: vec![peer_id.to_string()].into_iter().collect(), + piece_length, + response_header, + updated_at: Utc::now().naive_utc(), + created_at: Utc::now().naive_utc(), + ..Default::default() + }, }; - self.with_txn(|txn| { - let task = match txn.get_for_update::(id.as_bytes())? { - Some(mut task) => { - // If the task exists, update the task metadata. - task.updated_at = Utc::now().naive_utc(); - task.peer_ids.insert(peer_id.to_string()); - - // If the task has the response header, the response header - // will not be covered. - if task.response_header.is_empty() { - task.response_header = get_response_header(); - } - - task - } - None => Task { - id: id.to_string(), - peer_ids: vec![peer_id.to_string()].into_iter().collect(), - piece_length, - response_header: get_response_header(), - updated_at: Utc::now().naive_utc(), - created_at: Utc::now().naive_utc(), - ..Default::default() - }, - }; - - txn.put(id.as_bytes(), &task)?; - Ok(task) - }) + txn.put(id.as_bytes(), &task)?; + txn.commit()?; + Ok(task) } /// download_task_finished updates the metadata of the task when the task downloads finished. pub fn download_task_finished(&self, id: &str) -> Result { - self.with_txn(|txn| { - let task = match txn.get_for_update::(id.as_bytes())? { - Some(mut task) => { - task.updated_at = Utc::now().naive_utc(); - task.finished_at = Some(Utc::now().naive_utc()); - task - } - None => return Err(Error::TaskNotFound(id.to_string())), - }; + let txn = self.db.start_transaction(); + let task = match txn.get_for_update::(id.as_bytes())? { + Some(mut task) => { + task.updated_at = Utc::now().naive_utc(); + task.finished_at = Some(Utc::now().naive_utc()); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; - txn.put(id.as_bytes(), &task)?; - Ok(task) - }) + txn.put(id.as_bytes(), &task)?; + txn.commit()?; + Ok(task) } /// upload_task_started updates the metadata of the task when task uploads started. pub fn upload_task_started(&self, id: &str) -> Result { - self.with_txn(|txn| { - let task = match txn.get_for_update::(id.as_bytes())? { - Some(mut task) => { - task.uploading_count += 1; - task.updated_at = Utc::now().naive_utc(); - task - } - None => return Err(Error::TaskNotFound(id.to_string())), - }; + let txn = self.db.start_transaction(); + let task = match txn.get_for_update::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count += 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; - txn.put(id.as_bytes(), &task)?; - Ok(task) - }) + txn.put(id.as_bytes(), &task)?; + txn.commit()?; + Ok(task) } /// upload_task_finished updates the metadata of the task when task uploads finished. pub fn upload_task_finished(&self, id: &str) -> Result { - self.with_txn(|txn| { - let task = match txn.get_for_update::(id.as_bytes())? { - Some(mut task) => { - task.uploading_count -= 1; - task.uploaded_count += 1; - task.updated_at = Utc::now().naive_utc(); - task - } - None => return Err(Error::TaskNotFound(id.to_string())), - }; + let txn = self.db.start_transaction(); + let task = match txn.get_for_update::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count -= 1; + task.uploaded_count += 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; - txn.put(id.as_bytes(), &task)?; - Ok(task) - }) + txn.put(id.as_bytes(), &task)?; + txn.commit()?; + Ok(task) } /// upload_task_failed updates the metadata of the task when the task uploads failed. pub fn upload_task_failed(&self, id: &str) -> Result { - self.with_txn(|txn| { - let task = match txn.get_for_update::(id.as_bytes())? { - Some(mut task) => { - task.uploading_count -= 1; - task.updated_at = Utc::now().naive_utc(); - task - } - None => return Err(Error::TaskNotFound(id.to_string())), - }; + let txn = self.db.start_transaction(); + let task = match txn.get_for_update::(id.as_bytes())? { + Some(mut task) => { + task.uploading_count -= 1; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; - txn.put(id.as_bytes(), &task)?; - Ok(task) - }) + txn.put(id.as_bytes(), &task)?; + txn.commit()?; + Ok(task) } /// get_task gets the task metadata. @@ -347,22 +329,18 @@ impl Metadata { /// get_tasks gets the task metadatas. pub fn get_tasks(&self) -> Result> { - self.with_txn(|txn| txn.iter()?.map(|ele| ele.map(|(_, task)| task)).collect()) + let txn = self.db.start_transaction(); + let iter = txn.iter::()?; + iter.map(|ele| ele.map(|(_, task)| task)).collect() } /// delete_task deletes the task metadata. pub fn delete_task(&self, task_id: &str) -> Result<()> { - self.with_txn(|txn| { - txn.delete::(task_id.as_bytes())?; - Ok(()) - }) + self.db.delete::(task_id.as_bytes()) } /// download_piece_started updates the metadata of the piece when the piece downloads started. pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - // Construct the piece metadata. let piece = Piece { number, @@ -371,11 +349,10 @@ impl Metadata { ..Default::default() }; - self.with_txn(|txn| { - // Put the piece metadata. - txn.put(id.as_bytes(), &piece)?; - Ok(piece) - }) + // Put the piece metadata. + self.db + .put(self.piece_id(task_id, number).as_bytes(), &piece)?; + Ok(piece) } /// download_piece_finished updates the metadata of the piece when the piece downloads finished. @@ -390,135 +367,112 @@ impl Metadata { ) -> Result { // Get the piece id. let id = self.piece_id(task_id, number); + let txn = self.db.start_transaction(); + let piece = match txn.get_for_update::(id.as_bytes())? { + Some(mut piece) => { + piece.offset = offset; + piece.length = length; + piece.digest = digest.to_string(); + piece.parent_id = parent_id; + piece.updated_at = Utc::now().naive_utc(); + piece.finished_at = Some(Utc::now().naive_utc()); + piece + } + None => return Err(Error::PieceNotFound(id)), + }; - self.with_txn(|txn| { - let piece = match txn.get_for_update::(id.as_bytes())? { - Some(mut piece) => { - piece.offset = offset; - piece.length = length; - piece.digest = digest.to_string(); - piece.parent_id = parent_id; - piece.updated_at = Utc::now().naive_utc(); - piece.finished_at = Some(Utc::now().naive_utc()); - piece - } - None => return Err(Error::PieceNotFound(id)), - }; - - txn.put(id.as_bytes(), &piece)?; - Ok(piece) - }) + txn.put(id.as_bytes(), &piece)?; + txn.commit()?; + Ok(piece) } /// download_piece_failed updates the metadata of the piece when the piece downloads failed. - pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result { - // Get the piece id. - let id = self.piece_id(task_id, number); - - self.with_txn(|txn| { - let piece = match txn.get_for_update(id.as_bytes())? { - // If the piece exists, delete the piece metadata. - Some(piece) => { - txn.delete::(id.as_bytes())?; - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; - - Ok(piece) - }) + pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> { + self.db + .delete::(self.piece_id(task_id, number).as_bytes()) } /// upload_piece_started updates the metadata of the piece when piece uploads started. pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result { // Get the piece id. let id = self.piece_id(task_id, number); + let txn = self.db.start_transaction(); + let piece = match txn.get_for_update::(id.as_bytes())? { + Some(mut piece) => { + piece.uploading_count += 1; + piece.updated_at = Utc::now().naive_utc(); + piece + } + None => return Err(Error::PieceNotFound(id)), + }; - self.with_txn(|txn| { - let piece = match txn.get_for_update::(id.as_bytes())? { - Some(mut piece) => { - piece.uploading_count += 1; - piece.updated_at = Utc::now().naive_utc(); - piece - } - None => return Err(Error::PieceNotFound(id)), - }; - - txn.put(id.as_bytes(), &piece)?; - Ok(piece) - }) + txn.put(id.as_bytes(), &piece)?; + txn.commit()?; + Ok(piece) } /// upload_piece_finished updates the metadata of the piece when piece uploads finished. pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result { // Get the piece id. let id = self.piece_id(task_id, number); + let txn = self.db.start_transaction(); + let piece = match txn.get_for_update::(id.as_bytes())? { + Some(mut piece) => { + piece.uploading_count -= 1; + piece.uploaded_count += 1; + piece.updated_at = Utc::now().naive_utc(); + piece + } + None => return Err(Error::PieceNotFound(id)), + }; - self.with_txn(|txn| { - let piece = match txn.get_for_update::(id.as_bytes())? { - Some(mut piece) => { - piece.uploading_count -= 1; - piece.uploaded_count += 1; - piece.updated_at = Utc::now().naive_utc(); - piece - } - None => return Err(Error::PieceNotFound(id)), - }; - - txn.put(id.as_bytes(), &piece)?; - Ok(piece) - }) + txn.put(id.as_bytes(), &piece)?; + txn.commit()?; + Ok(piece) } /// upload_piece_failed updates the metadata of the piece when the piece uploads failed. pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result { // Get the piece id. let id = self.piece_id(task_id, number); + let txn = self.db.start_transaction(); + let piece = match txn.get_for_update::(id.as_bytes())? { + Some(mut piece) => { + piece.uploading_count -= 1; + piece.updated_at = Utc::now().naive_utc(); + piece + } + None => return Err(Error::PieceNotFound(id)), + }; - self.with_txn(|txn| { - let piece = match txn.get_for_update::(id.as_bytes())? { - Some(mut piece) => { - piece.uploading_count -= 1; - piece.updated_at = Utc::now().naive_utc(); - piece - } - None => return Err(Error::PieceNotFound(id)), - }; - - txn.put(id.as_bytes(), &piece)?; - Ok(piece) - }) + txn.put(id.as_bytes(), &piece)?; + txn.commit()?; + Ok(piece) } /// get_piece gets the piece metadata. pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { - let id = self.piece_id(task_id, number); - self.db.get(id.as_bytes()) + self.db.get(self.piece_id(task_id, number).as_bytes()) } /// get_pieces gets the piece metadatas. pub fn get_pieces(&self, task_id: &str) -> Result> { - self.with_txn(|txn| { - // Iterate the piece metadatas. - txn.prefix_iter(task_id.as_bytes())? - .map(|ele| ele.map(|(_, piece)| piece)) - .collect() - }) + let txn = self.db.start_transaction(); + let iter = txn.prefix_iter::(task_id.as_bytes())?; + iter.map(|ele| ele.map(|(_, piece)| piece)).collect() } /// delete_pieces deletes the piece metadatas. pub fn delete_pieces(&self, task_id: &str) -> Result<()> { - self.with_txn(|txn| { - let iter = txn.prefix_iter::(task_id.as_bytes())?; + let txn = self.db.start_transaction(); + let iter = txn.prefix_iter::(task_id.as_bytes())?; + for ele in iter { + let (key, _) = ele?; + txn.delete::(&key)?; + } - // Iterate the piece metadatas. - for ele in iter { - let (key, _) = ele?; - txn.delete::(&key)?; - } - Ok(()) - }) + txn.commit()?; + Ok(()) } /// piece_id returns the piece id.