feat: enhance client otel tracing and linking to scheduler & manager (#873)

This commit is contained in:
Junduo Dong 2024-12-01 12:45:29 +08:00 committed by GitHub
parent 189f09bf45
commit c291a8cfff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 70 additions and 18 deletions

View File

@ -54,6 +54,7 @@ use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream};
use tonic::service::interceptor::InterceptedService;
use tonic::{
transport::{Channel, Endpoint, Server, Uri},
Code, Request, Response, Status,
@ -61,6 +62,8 @@ use tonic::{
use tower::service_fn;
use tracing::{debug, error, info, instrument, Instrument, Span};
use super::tracing_grpc::TracingInterceptor;
/// DfdaemonDownloadServer is the grpc unix server of the download.
pub struct DfdaemonDownloadServer {
/// socket_path is the path of the unix domain socket.
@ -982,7 +985,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
#[derive(Clone)]
pub struct DfdaemonDownloadClient {
/// client is the grpc client of the dfdaemon.
pub client: DfdaemonDownloadGRPCClient<Channel>,
pub client: DfdaemonDownloadGRPCClient<InterceptedService<Channel, TracingInterceptor>>,
}
/// DfdaemonDownloadClient implements the grpc client of the dfdaemon download.
@ -1014,7 +1017,8 @@ impl DfdaemonDownloadClient {
err
})
.or_err(ErrorType::ConnectError)?;
let client = DfdaemonDownloadGRPCClient::new(channel)
let client = DfdaemonDownloadGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })

View File

@ -47,6 +47,7 @@ use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::ReceiverStream;
use tonic::service::interceptor::InterceptedService;
use tonic::{
transport::{Channel, Server},
Code, Request, Response, Status,
@ -54,6 +55,8 @@ use tonic::{
use tracing::{debug, error, info, instrument, Instrument, Span};
use url::Url;
use super::tracing_grpc::TracingInterceptor;
/// DfdaemonUploadServer is the grpc server of the upload.
pub struct DfdaemonUploadServer {
/// config is the configuration of the dfdaemon.
@ -1086,7 +1089,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
#[derive(Clone)]
pub struct DfdaemonUploadClient {
/// client is the grpc client of the dfdaemon upload.
pub client: DfdaemonUploadGRPCClient<Channel>,
pub client: DfdaemonUploadGRPCClient<InterceptedService<Channel, TracingInterceptor>>,
}
/// DfdaemonUploadClient implements the dfdaemon upload grpc client.
@ -1137,7 +1140,7 @@ impl DfdaemonUploadClient {
.or_err(ErrorType::ConnectError)?,
};
let client = DfdaemonUploadGRPCClient::new(channel)
let client = DfdaemonUploadGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })

View File

@ -21,19 +21,21 @@ use dragonfly_client_core::{
use hyper_util::rt::TokioIo;
use std::path::PathBuf;
use tokio::net::UnixStream;
use tonic::transport::ClientTlsConfig;
use tonic::transport::{Channel, Endpoint, Uri};
use tonic::{service::interceptor::InterceptedService, transport::ClientTlsConfig};
use tonic_health::pb::{
health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse,
};
use tower::service_fn;
use tracing::{error, instrument};
use super::tracing_grpc::TracingInterceptor;
/// HealthClient is a wrapper of HealthGRPCClient.
#[derive(Clone)]
pub struct HealthClient {
/// client is the grpc client of the certificate.
client: HealthGRPCClient<Channel>,
client: HealthGRPCClient<InterceptedService<Channel, TracingInterceptor>>,
}
/// HealthClient implements the grpc client of the health.
@ -73,7 +75,7 @@ impl HealthClient {
.or_err(ErrorType::ConnectError)?,
};
let client = HealthGRPCClient::new(channel)
let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })
@ -99,7 +101,7 @@ impl HealthClient {
err
})
.or_err(ErrorType::ConnectError)?;
let client = HealthGRPCClient::new(channel)
let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })

View File

