diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index bf2f4353..9b169bac 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -54,6 +54,7 @@ use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc; use tokio::sync::Barrier; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; +use tonic::service::interceptor::InterceptedService; use tonic::{ transport::{Channel, Endpoint, Server, Uri}, Code, Request, Response, Status, @@ -61,6 +62,8 @@ use tonic::{ use tower::service_fn; use tracing::{debug, error, info, instrument, Instrument, Span}; +use super::tracing_grpc::TracingInterceptor; + /// DfdaemonDownloadServer is the grpc unix server of the download. pub struct DfdaemonDownloadServer { /// socket_path is the path of the unix domain socket. @@ -982,7 +985,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { #[derive(Clone)] pub struct DfdaemonDownloadClient { /// client is the grpc client of the dfdaemon. - pub client: DfdaemonDownloadGRPCClient, + pub client: DfdaemonDownloadGRPCClient>, } /// DfdaemonDownloadClient implements the grpc client of the dfdaemon download. @@ -1014,7 +1017,8 @@ impl DfdaemonDownloadClient { err }) .or_err(ErrorType::ConnectError)?; - let client = DfdaemonDownloadGRPCClient::new(channel) + + let client = DfdaemonDownloadGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index b89bb244..88a794b1 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -47,6 +47,7 @@ use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::sync::Barrier; use tokio_stream::wrappers::ReceiverStream; +use tonic::service::interceptor::InterceptedService; use tonic::{ transport::{Channel, Server}, Code, Request, Response, Status, @@ -54,6 +55,8 @@ use tonic::{ use tracing::{debug, error, info, instrument, Instrument, Span}; use url::Url; +use super::tracing_grpc::TracingInterceptor; + /// DfdaemonUploadServer is the grpc server of the upload. pub struct DfdaemonUploadServer { /// config is the configuration of the dfdaemon. @@ -1086,7 +1089,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { #[derive(Clone)] pub struct DfdaemonUploadClient { /// client is the grpc client of the dfdaemon upload. - pub client: DfdaemonUploadGRPCClient, + pub client: DfdaemonUploadGRPCClient>, } /// DfdaemonUploadClient implements the dfdaemon upload grpc client. @@ -1137,7 +1140,7 @@ impl DfdaemonUploadClient { .or_err(ErrorType::ConnectError)?, }; - let client = DfdaemonUploadGRPCClient::new(channel) + let client = DfdaemonUploadGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) diff --git a/dragonfly-client/src/grpc/health.rs b/dragonfly-client/src/grpc/health.rs index 5e643c6a..40123fef 100644 --- a/dragonfly-client/src/grpc/health.rs +++ b/dragonfly-client/src/grpc/health.rs @@ -21,19 +21,21 @@ use dragonfly_client_core::{ use hyper_util::rt::TokioIo; use std::path::PathBuf; use tokio::net::UnixStream; -use tonic::transport::ClientTlsConfig; use tonic::transport::{Channel, Endpoint, Uri}; +use tonic::{service::interceptor::InterceptedService, transport::ClientTlsConfig}; use tonic_health::pb::{ health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse, }; use tower::service_fn; use tracing::{error, instrument}; +use super::tracing_grpc::TracingInterceptor; + /// HealthClient is a wrapper of HealthGRPCClient. #[derive(Clone)] pub struct HealthClient { /// client is the grpc client of the certificate. - client: HealthGRPCClient, + client: HealthGRPCClient>, } /// HealthClient implements the grpc client of the health. @@ -73,7 +75,7 @@ impl HealthClient { .or_err(ErrorType::ConnectError)?, }; - let client = HealthGRPCClient::new(channel) + let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) @@ -99,7 +101,7 @@ impl HealthClient { err }) .or_err(ErrorType::ConnectError)?; - let client = HealthGRPCClient::new(channel) + let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) diff --git a/dragonfly-client/src/grpc/manager.rs b/dragonfly-client/src/grpc/manager.rs index 1bfb2be3..753cba85 100644 --- a/dragonfly-client/src/grpc/manager.rs +++ b/dragonfly-client/src/grpc/manager.rs @@ -25,16 +25,18 @@ use dragonfly_client_core::{ Error, Result, }; use std::sync::Arc; -use tonic::transport::Channel; +use tonic::{service::interceptor::InterceptedService, transport::Channel}; use tonic_health::pb::health_check_response::ServingStatus; use tracing::{error, instrument, warn}; use url::Url; +use super::tracing_grpc::TracingInterceptor; + /// ManagerClient is a wrapper of ManagerGRPCClient. #[derive(Clone)] pub struct ManagerClient { /// client is the grpc client of the manager. - pub client: ManagerGRPCClient, + pub client: ManagerGRPCClient>, } /// ManagerClient implements the grpc client of the manager. @@ -99,7 +101,7 @@ impl ManagerClient { .or_err(ErrorType::ConnectError)?, }; - let client = ManagerGRPCClient::new(channel) + let client = ManagerGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) diff --git a/dragonfly-client/src/grpc/mod.rs b/dragonfly-client/src/grpc/mod.rs index 992e2728..1ed369de 100644 --- a/dragonfly-client/src/grpc/mod.rs +++ b/dragonfly-client/src/grpc/mod.rs @@ -29,6 +29,7 @@ pub mod dfdaemon_upload; pub mod health; pub mod manager; pub mod scheduler; +pub mod tracing_grpc; /// CONNECT_TIMEOUT is the timeout for GRPC connection. pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index a141bdec..96d6605e 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -16,6 +16,7 @@ // use crate::dynconfig::Dynconfig; use crate::dynconfig::Dynconfig; +use crate::grpc::tracing_grpc::TracingInterceptor; use dragonfly_api::common::v2::{Peer, PersistentCachePeer, PersistentCacheTask, Task}; use dragonfly_api::manager::v2::Scheduler; use dragonfly_api::scheduler::v2::{ @@ -36,6 +37,7 @@ use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; use tokio::task::JoinSet; +use tonic::service::interceptor::InterceptedService; use tonic::transport::Channel; use tracing::{error, info, instrument, Instrument}; use url::Url; @@ -191,7 +193,7 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::new(channel) + let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); client.announce_host(request).await?; @@ -245,7 +247,7 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::new(channel) + let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); client.announce_host(request).await?; @@ -304,7 +306,7 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::new(channel) + let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); client.delete_host(request).await?; @@ -458,7 +460,7 @@ impl SchedulerClient { &self, task_id: &str, peer_id: Option<&str>, - ) -> Result> { + ) -> Result>> { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; @@ -518,9 +520,11 @@ impl SchedulerClient { .or_err(ErrorType::ConnectError)?, }; - Ok(SchedulerGRPCClient::new(channel) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX)) + Ok( + SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX), + ) } /// update_available_scheduler_addrs updates the addresses of available schedulers. diff --git a/dragonfly-client/src/grpc/tracing_grpc.rs b/dragonfly-client/src/grpc/tracing_grpc.rs new file mode 100644 index 00000000..6e6c4e7c --- /dev/null +++ b/dragonfly-client/src/grpc/tracing_grpc.rs @@ -0,0 +1,36 @@ +use { + tonic::{metadata, service::Interceptor, Request, Status}, + tracing_opentelemetry::OpenTelemetrySpanExt, +}; + +/// MetadataMap is a tracing meda data map container. +struct MetadataMap<'a>(&'a mut metadata::MetadataMap); + +/// MetadataMap implements the otel tracing Injector. +impl<'a> opentelemetry::propagation::Injector for MetadataMap<'a> { + /// set a key-value pair to the injector. + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } +} + +/// TracingInterceptor is a auto-inject tracing gRPC interceptor. +#[derive(Clone)] +pub struct TracingInterceptor; + +/// TracingInterceptor implements the tonic Interceptor interface. +impl Interceptor for TracingInterceptor { + /// call and inject tracing context into lgobal propagator. + fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { + let context = tracing::Span::current().context(); + opentelemetry::global::get_text_map_propagator(|prop| { + prop.inject_context(&context, &mut MetadataMap(request.metadata_mut())); + }); + + Ok(request) + } +}