feat: remove is_uploading condition in gc (#646)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-08-05 16:31:18 +08:00 committed by GitHub
parent 6e760e877e
commit 50fa71e4c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 59 additions and 43 deletions

View File

@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize tracing. // Initialize tracing.
let _guards = init_tracing( let _guards = init_tracing(
dfinit::NAME, dfinit::NAME,
&args.log_dir, args.log_dir,
args.log_level, args.log_level,
args.log_max_files, args.log_max_files,
None, None,

View File

@ -20,6 +20,7 @@ use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::digest::{Algorithm, Digest}; use dragonfly_client_util::digest::{Algorithm, Digest};
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use std::path::Path; use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
@ -47,8 +48,8 @@ pub struct Storage {
// Storage implements the storage. // Storage implements the storage.
impl Storage { impl Storage {
// new returns a new storage. // new returns a new storage.
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> { pub async fn new(config: Arc<Config>, dir: &Path, log_dir: PathBuf) -> Result<Self> {
let metadata = metadata::Metadata::new(config.clone(), dir)?; let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?;
let content = content::Content::new(config.clone(), dir).await?; let content = content::Content::new(config.clone(), dir).await?;
Ok(Storage { Ok(Storage {
config, config,
@ -322,8 +323,8 @@ impl Storage {
} }
// Get the piece metadata and return the content of the piece. // Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(task_id, number)? { match self.metadata.get_piece(task_id, number) {
Some(piece) => { Ok(Some(piece)) => {
match self match self
.content .content
.read_piece(task_id, piece.offset, piece.length, range) .read_piece(task_id, piece.offset, piece.length, range)
@ -347,7 +348,7 @@ impl Storage {
} }
} }
} }
None => { Ok(None) => {
// Failed uploading the task. // Failed uploading the task.
self.metadata.upload_task_failed(task_id)?; self.metadata.upload_task_failed(task_id)?;
@ -355,6 +356,14 @@ impl Storage {
self.metadata.upload_piece_failed(task_id, number)?; self.metadata.upload_piece_failed(task_id, number)?;
Err(Error::PieceNotFound(self.piece_id(task_id, number))) Err(Error::PieceNotFound(self.piece_id(task_id, number)))
} }
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
Err(err)
}
} }
} }

View File

@ -22,6 +22,7 @@ use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tracing::{error, info}; use tracing::{error, info};
@ -785,6 +786,8 @@ impl<E: StorageEngineOwned> Metadata<E> {
let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?; let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?;
for ele in iter { for ele in iter {
let (key, _) = ele?; let (key, _) = ele?;
info!("delete piece metadata {}", task_id);
self.db.delete::<Piece>(&key)?; self.db.delete::<Piece>(&key)?;
} }
@ -800,9 +803,14 @@ impl<E: StorageEngineOwned> Metadata<E> {
// Metadata implements the metadata of the storage engine. // Metadata implements the metadata of the storage engine.
impl Metadata<RocksdbStorageEngine> { impl Metadata<RocksdbStorageEngine> {
// new creates a new metadata instance. // new creates a new metadata instance.
pub fn new(config: Arc<Config>, dir: &Path) -> Result<Metadata<RocksdbStorageEngine>> { pub fn new(
config: Arc<Config>,
dir: &Path,
log_dir: &PathBuf,
) -> Result<Metadata<RocksdbStorageEngine>> {
let db = RocksdbStorageEngine::open( let db = RocksdbStorageEngine::open(
dir, dir,
log_dir,
&[Task::NAMESPACE, Piece::NAMESPACE, CacheTask::NAMESPACE], &[Task::NAMESPACE, Piece::NAMESPACE, CacheTask::NAMESPACE],
config.storage.keep, config.storage.keep,
)?; )?;
@ -820,7 +828,8 @@ mod tests {
#[test] #[test]
fn should_create_metadata_db() { fn should_create_metadata_db() {
let dir = TempDir::new("metadata_db").unwrap(); let dir = TempDir::new("metadata_db").unwrap();
let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
assert!(metadata.get_tasks().unwrap().is_empty()); assert!(metadata.get_tasks().unwrap().is_empty());
assert!(metadata.get_pieces("task").unwrap().is_empty()); assert!(metadata.get_pieces("task").unwrap().is_empty());
} }
@ -828,7 +837,8 @@ mod tests {
#[test] #[test]
fn test_task_lifecycle() { fn test_task_lifecycle() {
let dir = TempDir::new("metadata_db").unwrap(); let dir = TempDir::new("metadata_db").unwrap();
let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
let task_id = "task1"; let task_id = "task1";
@ -906,7 +916,8 @@ mod tests {
#[test] #[test]
fn test_piece_lifecycle() { fn test_piece_lifecycle() {
let dir = TempDir::new("metadata_db").unwrap(); let dir = TempDir::new("metadata_db").unwrap();
let metadata = Metadata::new(Arc::new(Config::default()), dir.path()).unwrap(); let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
let task_id = "task3"; let task_id = "task3";
// Test download_piece_started. // Test download_piece_started.

View File

@ -20,7 +20,10 @@ use dragonfly_client_core::{
Error, Result, Error, Result,
}; };
use rocksdb::{ReadOptions, WriteOptions}; use rocksdb::{ReadOptions, WriteOptions};
use std::{ops::Deref, path::Path}; use std::{
ops::Deref,
path::{Path, PathBuf},
};
use tracing::{info, warn}; use tracing::{info, warn};
/// RocksdbStorageEngine is a storage engine based on rocksdb. /// RocksdbStorageEngine is a storage engine based on rocksdb.
@ -45,8 +48,8 @@ impl RocksdbStorageEngine {
/// DEFAULT_DIR_NAME is the default directory name to store metadata. /// DEFAULT_DIR_NAME is the default directory name to store metadata.
const DEFAULT_DIR_NAME: &'static str = "metadata"; const DEFAULT_DIR_NAME: &'static str = "metadata";
/// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 64MB. /// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 256MB.
const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 64 * 1024 * 1024; const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 256 * 1024 * 1024;
/// DEFAULT_MAX_OPEN_FILES is the default max open files for rocksdb. /// DEFAULT_MAX_OPEN_FILES is the default max open files for rocksdb.
const DEFAULT_MAX_OPEN_FILES: i32 = 10_000; const DEFAULT_MAX_OPEN_FILES: i32 = 10_000;
@ -57,8 +60,14 @@ impl RocksdbStorageEngine {
/// DEFAULT_CACHE_SIZE is the default cache size for rocksdb. /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb.
const DEFAULT_CACHE_SIZE: usize = 32 * 1024 * 1024; const DEFAULT_CACHE_SIZE: usize = 32 * 1024 * 1024;
// DEFAULT_LOG_MAX_SIZE is the default max log size for rocksdb, default is 64MB.
const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024;
// DEFAULT_LOG_MAX_FILES is the default max log files for rocksdb.
const DEFAULT_LOG_MAX_FILES: usize = 10;
/// open opens a rocksdb storage engine with the given directory and column families. /// open opens a rocksdb storage engine with the given directory and column families.
pub fn open(dir: &Path, cf_names: &[&str], keep: bool) -> Result<Self> { pub fn open(dir: &Path, log_dir: &PathBuf, cf_names: &[&str], keep: bool) -> Result<Self> {
info!("initializing metadata directory: {:?} {:?}", dir, cf_names); info!("initializing metadata directory: {:?} {:?}", dir, cf_names);
// Initialize rocksdb options. // Initialize rocksdb options.
let mut options = rocksdb::Options::default(); let mut options = rocksdb::Options::default();
@ -68,6 +77,12 @@ impl RocksdbStorageEngine {
options.increase_parallelism(num_cpus::get() as i32); options.increase_parallelism(num_cpus::get() as i32);
options.set_max_open_files(Self::DEFAULT_MAX_OPEN_FILES); options.set_max_open_files(Self::DEFAULT_MAX_OPEN_FILES);
// Set rocksdb log options.
options.set_db_log_dir(log_dir);
options.set_log_level(rocksdb::LogLevel::Debug);
options.set_max_log_file_size(Self::DEFAULT_LOG_MAX_SIZE);
options.set_keep_log_file_num(Self::DEFAULT_LOG_MAX_FILES);
// Initialize rocksdb block based table options. // Initialize rocksdb block based table options.
let mut block_options = rocksdb::BlockBasedOptions::default(); let mut block_options = rocksdb::BlockBasedOptions::default();
block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE)); block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE));

View File

@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
// Initialize tracing. // Initialize tracing.
let _guards = init_tracing( let _guards = init_tracing(
dfcache::NAME, dfcache::NAME,
&args.log_dir, args.log_dir,
args.log_level, args.log_level,
args.log_max_files, args.log_max_files,
None, None,

View File

@ -108,7 +108,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize tracing. // Initialize tracing.
let _guards = init_tracing( let _guards = init_tracing(
dfdaemon::NAME, dfdaemon::NAME,
&args.log_dir, args.log_dir.clone(),
args.log_level, args.log_level,
args.log_max_files, args.log_max_files,
config.tracing.addr.to_owned(), config.tracing.addr.to_owned(),
@ -117,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
); );
// Initialize storage. // Initialize storage.
let storage = Storage::new(config.clone(), config.storage.dir.as_path()) let storage = Storage::new(config.clone(), config.storage.dir.as_path(), args.log_dir)
.await .await
.map_err(|err| { .map_err(|err| {
error!("initialize storage failed: {}", err); error!("initialize storage failed: {}", err);

View File

@ -253,7 +253,7 @@ async fn main() -> anyhow::Result<()> {
// Initialize tracing. // Initialize tracing.
let _guards = init_tracing( let _guards = init_tracing(
dfget::NAME, dfget::NAME,
&args.log_dir, args.log_dir.clone(),
args.log_level, args.log_level,
args.log_max_files, args.log_max_files,
None, None,

View File

@ -110,7 +110,7 @@ fn main() {
// Initialize tracing. // Initialize tracing.
let _guards = init_tracing( let _guards = init_tracing(
dfstore::NAME, dfstore::NAME,
&args.log_dir, args.log_dir,
args.log_level, args.log_level,
args.log_max_files, args.log_max_files,
None, None,

View File

@ -110,11 +110,6 @@ impl GC {
for task in self.storage.get_tasks()? { for task in self.storage.get_tasks()? {
// If the task is expired and not uploading, evict the task. // If the task is expired and not uploading, evict the task.
if task.is_expired(self.config.gc.policy.task_ttl) { if task.is_expired(self.config.gc.policy.task_ttl) {
// If the task is uploading, skip it.
if task.is_uploading() {
continue;
}
self.storage.delete_task(&task.id).await; self.storage.delete_task(&task.id).await;
info!("evict task {}", task.id); info!("evict task {}", task.id);
@ -166,11 +161,6 @@ impl GC {
break; break;
} }
// If the task is uploading, skip it.
if task.is_uploading() {
continue;
}
// If the task has no content length, skip it. // If the task has no content length, skip it.
let task_space = match task.content_length() { let task_space = match task.content_length() {
Some(content_length) => content_length, Some(content_length) => content_length,
@ -214,11 +204,6 @@ impl GC {
for task in self.storage.get_cache_tasks()? { for task in self.storage.get_cache_tasks()? {
// If the cache task is expired and not uploading, evict the cache task. // If the cache task is expired and not uploading, evict the cache task.
if task.is_expired() { if task.is_expired() {
// If the cache task is uploading, skip it.
if task.is_uploading() {
continue;
}
self.storage.delete_cache_task(&task.id).await; self.storage.delete_cache_task(&task.id).await;
info!("evict cache task {}", task.id); info!("evict cache task {}", task.id);
@ -270,11 +255,6 @@ impl GC {
break; break;
} }
// If the cache task is uploading, skip it.
if task.is_uploading() {
continue;
}
// If the cache task is persistent, skip it. // If the cache task is persistent, skip it.
if task.is_persistent() { if task.is_persistent() {
continue; continue;

View File

@ -19,7 +19,7 @@ use rolling_file::*;
use std::fs; use std::fs;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use tracing::{info, Level}; use tracing::{info, Level};
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use tracing_log::LogTracer; use tracing_log::LogTracer;
@ -32,7 +32,7 @@ use tracing_subscriber::{
pub fn init_tracing( pub fn init_tracing(
name: &str, name: &str,
log_dir: &PathBuf, log_dir: PathBuf,
log_level: Level, log_level: Level,
log_max_files: usize, log_max_files: usize,
jaeger_addr: Option<String>, jaeger_addr: Option<String>,
@ -63,13 +63,14 @@ pub fn init_tracing(
guards.push(stdout_guard); guards.push(stdout_guard);
// Setup file layer. // Setup file layer.
fs::create_dir_all(log_dir).expect("failed to create log directory"); fs::create_dir_all(log_dir.clone()).expect("failed to create log directory");
let rolling_appender = BasicRollingFileAppender::new( let rolling_appender = BasicRollingFileAppender::new(
log_dir.join(name).with_extension("log"), log_dir.join(name).with_extension("log"),
RollingConditionBasic::new().hourly(), RollingConditionBasic::new().hourly(),
log_max_files, log_max_files,
) )
.expect("failed to create rolling file appender"); .expect("failed to create rolling file appender");
println!("log_dir: {:?}", log_dir);
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender); let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new() let file_logging_layer = Layer::new()
@ -128,7 +129,7 @@ pub fn init_tracing(
} }
// Redirect stderr to file. // Redirect stderr to file.
fn redirect_stderr_to_file(log_dir: &Path) { fn redirect_stderr_to_file(log_dir: PathBuf) {
let log_path = log_dir.join("stderr.log"); let log_path = log_dir.join("stderr.log");
let file = OpenOptions::new() let file = OpenOptions::new()
.create(true) .create(true)