profiles: Drive profile discovery on a daemon task (#156)

The profile router currently is responsible for driving the state of
profile discovery; but this means that, if a service is not polled for
traffic, the proxy may not drive discovery (so that requests may
timeout, etc).

This change moves this discovery onto a daemon task that sends profile
updates to the service over an mpsc with capacity of 1.
This commit is contained in:
Oliver Gould 2018-12-05 12:40:29 -08:00 committed by GitHub
parent b9ffbb7f93
commit 0065c13751
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 53 deletions

View File

@ -1,13 +1,17 @@
use futures::{Async, Future, Poll, Stream};
use bytes::IntoBuf;
use futures::sync::mpsc;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use http;
use regex::Regex;
use std::fmt;
use std::time::Duration;
use tokio::executor::{DefaultExecutor, Executor};
use tokio_timer::{clock, Delay};
use tower_grpc::{self as grpc, Body, BoxBody};
use tower_http::HttpService;
use api::destination as api;
use never::Never;
use proxy::http::profiles;
use NameAddr;
@ -18,7 +22,9 @@ pub struct Client<T> {
backoff: Duration,
}
pub struct Rx<T>
pub struct Rx(mpsc::Receiver<profiles::Routes>);
struct Daemon<T>
where
T: HttpService<BoxBody>,
T::ResponseBody: Body,
@ -27,6 +33,7 @@ where
backoff: Duration,
service: Option<T>,
state: State<T>,
tx: mpsc::Sender<profiles::Routes>,
}
enum State<T>
@ -44,57 +51,125 @@ where
impl<T> Client<T>
where
T: HttpService<BoxBody> + Clone,
T::ResponseBody: Body,
T: HttpService<BoxBody> + Clone + Send + 'static,
T::ResponseBody: Body + Send + 'static,
<T::ResponseBody as Body>::Data: Send + 'static,
<<T::ResponseBody as Body>::Data as IntoBuf>::Buf: Send + 'static,
T::Error: fmt::Debug,
{
pub fn new(service: Option<T>, backoff: Duration) -> Self {
Self {
service,
backoff,
}
Self { service, backoff }
}
}
impl<T> profiles::GetRoutes for Client<T>
where
T: HttpService<BoxBody> + Clone,
T::ResponseBody: Body,
T: HttpService<BoxBody> + Clone + Send + 'static,
T::Future: Send + 'static,
T::ResponseBody: Body + Send + 'static,
<T::ResponseBody as Body>::Data: Send + 'static,
<<T::ResponseBody as Body>::Data as IntoBuf>::Buf: Send + 'static,
T::Error: fmt::Debug,
{
type Stream = Rx<T>;
{
type Stream = Rx;
fn get_routes(&self, dst: &NameAddr) -> Option<Self::Stream> {
Some(Rx {
let (tx, rx) = mpsc::channel(1);
let daemon = Daemon {
tx,
dst: format!("{}", dst),
state: State::Disconnected,
service: self.service.clone(),
backoff: self.backoff,
})
};
let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
spawn.ok().map(|_| Rx(rx))
}
}
// === impl Rx ===
impl<T> Stream for Rx<T>
impl Stream for Rx {
type Item = profiles::Routes;
type Error = Never;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll().or_else(|_| Ok(None.into()))
}
}
// === impl Daemon ===
enum StreamState {
SendLost,
RecvDone,
}
impl<T> Daemon<T>
where
T: HttpService<BoxBody> + Clone,
T::ResponseBody: Body,
T::Error: fmt::Debug,
{
type Item = Vec<(profiles::RequestMatch, profiles::Route)>;
type Error = profiles::Error;
fn proxy_stream(
rx: &mut grpc::Streaming<api::DestinationProfile, T::ResponseBody>,
tx: &mut mpsc::Sender<profiles::Routes>,
) -> Async<StreamState> {
loop {
match tx.poll_ready() {
Ok(Async::NotReady) => return Async::NotReady,
Ok(Async::Ready(())) => {}
Err(_) => return StreamState::SendLost.into(),
}
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let service = match self.service {
Some(ref s) => s,
None => return Ok(Async::Ready(Some(Vec::new()))),
};
match rx.poll() {
Ok(Async::NotReady) => return Async::NotReady,
Ok(Async::Ready(None)) => return StreamState::RecvDone.into(),
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}", profile);
let rs = profile.routes.into_iter().filter_map(convert_route);
match tx.start_send(rs.collect()) {
Ok(AsyncSink::Ready) => {} // continue
Ok(AsyncSink::NotReady(_)) => {
info!("dropping profile update due to a full buffer");
// This must have been because another task stole
// our tx slot? It seems pretty unlikely, but possible?
return Async::NotReady;
}
Err(_) => {
return StreamState::SendLost.into();
}
}
}
Err(e) => {
warn!("profile stream failed: {:?}", e);
return StreamState::RecvDone.into();
}
}
}
}
}
impl<T> Future for Daemon<T>
where
T: HttpService<BoxBody> + Clone,
T::ResponseBody: Body,
T::Error: fmt::Debug,
{
type Item = ();
type Error = Never;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
self.state = match self.state {
State::Disconnected => {
let mut client = api::client::Destination::new(service.clone());
let mut client = match self.service {
Some(ref svc) => api::client::Destination::new(svc.clone()),
None => return Ok(Async::Ready(())),
};
let req = api::GetDestination {
scheme: "k8s".to_owned(),
path: self.dst.clone(),
@ -114,19 +189,10 @@ where
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},
State::Streaming(ref mut s) => match s.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}", profile);
let rs = profile.routes.into_iter().filter_map(convert_route);
return Ok(Async::Ready(Some(rs.collect())));
}
Ok(Async::Ready(None)) => {
debug!("profile stream ended");
State::Backoff(Delay::new(clock::now() + self.backoff))
}
Err(e) => {
warn!("profile stream failed: {:?}", e);
State::Streaming(ref mut s) => match Self::proxy_stream(s, &mut self.tx) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(StreamState::SendLost) => return Ok(().into()),
Async::Ready(StreamState::RecvDone) => {
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},

View File

@ -6,7 +6,8 @@ use indexmap::IndexMap;
use regex::Regex;
use std::iter::FromIterator;
use std::sync::Arc;
use std::{error, fmt};
use never::Never;
use NameAddr;
@ -17,7 +18,7 @@ pub type Routes = Vec<(RequestMatch, Route)>;
/// The stream updates with all routes for the given destination. The stream
/// never ends and cannot fail.
pub trait GetRoutes {
type Stream: Stream<Item = Routes, Error = Error>;
type Stream: Stream<Item = Routes, Error = Never>;
fn get_routes(&self, dst: &NameAddr) -> Option<Self::Stream>;
}
@ -35,9 +36,6 @@ pub trait CanGetDestination {
fn get_destination(&self) -> Option<&NameAddr>;
}
#[derive(Debug)]
pub enum Error {}
#[derive(Clone, Debug, Default)]
pub struct Route {
labels: Arc<IndexMap<String, String>>,
@ -145,16 +143,6 @@ impl ResponseMatch {
}
}
// === impl Error ===
impl fmt::Display for Error {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
unreachable!()
}
}
impl error::Error for Error {}
/// A stack module that produces a Service that routes requests through alternate
/// middleware configurations
///
@ -170,6 +158,8 @@ pub mod router {
use http;
use std::{error, fmt};
use never::Never;
use dns;
use svc;
@ -323,7 +313,7 @@ pub mod router {
impl<G, T, R> Service<G, T, R>
where
G: Stream<Item = Routes, Error = super::Error>,
G: Stream<Item = Routes, Error = Never>,
T: WithRoute + Clone,
R: svc::Stack<T::Output> + Clone,
{
@ -347,7 +337,7 @@ pub mod router {
impl<G, T, R, B> svc::Service<http::Request<B>> for Service<G, T, R>
where
G: Stream<Item = Routes, Error = super::Error>,
G: Stream<Item = Routes, Error = Never>,
T: WithRoute + Clone,
R: svc::Stack<T::Output> + Clone,
R::Value: svc::Service<http::Request<B>>,