diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 4d883c168..74a40f245 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -29,7 +29,7 @@ use transport::DnsNameAndPort; use control::cache::{Cache, CacheChange, Exists}; -use ::telemetry::metrics::{DstLabels, Labeled}; +use ::telemetry::metrics::DstLabels; /// A handle to start watching a destination for address changes. #[derive(Clone, Debug)] @@ -274,7 +274,7 @@ where type Request = B::Request; type Response = B::Response; type Error = B::Error; - type Service = Labeled; + type Service = B::Service; type DiscoverError = (); fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -295,7 +295,6 @@ where let endpoint = Endpoint::new(addr, labels_watch.clone()); let service = self.bind.bind(&endpoint) - .map(|svc| Labeled::new(svc, labels_watch)) .map_err(|_| ())?; return Ok(Async::Ready(Change::Insert(addr, service))) diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index 80b366698..f72754c0e 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use control::discovery::DstLabelsWatch; use ctx; -use telemetry::metrics::DstLabels; /// Describes a stream's request headers. @@ -20,11 +19,6 @@ pub struct Request { /// Identifies the proxy client that dispatched the request. pub client: Arc, - - /// Optional information on the request's destination service, which may - /// be provided by the control plane for destinations lookups against its - /// discovery API. - pub dst_labels: Option, } /// Describes a stream's response headers. @@ -49,19 +43,12 @@ impl Request { client: &Arc, id: usize, ) -> Arc { - // Look up whether the request has been extended with optional - // destination labels from the control plane's discovery API. - let dst_labels = request - .extensions() - .get::() - .cloned(); let r = Self { id, uri: request.uri().clone(), method: request.method().clone(), server: Arc::clone(server), client: Arc::clone(client), - dst_labels, }; Arc::new(r) diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 4b5b27610..7108c1468 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -18,7 +18,6 @@ use bind::{self, Bind, Protocol}; use control::{self, discovery}; use control::discovery::Bind as BindTrait; use ctx; -use telemetry::metrics; use timeout::Timeout; use transparency::h1; use transport::{DnsNameAndPort, Host, HostAndPort}; @@ -170,7 +169,7 @@ where type Request = http::Request; type Response = bind::HttpResponse; type Error = ::Error; - type Service = metrics::Labeled>; + type Service = bind::Service; type DiscoverError = BindError; fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -185,9 +184,6 @@ where // closing down when the connection is no longer usable. if let Some((addr, bind)) = opt.take() { let svc = bind.bind(&addr.into()) - // The controller has no labels to add to an external - // service. - .map(metrics::Labeled::none) .map_err(|_| BindError::External{ addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index 2f00330b8..bc580bde5 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -1,21 +1,9 @@ - -use futures::Poll; -use futures_watch::Watch; -use http; -use tower::Service; - use std::fmt::{self, Write}; use std::sync::Arc; -use ctx; +use http; -/// Middleware that adds an extension containing an optional set of metric -/// labels to requests. -#[derive(Clone, Debug)] -pub struct Labeled { - metric_labels: Option>>, - inner: T, -} +use ctx; #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct RequestLabels { @@ -63,56 +51,14 @@ enum Direction { #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct DstLabels(Arc); -// ===== impl Labeled ===== - -impl Labeled { - - /// Wrap `inner` with a `Watch` on dyanmically updated labels. - pub fn new(inner: T, watch: Watch>) -> Self { - Self { - metric_labels: Some(watch), - inner, - } - } - - /// Wrap `inner` with no `metric_labels`. - pub fn none(inner: T) -> Self { - Self { metric_labels: None, inner } - } -} - -impl Service for Labeled -where - T: Service>, -{ - type Request = T::Request; - type Response= T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - let mut req = req; - if let Some(labels) = self.metric_labels.as_ref() - .and_then(|labels| (*labels.borrow()).as_ref().cloned()) - { - req.extensions_mut().insert(labels); - } - self.inner.call(req) - } - -} - // ===== impl RequestLabels ===== impl<'a> RequestLabels { pub fn new(req: &ctx::http::Request) -> Self { let direction = Direction::from_context(req.server.proxy.as_ref()); - let outbound_labels = req.dst_labels.as_ref().cloned(); + let outbound_labels = req.dst_labels() + .and_then(|b| b.borrow().clone()); let authority = req.uri .authority_part() @@ -282,87 +228,3 @@ impl fmt::Display for DstLabels { write!(f, "{}", self.0) } } - -#[cfg(test)] -mod test { - use super::*; - - use futures::{Async, Poll, Future}; - use futures::future::{self, FutureResult}; - use http; - use tower::Service; - - - struct MockInnerService; - - impl Service for MockInnerService { - type Request = http::Request<()>; - type Response = Option; - type Error = (); - type Future = FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - future::ok(req.extensions() - .get::() - .map(|&DstLabels(ref inner)| inner.as_ref().to_string())) - } - } - - #[test] - fn no_labels() { - let mut labeled = Labeled { - metric_labels: None, - inner: MockInnerService, - }; - let labels = labeled.call(http::Request::new(())) - .wait().expect("call"); - assert_eq!(None, labels); - } - - #[test] - fn one_label() { - let (watch, _) = - Watch::new(DstLabels::new(vec![("foo", "bar")])); - let mut labeled = Labeled { - metric_labels: Some(watch), - inner: MockInnerService, - }; - let labels = labeled.call(http::Request::new(())) - .wait().expect("call"); - assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels); - } - - #[test] - fn label_updates() { - let (watch, mut store) = - Watch::new(DstLabels::new(vec![("foo", "bar")])); - let mut labeled = Labeled { - metric_labels: Some(watch), - inner: MockInnerService, - }; - - let labels = labeled.call(http::Request::new(())) - .wait().expect("first call"); - assert_eq!(Some("dst_foo=\"bar\"".to_string()), labels); - - store.store(DstLabels::new(vec![("foo", "baz")])) - .expect("store (\"foo\", \"baz\")"); - let labels = labeled.call(http::Request::new(())) - .wait().expect("second call"); - assert_eq!(Some("dst_foo=\"baz\"".to_string()), labels); - - store.store(DstLabels::new(vec![ - ("foo", "baz"), - ("quux", "quuux") - ])) - .expect("store (\"foo\", \"baz\"), (\"quux\", \"quuux\")"); - let labels = labeled.call(http::Request::new(())) - .wait().expect("third call"); - assert_eq!(Some("dst_foo=\"baz\",dst_quux=\"quuux\"".to_string()), labels); - } - -} diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index e2ccca024..e003f54a7 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -51,7 +51,7 @@ mod latency; use self::labels::{RequestLabels, ResponseLabels}; use self::latency::{BUCKET_BOUNDS, Histogram}; -pub use self::labels::{DstLabels, Labeled}; +pub use self::labels::DstLabels; #[derive(Debug, Clone)] struct Metrics {