proxy: Implement a WatchService (#1177)

WatchService is a middleware that rebinds its inner service
each time a Watch updates.

This is planned to be used to rebind endpoint stacks when TLS
configuration changes. Later, it should probably be moved into
the tower repo.
This commit is contained in:
Oliver Gould 2018-06-21 15:41:41 -07:00 committed by GitHub
parent c4d570aa26
commit 44d7ee06ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 1 deletions

View File

@ -89,8 +89,10 @@ mod transparency;
mod transport;
pub mod timeout;
mod tower_fn; // TODO: move to tower-fn
mod watch_service; // TODO: move to tower
use bind::Bind;
use conditional::Conditional;
use connection::BoundPort;
use inbound::Inbound;
use map_err::MapErr;
@ -98,7 +100,7 @@ use task::MainRuntime;
use transparency::{HttpBody, Server};
pub use transport::{AddrInfo, GetOriginalDst, SoOriginalDst, tls};
use outbound::Outbound;
use conditional::Conditional;
pub use watch_service::WatchService;
/// Runs a sidecar proxy.
///

120
proxy/src/watch_service.rs Normal file
View File

@ -0,0 +1,120 @@
use futures::{Async, Poll, Stream};
use futures_watch::Watch;
use tower_service::Service;
pub trait Rebind<T> {
type Service: Service;
fn rebind(&mut self, t: &T) -> Self::Service;
}
/// A Service that updates itself as a Watch updates.
#[derive(Debug)]
pub struct WatchService<T, R: Rebind<T>> {
watch: Watch<T>,
rebind: R,
inner: R::Service,
}
impl<T, R: Rebind<T>> WatchService<T, R> {
pub fn new(watch: Watch<T>, mut rebind: R) -> WatchService<T, R> {
let inner = rebind.rebind(&*watch.borrow());
WatchService { watch, rebind, inner }
}
}
impl<T, R: Rebind<T>> Service for WatchService<T, R> {
type Request = <R::Service as Service>::Request;
type Response = <R::Service as Service>::Response;
type Error = <R::Service as Service>::Error;
type Future = <R::Service as Service>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// Check to see if the watch has been updated and, if so, rebind the service.
//
// `watch.poll()` can't actually fail; so errors are not considered.
while let Ok(Async::Ready(Some(()))) = self.watch.poll() {
self.inner = self.rebind.rebind(&*self.watch.borrow());
}
self.inner.poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.inner.call(req)
}
}
impl<T, S, F> Rebind<T> for F
where
S: Service,
for<'t> F: FnMut(&'t T) -> S,
{
type Service = S;
fn rebind(&mut self, t: &T) -> S {
(self)(t)
}
}
#[cfg(test)]
mod tests {
use futures::future;
use std::time::Duration;
use task::test_util::BlockOnFor;
use tokio::runtime::current_thread::Runtime;
use super::*;
const TIMEOUT: Duration = Duration::from_secs(60);
#[test]
fn rebind() {
struct Svc(usize);
impl Service for Svc {
type Request = ();
type Response = usize;
type Error = ();
type Future = future::FutureResult<usize, ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(().into())
}
fn call(&mut self, _: ()) -> Self::Future {
future::ok(self.0)
}
}
let mut rt = Runtime::new().unwrap();
macro_rules! assert_ready {
($svc:expr) => {
rt.block_on_for(TIMEOUT, future::poll_fn(|| $svc.poll_ready()))
.expect("ready")
};
}
macro_rules! call {
($svc:expr) => {
rt.block_on_for(TIMEOUT, $svc.call(()))
.expect("call")
};
}
let (watch, mut store) = Watch::new(1);
let mut svc = WatchService::new(watch, |n: &usize| Svc(*n));
assert_ready!(svc);
assert_eq!(call!(svc), 1);
assert_ready!(svc);
assert_eq!(call!(svc), 1);
store.store(2).expect("store");
assert_ready!(svc);
assert_eq!(call!(svc), 2);
store.store(3).expect("store");
store.store(4).expect("store");
assert_ready!(svc);
assert_eq!(call!(svc), 4);
drop(store);
assert_ready!(svc);
assert_eq!(call!(svc), 4);
}
}