feat: add host_id to span and add instrument to health grpc (#155)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-12-20 10:25:51 +08:00 committed by GitHub
parent df33010fb2
commit 54193ea93a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 26 deletions

View File

@ -142,7 +142,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
type DownloadTaskStream = ReceiverStream<Result<DownloadTaskResponse, Status>>;
// download_task tells the dfdaemon to download the task.
#[instrument(skip_all, fields(task_id, peer_id))]
#[instrument(skip_all, fields(host_id, task_id, peer_id))]
async fn download_task(
&self,
request: Request<DownloadTaskRequest>,
@ -178,7 +178,8 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
// Generate the peer id.
let peer_id = self.task.id_generator.peer_id();
// Span record the task id and peer id.
// Span record the host id, task id and peer id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());
Span::current().record("peer_id", peer_id.as_str());
@ -277,7 +278,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
}
// stat_task gets the status of the task.
#[instrument(skip_all, fields(task_id))]
#[instrument(skip_all, fields(host_id, task_id))]
async fn stat_task(
&self,
request: Request<DfdaemonStatTaskRequest>,
@ -285,10 +286,14 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
// Clone the request.
let request = request.into_inner();
// Generate the host id.
let host_id = self.task.id_generator.host_id();
// Get the task id from the request.
let task_id = request.task_id;
// Span record the task id and peer id.
// Span record the host id and task id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());
// Get the task from the scheduler.
@ -349,20 +354,16 @@ impl DfdaemonDownloadClient {
&self,
request: DownloadTaskRequest,
) -> ClientResult<tonic::Response<tonic::codec::Streaming<DownloadTaskResponse>>> {
// Get the timeout from the request.
let timeout = request
.clone()
.download
.ok_or_else(|| {
tonic::Status::invalid_argument("missing download in download task request")
})?
.timeout;
// Get the download from the request.
let download = request.clone().download.ok_or_else(|| {
tonic::Status::invalid_argument("missing download in download task request")
})?;
// Initialize the request.
let mut request = tonic::Request::new(request);
// Set the timeout to the request.
if let Some(timeout) = timeout {
if let Some(timeout) = download.timeout {
request.set_timeout(
Duration::try_from(timeout)
.map_err(|_| tonic::Status::invalid_argument("invalid timeout"))?,

View File

@ -134,7 +134,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
type SyncPiecesStream = ReceiverStream<Result<SyncPiecesResponse, Status>>;
// get_piece_numbers gets the piece numbers.
#[instrument(skip_all, fields(task_id))]
#[instrument(skip_all, fields(host_id, task_id))]
async fn sync_pieces(
&self,
request: Request<SyncPiecesRequest>,
@ -142,10 +142,14 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Clone the request.
let request = request.into_inner();
// Generate the host id.
let host_id = self.task.id_generator.host_id();
// Get the task id from tae request.
let task_id = request.task_id;
// Span record the task id.
// Span record the host id and task id.
Span::current().record("host_id", host_id.clone());
Span::current().record("task_id", task_id.clone());
// Get the interested piece numbers from the request.
@ -232,7 +236,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
}
// sync_pieces syncs the pieces.
#[instrument(skip_all, fields(task_id, piece_number))]
#[instrument(skip_all, fields(host_id, task_id, piece_number))]
async fn download_piece(
&self,
request: Request<DownloadPieceRequest>,
@ -240,23 +244,23 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Clone the request.
let request = request.into_inner();
// Clone the task.
let task = self.task.clone();
// Generate the host id.
let host_id = self.task.id_generator.host_id();
// Get the task id from the request.
let task_id = request.task_id;
// Span record the task id.
Span::current().record("task_id", task_id.as_str());
// Get the interested piece number from the request.
let piece_number = request.piece_number;
// Span record the piece number.
// Span record the host id, task id and piece number.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());
Span::current().record("piece_number", piece_number);
// Get the piece metadata from the local storage.
let piece = task
let piece = self
.task
.piece
.get(task_id.as_str(), piece_number)
.map_err(|err| {
@ -269,7 +273,8 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
})?;
// Get the piece content from the local storage.
let mut reader = task
let mut reader = self
.task
.piece
.download_from_local_peer_into_async_read(task_id.as_str(), piece_number)
.await
@ -302,7 +307,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
}
// trigger_download_task triggers the download task.
#[instrument(skip_all, fields(url))]
#[instrument(skip_all, fields(host_id, url))]
async fn trigger_download_task(
&self,
request: Request<TriggerDownloadTaskRequest>,
@ -310,13 +315,17 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Clone the request.
let request = request.into_inner();
// Generate the host id.
let host_id = self.task.id_generator.host_id();
// Get the download from the request.
let download = request.download.ok_or_else(|| {
error!("download not found");
Status::invalid_argument("download not found")
})?;
// Span record the download url.
// Span record the host id and download url.
Span::current().record("host_id", host_id.as_str());
Span::current().record("url", download.url.as_str());
// Initialize the dfdaemon download client.

View File

@ -48,6 +48,7 @@ pub struct HealthServer<T: Health> {
// HealthServer implements the grpc server of the health.
impl<T: Health> HealthServer<T> {
// new creates a new HealthServer.
#[instrument(skip_all)]
pub fn new(
addr: SocketAddr,
service: HealthGRPCServer<T>,
@ -110,6 +111,7 @@ impl HealthClient {
}
// new_unix creates a new HealthClient with unix domain socket.
#[instrument(skip_all)]
pub async fn new_unix(socket_path: PathBuf) -> Result<Self> {
// Ignore the uri because it is not used.
let channel = Endpoint::try_from("http://[::]:50051")
@ -123,6 +125,7 @@ impl HealthClient {
}
// check checks the health of the server.
#[instrument(skip_all)]
pub async fn check(&self) -> Result<HealthCheckResponse> {
let services = vec![
"dfdaemon.v2.DfdaemonDownload".to_string(),
@ -156,12 +159,14 @@ impl HealthClient {
}
// check_dfdaemon_download checks the health of the dfdaemon download service.
#[instrument(skip_all)]
pub async fn check_dfdaemon_download(&self) -> Result<HealthCheckResponse> {
self.check_service("dfdaemon.v2.DfdaemonDownload".to_string())
.await
}
// check_dfdaemon_upload checks the health of the dfdaemon upload service.
#[instrument(skip_all)]
pub async fn check_dfdaemon_upload(&self) -> Result<HealthCheckResponse> {
self.check_service("dfdaemon.v2.DfdaemonUpload".to_string())
.await