fix endless loop in graceful shutdown of config server (#43)
* fix endless loop in graceful shutdown of config server Signed-off-by: xixi <i@hexilee.me> * graceful shutdown for connection Signed-off-by: xixi <i@hexilee.me> * exit forcedly Signed-off-by: xixi <i@hexilee.me> * add comments Signed-off-by: xixi <i@hexilee.me> * continue to serve after errors occur Signed-off-by: xixi <i@hexilee.me>
This commit is contained in:
parent
f279d67de9
commit
490cfa3567
|
@ -7,7 +7,7 @@ use std::task::{Context, Poll};
|
|||
use anyhow::Error;
|
||||
use futures::TryStreamExt;
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::server::conn::Http;
|
||||
use hyper::server::conn::{Connection, Http};
|
||||
use hyper::service::Service;
|
||||
use hyper::Body;
|
||||
use tokio::select;
|
||||
|
@ -41,29 +41,25 @@ impl ConfigServer {
|
|||
}
|
||||
|
||||
pub fn serve_interactive(&mut self) {
|
||||
let rx = self.rx.take().unwrap();
|
||||
let mut rx = self.rx.take().unwrap();
|
||||
let mut service = ConfigService(self.proxy.clone());
|
||||
self.task = Some(tokio::spawn(async move {
|
||||
select! {
|
||||
_ = rx => {
|
||||
tracing::trace!("catch signal in config server.");
|
||||
return Ok(());
|
||||
},
|
||||
_ = async {
|
||||
loop {
|
||||
let stream = StdStream::default();
|
||||
let conn = Http::new()
|
||||
.serve_connection(stream, &mut service);
|
||||
if let Err(e) = conn.await {
|
||||
tracing::error!("{}",e);
|
||||
return Err(anyhow::anyhow!("{}",e));
|
||||
}
|
||||
let rx_mut = &mut rx;
|
||||
loop {
|
||||
let stream = StdStream::default();
|
||||
let mut conn = Http::new().serve_connection(stream, &mut service);
|
||||
let conn_mut = &mut conn;
|
||||
select! {
|
||||
_ = &mut *rx_mut => {
|
||||
tracing::trace!("catch signal in config server.");
|
||||
Connection::graceful_shutdown(Pin::new(conn_mut));
|
||||
return Ok(());
|
||||
},
|
||||
ret = &mut *conn_mut => if let Err(e) = ret {
|
||||
tracing::error!("{}",e);
|
||||
}
|
||||
#[allow(unreachable_code)]
|
||||
Ok::<_, anyhow::Error>(())
|
||||
} => {}
|
||||
};
|
||||
Ok(())
|
||||
};
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,9 @@ async fn main() -> anyhow::Result<()> {
|
|||
let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?;
|
||||
signals.wait().await?;
|
||||
config_server.stop().await?;
|
||||
|
||||
// Currently we cannot graceful shutdown the config server.
|
||||
exit(0);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use chaos_tproxy_proxy::handler::http::action::{apply_request_action, Actions, ReplaceAction};
|
||||
use http::header::CONTENT_LENGTH;
|
||||
use http::HeaderMap;
|
||||
use hyper::{Body, Client, Method, Request};
|
||||
use chaos_tproxy_proxy::handler::http::action::{apply_request_action, Actions, ReplaceAction};
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
|
|
Loading…
Reference in New Issue