From 2109f375316536a86b9d5f29ebcad2be3e162d9c Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 26 Oct 2018 13:30:09 -0700 Subject: [PATCH] Make a generic Watch stack (#104) The TLS-configuration-watching logic in `app::outbound::tls_config` need not be specific to the outbound types, or even TLS configuration. Instead, this change extends the `watch` stack module with a Stack type that can satisfy the TLS use case independently of the concrete types at play. --- lib/stack/src/watch.rs | 160 +++++++++++++++++++++++++++++++++++------ src/app/main.rs | 4 +- src/app/outbound.rs | 107 +++++---------------------- 3 files changed, 158 insertions(+), 113 deletions(-) diff --git a/lib/stack/src/watch.rs b/lib/stack/src/watch.rs index 9232566f4..0d0f8c6ec 100644 --- a/lib/stack/src/watch.rs +++ b/lib/stack/src/watch.rs @@ -1,15 +1,35 @@ extern crate futures_watch; +use self::futures_watch::Watch; use futures::{future::MapErr, Async, Future, Poll, Stream}; use std::{error, fmt}; use svc; +/// Implemented by targets that can be updated by a `Watch` +pub trait WithUpdate { + type Updated; + + fn with_update(&self, update: &U) -> Self::Updated; +} + +#[derive(Debug)] +pub struct Layer { + watch: Watch, +} + +#[derive(Debug)] +pub struct Stack { + watch: Watch, + inner: M, +} + /// A Service that updates itself as a Watch updates. -#[derive(Clone, Debug)] -pub struct Service> { - watch: futures_watch::Watch, - make: M, +#[derive(Debug)] +pub struct Service, U, M: super::Stack> { + watch: Watch, + target: T, + stack: M, inner: M::Value, } @@ -19,23 +39,77 @@ pub enum Error { Inner(I), } -impl Service +/// A special implemtation of WithUpdate that clones the observed update value. +#[derive(Clone, Debug)] +pub struct CloneUpdate {} + +// === impl Layer === + +pub fn layer(watch: Watch) -> Layer { + Layer { watch } +} + +impl Clone for Layer { + fn clone(&self) -> Self { + Self { + watch: self.watch.clone(), + } + } +} + +impl super::Layer for Layer where - M: super::Stack, + T: WithUpdate + Clone, + M: super::Stack + Clone, { - pub fn try(watch: futures_watch::Watch, make: M) -> Result { - let inner = make.make(&*watch.borrow())?; - Ok(Self { - watch, - make, + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { inner, + watch: self.watch.clone(), + } + } +} + +// === impl Stack === + +impl Clone for Stack { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + watch: self.watch.clone(), + } + } +} + +impl super::Stack for Stack +where + T: WithUpdate + Clone, + M: super::Stack + Clone, +{ + type Value = Service; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + let inner = self.inner.make(&target.with_update(&*self.watch.borrow()))?; + Ok(Service { + inner, + watch: self.watch.clone(), + target: target.clone(), + stack: self.inner.clone(), }) } } -impl svc::Service for Service +// === impl Service === + +impl svc::Service for Service where - M: super::Stack, + T: WithUpdate, + M: super::Stack, M::Value: svc::Service, { type Request = ::Request; @@ -51,11 +125,11 @@ where // // `watch.poll()` can't actually fail; so errors are not considered. while let Ok(Async::Ready(Some(()))) = self.watch.poll() { - let target = self.watch.borrow(); - // `inner` is only updated if `target` is valid. The caller may + let updated = self.target.with_update(&*self.watch.borrow()); + // `inner` is only updated if `updated` is valid. The caller may // choose to continue using the service or discard as is // appropriate. - self.inner = self.make.make(&*target).map_err(Error::Stack)?; + self.inner = self.stack.make(&updated).map_err(Error::Stack)?; } self.inner.poll_ready().map_err(Error::Inner) @@ -66,6 +140,51 @@ where } } +impl Service +where + U: Clone, + M: super::Stack, + M::Value: svc::Service, +{ + pub fn try(watch: Watch, stack: M) -> Result { + let inner = stack.make(&*watch.borrow())?; + Ok(Self { + inner, + watch, + stack, + target: CloneUpdate {}, + }) + } +} + +impl Clone for Service +where + T: WithUpdate + Clone, + M: super::Stack + Clone, + M::Value: svc::Service + Clone, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + watch: self.watch.clone(), + stack: self.stack.clone(), + target: self.target.clone(), + } + } +} + +// === impl CloneUpdate === + +impl WithUpdate for CloneUpdate { + type Updated = U; + + fn with_update(&self, update: &U) -> U { + update.clone() + } +} + +// === impl Error === + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -82,11 +201,11 @@ mod tests { extern crate linkerd2_task as task; extern crate tokio; - use futures::future; use self::task::test_util::BlockOnFor; use self::tokio::runtime::current_thread::Runtime; - use std::time::Duration; use super::*; + use futures::future; + use std::time::Duration; use svc::Service as _Service; const TIMEOUT: Duration = Duration::from_secs(60); @@ -116,8 +235,7 @@ mod tests { } macro_rules! call { ($svc:expr) => { - rt.block_on_for(TIMEOUT, $svc.call(())) - .expect("call") + rt.block_on_for(TIMEOUT, $svc.call(())).expect("call") }; } @@ -130,7 +248,7 @@ mod tests { } } - let (watch, mut store) = futures_watch::Watch::new(1); + let (watch, mut store) = Watch::new(1); let mut svc = Service::try(watch, Stack).unwrap(); assert_ready!(svc); diff --git a/src/app/main.rs b/src/app/main.rs index 450fdcae3..379092be7 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -23,7 +23,7 @@ use proxy::{ http::{ balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router, }, - limit, reconnect, timeout, + limit, reconnect, timeout }; use svc::{self, Layer as _Layer, Stack as _Stack}; use tap; @@ -260,7 +260,7 @@ where resolver, ))) .and_then(outbound::orig_proto_upgrade::Layer::new()) - .and_then(outbound::tls_config::Layer::new(tls_client_config)) + .and_then(svc::watch::layer(tls_client_config)) .and_then(proxy::http::metrics::Layer::new( http_metrics, classify::Classify, diff --git a/src/app/outbound.rs b/src/app/outbound.rs index 1f852f105..a66224a60 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -4,9 +4,9 @@ use std::fmt; use app::Destination; use control::destination::{Metadata, ProtocolHint}; use proxy::http::{client, router, normalize_uri::ShouldNormalizeUri}; -use svc::stack_per_request::ShouldStackPerRequest; +use svc::{self, stack_per_request::ShouldStackPerRequest}; use tap; -use transport::connect; +use transport::{connect, tls}; #[derive(Clone, Debug)] pub struct Endpoint { @@ -42,6 +42,21 @@ impl ShouldStackPerRequest for Endpoint { } } +impl svc::watch::WithUpdate for Endpoint { + type Updated = Self; + + fn with_update(&self, client_config: &tls::ConditionalClientConfig) -> Self::Updated { + let mut ep = self.clone(); + ep.connect.tls = ep.metadata.tls_identity().and_then(|identity| { + client_config.as_ref().map(|config| tls::ConnectionConfig { + server_identity: identity.clone(), + config: config.clone(), + }) + }); + ep + } +} + impl fmt::Display for Endpoint { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.connect.addr.fmt(f) @@ -242,91 +257,3 @@ pub mod orig_proto_upgrade { } } } - -pub mod tls_config { - use futures_watch::Watch; - - use super::Endpoint; - use svc; - use transport::tls; - - #[derive(Debug, Clone)] - pub struct Layer{ - watch: Watch, - } - - #[derive(Clone, Debug)] - pub struct Stack> { - watch: Watch, - inner: M, - } - - #[derive(Clone, Debug)] - pub struct StackEndpointWithTls> { - endpoint: Endpoint, - inner: M, - } - - impl Layer { - pub fn new(watch: Watch) -> Self { - Layer { - watch, - } - } - } - - impl svc::Layer for Layer - where - M: svc::Stack + Clone, - { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; - - fn bind(&self, inner: M) -> Self::Stack { - Stack { - inner, - watch: self.watch.clone(), - } - } - } - - impl svc::Stack for Stack - where - M: svc::Stack + Clone, - { - type Value = svc::watch::Service>; - type Error = M::Error; - - fn make(&self, endpoint: &Endpoint) -> Result { - let inner = StackEndpointWithTls { - endpoint: endpoint.clone(), - inner: self.inner.clone(), - }; - svc::watch::Service::try(self.watch.clone(), inner) - } - } - - impl svc::Stack for StackEndpointWithTls - where - M: svc::Stack, - { - type Value = M::Value; - type Error = M::Error; - - fn make( - &self, - client_config: &tls::ConditionalClientConfig, - ) -> Result { - let mut endpoint = self.endpoint.clone(); - endpoint.connect.tls = endpoint.metadata.tls_identity().and_then(|identity| { - client_config.as_ref().map(|config| tls::ConnectionConfig { - server_identity: identity.clone(), - config: config.clone(), - }) - }); - - self.inner.make(&endpoint) - } - } -}