fix: txn memory leak in storage (#466)

feat: txn memory leak in storage

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-05-14 21:37:47 +08:00 committed by GitHub
parent 9732a92283
commit 17955f3702
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 175 additions and 221 deletions

16
Cargo.lock generated
View File

@ -959,7 +959,7 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"bytes",
@ -1022,7 +1022,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-backend"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"dragonfly-client-core",
"futures",
@ -1040,7 +1040,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"dragonfly-client-core",
"home",
@ -1059,7 +1059,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"libloading",
"reqwest",
@ -1070,7 +1070,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"clap",
@ -1086,7 +1086,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"base16ct",
"blake3",
@ -1110,7 +1110,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1568,7 +1568,7 @@ dependencies = [
[[package]]
name = "hdfs"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "0.1.53"
version = "0.1.54"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.53" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.53" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.53" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.53" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.53" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.53" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.53" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.54" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.54" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.54" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.54" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.54" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.54" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.54" }
thiserror = "1.0"
dragonfly-api = "2.0.112"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }

View File

@ -187,7 +187,7 @@ impl Storage {
}
// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<metadata::Piece> {
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.metadata.download_piece_failed(task_id, number)
}

View File

@ -25,8 +25,7 @@ use std::time::Duration;
use tracing::error;
use crate::storage_engine::{
rocksdb::RocksdbStorageEngine, DatabaseObject, Operations, StorageEngine, StorageEngineOwned,
Transaction,
rocksdb::RocksdbStorageEngine, DatabaseObject, Operations, StorageEngineOwned, Transaction,
};
// Task is the metadata of the task.
@ -209,21 +208,6 @@ where
}
impl<E: StorageEngineOwned> Metadata<E> {
/// with_txn executes the enclosed closure within a transaction.
fn with_txn<T>(&self, f: impl FnOnce(&<E as StorageEngine>::Txn) -> Result<T>) -> Result<T> {
let txn = self.db.start_transaction();
match f(&txn) {
Ok(result) => {
txn.commit()?;
Ok(result)
}
Err(err) => {
txn.rollback()?;
Err(err)
}
}
}
/// download_task_started updates the metadata of the task when the task downloads started.
pub fn download_task_started(
&self,
@ -233,111 +217,109 @@ impl<E: StorageEngineOwned> Metadata<E> {
response_header: Option<HeaderMap>,
) -> Result<Task> {
// Convert the response header to hashmap.
let get_response_header = || {
response_header
.as_ref()
.map(reqwest_headermap_to_hashmap)
.unwrap_or_default()
let response_header = response_header
.as_ref()
.map(reqwest_headermap_to_hashmap)
.unwrap_or_default();
let txn = self.db.start_transaction();
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
// If the task exists, update the task metadata.
task.updated_at = Utc::now().naive_utc();
task.peer_ids.insert(peer_id.to_string());
// If the task has the response header, the response header
// will not be covered.
if task.response_header.is_empty() {
task.response_header = response_header;
}
task
}
None => Task {
id: id.to_string(),
peer_ids: vec![peer_id.to_string()].into_iter().collect(),
piece_length,
response_header,
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
},
};
self.with_txn(|txn| {
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
// If the task exists, update the task metadata.
task.updated_at = Utc::now().naive_utc();
task.peer_ids.insert(peer_id.to_string());
// If the task has the response header, the response header
// will not be covered.
if task.response_header.is_empty() {
task.response_header = get_response_header();
}
task
}
None => Task {
id: id.to_string(),
peer_ids: vec![peer_id.to_string()].into_iter().collect(),
piece_length,
response_header: get_response_header(),
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
..Default::default()
},
};
txn.put(id.as_bytes(), &task)?;
Ok(task)
})
txn.put(id.as_bytes(), &task)?;
txn.commit()?;
Ok(task)
}
/// download_task_finished updates the metadata of the task when the task downloads finished.
pub fn download_task_finished(&self, id: &str) -> Result<Task> {
self.with_txn(|txn| {
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.updated_at = Utc::now().naive_utc();
task.finished_at = Some(Utc::now().naive_utc());
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
let txn = self.db.start_transaction();
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.updated_at = Utc::now().naive_utc();
task.finished_at = Some(Utc::now().naive_utc());
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
txn.put(id.as_bytes(), &task)?;
Ok(task)
})
txn.put(id.as_bytes(), &task)?;
txn.commit()?;
Ok(task)
}
/// upload_task_started updates the metadata of the task when task uploads started.
pub fn upload_task_started(&self, id: &str) -> Result<Task> {
self.with_txn(|txn| {
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
let txn = self.db.start_transaction();
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
txn.put(id.as_bytes(), &task)?;
Ok(task)
})
txn.put(id.as_bytes(), &task)?;
txn.commit()?;
Ok(task)
}
/// upload_task_finished updates the metadata of the task when task uploads finished.
pub fn upload_task_finished(&self, id: &str) -> Result<Task> {
self.with_txn(|txn| {
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.uploaded_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
let txn = self.db.start_transaction();
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.uploaded_count += 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
txn.put(id.as_bytes(), &task)?;
Ok(task)
})
txn.put(id.as_bytes(), &task)?;
txn.commit()?;
Ok(task)
}
/// upload_task_failed updates the metadata of the task when the task uploads failed.
pub fn upload_task_failed(&self, id: &str) -> Result<Task> {
self.with_txn(|txn| {
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
let txn = self.db.start_transaction();
let task = match txn.get_for_update::<Task>(id.as_bytes())? {
Some(mut task) => {
task.uploading_count -= 1;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};
txn.put(id.as_bytes(), &task)?;
Ok(task)
})
txn.put(id.as_bytes(), &task)?;
txn.commit()?;
Ok(task)
}
/// get_task gets the task metadata.
@ -347,22 +329,18 @@ impl<E: StorageEngineOwned> Metadata<E> {
/// get_tasks gets the task metadatas.
pub fn get_tasks(&self) -> Result<Vec<Task>> {
self.with_txn(|txn| txn.iter()?.map(|ele| ele.map(|(_, task)| task)).collect())
let txn = self.db.start_transaction();
let iter = txn.iter::<Task>()?;
iter.map(|ele| ele.map(|(_, task)| task)).collect()
}
/// delete_task deletes the task metadata.
pub fn delete_task(&self, task_id: &str) -> Result<()> {
self.with_txn(|txn| {
txn.delete::<Task>(task_id.as_bytes())?;
Ok(())
})
self.db.delete::<Task>(task_id.as_bytes())
}
/// download_piece_started updates the metadata of the piece when the piece downloads started.
pub fn download_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
// Construct the piece metadata.
let piece = Piece {
number,
@ -371,11 +349,10 @@ impl<E: StorageEngineOwned> Metadata<E> {
..Default::default()
};
self.with_txn(|txn| {
// Put the piece metadata.
txn.put(id.as_bytes(), &piece)?;
Ok(piece)
})
// Put the piece metadata.
self.db
.put(self.piece_id(task_id, number).as_bytes(), &piece)?;
Ok(piece)
}
/// download_piece_finished updates the metadata of the piece when the piece downloads finished.
@ -390,135 +367,112 @@ impl<E: StorageEngineOwned> Metadata<E> {
) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let txn = self.db.start_transaction();
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.offset = offset;
piece.length = length;
piece.digest = digest.to_string();
piece.parent_id = parent_id;
piece.updated_at = Utc::now().naive_utc();
piece.finished_at = Some(Utc::now().naive_utc());
piece
}
None => return Err(Error::PieceNotFound(id)),
};
self.with_txn(|txn| {
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.offset = offset;
piece.length = length;
piece.digest = digest.to_string();
piece.parent_id = parent_id;
piece.updated_at = Utc::now().naive_utc();
piece.finished_at = Some(Utc::now().naive_utc());
piece
}
None => return Err(Error::PieceNotFound(id)),
};
txn.put(id.as_bytes(), &piece)?;
Ok(piece)
})
txn.put(id.as_bytes(), &piece)?;
txn.commit()?;
Ok(piece)
}
/// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
self.with_txn(|txn| {
let piece = match txn.get_for_update(id.as_bytes())? {
// If the piece exists, delete the piece metadata.
Some(piece) => {
txn.delete::<Piece>(id.as_bytes())?;
piece
}
// If the piece does not exist, return error.
None => return Err(Error::PieceNotFound(id)),
};
Ok(piece)
})
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
}
/// upload_piece_started updates the metadata of the piece when piece uploads started.
pub fn upload_piece_started(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let txn = self.db.start_transaction();
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
self.with_txn(|txn| {
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
txn.put(id.as_bytes(), &piece)?;
Ok(piece)
})
txn.put(id.as_bytes(), &piece)?;
txn.commit()?;
Ok(piece)
}
/// upload_piece_finished updates the metadata of the piece when piece uploads finished.
pub fn upload_piece_finished(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let txn = self.db.start_transaction();
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.uploaded_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
self.with_txn(|txn| {
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.uploaded_count += 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
txn.put(id.as_bytes(), &piece)?;
Ok(piece)
})
txn.put(id.as_bytes(), &piece)?;
txn.commit()?;
Ok(piece)
}
/// upload_piece_failed updates the metadata of the piece when the piece uploads failed.
pub fn upload_piece_failed(&self, task_id: &str, number: u32) -> Result<Piece> {
// Get the piece id.
let id = self.piece_id(task_id, number);
let txn = self.db.start_transaction();
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
self.with_txn(|txn| {
let piece = match txn.get_for_update::<Piece>(id.as_bytes())? {
Some(mut piece) => {
piece.uploading_count -= 1;
piece.updated_at = Utc::now().naive_utc();
piece
}
None => return Err(Error::PieceNotFound(id)),
};
txn.put(id.as_bytes(), &piece)?;
Ok(piece)
})
txn.put(id.as_bytes(), &piece)?;
txn.commit()?;
Ok(piece)
}
/// get_piece gets the piece metadata.
pub fn get_piece(&self, task_id: &str, number: u32) -> Result<Option<Piece>> {
let id = self.piece_id(task_id, number);
self.db.get(id.as_bytes())
self.db.get(self.piece_id(task_id, number).as_bytes())
}
/// get_pieces gets the piece metadatas.
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<Piece>> {
self.with_txn(|txn| {
// Iterate the piece metadatas.
txn.prefix_iter(task_id.as_bytes())?
.map(|ele| ele.map(|(_, piece)| piece))
.collect()
})
let txn = self.db.start_transaction();
let iter = txn.prefix_iter::<Piece>(task_id.as_bytes())?;
iter.map(|ele| ele.map(|(_, piece)| piece)).collect()
}
/// delete_pieces deletes the piece metadatas.
pub fn delete_pieces(&self, task_id: &str) -> Result<()> {
self.with_txn(|txn| {
let iter = txn.prefix_iter::<Piece>(task_id.as_bytes())?;
let txn = self.db.start_transaction();
let iter = txn.prefix_iter::<Piece>(task_id.as_bytes())?;
for ele in iter {
let (key, _) = ele?;
txn.delete::<Piece>(&key)?;
}
// Iterate the piece metadatas.
for ele in iter {
let (key, _) = ele?;
txn.delete::<Piece>(&key)?;
}
Ok(())
})
txn.commit()?;
Ok(())
}
/// piece_id returns the piece id.