From db2008f06194d38bcc94eab82e62d6962f0d0f84 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 17 Jul 2024 15:32:13 +0800 Subject: [PATCH] feat: implement stat command for cache task (#601) Signed-off-by: Gaius --- Cargo.lock | 52 +++++ Cargo.toml | 3 +- dragonfly-client/Cargo.toml | 2 + dragonfly-client/src/bin/dfcache/import.rs | 44 +++- dragonfly-client/src/bin/dfcache/stat.rs | 216 +++++++++++++++++- dragonfly-client/src/bin/dfget/main.rs | 2 +- .../src/grpc/dfdaemon_download.rs | 8 +- dragonfly-client/src/grpc/dfdaemon_upload.rs | 8 +- dragonfly-client/src/resource/cache_task.rs | 1 - 9 files changed, 314 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a5f3f71..761043a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -646,6 +646,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytemuck" version = "1.15.0" @@ -664,6 +670,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" +dependencies = [ + "serde", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -1098,6 +1113,7 @@ dependencies = [ "anyhow", "blake3", "bytes", + "bytesize", "chrono", "clap", "dashmap 6.0.1", @@ -1136,6 +1152,7 @@ dependencies = [ "serde", "serde_json", "sysinfo", + "tabled", "termion", "tikv-jemallocator", "tokio", @@ -3005,6 +3022,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "papergrid" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ad43c07024ef767f9160710b3a6773976194758c7919b17e63b863db0bdf7fb" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parking" version = "2.2.0" @@ -4452,6 +4480,30 @@ dependencies = [ "libc", ] +[[package]] +name = "tabled" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c998b0c8b921495196a48aabaf1901ff28be0760136e31604f7967b0792050e" +dependencies = [ + "papergrid", + "tabled_derive", + "unicode-width", +] + +[[package]] +name = "tabled_derive" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c138f99377e5d653a371cdad263615634cfc8467685dfe8e73e2b8e98f44b17" +dependencies = [ + "heck 0.4.1", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tempdir" version = "0.3.7" diff --git a/Cargo.toml b/Cargo.toml index 4ceea232..4536ddb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ headers = "0.4.0" regex = "1.10.5" humantime = "2.1.0" prost-wkt-types = "0.4" -chrono = { version = "0.4.35", features = ["serde"] } +chrono = { version = "0.4.35", features = ["serde", "clock"] } openssl = { version = "0.10", features = ["vendored"] } opendal = { version = "0.47.3", features = [ "services-s3", @@ -83,6 +83,7 @@ anyhow = "1.0.86" toml_edit = "0.22.14" toml = "0.8.13" base16ct = { version = "0.2", features = ["alloc"] } +bytesize = {version = "1.2.0", features = ["serde"]} [profile.release] debug = true diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 166093b3..7c45128e 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -59,6 +59,7 @@ openssl.workspace = true clap.workspace = true anyhow.workspace = true blake3.workspace = true +bytesize.workspace = true tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } tracing-appender = "0.2.3" @@ -85,6 +86,7 @@ tokio-rustls = "0.25.0-alpha.4" http-body-util = "0.1.2" futures-util = "0.3.30" termion = "4.0.2" +tabled = "0.15.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] } diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index 9611ca65..e1949f21 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -87,19 +87,20 @@ pub struct ImportCommand { // Implement the execute for ImportCommand. impl ImportCommand { + // execute executes the import sub command. pub async fn execute(&self, endpoint: &Path) -> Result<()> { // Run import sub command. if let Err(err) = self.run(endpoint).await { match err { Error::TonicStatus(status) => { eprintln!( - "{}{}{}Importing failed, bad code:{} {}", + "{}{}{}Importing Failed!{}", color::Fg(color::Red), style::Italic, style::Bold, style::Reset, - status.code() ); + eprintln!( "{}{}{}*********************************{}", color::Fg(color::Black), @@ -107,6 +108,16 @@ impl ImportCommand { style::Bold, style::Reset ); + + eprintln!( + "{}{}{}Bad Code:{} {}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset, + status.code() + ); + eprintln!( "{}{}{}Message:{} {}", color::Fg(color::Cyan), @@ -115,6 +126,7 @@ impl ImportCommand { style::Reset, status.message() ); + eprintln!( "{}{}{}Details:{} {}", color::Fg(color::Cyan), @@ -123,6 +135,7 @@ impl ImportCommand { style::Reset, std::str::from_utf8(status.details()).unwrap() ); + eprintln!( "{}{}{}*********************************{}", color::Fg(color::Black), @@ -133,13 +146,37 @@ impl ImportCommand { } err => { eprintln!( - "{}{}{}Importing failed, error message:{} {}", + "{}{}{}Importing Failed!{}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset + ); + + eprintln!( + "{}{}{}****************************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); + + eprintln!( + "{}{}{}Message:{} {}", color::Fg(color::Red), style::Italic, style::Bold, style::Reset, err ); + + eprintln!( + "{}{}{}****************************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); } } @@ -149,6 +186,7 @@ impl ImportCommand { Ok(()) } + // run runs the import sub command. async fn run(&self, endpoint: &Path) -> Result<()> { let dfdaemon_download_client = self .get_dfdaemon_download_client(endpoint.to_path_buf()) diff --git a/dragonfly-client/src/bin/dfcache/stat.rs b/dragonfly-client/src/bin/dfcache/stat.rs index d24786a1..723bbb3e 100644 --- a/dragonfly-client/src/bin/dfcache/stat.rs +++ b/dragonfly-client/src/bin/dfcache/stat.rs @@ -14,9 +14,20 @@ * limitations under the License. */ +use chrono::{DateTime, Local}; use clap::Parser; -use dragonfly_client_core::Result; -use std::path::Path; +use dragonfly_api::dfdaemon::v2::StatCacheTaskRequest; +use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient; +use dragonfly_client::grpc::health::HealthClient; +use dragonfly_client_core::{Error, Result}; +use humantime::format_duration; +use std::path::{Path, PathBuf}; +use tabled::{ + settings::{object::Rows, Alignment, Modify, Style}, + Table, Tabled, +}; +use termion::{color, style}; +use tracing::error; // StatCommand is the subcommand of stat. #[derive(Debug, Clone, Parser)] @@ -27,8 +38,205 @@ pub struct StatCommand { // Implement the execute for StatCommand. impl StatCommand { - pub async fn execute(&self, _endpoint: &Path) -> Result<()> { - println!("StatCommand is executed!"); + // execute executes the stat command. + pub async fn execute(&self, endpoint: &Path) -> Result<()> { + // Run stat sub command. + if let Err(err) = self.run(endpoint).await { + match err { + Error::TonicStatus(status) => { + eprintln!( + "{}{}{}Stating Failed!{}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset, + ); + + eprintln!( + "{}{}{}*********************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); + + eprintln!( + "{}{}{}Bad Code:{} {}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset, + status.code() + ); + + eprintln!( + "{}{}{}Message:{} {}", + color::Fg(color::Cyan), + style::Italic, + style::Bold, + style::Reset, + status.message() + ); + + eprintln!( + "{}{}{}Details:{} {}", + color::Fg(color::Cyan), + style::Italic, + style::Bold, + style::Reset, + std::str::from_utf8(status.details()).unwrap() + ); + + eprintln!( + "{}{}{}*********************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); + } + err => { + eprintln!( + "{}{}{}Stating Failed!{}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset + ); + + eprintln!( + "{}{}{}****************************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); + + eprintln!( + "{}{}{}Message:{} {}", + color::Fg(color::Red), + style::Italic, + style::Bold, + style::Reset, + err + ); + + eprintln!( + "{}{}{}****************************************{}", + color::Fg(color::Black), + style::Italic, + style::Bold, + style::Reset + ); + } + } + + std::process::exit(1); + } + Ok(()) } + + // run runs the stat command. + async fn run(&self, endpoint: &Path) -> Result<()> { + let dfdaemon_download_client = self + .get_dfdaemon_download_client(endpoint.to_path_buf()) + .await + .map_err(|err| { + error!("initialize dfdaemon download client failed: {}", err); + err + })?; + + let task = dfdaemon_download_client + .stat_cache_task(StatCacheTaskRequest { + task_id: self.id.clone(), + }) + .await?; + + // Define the table struct for printing. + #[derive(Debug, Default, Tabled)] + #[tabled(rename_all = "UPPERCASE")] + struct TableTask { + id: String, + state: String, + #[tabled(rename = "CONTENT LENGTH")] + content_length: String, + #[tabled(rename = "PIECE LENGTH")] + piece_length: String, + #[tabled(rename = "PERSISTENT REPLICA COUNT")] + persistent_replica_count: u64, + #[tabled(rename = "REPLICA COUNT")] + replica_count: u64, + ttl: String, + #[tabled(rename = "CREATED")] + created_at: String, + #[tabled(rename = "UPDATED")] + updated_at: String, + } + + let mut table_task = TableTask { + id: task.id, + state: task.state, + // Convert content_length to human readable format. + content_length: bytesize::to_string(task.content_length, true), + // Convert piece_length to human readable format. + piece_length: bytesize::to_string(task.piece_length, true), + persistent_replica_count: task.persistent_replica_count, + replica_count: task.replica_count, + ..Default::default() + }; + + // Convert ttl to human readable format. + if let Some(ttl) = task.ttl { + table_task.ttl = + format_duration(std::time::Duration::from_secs(ttl.seconds as u64)).to_string(); + } + + // Convert created_at to human readable format. + if let Some(created_at) = task.created_at { + if let Some(date_time) = + DateTime::from_timestamp(created_at.seconds, created_at.nanos as u32) + { + table_task.created_at = date_time + .with_timezone(&Local) + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + } + } + + // Convert updated_at to human readable format. + if let Some(updated_at) = task.updated_at { + if let Some(date_time) = + DateTime::from_timestamp(updated_at.seconds, updated_at.nanos as u32) + { + table_task.updated_at = date_time + .with_timezone(&Local) + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + } + } + + // Create a table and print it. + let mut table = Table::new(vec![table_task]); + table + .with(Style::blank()) + .with(Modify::new(Rows::first()).with(Alignment::center())); + println!("{table}"); + + Ok(()) + } + + // get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health. + async fn get_dfdaemon_download_client( + &self, + endpoint: PathBuf, + ) -> Result { + // Check dfdaemon's health. + let health_client = HealthClient::new_unix(endpoint.clone()).await?; + health_client.check_dfdaemon_download().await?; + + // Get dfdaemon download client. + let dfdaemon_download_client = DfdaemonDownloadClient::new_unix(endpoint).await?; + Ok(dfdaemon_download_client) + } } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index a906e299..c58ff3a2 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -365,7 +365,7 @@ async fn main() -> anyhow::Result<()> { ); eprintln!( - "{}{}{}Bad code:{} {}", + "{}{}{}Bad Code:{} {}", color::Fg(color::Red), style::Italic, style::Bold, diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index fa70fce7..aed58806 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -883,13 +883,9 @@ impl DfdaemonDownloadClient { // stat_cache_task stats the cache task. #[instrument(skip_all)] - pub async fn stat_cache_task( - &self, - request: StatCacheTaskRequest, - timeout: Duration, - ) -> ClientResult { + pub async fn stat_cache_task(&self, request: StatCacheTaskRequest) -> ClientResult { let mut request = tonic::Request::new(request); - request.set_timeout(timeout); + request.set_timeout(super::CONNECT_TIMEOUT); let response = self.client.clone().stat_cache_task(request).await?; Ok(response.into_inner()) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index b61bbe7f..6c7552ef 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -876,13 +876,9 @@ impl DfdaemonUploadClient { // stat_cache_task stats the cache task. #[instrument(skip_all)] - pub async fn stat_cache_task( - &self, - request: StatCacheTaskRequest, - timeout: Duration, - ) -> ClientResult { + pub async fn stat_cache_task(&self, request: StatCacheTaskRequest) -> ClientResult { let mut request = tonic::Request::new(request); - request.set_timeout(timeout); + request.set_timeout(super::CONNECT_TIMEOUT); let response = self.client.clone().stat_cache_task(request).await?; Ok(response.into_inner()) diff --git a/dragonfly-client/src/resource/cache_task.rs b/dragonfly-client/src/resource/cache_task.rs index ab49d9c5..8bfb3fe4 100644 --- a/dragonfly-client/src/resource/cache_task.rs +++ b/dragonfly-client/src/resource/cache_task.rs @@ -189,7 +189,6 @@ impl CacheTask { err })?; - self.storage.delete_cache_task(task_id).await; Ok(()) } }