From 91299c67a008d45d38ffd2d37540249b5df66bf0 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 19 Apr 2018 15:36:01 -0700 Subject: [PATCH] proxy: Remove the `Labeled` middleware in favor of client context labels (#812) The `Labeled` middleware is used to add `DstLabels` to each request. Now that each client context maintains a watch on its endpoint's `DstLabels`, the `Labeled` middleware can safely be removed. This has one subtle behavior change: labels are associated with requests _lazily_, whereas before they were determined _eagerly_. This means that if an endpoints labels are updated before the telemetry system captures the labels for the request, it may use the newer labels. Previously, it would only use the labels at the time that the request originated. --- proxy/src/control/discovery.rs | 5 +- proxy/src/ctx/http.rs | 13 --- proxy/src/outbound.rs | 6 +- proxy/src/telemetry/metrics/labels.rs | 146 +------------------------- proxy/src/telemetry/metrics/mod.rs | 2 +- 5 files changed, 8 insertions(+), 164 deletions(-) diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 4d883c168..74a40f245 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -29,7 +29,7 @@ use transport::DnsNameAndPort; use control::cache::{Cache, CacheChange, Exists}; -use ::telemetry::metrics::{DstLabels, Labeled}; +use ::telemetry::metrics::DstLabels; /// A handle to start watching a destination for address changes. #[derive(Clone, Debug)] @@ -274,7 +274,7 @@ where type Request = B::Request; type Response = B::Response; type Error = B::Error; - type Service = Labeled; + type Service = B::Service; type DiscoverError = (); fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -295,7 +295,6 @@ where let endpoint = Endpoint::new(addr, labels_watch.clone()); let service = self.bind.bind(&endpoint) - .map(|svc| Labeled::new(svc, labels_watch)) .map_err(|_| ())?; return Ok(Async::Ready(Change::Insert(addr, service))) diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index 80b366698..f72754c0e 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use control::discovery::DstLabelsWatch; use ctx; -use telemetry::metrics::DstLabels; /// Describes a stream's request headers. @@ -20,11 +19,6 @@ pub struct Request { /// Identifies the proxy client that dispatched the request. pub client: Arc, - - /// Optional information on the request's destination service, which may - /// be provided by the control plane for destinations lookups against its - /// discovery API. - pub dst_labels: Option, } /// Describes a stream's response headers. @@ -49,19 +43,12 @@ impl Request { client: &Arc, id: usize, ) -> Arc { - // Look up whether the request has been extended with optional - // destination labels from the control plane's discovery API. - let dst_labels = request - .extensions() - .get::() - .cloned(); let r = Self { id, uri: request.uri().clone(), method: request.method().clone(), server: Arc::clone(server), client: Arc::clone(client), - dst_labels, }; Arc::new(r) diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 4b5b27610..7108c1468 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -18,7 +18,6 @@ use bind::{self, Bind, Protocol}; use control::{self, discovery}; use control::discovery::Bind as BindTrait; use ctx; -use telemetry::metrics; use timeout::Timeout; use transparency::h1; use transport::{DnsNameAndPort, Host, HostAndPort}; @@ -170,7 +169,7 @@ where type Request = http::Request; type Response = bind::HttpResponse; type Error = ::Error; - type Service = metrics::Labeled>; + type Service = bind::Service; type DiscoverError = BindError; fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -185,9 +184,6 @@ where // closing down when the connection is no longer usable. if let Some((addr, bind)) = opt.take() { let svc = bind.bind(&addr.into()) - // The controller has no labels to add to an external - // service. - .map(metrics::Labeled::none) .map_err(|_| BindError::External{ addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index 2f00330b8..bc580bde5 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -1,21 +1,9 @@ - -use futures::Poll; -use futures_watch::Watch; -use http; -use tower::Service; - use std::fmt::{self, Write}; use std::sync::Arc; -use ctx; +use http; -/// Middleware that adds an extension containing an optional set of metric -/// labels to requests. -#[derive(Clone, Debug)] -pub struct Labeled { - metric_labels: Option>>, - inner: T, -} +use ctx; #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct RequestLabels { @@ -63,56 +51,14 @@ enum Direction { #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct DstLabels(Arc); -// ===== impl Labeled ===== - -impl Labeled { - - /// Wrap `inner` with a `Watch` on dyanmically updated labels. - pub fn new(inner: T, watch: Watch>) -> Self { - Self { - metric_labels: Some(watch), - inner, - } - } - - /// Wrap `inner` with no `metric_labels`. - pub fn none(inner: T) -> Self { - Self { metric_labels: None, inner } - } -} - -impl Service for Labeled -where - T: Service>, -{ - type Request = T::Request; - type Response= T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - let mut req = req; - if let Some(labels) = self.metric_labels.as_ref() - .and_then(|labels| (*labels.borrow()).as_ref().cloned()) - { - req.extensions_mut().insert(labels); - } - self.inner.call(req) - } - -} - // ===== impl RequestLabels ===== impl<'a> RequestLabels { pub fn new(req: &ctx::http::Request) -> Self { let direction = Direction::from_context(req.server.proxy.as_ref()); - let outbound_labels = req.dst_labels.as_ref().cloned(); + let outbound_labels = req.dst_labels() + .and_then(|b| b.borrow().clone()); let authority = req.uri .authority_part() @@ -282,87 +228,3 @@ impl fmt::Display for DstLabels { write!(f, "{}", self.0) } } - -#[cfg(test)] -mod test { - use super::*; - - use futures::{Async, Poll, Future}; - use futures::future::{self, FutureResult}; - use http; - use tower::Service; - - - struct MockInnerService; - - impl Service for MockInnerService { - type Request = http::Request<()>; - type Response = Option; - type Error = (); - type Future = FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - future::ok(req.extensions() - .get::() - .map(|&DstLabels(ref inner)| inner.as_ref().to_string())) - } - } - - #[test] - fn no_labels() { - let mut labeled = Labeled { - metric_labels: None, - inner: MockInnerService, - }; - let labels = labeled.call(http::Request::new(())) - .wait().expect("call"); - assert_eq!(None, labels); - } - - #[test] - fn one_label() { - let (watch, _) = - Watch::new(DstLabels::new(vec![("foo", "bar")])); - let mut labeled = Labeled { - metric_labels: Some(watch), - inner: MockInnerService, - }; - let labels = labeled.call(http::Request::new(())) - .wait().expect("call"); - assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels); - } - - #[test] - fn label_updates() { - let (watch, mut store) = - Watch::new(DstLabels::new(vec![("foo", "bar")])); - let mut labeled = Labeled { - metric_labels: Some(watch), - inner: MockInnerService, - }; - - let labels = labeled.call(http::Request::new(())) - .wait().expect("first call"); - assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels); - - store.store(DstLabels::new(vec![("foo", "baz")])) - .expect("store (\"foo\", \"baz\")"); - let labels = labeled.call(http::Request::new(())) - .wait().expect("second call"); - assert_eq!(Some("dst_foo=\"baz\"".to_string()), labels); - - store.store(DstLabels::new(vec![ - ("foo", "baz"), - ("quux", "quuux") - ])) - .expect("store (\"foo\", \"baz\"), (\"quux\", \"quuux\")"); - let labels = labeled.call(http::Request::new(())) - .wait().expect("third call"); - assert_eq!(Some("dst_foo=\"baz\",dst_quux=\"quuux\"".to_string()), labels); - } - -} diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index e2ccca024..e003f54a7 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -51,7 +51,7 @@ mod latency; use self::labels::{RequestLabels, ResponseLabels}; use self::latency::{BUCKET_BOUNDS, Histogram}; -pub use self::labels::{DstLabels, Labeled}; +pub use self::labels::DstLabels; #[derive(Debug, Clone)] struct Metrics {