From 0065c13751e1a51d557866818c8d405a48f54ec9 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 5 Dec 2018 12:40:29 -0800 Subject: [PATCH] profiles: Drive profile discovery on a daemon task (#156) The profile router currently is responsible for driving the state of profile discovery; but this means that, if a service is not polled for traffic, the proxy may not drive discovery (so that requests may timeout, etc). This change moves this discovery onto a daemon task that sends profile updates to the service over an mpsc with capacity of 1. --- src/app/profiles.rs | 138 +++++++++++++++++++++++++++---------- src/proxy/http/profiles.rs | 24 ++----- 2 files changed, 109 insertions(+), 53 deletions(-) diff --git a/src/app/profiles.rs b/src/app/profiles.rs index 4db0c9ed5..179e8c7af 100644 --- a/src/app/profiles.rs +++ b/src/app/profiles.rs @@ -1,13 +1,17 @@ -use futures::{Async, Future, Poll, Stream}; +use bytes::IntoBuf; +use futures::sync::mpsc; +use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use http; use regex::Regex; use std::fmt; use std::time::Duration; +use tokio::executor::{DefaultExecutor, Executor}; use tokio_timer::{clock, Delay}; use tower_grpc::{self as grpc, Body, BoxBody}; use tower_http::HttpService; use api::destination as api; +use never::Never; use proxy::http::profiles; use NameAddr; @@ -18,7 +22,9 @@ pub struct Client { backoff: Duration, } -pub struct Rx +pub struct Rx(mpsc::Receiver); + +struct Daemon where T: HttpService, T::ResponseBody: Body, @@ -27,6 +33,7 @@ where backoff: Duration, service: Option, state: State, + tx: mpsc::Sender, } enum State @@ -44,57 +51,125 @@ where impl Client where - T: HttpService + Clone, - T::ResponseBody: Body, + T: HttpService + Clone + Send + 'static, + T::ResponseBody: Body + Send + 'static, + ::Data: Send + 'static, + <::Data as IntoBuf>::Buf: Send + 'static, T::Error: fmt::Debug, { pub fn new(service: Option, backoff: Duration) -> Self { - Self { - service, - backoff, - } + Self { service, backoff } } } impl profiles::GetRoutes for Client where - T: HttpService + Clone, - T::ResponseBody: Body, + T: HttpService + Clone + Send + 'static, + T::Future: Send + 'static, + T::ResponseBody: Body + Send + 'static, + ::Data: Send + 'static, + <::Data as IntoBuf>::Buf: Send + 'static, T::Error: fmt::Debug, -{ - type Stream = Rx; + { + type Stream = Rx; fn get_routes(&self, dst: &NameAddr) -> Option { - Some(Rx { + let (tx, rx) = mpsc::channel(1); + + let daemon = Daemon { + tx, dst: format!("{}", dst), state: State::Disconnected, service: self.service.clone(), backoff: self.backoff, - }) + }; + let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ()))); + + spawn.ok().map(|_| Rx(rx)) } } // === impl Rx === -impl Stream for Rx +impl Stream for Rx { + type Item = profiles::Routes; + type Error = Never; + + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll().or_else(|_| Ok(None.into())) + } +} + +// === impl Daemon === + +enum StreamState { + SendLost, + RecvDone, +} + +impl Daemon where T: HttpService + Clone, T::ResponseBody: Body, T::Error: fmt::Debug, { - type Item = Vec<(profiles::RequestMatch, profiles::Route)>; - type Error = profiles::Error; + fn proxy_stream( + rx: &mut grpc::Streaming, + tx: &mut mpsc::Sender, + ) -> Async { + loop { + match tx.poll_ready() { + Ok(Async::NotReady) => return Async::NotReady, + Ok(Async::Ready(())) => {} + Err(_) => return StreamState::SendLost.into(), + } - fn poll(&mut self) -> Poll, Self::Error> { - let service = match self.service { - Some(ref s) => s, - None => return Ok(Async::Ready(Some(Vec::new()))), - }; + match rx.poll() { + Ok(Async::NotReady) => return Async::NotReady, + Ok(Async::Ready(None)) => return StreamState::RecvDone.into(), + Ok(Async::Ready(Some(profile))) => { + debug!("profile received: {:?}", profile); + let rs = profile.routes.into_iter().filter_map(convert_route); + match tx.start_send(rs.collect()) { + Ok(AsyncSink::Ready) => {} // continue + Ok(AsyncSink::NotReady(_)) => { + info!("dropping profile update due to a full buffer"); + // This must have been because another task stole + // our tx slot? It seems pretty unlikely, but possible? + return Async::NotReady; + } + Err(_) => { + return StreamState::SendLost.into(); + } + } + } + Err(e) => { + warn!("profile stream failed: {:?}", e); + return StreamState::RecvDone.into(); + } + } + } + } +} +impl Future for Daemon +where + T: HttpService + Clone, + T::ResponseBody: Body, + T::Error: fmt::Debug, +{ + type Item = (); + type Error = Never; + + fn poll(&mut self) -> Poll { loop { self.state = match self.state { State::Disconnected => { - let mut client = api::client::Destination::new(service.clone()); + let mut client = match self.service { + Some(ref svc) => api::client::Destination::new(svc.clone()), + None => return Ok(Async::Ready(())), + }; + let req = api::GetDestination { scheme: "k8s".to_owned(), path: self.dst.clone(), @@ -114,19 +189,10 @@ where State::Backoff(Delay::new(clock::now() + self.backoff)) } }, - State::Streaming(ref mut s) => match s.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some(profile))) => { - debug!("profile received: {:?}", profile); - let rs = profile.routes.into_iter().filter_map(convert_route); - return Ok(Async::Ready(Some(rs.collect()))); - } - Ok(Async::Ready(None)) => { - debug!("profile stream ended"); - State::Backoff(Delay::new(clock::now() + self.backoff)) - } - Err(e) => { - warn!("profile stream failed: {:?}", e); + State::Streaming(ref mut s) => match Self::proxy_stream(s, &mut self.tx) { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(StreamState::SendLost) => return Ok(().into()), + Async::Ready(StreamState::RecvDone) => { State::Backoff(Delay::new(clock::now() + self.backoff)) } }, diff --git a/src/proxy/http/profiles.rs b/src/proxy/http/profiles.rs index a682934ef..16528fc61 100644 --- a/src/proxy/http/profiles.rs +++ b/src/proxy/http/profiles.rs @@ -6,7 +6,8 @@ use indexmap::IndexMap; use regex::Regex; use std::iter::FromIterator; use std::sync::Arc; -use std::{error, fmt}; + +use never::Never; use NameAddr; @@ -17,7 +18,7 @@ pub type Routes = Vec<(RequestMatch, Route)>; /// The stream updates with all routes for the given destination. The stream /// never ends and cannot fail. pub trait GetRoutes { - type Stream: Stream; + type Stream: Stream; fn get_routes(&self, dst: &NameAddr) -> Option; } @@ -35,9 +36,6 @@ pub trait CanGetDestination { fn get_destination(&self) -> Option<&NameAddr>; } -#[derive(Debug)] -pub enum Error {} - #[derive(Clone, Debug, Default)] pub struct Route { labels: Arc>, @@ -145,16 +143,6 @@ impl ResponseMatch { } } -// === impl Error === - -impl fmt::Display for Error { - fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { - unreachable!() - } -} - -impl error::Error for Error {} - /// A stack module that produces a Service that routes requests through alternate /// middleware configurations /// @@ -170,6 +158,8 @@ pub mod router { use http; use std::{error, fmt}; + use never::Never; + use dns; use svc; @@ -323,7 +313,7 @@ pub mod router { impl Service where - G: Stream, + G: Stream, T: WithRoute + Clone, R: svc::Stack + Clone, { @@ -347,7 +337,7 @@ pub mod router { impl svc::Service> for Service where - G: Stream, + G: Stream, T: WithRoute + Clone, R: svc::Stack + Clone, R::Value: svc::Service>,