From c60dd7aa5fc0878a40e0e26103476294856745b3 Mon Sep 17 00:00:00 2001 From: imtsuki Date: Wed, 17 Apr 2024 15:20:11 +0800 Subject: [PATCH] Refactor: wrap rocksdb transaction operations in closures (#393) * refactor: wrap rocksdb txn operations in closures Signed-off-by: imtsuki * use generics for CF_NAME Signed-off-by: imtsuki * address review comments Change-Id: I86eccaf3cd9cc2b1128ec89c4a2cf041947a87ea Signed-off-by: imtsuki --------- Signed-off-by: imtsuki Signed-off-by: imtsuki --- dragonfly-client-storage/src/metadata.rs | 573 +++++++++-------------- 1 file changed, 233 insertions(+), 340 deletions(-) diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index dfd0d0df..f9c04a10 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -22,10 +22,10 @@ use dragonfly_client_core::{ use dragonfly_client_util::http::reqwest_headermap_to_hashmap; use reqwest::header::{self, HeaderMap}; use rocksdb::{ - BlockBasedOptions, Cache, ColumnFamily, IteratorMode, Options, TransactionDB, + BlockBasedOptions, Cache, ColumnFamily, IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; use std::time::Duration; @@ -46,12 +46,18 @@ const DEFAULT_BLOCK_SIZE: usize = 64 * 1024; // DEFAULT_CACHE_SIZE is the default cache size for rocksdb. const DEFAULT_CACHE_SIZE: usize = 16 * 1024 * 1024; -// TASK_CF_NAME is the column family name of task. +/// TASK_CF_NAME is the column family name of [Task]. const TASK_CF_NAME: &str = "task"; -// PIECE_CF_NAME is the column family name of piece. +/// PIECE_CF_NAME is the column family name of [Piece]. const PIECE_CF_NAME: &str = "piece"; +/// ColumnFamilyDescriptor marks a type can be stored in rocksdb, which has a cf name. +trait ColumnFamilyDescriptor: Default + Serialize + DeserializeOwned { + /// CF_NAME returns the column family name. + const CF_NAME: &'static str; +} + // Task is the metadata of the task. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Task { @@ -81,6 +87,10 @@ pub struct Task { pub finished_at: Option, } +impl ColumnFamilyDescriptor for Task { + const CF_NAME: &'static str = TASK_CF_NAME; +} + // Task implements the task metadata. impl Task { // is_started returns whether the task downloads started. @@ -164,6 +174,10 @@ pub struct Piece { pub finished_at: Option, } +impl ColumnFamilyDescriptor for Piece { + const CF_NAME: &'static str = PIECE_CF_NAME; +} + // Piece implements the piece metadata. impl Piece { // is_started returns whether the piece downloads started. @@ -208,15 +222,14 @@ impl Piece { } } -// Metadata is the metadata of the task. +/// Metadata manages the metadata of [Task] and [Piece]. pub struct Metadata { - // db is the rocksdb instance. + /// db is the underlying rocksdb instance. db: TransactionDB, } -// Metadata implements the metadata storage. impl Metadata { - // new returns a new metadata. + /// new creates a new metadata instance. pub fn new(dir: &Path) -> Result { // Initialize rocksdb options. let mut options = Options::default(); @@ -240,7 +253,7 @@ impl Metadata { // Open rocksdb. let dir = dir.join(DEFAULT_DIR_NAME); - let cf_names = [TASK_CF_NAME, PIECE_CF_NAME]; + let cf_names = [Task::CF_NAME, Piece::CF_NAME]; let db = TransactionDB::open_cf(&options, &TransactionDBOptions::default(), &dir, cf_names) .or_err(ErrorType::StorageError)?; info!("metadata initialized directory: {:?}", dir); @@ -248,189 +261,143 @@ impl Metadata { Ok(Metadata { db }) } - // download_task_started updates the metadata of the task when the task downloads started. + /// with_txn executes the enclosed closure within a transaction. + fn with_txn(&self, f: impl FnOnce(&Transaction) -> Result) -> Result { + let txn = self.db.transaction(); + let result = f(&txn)?; + txn.commit().or_err(ErrorType::StorageError)?; + Ok(result) + } + + /// transactional_update_or_else gets the object from the database and updates + /// it within a transaction. + /// If the object does not exist, execute the `or_else` closure. + /// `or_else` can either return an new object, which means that a new object + /// will be created; or return an error to abort the transaction. + fn transactional_update_or_else( + &self, + key: &str, + update: impl FnOnce(T) -> Result, + or_else: impl FnOnce() -> Result, + ) -> Result + where + T: ColumnFamilyDescriptor, + { + self.with_txn(|txn| { + let handle = self.cf_handle::()?; + let object = match txn + .get_for_update_cf(handle, key, true) + .or_err(ErrorType::StorageError)? + { + Some(bytes) => serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?, + None => or_else()?, + }; + let object = update(object)?; + let json = serde_json::to_string(&object).or_err(ErrorType::SerializeError)?; + txn.put_cf(handle, key.as_bytes(), json.as_bytes()) + .or_err(ErrorType::StorageError)?; + Ok(object) + }) + } + + /// download_task_started updates the metadata of the task when the task downloads started. pub fn download_task_started( &self, id: &str, piece_length: u64, response_header: Option, ) -> Result { - // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; - // Convert the response header to hashmap. - let response_header = match response_header { - Some(response_header) => reqwest_headermap_to_hashmap(&response_header), - None => HashMap::new(), + let get_response_header = || { + response_header + .as_ref() + .map(reqwest_headermap_to_hashmap) + .unwrap_or_default() }; - // Transaction is used to update the task metadata. - let txn = self.db.transaction(); - let task = match txn - .get_for_update_cf(handle, id, true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { + self.transactional_update_or_else( + id, + |mut task: Task| { // If the task exists, update the task metadata. - let mut task: Task = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; task.updated_at = Utc::now().naive_utc(); - // If the task has the response header, the response header // will not be covered. if task.response_header.is_empty() { - task.response_header = response_header; + task.response_header = get_response_header(); } - task - } - // If the task does not exist, create a new task metadata. - None => Task { - id: id.to_string(), - piece_length, - response_header, - updated_at: Utc::now().naive_utc(), - created_at: Utc::now().naive_utc(), - ..Default::default() - }, - }; - // Put the task metadata. - let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(task) + Ok(task) + }, + || { + Ok(Task { + id: id.to_string(), + piece_length, + response_header: get_response_header(), + updated_at: Utc::now().naive_utc(), + created_at: Utc::now().naive_utc(), + ..Default::default() + }) + }, + ) } - // download_task_finished updates the metadata of the task when the task downloads finished. + /// download_task_finished updates the metadata of the task when the task downloads finished. pub fn download_task_finished(&self, id: &str) -> Result { - // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; - - // Transaction is used to update the task metadata. - let txn = self.db.transaction(); - let task = match txn - .get_for_update_cf(handle, id, true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the task exists, update the task metadata. - let mut task: Task = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id, + |mut task: Task| { task.updated_at = Utc::now().naive_utc(); task.finished_at = Some(Utc::now().naive_utc()); - task - } - // If the task does not exist, return error. - None => return Err(Error::TaskNotFound(id.to_string())), - }; - - // Put the task metadata. - let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(task) + Ok(task) + }, + || Err(Error::TaskNotFound(id.to_string())), + ) } - // upload_task_started updates the metadata of the task when task uploads started. + /// upload_task_started updates the metadata of the task when task uploads started. pub fn upload_task_started(&self, id: &str) -> Result { - // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; - - // Transaction is used to update the task metadata. - let txn = self.db.transaction(); - let task = match txn - .get_for_update_cf(handle, id, true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the task exists, update the task metadata. - let mut task: Task = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id, + |mut task: Task| { task.uploading_count += 1; task.updated_at = Utc::now().naive_utc(); - task - } - // If the task does not exist, return error. - None => return Err(Error::TaskNotFound(id.to_string())), - }; - - // Put the task metadata. - let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(task) + Ok(task) + }, + || Err(Error::TaskNotFound(id.to_string())), + ) } - // upload_task_finished updates the metadata of the task when task uploads finished. + /// upload_task_finished updates the metadata of the task when task uploads finished. pub fn upload_task_finished(&self, id: &str) -> Result { - // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; - - // Transaction is used to update the task metadata. - let txn = self.db.transaction(); - let task = match txn - .get_for_update_cf(handle, id, true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the task exists, update the task metadata. - let mut task: Task = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id, + |mut task: Task| { task.uploading_count -= 1; task.uploaded_count += 1; task.updated_at = Utc::now().naive_utc(); - task - } - // If the task does not exist, return error. - None => return Err(Error::TaskNotFound(id.to_string())), - }; - - // Put the task metadata. - let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(task) + Ok(task) + }, + || Err(Error::TaskNotFound(id.to_string())), + ) } - // upload_task_failed updates the metadata of the task when the task uploads failed. + /// upload_task_failed updates the metadata of the task when the task uploads failed. pub fn upload_task_failed(&self, id: &str) -> Result { - // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; - - // Transaction is used to update the task metadata. - let txn = self.db.transaction(); - let task = match txn - .get_for_update_cf(handle, id, true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the task exists, update the task metadata. - let mut task: Task = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id, + |mut task: Task| { task.uploading_count -= 1; task.updated_at = Utc::now().naive_utc(); - task - } - // If the task does not exist, return error. - None => return Err(Error::TaskNotFound(id.to_string())), - }; - - // Put the task metadata. - let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(task) + Ok(task) + }, + || Err(Error::TaskNotFound(id.to_string())), + ) } - // get_task gets the task metadata. + /// get_task gets the task metadata. pub fn get_task(&self, id: &str) -> Result> { // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; + let handle = self.cf_handle::()?; match self.db.get_cf(handle, id).or_err(ErrorType::StorageError)? { Some(bytes) => Ok(Some( serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?, @@ -439,43 +406,43 @@ impl Metadata { } } - // get_tasks gets the task metadatas. + /// get_tasks gets the task metadatas. pub fn get_tasks(&self) -> Result> { // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; + let handle = self.cf_handle::()?; - // Transaction is used to get the task metadatas. - let txn = self.db.transaction(); - let iter = txn.iterator_cf(handle, IteratorMode::Start); + self.with_txn(|txn| { + let iter = txn.iterator_cf(handle, IteratorMode::Start); - // Iterate the task metadatas. - let mut tasks = Vec::new(); - for ele in iter { - let (_, value) = ele.or_err(ErrorType::StorageError)?; - let task: Task = serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; - tasks.push(task); - } + // Iterate the task metadatas. + let mut tasks = Vec::new(); + for ele in iter { + let (_, value) = ele.or_err(ErrorType::StorageError)?; + let task: Task = + serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; + tasks.push(task); + } - Ok(tasks) + Ok(tasks) + }) } - // delete_task deletes the task metadata. + /// delete_task deletes the task metadata. pub fn delete_task(&self, task_id: &str) -> Result<()> { // Get the column family handle of task. - let handle = self.cf_handle(TASK_CF_NAME)?; + let handle = self.cf_handle::()?; - // Transaction is used to delete the task metadata. - let txn = self.db.transaction(); - txn.delete_cf(handle, task_id) - .or_err(ErrorType::SerializeError)?; - txn.commit().or_err(ErrorType::SerializeError)?; - Ok(()) + self.with_txn(|txn| { + txn.delete_cf(handle, task_id) + .or_err(ErrorType::SerializeError)?; + Ok(()) + }) } - // download_piece_started updates the metadata of the piece when the piece downloads started. + /// download_piece_started updates the metadata of the piece when the piece downloads started. pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result { // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; + let handle = self.cf_handle::()?; // Get the piece id. let id = self.piece_id(task_id, number); @@ -488,18 +455,16 @@ impl Metadata { ..Default::default() }; - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - - // Put the piece metadata. - let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + self.with_txn(|txn| { + // Put the piece metadata. + let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; + txn.put_cf(handle, id.as_bytes(), json.as_bytes()) + .or_err(ErrorType::StorageError)?; + Ok(piece) + }) } - // download_piece_finished updates the metadata of the piece when the piece downloads finished. + /// download_piece_finished updates the metadata of the piece when the piece downloads finished. pub fn download_piece_finished( &self, task_id: &str, @@ -509,180 +474,106 @@ impl Metadata { digest: &str, parent_id: Option, ) -> Result { - // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; - // Get the piece id. let id = self.piece_id(task_id, number); - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - let piece = match txn - .get_for_update_cf(handle, id.as_bytes(), true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the piece exists, update the piece metadata. - let mut piece: Piece = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id.as_str(), + |mut piece: Piece| { piece.offset = offset; piece.length = length; piece.digest = digest.to_string(); piece.parent_id = parent_id; piece.updated_at = Utc::now().naive_utc(); piece.finished_at = Some(Utc::now().naive_utc()); - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; - - // Put the piece metadata. - let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + Ok(piece) + }, + || Err(Error::PieceNotFound(id.to_string())), + ) } - // download_piece_failed updates the metadata of the piece when the piece downloads failed. + /// download_piece_failed updates the metadata of the piece when the piece downloads failed. pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result { // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; + let handle = self.cf_handle::()?; // Get the piece id. let id = self.piece_id(task_id, number); - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - let piece = match txn - .get_for_update_cf(handle, id.as_bytes(), true) - .or_err(ErrorType::StorageError)? - { - // If the piece exists, delete the piece metadata. - Some(bytes) => { - txn.delete_cf(handle, id.as_bytes()) - .or_err(ErrorType::StorageError)?; - let piece: Piece = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; + self.with_txn(|txn| { + let piece = match txn + .get_for_update_cf(handle, id.as_bytes(), true) + .or_err(ErrorType::StorageError)? + { + // If the piece exists, delete the piece metadata. + Some(bytes) => { + txn.delete_cf(handle, id.as_bytes()) + .or_err(ErrorType::StorageError)?; + let piece: Piece = + serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + piece + } + // If the piece does not exist, return error. + None => return Err(Error::PieceNotFound(id)), + }; - // Commit the transaction. - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + Ok(piece) + }) } - // upload_piece_started updates the metadata of the piece when piece uploads started. + /// upload_piece_started updates the metadata of the piece when piece uploads started. pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result { - // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; - // Get the piece id. let id = self.piece_id(task_id, number); - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - let piece = match txn - .get_for_update_cf(handle, id.as_bytes(), true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the piece exists, update the piece metadata. - let mut piece: Piece = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id.as_str(), + |mut piece: Piece| { piece.uploading_count += 1; piece.updated_at = Utc::now().naive_utc(); - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; - - // Put the piece metadata. - let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + Ok(piece) + }, + || Err(Error::PieceNotFound(id.to_string())), + ) } - // upload_piece_finished updates the metadata of the piece when piece uploads finished. + /// upload_piece_finished updates the metadata of the piece when piece uploads finished. pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result { - // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; - // Get the piece id. let id = self.piece_id(task_id, number); - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - let piece = match txn - .get_for_update_cf(handle, id.as_bytes(), true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the piece exists, update the piece metadata. - let mut piece: Piece = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id.as_str(), + |mut piece: Piece| { piece.uploading_count -= 1; piece.uploaded_count += 1; piece.updated_at = Utc::now().naive_utc(); - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; - - // Put the piece metadata. - let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + Ok(piece) + }, + || Err(Error::PieceNotFound(id.to_string())), + ) } - // upload_piece_failed updates the metadata of the piece when the piece uploads failed. + /// upload_piece_failed updates the metadata of the piece when the piece uploads failed. pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result { - // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; - // Get the piece id. let id = self.piece_id(task_id, number); - // Transaction is used to update the piece metadata. - let txn = self.db.transaction(); - let piece = match txn - .get_for_update_cf(handle, id.as_bytes(), true) - .or_err(ErrorType::StorageError)? - { - Some(bytes) => { - // If the piece exists, update the piece metadata. - let mut piece: Piece = - serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; + self.transactional_update_or_else( + id.as_str(), + |mut piece: Piece| { piece.uploading_count -= 1; piece.updated_at = Utc::now().naive_utc(); - piece - } - // If the piece does not exist, return error. - None => return Err(Error::PieceNotFound(id)), - }; - - // Put the piece metadata. - let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; - txn.put_cf(handle, id.as_bytes(), json.as_bytes()) - .or_err(ErrorType::StorageError)?; - txn.commit().or_err(ErrorType::StorageError)?; - Ok(piece) + Ok(piece) + }, + || Err(Error::PieceNotFound(id.to_string())), + ) } - // get_piece gets the piece metadata. + /// get_piece gets the piece metadata. pub fn get_piece(&self, task_id: &str, number: u32) -> Result> { let id = self.piece_id(task_id, number); - let handle = self.cf_handle(PIECE_CF_NAME)?; + let handle = self.cf_handle::()?; match self .db .get_cf(handle, id.as_bytes()) @@ -695,53 +586,55 @@ impl Metadata { } } - // get_pieces gets the piece metadatas. + /// get_pieces gets the piece metadatas. pub fn get_pieces(&self, task_id: &str) -> Result> { // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; + let handle = self.cf_handle::()?; - // Transaction is used to get the piece metadatas. - let txn = self.db.transaction(); - let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes()); + self.with_txn(|txn| { + let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes()); - // Iterate the piece metadatas. - let mut pieces = Vec::new(); - for ele in iter { - let (_, value) = ele.or_err(ErrorType::StorageError)?; - let piece: Piece = serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; - pieces.push(piece); - } + // Iterate the piece metadatas. + let mut pieces = Vec::new(); + for ele in iter { + let (_, value) = ele.or_err(ErrorType::StorageError)?; + let piece: Piece = + serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; + pieces.push(piece); + } - Ok(pieces) + Ok(pieces) + }) } - // delete_pieces deletes the piece metadatas. + /// delete_pieces deletes the piece metadatas. pub fn delete_pieces(&self, task_id: &str) -> Result<()> { // Get the column family handle of piece. - let handle = self.cf_handle(PIECE_CF_NAME)?; + let handle = self.cf_handle::()?; - // Transaction is used to delete the piece metadatas. - let txn = self.db.transaction(); - let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes()); + self.with_txn(|txn| { + let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes()); - // Iterate the piece metadatas. - for ele in iter { - let (key, _) = ele.or_err(ErrorType::StorageError)?; - txn.delete_cf(handle, key).or_err(ErrorType::StorageError)?; - } - - // Commit the transaction. - txn.commit().or_err(ErrorType::StorageError)?; - Ok(()) + // Iterate the piece metadatas. + for ele in iter { + let (key, _) = ele.or_err(ErrorType::StorageError)?; + txn.delete_cf(handle, key).or_err(ErrorType::StorageError)?; + } + Ok(()) + }) } - // piece_id returns the piece id. + /// piece_id returns the piece id. pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) } // cf_handle returns the column family handle. - fn cf_handle(&self, cf_name: &str) -> Result<&ColumnFamily> { + fn cf_handle(&self) -> Result<&ColumnFamily> + where + T: ColumnFamilyDescriptor, + { + let cf_name = T::CF_NAME; self.db .cf_handle(cf_name) .ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string()))