diff --git a/Cargo.lock b/Cargo.lock index 91b506d5..52ffe03a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -903,7 +903,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "bytes", @@ -962,7 +962,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.34" +version = "0.1.35" dependencies = [ "dragonfly-client-core", "futures", @@ -980,7 +980,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.34" +version = "0.1.35" dependencies = [ "dragonfly-client-core", "home", @@ -999,7 +999,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.34" +version = "0.1.35" dependencies = [ "libloading", "reqwest", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "clap", @@ -1026,7 +1026,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.34" +version = "0.1.35" dependencies = [ "base16ct", "chrono", @@ -1049,7 +1049,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.34" +version = "0.1.35" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1495,7 +1495,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.34" +version = "0.1.35" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 0d9c3027..7f80bdf2 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -26,7 +26,6 @@ use tokio::io::AsyncRead; pub mod content; pub mod metadata; - pub mod storage_engine; // DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL is the default interval for waiting for the piece to be finished. diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index a1b201dd..c6e8eb12 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -58,8 +58,9 @@ pub struct Task { pub finished_at: Option, } +// Task implements the task database object. impl DatabaseObject for Task { - /// NAMESPACE is the namespace of [Task] objects. + // NAMESPACE is the namespace of [Task] objects. const NAMESPACE: &'static str = "task"; } @@ -146,8 +147,9 @@ pub struct Piece { pub finished_at: Option, } +// Piece implements the piece database object. impl DatabaseObject for Piece { - /// NAMESPACE is the namespace of [Piece] objects. + // NAMESPACE is the namespace of [Piece] objects. const NAMESPACE: &'static str = "piece"; } diff --git a/dragonfly-client-storage/src/storage_engine/mod.rs b/dragonfly-client-storage/src/storage_engine/mod.rs index 8c65ecc7..e22c10e9 100644 --- a/dragonfly-client-storage/src/storage_engine/mod.rs +++ b/dragonfly-client-storage/src/storage_engine/mod.rs @@ -1,3 +1,19 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use dragonfly_client_core::{ error::{ErrorType, OrErr}, Result, @@ -41,12 +57,16 @@ impl StorageEngine<'db>> StorageEngineOwned for T {} pub trait Operations { /// get gets the object by key. fn get(&self, key: &[u8]) -> Result>; + /// put puts the object by key. fn put(&self, key: &[u8], value: &O) -> Result<()>; + /// delete deletes the object by key. fn delete(&self, key: &[u8]) -> Result<()>; + /// iter iterates all objects. fn iter(&self) -> Result, O)>>>; + /// prefix_iter iterates all objects with prefix. fn prefix_iter( &self, @@ -58,8 +78,10 @@ pub trait Operations { pub trait Transaction: Operations { /// get_for_update gets the object for update. fn get_for_update(&self, key: &[u8]) -> Result>; + /// commit commits the transaction. fn commit(self) -> Result<()>; + /// rollback rolls back the transaction. fn rollback(&self) -> Result<()>; } diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index 95614c8d..d01bf930 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -1,27 +1,45 @@ -use std::{ops::Deref, path::Path}; +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::storage_engine::{DatabaseObject, Operations, StorageEngine, Transaction}; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error, Result, }; - +use std::{ops::Deref, path::Path}; use tracing::info; -use crate::storage_engine::{DatabaseObject, Operations, StorageEngine, Transaction}; - /// RocksdbStorageEngine is a storage engine based on rocksdb. pub struct RocksdbStorageEngine { + // inner is the inner rocksdb transaction db. inner: rocksdb::TransactionDB, } +// RocksdbStorageEngine implements deref of the storage engine. impl Deref for RocksdbStorageEngine { + // Target is the inner rocksdb transaction db. type Target = rocksdb::TransactionDB; + // deref returns the inner rocksdb transaction db. fn deref(&self) -> &Self::Target { &self.inner } } +// RocksdbStorageEngine implements the storage engine of the rocksdb. impl RocksdbStorageEngine { /// DEFAULT_DIR_NAME is the default directory name to store metadata. const DEFAULT_DIR_NAME: &'static str = "metadata"; @@ -75,7 +93,9 @@ impl RocksdbStorageEngine { } } +// RocksdbStorageEngine implements the storage engine operations. impl Operations for RocksdbStorageEngine { + // get gets the object by key. fn get(&self, key: &[u8]) -> Result> { let cf = cf_handle::(self)?; let value = self.get_cf(cf, key).or_err(ErrorType::StorageError)?; @@ -85,6 +105,7 @@ impl Operations for RocksdbStorageEngine { } } + // put puts the object by key. fn put(&self, key: &[u8], value: &O) -> Result<()> { let cf = cf_handle::(self)?; let serialized = value.serialized()?; @@ -93,12 +114,14 @@ impl Operations for RocksdbStorageEngine { Ok(()) } + // delete deletes the object by key. fn delete(&self, key: &[u8]) -> Result<()> { let cf = cf_handle::(self)?; self.delete_cf(cf, key).or_err(ErrorType::StorageError)?; Ok(()) } + // iter iterates all objects. fn iter(&self) -> Result, O)>>> { let cf = cf_handle::(self)?; let iter = self.iterator_cf(cf, rocksdb::IteratorMode::Start); @@ -108,6 +131,7 @@ impl Operations for RocksdbStorageEngine { })) } + // prefix_iter iterates all objects with prefix. fn prefix_iter( &self, prefix: &[u8], @@ -121,9 +145,12 @@ impl Operations for RocksdbStorageEngine { } } +// RocksdbStorageEngine implements the transaction of the storage engine. impl<'db> StorageEngine<'db> for RocksdbStorageEngine { + // Txn is the transaction type. type Txn = RocksdbTransaction<'db>; + // start_transaction starts a transaction. fn start_transaction(&'db self) -> RocksdbTransaction<'db> { let txn = self.transaction(); RocksdbTransaction { txn, db: self } @@ -132,11 +159,16 @@ impl<'db> StorageEngine<'db> for RocksdbStorageEngine { /// RocksdbTransaction wraps a rocksdb transaction. pub struct RocksdbTransaction<'db> { + // txn is the inner rocksdb transaction. txn: rocksdb::Transaction<'db, rocksdb::TransactionDB>, + + // db is the rocksdb storage engine. db: &'db rocksdb::TransactionDB, } +// RocksdbTransaction implements the transaction operations. impl Operations for RocksdbTransaction<'_> { + // get gets the object by key. fn get(&self, key: &[u8]) -> Result> { let cf = cf_handle::(self.db)?; let value = self.txn.get_cf(cf, key).or_err(ErrorType::StorageError)?; @@ -146,6 +178,7 @@ impl Operations for RocksdbTransaction<'_> { } } + // put puts the object by key. fn put(&self, key: &[u8], value: &O) -> Result<()> { let cf = cf_handle::(self.db)?; let serialized = value.serialized()?; @@ -155,6 +188,7 @@ impl Operations for RocksdbTransaction<'_> { Ok(()) } + // delete deletes the object by key. fn delete(&self, key: &[u8]) -> Result<()> { let cf = cf_handle::(self.db)?; self.txn @@ -163,6 +197,7 @@ impl Operations for RocksdbTransaction<'_> { Ok(()) } + // iter iterates all objects. fn iter(&self) -> Result, O)>>> { let cf = cf_handle::(self.db)?; let iter = self.txn.iterator_cf(cf, rocksdb::IteratorMode::Start); @@ -172,6 +207,7 @@ impl Operations for RocksdbTransaction<'_> { })) } + // prefix_iter iterates all objects with prefix. fn prefix_iter( &self, prefix: &[u8], @@ -185,7 +221,9 @@ impl Operations for RocksdbTransaction<'_> { } } +// RocksdbTransaction implements the transaction operations. impl Transaction for RocksdbTransaction<'_> { + // get_for_update gets the object for update. fn get_for_update(&self, key: &[u8]) -> Result> { let cf = cf_handle::(self.db)?; let value = self @@ -198,11 +236,13 @@ impl Transaction for RocksdbTransaction<'_> { } } + // commit commits the transaction. fn commit(self) -> Result<()> { self.txn.commit().or_err(ErrorType::StorageError)?; Ok(()) } + // rollback rolls back the transaction. fn rollback(&self) -> Result<()> { self.txn.rollback().or_err(ErrorType::StorageError)?; Ok(())