From e0eb83f5abd0726e3a316333af67ce7a19104edc Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 7 Jun 2023 22:10:01 +0800 Subject: [PATCH] feat: add graceful shutdown to tokio (#41) Signed-off-by: Gaius --- src/bin/dfdaemon/main.rs | 32 ++++++++++++++++++++++++++++---- src/health/mod.rs | 18 +++++++++++++++--- src/metrics/mod.rs | 17 ++++++++++++++--- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/src/bin/dfdaemon/main.rs b/src/bin/dfdaemon/main.rs index 3dcdce20..1f8ab648 100644 --- a/src/bin/dfdaemon/main.rs +++ b/src/bin/dfdaemon/main.rs @@ -23,7 +23,8 @@ use client::metrics::Metrics; use client::tracing::init_tracing; use std::error::Error; use std::path::PathBuf; -use tracing::Level; +use tokio::signal::unix::{signal, SignalKind}; +use tracing::{info, Level}; #[derive(Debug, Parser)] #[command( @@ -61,7 +62,7 @@ struct Args { } #[tokio::main] -pub async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { // Parse command line arguments. let args = Args::parse(); @@ -73,11 +74,34 @@ pub async fn main() -> Result<(), Box> { // Start metrics server. let metrics = Metrics::new(config.network.enable_ipv6); - tokio::spawn(async move { metrics.serve().await }); + let metrics_handle = tokio::spawn(async move { metrics.run(shutdown_signal()).await }); // Start health server. let health = Health::new(config.network.enable_ipv6); - health.serve().await; + let health_handle = tokio::spawn(async move { health.run(shutdown_signal()).await }); + + // Wait for servers to exit. + tokio::try_join!(metrics_handle, health_handle)?; Ok(()) } + +// shutdown_signal returns a future that will resolve when a SIGINT, SIGTERM or SIGQUIT signal is +// received by the process. +async fn shutdown_signal() { + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigquit = signal(SignalKind::quit()).unwrap(); + + tokio::select! { + _ = sigint.recv() => { + info!("received SIGINT, shutting down"); + }, + _ = sigterm.recv() => { + info!("received SIGTERM, shutting down"); + } + _ = sigquit.recv() => { + info!("received SIGQUIT, shutting down"); + } + } +} diff --git a/src/health/mod.rs b/src/health/mod.rs index 97b1da50..7af68396 100644 --- a/src/health/mod.rs +++ b/src/health/mod.rs @@ -14,7 +14,9 @@ * limitations under the License. */ +use std::future::Future; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use tracing::info; use warp::{Filter, Rejection, Reply}; // Health is the health server. @@ -39,13 +41,23 @@ impl Health { } } - // serve starts the metrics server. - pub async fn serve(&self) { + // run starts the metrics server. + pub async fn run(&self, shutdown: impl Future) { let health_route = warp::path!("healthy") .and(warp::get()) .and(warp::path::end()) .and_then(Self::health_handler); - warp::serve(health_route).run(self.addr).await; + + tokio::select! { + _ = warp::serve(health_route).run(self.addr) => { + // Health server ended. + info!("health server ended"); + } + _ = shutdown => { + // Health server shutting down with signals. + info!("health server shutting down"); + } + } } // health_handler handles the health check request. diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index e196804b..997eaaee 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -17,8 +17,10 @@ use crate::config::{NAME, SERVICE_NAME}; use lazy_static::lazy_static; use prometheus::{gather, Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder}; +use std::future::Future; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use tracing::error; +use tracing::info; use warp::{Filter, Rejection, Reply}; lazy_static! { @@ -62,12 +64,21 @@ impl Metrics { } } - // serve starts the metrics server. - pub async fn serve(&self) { + // run starts the metrics server. + pub async fn run(&self, shutdown: impl Future) { self.register_custom_metrics(); let metrics_route = warp::path!("metrics").and_then(Self::metrics_handler); - warp::serve(metrics_route).run(self.addr).await; + tokio::select! { + _ = warp::serve(metrics_route).run(self.addr) => { + // Metrics server ended. + info!("metrics server ended"); + } + _ = shutdown => { + // Metrics server shutting down with signals. + info!("metrics server shutting down"); + } + } } // register_custom_metrics registers all custom metrics.