@ -25,16 +25,18 @@ use dragonfly_client_core::{
Error, Result,
};
use std::sync::Arc;
use tonic::transport::Channel;
use tonic::{service::interceptor::InterceptedService, transport::Channel};
use tonic_health::pb::health_check_response::ServingStatus;
use tracing::{error, instrument, warn};
use url::Url;
use super::tracing_grpc::TracingInterceptor;
/// ManagerClient is a wrapper of ManagerGRPCClient.
#[derive(Clone)]
pub struct ManagerClient {
/// client is the grpc client of the manager.
pub client: ManagerGRPCClient<Channel>,
pub client: ManagerGRPCClient<InterceptedService<Channel, TracingInterceptor>>,
}
/// ManagerClient implements the grpc client of the manager.
@ -99,7 +101,7 @@ impl ManagerClient {
.or_err(ErrorType::ConnectError)?,
};
let client = ManagerGRPCClient::new(channel)
let client = ManagerGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })

View File

@ -29,6 +29,7 @@ pub mod dfdaemon_upload;
pub mod health;
pub mod manager;
pub mod scheduler;
pub mod tracing_grpc;
/// CONNECT_TIMEOUT is the timeout for GRPC connection.
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);

View File

@ -16,6 +16,7 @@
// use crate::dynconfig::Dynconfig;
use crate::dynconfig::Dynconfig;
use crate::grpc::tracing_grpc::TracingInterceptor;
use dragonfly_api::common::v2::{Peer, PersistentCachePeer, PersistentCacheTask, Task};
use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{
@ -36,6 +37,7 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::{error, info, instrument, Instrument};
use url::Url;
@ -191,7 +193,7 @@ impl SchedulerClient {
})
.or_err(ErrorType::ConnectError)?;
let mut client = SchedulerGRPCClient::new(channel)
let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
client.announce_host(request).await?;
@ -245,7 +247,7 @@ impl SchedulerClient {
})
.or_err(ErrorType::ConnectError)?;
let mut client = SchedulerGRPCClient::new(channel)
let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
client.announce_host(request).await?;
@ -304,7 +306,7 @@ impl SchedulerClient {
})
.or_err(ErrorType::ConnectError)?;
let mut client = SchedulerGRPCClient::new(channel)
let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
client.delete_host(request).await?;
@ -458,7 +460,7 @@ impl SchedulerClient {
&self,
task_id: &str,
peer_id: Option<&str>,
) -> Result<SchedulerGRPCClient<Channel>> {
) -> Result<SchedulerGRPCClient<InterceptedService<Channel, TracingInterceptor>>> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;
@ -518,9 +520,11 @@ impl SchedulerClient {
.or_err(ErrorType::ConnectError)?,
};
Ok(SchedulerGRPCClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX))
Ok(
SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX),
)
}
/// update_available_scheduler_addrs updates the addresses of available schedulers.

View File

@ -0,0 +1,36 @@
use {
tonic::{metadata, service::Interceptor, Request, Status},
tracing_opentelemetry::OpenTelemetrySpanExt,
};
/// MetadataMap is a tracing meda data map container.
struct MetadataMap<'a>(&'a mut metadata::MetadataMap);
/// MetadataMap implements the otel tracing Injector.
impl<'a> opentelemetry::propagation::Injector for MetadataMap<'a> {
/// set a key-value pair to the injector.
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = metadata::MetadataKey::from_bytes(key.as_bytes()) {
if let Ok(val) = metadata::MetadataValue::try_from(&value) {
self.0.insert(key, val);
}
}
}
}
/// TracingInterceptor is a auto-inject tracing gRPC interceptor.
#[derive(Clone)]
pub struct TracingInterceptor;
/// TracingInterceptor implements the tonic Interceptor interface.
impl Interceptor for TracingInterceptor {
/// call and inject tracing context into lgobal propagator.
fn call(&mut self, mut request: Request<()>) -> std::result::Result<Request<()>, Status> {
let context = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|prop| {
prop.inject_context(&context, &mut MetadataMap(request.metadata_mut()));
});
Ok(request)
}
}