From 44d7ee06ca0ba5189051de9a51a07eaec7398175 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 21 Jun 2018 15:41:41 -0700 Subject: [PATCH] proxy: Implement a WatchService (#1177) WatchService is a middleware that rebinds its inner service each time a Watch updates. This is planned to be used to rebind endpoint stacks when TLS configuration changes. Later, it should probably be moved into the tower repo. --- proxy/src/lib.rs | 4 +- proxy/src/watch_service.rs | 120 +++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 proxy/src/watch_service.rs diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index b2f4e7fbd..c8b649d38 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -89,8 +89,10 @@ mod transparency; mod transport; pub mod timeout; mod tower_fn; // TODO: move to tower-fn +mod watch_service; // TODO: move to tower use bind::Bind; +use conditional::Conditional; use connection::BoundPort; use inbound::Inbound; use map_err::MapErr; @@ -98,7 +100,7 @@ use task::MainRuntime; use transparency::{HttpBody, Server}; pub use transport::{AddrInfo, GetOriginalDst, SoOriginalDst, tls}; use outbound::Outbound; -use conditional::Conditional; +pub use watch_service::WatchService; /// Runs a sidecar proxy. /// diff --git a/proxy/src/watch_service.rs b/proxy/src/watch_service.rs new file mode 100644 index 000000000..08b20694f --- /dev/null +++ b/proxy/src/watch_service.rs @@ -0,0 +1,120 @@ +use futures::{Async, Poll, Stream}; +use futures_watch::Watch; +use tower_service::Service; + +pub trait Rebind { + type Service: Service; + fn rebind(&mut self, t: &T) -> Self::Service; +} + +/// A Service that updates itself as a Watch updates. +#[derive(Debug)] +pub struct WatchService> { + watch: Watch, + rebind: R, + inner: R::Service, +} + +impl> WatchService { + pub fn new(watch: Watch, mut rebind: R) -> WatchService { + let inner = rebind.rebind(&*watch.borrow()); + WatchService { watch, rebind, inner } + } +} + +impl> Service for WatchService { + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; + type Future = ::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // Check to see if the watch has been updated and, if so, rebind the service. + // + // `watch.poll()` can't actually fail; so errors are not considered. + while let Ok(Async::Ready(Some(()))) = self.watch.poll() { + self.inner = self.rebind.rebind(&*self.watch.borrow()); + } + + self.inner.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + self.inner.call(req) + } +} + +impl Rebind for F +where + S: Service, + for<'t> F: FnMut(&'t T) -> S, +{ + type Service = S; + fn rebind(&mut self, t: &T) -> S { + (self)(t) + } +} + +#[cfg(test)] +mod tests { + use futures::future; + use std::time::Duration; + use task::test_util::BlockOnFor; + use tokio::runtime::current_thread::Runtime; + use super::*; + + const TIMEOUT: Duration = Duration::from_secs(60); + + #[test] + fn rebind() { + struct Svc(usize); + impl Service for Svc { + type Request = (); + type Response = usize; + type Error = (); + type Future = future::FutureResult; + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + fn call(&mut self, _: ()) -> Self::Future { + future::ok(self.0) + } + } + + let mut rt = Runtime::new().unwrap(); + macro_rules! assert_ready { + ($svc:expr) => { + rt.block_on_for(TIMEOUT, future::poll_fn(|| $svc.poll_ready())) + .expect("ready") + }; + } + macro_rules! call { + ($svc:expr) => { + rt.block_on_for(TIMEOUT, $svc.call(())) + .expect("call") + }; + } + + let (watch, mut store) = Watch::new(1); + let mut svc = WatchService::new(watch, |n: &usize| Svc(*n)); + + assert_ready!(svc); + assert_eq!(call!(svc), 1); + + assert_ready!(svc); + assert_eq!(call!(svc), 1); + + store.store(2).expect("store"); + assert_ready!(svc); + assert_eq!(call!(svc), 2); + + store.store(3).expect("store"); + store.store(4).expect("store"); + assert_ready!(svc); + assert_eq!(call!(svc), 4); + + drop(store); + assert_ready!(svc); + assert_eq!(call!(svc), 4); + } +}