diff --git a/dragonfly-client-init/src/bin/main.rs b/dragonfly-client-init/src/bin/main.rs index 4d0557e2..581120e5 100644 --- a/dragonfly-client-init/src/bin/main.rs +++ b/dragonfly-client-init/src/bin/main.rs @@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize tracing. let _guards = init_tracing( dfinit::NAME, - &args.log_dir, + args.log_dir, args.log_level, args.log_max_files, None, diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 46f342a7..3c2d3945 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -20,6 +20,7 @@ use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::digest::{Algorithm, Digest}; use reqwest::header::HeaderMap; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; @@ -47,8 +48,8 @@ pub struct Storage { // Storage implements the storage. impl Storage { // new returns a new storage. - pub async fn new(config: Arc, dir: &Path) -> Result { - let metadata = metadata::Metadata::new(config.clone(), dir)?; + pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { + let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; Ok(Storage { config, @@ -322,8 +323,8 @@ impl Storage { } // Get the piece metadata and return the content of the piece. - match self.metadata.get_piece(task_id, number)? { - Some(piece) => { + match self.metadata.get_piece(task_id, number) { + Ok(Some(piece)) => { match self .content .read_piece(task_id, piece.offset, piece.length, range) @@ -347,7 +348,7 @@ impl Storage { } } } - None => { + Ok(None) => { // Failed uploading the task. self.metadata.upload_task_failed(task_id)?; @@ -355,6 +356,14 @@ impl Storage { self.metadata.upload_piece_failed(task_id, number)?; Err(Error::PieceNotFound(self.piece_id(task_id, number))) } + Err(err) => { + // Failed uploading the task. + self.metadata.upload_task_failed(task_id)?; + + // Failed uploading the piece. + self.metadata.upload_piece_failed(task_id, number)?; + Err(err) + } } } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 678bb07a..9dc986f5 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -22,6 +22,7 @@ use reqwest::header::HeaderMap; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tracing::{error, info}; @@ -785,6 +786,8 @@ impl Metadata { let iter = self.db.prefix_iter::(task_id.as_bytes())?; for ele in iter { let (key, _) = ele?; + + info!("delete piece metadata {}", task_id); self.db.delete::(&key)?; } @@ -800,9 +803,14 @@ impl Metadata { // Metadata implements the metadata of the storage engine. impl Metadata { // new creates a new metadata instance. - pub fn new(config: Arc, dir: &Path) -> Result> { + pub fn new( + config: Arc, + dir: &Path, + log_dir: &PathBuf, + ) -> Result> { let db = RocksdbStorageEngine::open( dir, + log_dir, &[Task::NAMESPACE, Piece::NAMESPACE, CacheTask::NAMESPACE], config.storage.keep, )?; @@ -820,7 +828,8 @@ mod tests { #[test] fn should_create_metadata_db() { let dir = TempDir::new("metadata_db").unwrap(); - let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); + let log_dir = dir.path().join("log"); + let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); assert!(metadata.get_tasks().unwrap().is_empty()); assert!(metadata.get_pieces("task").unwrap().is_empty()); } @@ -828,7 +837,8 @@ mod tests { #[test] fn test_task_lifecycle() { let dir = TempDir::new("metadata_db").unwrap(); - let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); + let log_dir = dir.path().join("log"); + let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); let task_id = "task1"; @@ -906,7 +916,8 @@ mod tests { #[test] fn test_piece_lifecycle() { let dir = TempDir::new("metadata_db").unwrap(); - let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); + let log_dir = dir.path().join("log"); + let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); let task_id = "task3"; // Test download_piece_started. diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index 19ac8ce0..fa3006f8 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -20,7 +20,10 @@ use dragonfly_client_core::{ Error, Result, }; use rocksdb::{ReadOptions, WriteOptions}; -use std::{ops::Deref, path::Path}; +use std::{ + ops::Deref, + path::{Path, PathBuf}, +}; use tracing::{info, warn}; /// RocksdbStorageEngine is a storage engine based on rocksdb. @@ -45,8 +48,8 @@ impl RocksdbStorageEngine { /// DEFAULT_DIR_NAME is the default directory name to store metadata. const DEFAULT_DIR_NAME: &'static str = "metadata"; - /// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 64MB. - const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 64 * 1024 * 1024; + /// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 256MB. + const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 256 * 1024 * 1024; /// DEFAULT_MAX_OPEN_FILES is the default max open files for rocksdb. const DEFAULT_MAX_OPEN_FILES: i32 = 10_000; @@ -57,8 +60,14 @@ impl RocksdbStorageEngine { /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb. const DEFAULT_CACHE_SIZE: usize = 32 * 1024 * 1024; + // DEFAULT_LOG_MAX_SIZE is the default max log size for rocksdb, default is 64MB. + const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024; + + // DEFAULT_LOG_MAX_FILES is the default max log files for rocksdb. + const DEFAULT_LOG_MAX_FILES: usize = 10; + /// open opens a rocksdb storage engine with the given directory and column families. - pub fn open(dir: &Path, cf_names: &[&str], keep: bool) -> Result { + pub fn open(dir: &Path, log_dir: &PathBuf, cf_names: &[&str], keep: bool) -> Result { info!("initializing metadata directory: {:?} {:?}", dir, cf_names); // Initialize rocksdb options. let mut options = rocksdb::Options::default(); @@ -68,6 +77,12 @@ impl RocksdbStorageEngine { options.increase_parallelism(num_cpus::get() as i32); options.set_max_open_files(Self::DEFAULT_MAX_OPEN_FILES); + // Set rocksdb log options. + options.set_db_log_dir(log_dir); + options.set_log_level(rocksdb::LogLevel::Debug); + options.set_max_log_file_size(Self::DEFAULT_LOG_MAX_SIZE); + options.set_keep_log_file_num(Self::DEFAULT_LOG_MAX_FILES); + // Initialize rocksdb block based table options. let mut block_options = rocksdb::BlockBasedOptions::default(); block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE)); diff --git a/dragonfly-client/src/bin/dfcache/main.rs b/dragonfly-client/src/bin/dfcache/main.rs index c83458cc..e8468b71 100644 --- a/dragonfly-client/src/bin/dfcache/main.rs +++ b/dragonfly-client/src/bin/dfcache/main.rs @@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> { // Initialize tracing. let _guards = init_tracing( dfcache::NAME, - &args.log_dir, + args.log_dir, args.log_level, args.log_max_files, None, diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index c59076e5..85e5cbdb 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -108,7 +108,7 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize tracing. let _guards = init_tracing( dfdaemon::NAME, - &args.log_dir, + args.log_dir.clone(), args.log_level, args.log_max_files, config.tracing.addr.to_owned(), @@ -117,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> { ); // Initialize storage. - let storage = Storage::new(config.clone(), config.storage.dir.as_path()) + let storage = Storage::new(config.clone(), config.storage.dir.as_path(), args.log_dir) .await .map_err(|err| { error!("initialize storage failed: {}", err); diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 13e6ea11..3bc16979 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -253,7 +253,7 @@ async fn main() -> anyhow::Result<()> { // Initialize tracing. let _guards = init_tracing( dfget::NAME, - &args.log_dir, + args.log_dir.clone(), args.log_level, args.log_max_files, None, diff --git a/dragonfly-client/src/bin/dfstore/main.rs b/dragonfly-client/src/bin/dfstore/main.rs index 69066a39..e630cac3 100644 --- a/dragonfly-client/src/bin/dfstore/main.rs +++ b/dragonfly-client/src/bin/dfstore/main.rs @@ -110,7 +110,7 @@ fn main() { // Initialize tracing. let _guards = init_tracing( dfstore::NAME, - &args.log_dir, + args.log_dir, args.log_level, args.log_max_files, None, diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index 0ad3c00b..03a8c9b1 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -110,11 +110,6 @@ impl GC { for task in self.storage.get_tasks()? { // If the task is expired and not uploading, evict the task. if task.is_expired(self.config.gc.policy.task_ttl) { - // If the task is uploading, skip it. - if task.is_uploading() { - continue; - } - self.storage.delete_task(&task.id).await; info!("evict task {}", task.id); @@ -166,11 +161,6 @@ impl GC { break; } - // If the task is uploading, skip it. - if task.is_uploading() { - continue; - } - // If the task has no content length, skip it. let task_space = match task.content_length() { Some(content_length) => content_length, @@ -214,11 +204,6 @@ impl GC { for task in self.storage.get_cache_tasks()? { // If the cache task is expired and not uploading, evict the cache task. if task.is_expired() { - // If the cache task is uploading, skip it. - if task.is_uploading() { - continue; - } - self.storage.delete_cache_task(&task.id).await; info!("evict cache task {}", task.id); @@ -270,11 +255,6 @@ impl GC { break; } - // If the cache task is uploading, skip it. - if task.is_uploading() { - continue; - } - // If the cache task is persistent, skip it. if task.is_persistent() { continue; diff --git a/dragonfly-client/src/tracing/mod.rs b/dragonfly-client/src/tracing/mod.rs index 300aa743..c8e24e22 100644 --- a/dragonfly-client/src/tracing/mod.rs +++ b/dragonfly-client/src/tracing/mod.rs @@ -19,7 +19,7 @@ use rolling_file::*; use std::fs; use std::fs::OpenOptions; use std::os::unix::io::AsRawFd; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use tracing::{info, Level}; use tracing_appender::non_blocking::WorkerGuard; use tracing_log::LogTracer; @@ -32,7 +32,7 @@ use tracing_subscriber::{ pub fn init_tracing( name: &str, - log_dir: &PathBuf, + log_dir: PathBuf, log_level: Level, log_max_files: usize, jaeger_addr: Option, @@ -63,13 +63,14 @@ pub fn init_tracing( guards.push(stdout_guard); // Setup file layer. - fs::create_dir_all(log_dir).expect("failed to create log directory"); + fs::create_dir_all(log_dir.clone()).expect("failed to create log directory"); let rolling_appender = BasicRollingFileAppender::new( log_dir.join(name).with_extension("log"), RollingConditionBasic::new().hourly(), log_max_files, ) .expect("failed to create rolling file appender"); + println!("log_dir: {:?}", log_dir); let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender); let file_logging_layer = Layer::new() @@ -128,7 +129,7 @@ pub fn init_tracing( } // Redirect stderr to file. -fn redirect_stderr_to_file(log_dir: &Path) { +fn redirect_stderr_to_file(log_dir: PathBuf) { let log_path = log_dir.join("stderr.log"); let file = OpenOptions::new() .create(true)