feat: add graceful shutdown to tokio (#41)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-06-07 22:10:01 +08:00 committed by GitHub
parent 4f03015614
commit e0eb83f5ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 10 deletions

View File

@ -23,7 +23,8 @@ use client::metrics::Metrics;
use client::tracing::init_tracing; use client::tracing::init_tracing;
use std::error::Error; use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
use tracing::Level; use tokio::signal::unix::{signal, SignalKind};
use tracing::{info, Level};
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
#[command( #[command(
@ -61,7 +62,7 @@ struct Args {
} }
#[tokio::main] #[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
// Parse command line arguments. // Parse command line arguments.
let args = Args::parse(); let args = Args::parse();
@ -73,11 +74,34 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
// Start metrics server. // Start metrics server.
let metrics = Metrics::new(config.network.enable_ipv6); let metrics = Metrics::new(config.network.enable_ipv6);
tokio::spawn(async move { metrics.serve().await }); let metrics_handle = tokio::spawn(async move { metrics.run(shutdown_signal()).await });
// Start health server. // Start health server.
let health = Health::new(config.network.enable_ipv6); let health = Health::new(config.network.enable_ipv6);
health.serve().await; let health_handle = tokio::spawn(async move { health.run(shutdown_signal()).await });
// Wait for servers to exit.
tokio::try_join!(metrics_handle, health_handle)?;
Ok(()) Ok(())
} }
// shutdown_signal returns a future that will resolve when a SIGINT, SIGTERM or SIGQUIT signal is
// received by the process.
async fn shutdown_signal() {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
tokio::select! {
_ = sigint.recv() => {
info!("received SIGINT, shutting down");
},
_ = sigterm.recv() => {
info!("received SIGTERM, shutting down");
}
_ = sigquit.recv() => {
info!("received SIGQUIT, shutting down");
}
}
}

View File

@ -14,7 +14,9 @@
* limitations under the License. * limitations under the License.
*/ */
use std::future::Future;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use tracing::info;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
// Health is the health server. // Health is the health server.
@ -39,13 +41,23 @@ impl Health {
} }
} }
// serve starts the metrics server. // run starts the metrics server.
pub async fn serve(&self) { pub async fn run(&self, shutdown: impl Future) {
let health_route = warp::path!("healthy") let health_route = warp::path!("healthy")
.and(warp::get()) .and(warp::get())
.and(warp::path::end()) .and(warp::path::end())
.and_then(Self::health_handler); .and_then(Self::health_handler);
warp::serve(health_route).run(self.addr).await;
tokio::select! {
_ = warp::serve(health_route).run(self.addr) => {
// Health server ended.
info!("health server ended");
}
_ = shutdown => {
// Health server shutting down with signals.
info!("health server shutting down");
}
}
} }
// health_handler handles the health check request. // health_handler handles the health check request.

View File

@ -17,8 +17,10 @@
use crate::config::{NAME, SERVICE_NAME}; use crate::config::{NAME, SERVICE_NAME};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{gather, Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder}; use prometheus::{gather, Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
use std::future::Future;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use tracing::error; use tracing::error;
use tracing::info;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
lazy_static! { lazy_static! {
@ -62,12 +64,21 @@ impl Metrics {
} }
} }
// serve starts the metrics server. // run starts the metrics server.
pub async fn serve(&self) { pub async fn run(&self, shutdown: impl Future) {
self.register_custom_metrics(); self.register_custom_metrics();
let metrics_route = warp::path!("metrics").and_then(Self::metrics_handler); let metrics_route = warp::path!("metrics").and_then(Self::metrics_handler);
warp::serve(metrics_route).run(self.addr).await; tokio::select! {
_ = warp::serve(metrics_route).run(self.addr) => {
// Metrics server ended.
info!("metrics server ended");
}
_ = shutdown => {
// Metrics server shutting down with signals.
info!("metrics server shutting down");
}
}
} }
// register_custom_metrics registers all custom metrics. // register_custom_metrics registers all custom metrics.