Refactor: wrap rocksdb transaction operations in closures (#393)

* refactor: wrap rocksdb txn operations in closures

Signed-off-by: imtsuki <me@qjx.app>

* use generics for CF_NAME

Signed-off-by: imtsuki <me@qjx.app>

* address review comments

Change-Id: I86eccaf3cd9cc2b1128ec89c4a2cf041947a87ea
Signed-off-by: imtsuki <jianxin.qiu@outlook.com>

---------

Signed-off-by: imtsuki <me@qjx.app>
Signed-off-by: imtsuki <jianxin.qiu@outlook.com>
This commit is contained in:
imtsuki 2024-04-17 15:20:11 +08:00 committed by GitHub
parent a62f5b3960
commit c60dd7aa5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 233 additions and 340 deletions

View File

@ -22,10 +22,10 @@ use dragonfly_client_core::{
use dragonfly_client_util::http::reqwest_headermap_to_hashmap; use dragonfly_client_util::http::reqwest_headermap_to_hashmap;
use reqwest::header::{self, HeaderMap}; use reqwest::header::{self, HeaderMap};
use rocksdb::{ use rocksdb::{
BlockBasedOptions, Cache, ColumnFamily, IteratorMode, Options, TransactionDB, BlockBasedOptions, Cache, ColumnFamily, IteratorMode, Options, Transaction, TransactionDB,
TransactionDBOptions, TransactionDBOptions,
}; };
use serde::{Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::time::Duration; use std::time::Duration;
@ -46,12 +46,18 @@ const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;
// DEFAULT_CACHE_SIZE is the default cache size for rocksdb. // DEFAULT_CACHE_SIZE is the default cache size for rocksdb.
const DEFAULT_CACHE_SIZE: usize = 16 * 1024 * 1024; const DEFAULT_CACHE_SIZE: usize = 16 * 1024 * 1024;
// TASK_CF_NAME is the column family name of task. /// TASK_CF_NAME is the column family name of [Task].
const TASK_CF_NAME: &str = "task"; const TASK_CF_NAME: &str = "task";
// PIECE_CF_NAME is the column family name of piece. /// PIECE_CF_NAME is the column family name of [Piece].
const PIECE_CF_NAME: &str = "piece"; const PIECE_CF_NAME: &str = "piece";
/// ColumnFamilyDescriptor marks a type can be stored in rocksdb, which has a cf name.
trait ColumnFamilyDescriptor: Default + Serialize + DeserializeOwned {
/// CF_NAME returns the column family name.
const CF_NAME: &'static str;
}
// Task is the metadata of the task. // Task is the metadata of the task.
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Task { pub struct Task {
@ -81,6 +87,10 @@ pub struct Task {
pub finished_at: Option<NaiveDateTime>, pub finished_at: Option<NaiveDateTime>,
} }
impl ColumnFamilyDescriptor for Task {
const CF_NAME: &'static str = TASK_CF_NAME;
}
// Task implements the task metadata. // Task implements the task metadata.
impl Task { impl Task {
// is_started returns whether the task downloads started. // is_started returns whether the task downloads started.
@ -164,6 +174,10 @@ pub struct Piece {
pub finished_at: Option<NaiveDateTime>, pub finished_at: Option<NaiveDateTime>,
} }
impl ColumnFamilyDescriptor for Piece {
const CF_NAME: &'static str = PIECE_CF_NAME;
}
// Piece implements the piece metadata. // Piece implements the piece metadata.
impl Piece { impl Piece {
// is_started returns whether the piece downloads started. // is_started returns whether the piece downloads started.
@ -208,15 +222,14 @@ impl Piece {
} }
} }
// Metadata is the metadata of the task. /// Metadata manages the metadata of [Task] and [Piece].
pub struct Metadata { pub struct Metadata {
// db is the rocksdb instance. /// db is the underlying rocksdb instance.
db: TransactionDB, db: TransactionDB,
} }
// Metadata implements the metadata storage.
impl Metadata { impl Metadata {
// new returns a new metadata. /// new creates a new metadata instance.
pub fn new(dir: &Path) -> Result<Metadata> { pub fn new(dir: &Path) -> Result<Metadata> {
// Initialize rocksdb options. // Initialize rocksdb options.
let mut options = Options::default(); let mut options = Options::default();
@ -240,7 +253,7 @@ impl Metadata {
// Open rocksdb. // Open rocksdb.
let dir = dir.join(DEFAULT_DIR_NAME); let dir = dir.join(DEFAULT_DIR_NAME);
let cf_names = [TASK_CF_NAME, PIECE_CF_NAME]; let cf_names = [Task::CF_NAME, Piece::CF_NAME];
let db = TransactionDB::open_cf(&options, &TransactionDBOptions::default(), &dir, cf_names) let db = TransactionDB::open_cf(&options, &TransactionDBOptions::default(), &dir, cf_names)
.or_err(ErrorType::StorageError)?; .or_err(ErrorType::StorageError)?;
info!("metadata initialized directory: {:?}", dir); info!("metadata initialized directory: {:?}", dir);
@ -248,189 +261,143 @@ impl Metadata {
Ok(Metadata { db }) Ok(Metadata { db })
} }
// download_task_started updates the metadata of the task when the task downloads started. /// with_txn executes the enclosed closure within a transaction.
fn with_txn<T>(&self, f: impl FnOnce(&Transaction<TransactionDB>) -> Result<T>) -> Result<T> {
let txn = self.db.transaction();
let result = f(&txn)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(result)
}
/// transactional_update_or_else gets the object from the database and updates
/// it within a transaction.
/// If the object does not exist, execute the `or_else` closure.
/// `or_else` can either return an new object, which means that a new object
/// will be created; or return an error to abort the transaction.
fn transactional_update_or_else<T>(
&self,
key: &str,
update: impl FnOnce(T) -> Result<T>,
or_else: impl FnOnce() -> Result<T>,
) -> Result<T>
where
T: ColumnFamilyDescriptor,
{
self.with_txn(|txn| {
let handle = self.cf_handle::<T>()?;
let object = match txn
.get_for_update_cf(handle, key, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?,
None => or_else()?,
};
let object = update(object)?;
let json = serde_json::to_string(&object).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, key.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
Ok(object)
})
}
/// download_task_started updates the metadata of the task when the task downloads started.
pub fn download_task_started( pub fn download_task_started(
&self, &self,
id: &str, id: &str,
piece_length: u64, piece_length: u64,
response_header: Option<HeaderMap>, response_header: Option<HeaderMap>,
) -> Result<Task> { ) -> Result<Task> {
// Get the column family handle of task.
let handle = self.cf_handle(TASK_CF_NAME)?;
// Convert the response header to hashmap. // Convert the response header to hashmap.
let response_header = match response_header { let get_response_header = || {
Some(response_header) => reqwest_headermap_to_hashmap(&response_header), response_header
None => HashMap::new(), .as_ref()
.map(reqwest_headermap_to_hashmap)
.unwrap_or_default()
}; };
// Transaction is used to update the task metadata. self.transactional_update_or_else(
let txn = self.db.transaction(); id,
let task = match txn |mut task: Task| {
.get_for_update_cf(handle, id, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the task exists, update the task metadata. // If the task exists, update the task metadata.
let mut task: Task =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
task.updated_at = Utc::now().naive_utc(); task.updated_at = Utc::now().naive_utc();
// If the task has the response header, the response header // If the task has the response header, the response header
// will not be covered. // will not be covered.
if task.response_header.is_empty() { if task.response_header.is_empty() {
task.response_header = response_header; task.response_header = get_response_header();
} }
task
}
// If the task does not exist, create a new task metadata.
None => Task {
id: id.to_string(),
piece_length,
response_header,
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
},
};
// Put the task metadata. Ok(task)
let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?; },
txn.put_cf(handle, id.as_bytes(), json.as_bytes()) || {
.or_err(ErrorType::StorageError)?; Ok(Task {
txn.commit().or_err(ErrorType::StorageError)?; id: id.to_string(),
Ok(task) piece_length,
response_header: get_response_header(),
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
})
},
)
} }
// download_task_finished updates the metadata of the task when the task downloads finished. /// download_task_finished updates the metadata of the task when the task downloads finished.
pub fn download_task_finished(&self, id: &str) -> Result<Task> { pub fn download_task_finished(&self, id: &str) -> Result<Task> {
// Get the column family handle of task. self.transactional_update_or_else(
let handle = self.cf_handle(TASK_CF_NAME)?; id,
|mut task: Task| {
// Transaction is used to update the task metadata.
let txn = self.db.transaction();
let task = match txn
.get_for_update_cf(handle, id, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the task exists, update the task metadata.
let mut task: Task =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
task.updated_at = Utc::now().naive_utc(); task.updated_at = Utc::now().naive_utc();
task.finished_at = Some(Utc::now().naive_utc()); task.finished_at = Some(Utc::now().naive_utc());
task Ok(task)
} },
// If the task does not exist, return error. || Err(Error::TaskNotFound(id.to_string())),
None => return Err(Error::TaskNotFound(id.to_string())), )
};
// Put the task metadata.
let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(task)
} }
// upload_task_started updates the metadata of the task when task uploads started. /// upload_task_started updates the metadata of the task when task uploads started.
pub fn upload_task_started(&self, id: &str) -> Result<Task> { pub fn upload_task_started(&self, id: &str) -> Result<Task> {
// Get the column family handle of task. self.transactional_update_or_else(
let handle = self.cf_handle(TASK_CF_NAME)?; id,
|mut task: Task| {
// Transaction is used to update the task metadata.
let txn = self.db.transaction();
let task = match txn
.get_for_update_cf(handle, id, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the task exists, update the task metadata.
let mut task: Task =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
task.uploading_count += 1; task.uploading_count += 1;
task.updated_at = Utc::now().naive_utc(); task.updated_at = Utc::now().naive_utc();
task Ok(task)
} },
// If the task does not exist, return error. || Err(Error::TaskNotFound(id.to_string())),
None => return Err(Error::TaskNotFound(id.to_string())), )
};
// Put the task metadata.
let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(task)
} }
// upload_task_finished updates the metadata of the task when task uploads finished. /// upload_task_finished updates the metadata of the task when task uploads finished.
pub fn upload_task_finished(&self, id: &str) -> Result<Task> { pub fn upload_task_finished(&self, id: &str) -> Result<Task> {
// Get the column family handle of task. self.transactional_update_or_else(
let handle = self.cf_handle(TASK_CF_NAME)?; id,
|mut task: Task| {
// Transaction is used to update the task metadata.
let txn = self.db.transaction();
let task = match txn
.get_for_update_cf(handle, id, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the task exists, update the task metadata.
let mut task: Task =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
task.uploading_count -= 1; task.uploading_count -= 1;
task.uploaded_count += 1; task.uploaded_count += 1;
task.updated_at = Utc::now().naive_utc(); task.updated_at = Utc::now().naive_utc();
task Ok(task)
} },
// If the task does not exist, return error. || Err(Error::TaskNotFound(id.to_string())),
None => return Err(Error::TaskNotFound(id.to_string())), )
};
// Put the task metadata.
let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(task)
} }
// upload_task_failed updates the metadata of the task when the task uploads failed. /// upload_task_failed updates the metadata of the task when the task uploads failed.
pub fn upload_task_failed(&self, id: &str) -> Result<Task> { pub fn upload_task_failed(&self, id: &str) -> Result<Task> {
// Get the column family handle of task. self.transactional_update_or_else(
let handle = self.cf_handle(TASK_CF_NAME)?; id,
|mut task: Task| {
// Transaction is used to update the task metadata.
let txn = self.db.transaction();
let task = match txn
.get_for_update_cf(handle, id, true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the task exists, update the task metadata.
let mut task: Task =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
task.uploading_count -= 1; task.uploading_count -= 1;
task.updated_at = Utc::now().naive_utc(); task.updated_at = Utc::now().naive_utc();
task Ok(task)
} },
// If the task does not exist, return error. || Err(Error::TaskNotFound(id.to_string())),
None => return Err(Error::TaskNotFound(id.to_string())), )
};
// Put the task metadata.
let json = serde_json::to_string(&task).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(task)
} }
// get_task gets the task metadata. /// get_task gets the task metadata.
pub fn get_task(&self, id: &str) -> Result<Option<Task>> { pub fn get_task(&self, id: &str) -> Result<Option<Task>> {
// Get the column family handle of task. // Get the column family handle of task.
let handle = self.cf_handle(TASK_CF_NAME)?; let handle = self.cf_handle::<Task>()?;
match self.db.get_cf(handle, id).or_err(ErrorType::StorageError)? { match self.db.get_cf(handle, id).or_err(ErrorType::StorageError)? {
Some(bytes) => Ok(Some( Some(bytes) => Ok(Some(
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?, serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?,
@ -439,43 +406,43 @@ impl Metadata {
} }
} }
// get_tasks gets the task metadatas. /// get_tasks gets the task metadatas.
pub fn get_tasks(&self) -> Result<Vec<Task>> { pub fn get_tasks(&self) -> Result<Vec<Task>> {
// Get the column family handle of task. // Get the column family handle of task.
let handle = self.cf_handle(TASK_CF_NAME)?; let handle = self.cf_handle::<Task>()?;
// Transaction is used to get the task metadatas. self.with_txn(|txn| {
let txn = self.db.transaction(); let iter = txn.iterator_cf(handle, IteratorMode::Start);
let iter = txn.iterator_cf(handle, IteratorMode::Start);
// Iterate the task metadatas. // Iterate the task metadatas.
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for ele in iter { for ele in iter {
let (_, value) = ele.or_err(ErrorType::StorageError)?; let (_, value) = ele.or_err(ErrorType::StorageError)?;
let task: Task = serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; let task: Task =
tasks.push(task); serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?;
} tasks.push(task);
}
Ok(tasks) Ok(tasks)
})
} }
// delete_task deletes the task metadata. /// delete_task deletes the task metadata.
pub fn delete_task(&self, task_id: &str) -> Result<()> { pub fn delete_task(&self, task_id: &str) -> Result<()> {
// Get the column family handle of task. // Get the column family handle of task.
let handle = self.cf_handle(TASK_CF_NAME)?; let handle = self.cf_handle::<Task>()?;
// Transaction is used to delete the task metadata. self.with_txn(|txn| {
let txn = self.db.transaction(); txn.delete_cf(handle, task_id)
txn.delete_cf(handle, task_id) .or_err(ErrorType::SerializeError)?;
.or_err(ErrorType::SerializeError)?; Ok(())
txn.commit().or_err(ErrorType::SerializeError)?; })
Ok(())
} }
// download_piece_started updates the metadata of the piece when the piece downloads started. /// download_piece_started updates the metadata of the piece when the piece downloads started.
pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> { pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the column family handle of piece. // Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle::<Piece>()?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
@ -488,18 +455,16 @@ impl Metadata {
..Default::default() ..Default::default()
}; };
// Transaction is used to update the piece metadata. self.with_txn(|txn| {
let txn = self.db.transaction(); // Put the piece metadata.
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?;
// Put the piece metadata. txn.put_cf(handle, id.as_bytes(), json.as_bytes())
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?; .or_err(ErrorType::StorageError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes()) Ok(piece)
.or_err(ErrorType::StorageError)?; })
txn.commit().or_err(ErrorType::StorageError)?;
Ok(piece)
} }
// download_piece_finished updates the metadata of the piece when the piece downloads finished. /// download_piece_finished updates the metadata of the piece when the piece downloads finished.
pub fn download_piece_finished( pub fn download_piece_finished(
&self, &self,
task_id: &str, task_id: &str,
@ -509,180 +474,106 @@ impl Metadata {
digest: &str, digest: &str,
parent_id: Option<String>, parent_id: Option<String>,
) -> Result<Piece> { ) -> Result<Piece> {
// Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
// Transaction is used to update the piece metadata. self.transactional_update_or_else(
let txn = self.db.transaction(); id.as_str(),
let piece = match txn |mut piece: Piece| {
.get_for_update_cf(handle, id.as_bytes(), true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the piece exists, update the piece metadata.
let mut piece: Piece =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
piece.offset = offset; piece.offset = offset;
piece.length = length; piece.length = length;
piece.digest = digest.to_string(); piece.digest = digest.to_string();
piece.parent_id = parent_id; piece.parent_id = parent_id;
piece.updated_at = Utc::now().naive_utc(); piece.updated_at = Utc::now().naive_utc();
piece.finished_at = Some(Utc::now().naive_utc()); piece.finished_at = Some(Utc::now().naive_utc());
piece Ok(piece)
} },
// If the piece does not exist, return error. || Err(Error::PieceNotFound(id.to_string())),
None => return Err(Error::PieceNotFound(id)), )
};
// Put the piece metadata.
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(piece)
} }
// download_piece_failed updates the metadata of the piece when the piece downloads failed. /// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> { pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the column family handle of piece. // Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle::<Piece>()?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
// Transaction is used to update the piece metadata. self.with_txn(|txn| {
let txn = self.db.transaction(); let piece = match txn
let piece = match txn .get_for_update_cf(handle, id.as_bytes(), true)
.get_for_update_cf(handle, id.as_bytes(), true) .or_err(ErrorType::StorageError)?
.or_err(ErrorType::StorageError)? {
{ // If the piece exists, delete the piece metadata.
// If the piece exists, delete the piece metadata. Some(bytes) => {
Some(bytes) => { txn.delete_cf(handle, id.as_bytes())
txn.delete_cf(handle, id.as_bytes()) .or_err(ErrorType::StorageError)?;
.or_err(ErrorType::StorageError)?; let piece: Piece =
let piece: Piece = serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?; piece
piece }
} // If the piece does not exist, return error.
// If the piece does not exist, return error. None => return Err(Error::PieceNotFound(id)),
None => return Err(Error::PieceNotFound(id)), };
};
// Commit the transaction. Ok(piece)
txn.commit().or_err(ErrorType::StorageError)?; })
Ok(piece)
} }
// upload_piece_started updates the metadata of the piece when piece uploads started. /// upload_piece_started updates the metadata of the piece when piece uploads started.
pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> { pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
// Transaction is used to update the piece metadata. self.transactional_update_or_else(
let txn = self.db.transaction(); id.as_str(),
let piece = match txn |mut piece: Piece| {
.get_for_update_cf(handle, id.as_bytes(), true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the piece exists, update the piece metadata.
let mut piece: Piece =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
piece.uploading_count += 1; piece.uploading_count += 1;
piece.updated_at = Utc::now().naive_utc(); piece.updated_at = Utc::now().naive_utc();
piece Ok(piece)
} },
// If the piece does not exist, return error. || Err(Error::PieceNotFound(id.to_string())),
None => return Err(Error::PieceNotFound(id)), )
};
// Put the piece metadata.
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(piece)
} }
// upload_piece_finished updates the metadata of the piece when piece uploads finished. /// upload_piece_finished updates the metadata of the piece when piece uploads finished.
pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<Piece> { pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
// Transaction is used to update the piece metadata. self.transactional_update_or_else(
let txn = self.db.transaction(); id.as_str(),
let piece = match txn |mut piece: Piece| {
.get_for_update_cf(handle, id.as_bytes(), true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the piece exists, update the piece metadata.
let mut piece: Piece =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
piece.uploading_count -= 1; piece.uploading_count -= 1;
piece.uploaded_count += 1; piece.uploaded_count += 1;
piece.updated_at = Utc::now().naive_utc(); piece.updated_at = Utc::now().naive_utc();
piece Ok(piece)
} },
// If the piece does not exist, return error. || Err(Error::PieceNotFound(id.to_string())),
None => return Err(Error::PieceNotFound(id)), )
};
// Put the piece metadata.
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(piece)
} }
// upload_piece_failed updates the metadata of the piece when the piece uploads failed. /// upload_piece_failed updates the metadata of the piece when the piece uploads failed.
pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> { pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?;
// Get the piece id. // Get the piece id.
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
// Transaction is used to update the piece metadata. self.transactional_update_or_else(
let txn = self.db.transaction(); id.as_str(),
let piece = match txn |mut piece: Piece| {
.get_for_update_cf(handle, id.as_bytes(), true)
.or_err(ErrorType::StorageError)?
{
Some(bytes) => {
// If the piece exists, update the piece metadata.
let mut piece: Piece =
serde_json::from_slice(&bytes).or_err(ErrorType::SerializeError)?;
piece.uploading_count -= 1; piece.uploading_count -= 1;
piece.updated_at = Utc::now().naive_utc(); piece.updated_at = Utc::now().naive_utc();
piece Ok(piece)
} },
// If the piece does not exist, return error. || Err(Error::PieceNotFound(id.to_string())),
None => return Err(Error::PieceNotFound(id)), )
};
// Put the piece metadata.
let json = serde_json::to_string(&piece).or_err(ErrorType::SerializeError)?;
txn.put_cf(handle, id.as_bytes(), json.as_bytes())
.or_err(ErrorType::StorageError)?;
txn.commit().or_err(ErrorType::StorageError)?;
Ok(piece)
} }
// get_piece gets the piece metadata. /// get_piece gets the piece metadata.
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> { pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> {
let id = self.piece_id(task_id, number); let id = self.piece_id(task_id, number);
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle::<Piece>()?;
match self match self
.db .db
.get_cf(handle, id.as_bytes()) .get_cf(handle, id.as_bytes())
@ -695,53 +586,55 @@ impl Metadata {
} }
} }
// get_pieces gets the piece metadatas. /// get_pieces gets the piece metadatas.
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<Piece>> { pub fn get_pieces(&self, task_id: &str) -> Result<Vec<Piece>> {
// Get the column family handle of piece. // Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle::<Piece>()?;
// Transaction is used to get the piece metadatas. self.with_txn(|txn| {
let txn = self.db.transaction(); let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes());
let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes());
// Iterate the piece metadatas. // Iterate the piece metadatas.
let mut pieces = Vec::new(); let mut pieces = Vec::new();
for ele in iter { for ele in iter {
let (_, value) = ele.or_err(ErrorType::StorageError)?; let (_, value) = ele.or_err(ErrorType::StorageError)?;
let piece: Piece = serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?; let piece: Piece =
pieces.push(piece); serde_json::from_slice(&value).or_err(ErrorType::SerializeError)?;
} pieces.push(piece);
}
Ok(pieces) Ok(pieces)
})
} }
// delete_pieces deletes the piece metadatas. /// delete_pieces deletes the piece metadatas.
pub fn delete_pieces(&self, task_id: &str) -> Result<()> { pub fn delete_pieces(&self, task_id: &str) -> Result<()> {
// Get the column family handle of piece. // Get the column family handle of piece.
let handle = self.cf_handle(PIECE_CF_NAME)?; let handle = self.cf_handle::<Piece>()?;
// Transaction is used to delete the piece metadatas. self.with_txn(|txn| {
let txn = self.db.transaction(); let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes());
let iter = txn.prefix_iterator_cf(handle, task_id.as_bytes());
// Iterate the piece metadatas. // Iterate the piece metadatas.
for ele in iter { for ele in iter {
let (key, _) = ele.or_err(ErrorType::StorageError)?; let (key, _) = ele.or_err(ErrorType::StorageError)?;
txn.delete_cf(handle, key).or_err(ErrorType::StorageError)?; txn.delete_cf(handle, key).or_err(ErrorType::StorageError)?;
} }
Ok(())
// Commit the transaction. })
txn.commit().or_err(ErrorType::StorageError)?;
Ok(())
} }
// piece_id returns the piece id. /// piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: u32) -> String { pub fn piece_id(&self, task_id: &str, number: u32) -> String {
format!("{}-{}", task_id, number) format!("{}-{}", task_id, number)
} }
// cf_handle returns the column family handle. // cf_handle returns the column family handle.
fn cf_handle(&self, cf_name: &str) -> Result<&ColumnFamily> { fn cf_handle<T>(&self) -> Result<&ColumnFamily>
where
T: ColumnFamilyDescriptor,
{
let cf_name = T::CF_NAME;
self.db self.db
.cf_handle(cf_name) .cf_handle(cf_name)
.ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string())) .ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string()))