diff --git a/Cargo.lock b/Cargo.lock index 1d2e87cb2..631b1f6b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,7 @@ dependencies = [ "env_logger 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures-mpsc-lossy 0.3.0", + "futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)", "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -247,6 +248,15 @@ dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-watch" +version = "0.1.0" +source = "git+https://github.com/carllerche/better-future.git#07baa13e91fefe7a51533dfde7b4e69e109ebe14" +dependencies = [ + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "h2" version = "0.1.5" @@ -957,6 +967,7 @@ dependencies = [ "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)" = "" "checksum h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "065fb096fc65bbfb9c765d48c9f3f1a21cdb25ba0d3f82105b38f30ddffa2f7e" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" "checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 10ac58ef1..5e2e46302 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -19,6 +19,7 @@ bytes = "0.4" domain = "0.2.3" env_logger = { version = "0.5", default-features = false } futures = "0.1" +futures-watch = { git = "https://github.com/carllerche/better-future.git" } h2 = "0.1.5" http = "0.1" httparse = "1.2" diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index ea5e0346a..fb22a9109 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -1,11 +1,15 @@ use std::collections::VecDeque; use std::collections::hash_map::{Entry, HashMap}; -use std::net::SocketAddr; use std::fmt; +use std::iter::IntoIterator; +use std::net::SocketAddr; use std::time::Duration; +use std::sync::Arc; use futures::{Async, Future, Poll, Stream}; use futures::sync::mpsc; +use futures_watch; +use http; use tokio_core::reactor::Handle; use tower::Service; use tower_h2::{HttpService, BoxBody, RecvBody}; @@ -16,13 +20,18 @@ use dns::{self, IpAddrListFuture}; use super::fully_qualified_authority::FullyQualifiedAuthority; use conduit_proxy_controller_grpc::common::{Destination, TcpAddress}; -use conduit_proxy_controller_grpc::destination::Update as PbUpdate; +use conduit_proxy_controller_grpc::destination::{ + Update as PbUpdate, + WeightedAddr, +}; use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc}; use transport::DnsNameAndPort; use control::cache::{Cache, CacheChange, Exists}; +use ::telemetry::metrics::prometheus::{DstLabels, Labeled}; + /// A handle to start watching a destination for address changes. #[derive(Clone, Debug)] pub struct Discovery { @@ -33,6 +42,12 @@ pub struct Discovery { #[derive(Debug)] pub struct Watch { rx: mpsc::UnboundedReceiver, + /// Map associating addresses with the `Store` for the watch on that + /// service's metric labels (as provided by the Destination service). + /// + /// This is used to update the `Labeled` middleware on those services + /// without requiring the service stack to be re-bound. + metric_labels: HashMap>>, bind: B, } @@ -60,8 +75,15 @@ pub struct DiscoveryWork> { rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender)>, } +/// Any additional metadata describing a discovered service. +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct Metadata { + /// A set of Prometheus metric labels describing the destination. + metric_labels: Option, +} + struct DestinationSet> { - addrs: Exists>, + addrs: Exists>, query: Option>, dns_query: Option, txs: Vec>, @@ -107,8 +129,9 @@ enum RxError { #[derive(Debug)] enum Update { - Insert(SocketAddr), + Insert(SocketAddr, Metadata), Remove(SocketAddr), + ChangeMetadata(SocketAddr, Metadata), } /// Bind a `SocketAddr` with a protocol. @@ -162,6 +185,7 @@ impl Discovery { Watch { rx, + metric_labels: HashMap::new(), bind, } } @@ -169,37 +193,77 @@ impl Discovery { // ==== impl Watch ===== -impl Discover for Watch +impl Watch { + fn update_metadata(&mut self, + addr: SocketAddr, + meta: Metadata) + -> Result<(), ()> + { + if let Some(store) = self.metric_labels.get_mut(&addr) { + store.store(meta.metric_labels) + .map_err(|e| { + error!("update_metadata: label store error: {:?}", e); + }) + .map(|_| ()) + } else { + // The store has already been removed, so nobody cares about + // the metadata change. We expect that this shouldn't happen, + // but if it does, log a warning and handle it gracefully. + warn!( + "update_metadata: ignoring ChangeMetadata for {:?} \ + because the service no longer exists.", + addr + ); + Ok(()) + } + } +} + +impl Discover for Watch where - B: Bind, + B: Bind>, { type Key = SocketAddr; type Request = B::Request; type Response = B::Response; type Error = B::Error; - type Service = B::Service; + type Service = Labeled; type DiscoverError = (); fn poll(&mut self) -> Poll, Self::DiscoverError> { - let up = self.rx.poll(); - trace!("watch: {:?}", up); - let update = match up { - Ok(Async::Ready(Some(update))) => update, - Ok(Async::Ready(None)) => unreachable!(), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_) => return Err(()), - }; + loop { + let up = self.rx.poll(); + trace!("watch: {:?}", up); + let update = try_ready!(up).expect("discovery stream must be infinite"); - match update { - Update::Insert(addr) => { - let service = self.bind.bind(&addr).map_err(|_| ())?; + match update { + Update::Insert(addr, meta) => { + // Construct a watch for the `Labeled` middleware that will + // wrap the bound service, and insert the store into our map + // so it can be updated later. + let (labels_watch, labels_store) = + futures_watch::Watch::new(meta.metric_labels); + self.metric_labels.insert(addr, labels_store); - Ok(Async::Ready(Change::Insert(addr, service))) - }, - // TODO: handle metadata changes by changing the labeling - // middleware to hold a `futures-watch::Watch` on the label value, - // so it can be updated. - Update::Remove(addr) => Ok(Async::Ready(Change::Remove(addr))), + let service = self.bind.bind(&addr) + .map(|svc| Labeled::new(svc, labels_watch)) + .map_err(|_| ())?; + + return Ok(Async::Ready(Change::Insert(addr, service))) + }, + Update::ChangeMetadata(addr, meta) => { + // Update metadata and continue polling `rx`. + self.update_metadata(addr, meta)?; + }, + Update::Remove(addr) => { + // It's safe to drop the store handle here, even if + // the `Labeled` middleware using the watch handle + // still exists --- it will simply read the final + // value from the watch. + self.metric_labels.remove(&addr); + return Ok(Async::Ready(Change::Remove(addr))); + }, + } } } } @@ -278,8 +342,12 @@ where // them onto the new watch first match set.addrs { Exists::Yes(ref cache) => { - for (&addr, _) in cache { - tx.unbounded_send(Update::Insert(addr)) + for (&addr, meta) in cache { + let update = Update::Insert( + addr, + meta.clone() + ); + tx.unbounded_send(update) .expect("unbounded_send does not fail"); } }, @@ -448,17 +516,19 @@ impl DestinationSet match rx.poll() { Ok(Async::Ready(Some(update))) => match update.update { Some(PbUpdate2::Add(a_set)) => { - exists = Exists::Yes(()); - self.add( - auth, - a_set.addrs.iter().filter_map( - |addr| addr.addr.clone().and_then(pb_to_sock_addr))); + let set_labels = Arc::new(a_set.metric_labels); + let addrs = a_set.addrs.into_iter() + .filter_map(|pb| + pb_to_addr_meta(pb, &set_labels) + ); + self.add(auth, addrs) }, Some(PbUpdate2::Remove(r_set)) => { exists = Exists::Yes(()); self.remove( auth, - r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))); + r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone())) + ); }, Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => { exists = Exists::Yes(()); @@ -501,7 +571,7 @@ impl DestinationSet Ok(Async::Ready(dns::Response::Exists(ips))) => { trace!("positive result of DNS query for {:?}: {:?}", authority, ips); self.add(authority, ips.iter().map(|ip| { - SocketAddr::from((*ip, authority.port)) + (SocketAddr::from((*ip, authority.port)), Metadata::no_metadata()) })); }, Ok(Async::Ready(dns::Response::DoesNotExist)) => { @@ -533,16 +603,22 @@ impl > DestinationSet { } fn add(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) - where A: Iterator + where A: Iterator { let mut cache = match self.addrs.take() { Exists::Yes(mut cache) => cache, Exists::Unknown | Exists::No => Cache::new(), }; cache.update_union( - addrs_to_add.map(|a| (a, ())), - &mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr, - change)); + addrs_to_add, + &mut |(addr, meta), change| Self::on_change( + &mut self.txs, + authority_for_logging, + addr, + meta, + change, + ) + ); self.addrs = Exists::Yes(cache); } @@ -553,8 +629,14 @@ impl > DestinationSet { Exists::Yes(mut cache) => { cache.remove( addrs_to_remove, - &mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr, - change)); + &mut |(addr, meta), change| Self::on_change( + &mut self.txs, + authority_for_logging, + addr, + meta, + change, + ) + ); cache }, Exists::Unknown | Exists::No => Cache::new(), @@ -568,8 +650,14 @@ impl > DestinationSet { match self.addrs.take() { Exists::Yes(mut cache) => { cache.clear( - &mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr, - change)); + &mut |(addr, meta), change| Self::on_change( + &mut self.txs, + authority_for_logging, + addr, + meta, + change + ) + ); }, Exists::Unknown | Exists::No => (), }; @@ -583,20 +671,20 @@ impl > DestinationSet { fn on_change(txs: &mut Vec>, authority_for_logging: &DnsNameAndPort, addr: SocketAddr, + meta: Metadata, change: CacheChange) { - let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) = + let (update_str, update_constructor): (&'static str, fn(SocketAddr, Metadata) -> Update) = match change { CacheChange::Insertion => ("insert", Update::Insert), - CacheChange::Removal => ("remove", Update::Remove), - CacheChange::Modification => { - // TODO: generate `ChangeMetadata` events. - return; - } + CacheChange::Removal => + ("remove", |addr, _| Update::Remove(addr)), + CacheChange::Modification => + ("change metadata for", Update::ChangeMetadata), }; trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); // retain is used to drop any senders that are dead txs.retain(|tx| { - tx.unbounded_send(update_constructor(addr)).is_ok() + tx.unbounded_send(update_constructor(addr, meta.clone())).is_ok() }); } } @@ -644,7 +732,29 @@ where T: HttpService, } } -// ===== impl RxError ===== +// ===== impl Metadata ===== + +impl Metadata { + fn no_metadata() -> Self { + Metadata { + metric_labels: None, + } + } +} + +/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. +fn pb_to_addr_meta(pb: WeightedAddr, set_labels: &Arc>) + -> Option<(SocketAddr, Metadata)> { + let addr = pb.addr.and_then(pb_to_sock_addr)?; + let label_iter = + set_labels.as_ref() + .iter() + .chain(pb.metric_labels.iter()); + let meta = Metadata { + metric_labels: DstLabels::new(label_iter), + }; + Some((addr, meta)) +} fn pb_to_sock_addr(pb: TcpAddress) -> Option { use conduit_proxy_controller_grpc::common::ip_address::Ip; diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index 420cbd729..652d37a23 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -2,6 +2,7 @@ use http; use std::sync::Arc; use ctx; +use telemetry::metrics::prometheus; /// Describes a stream's request headers. #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -17,6 +18,11 @@ pub struct Request { /// Identifies the proxy client that dispatched the request. pub client: Arc, + + /// Optional information on the request's destination service, which may + /// be provided by the control plane for destinations lookups against its + /// discovery API. + pub dst_labels: Option, } /// Describes a stream's response headers. @@ -41,12 +47,19 @@ impl Request { client: &Arc, id: usize, ) -> Arc { + // Look up whether the request has been extended with optional + // destination labels from the control plane's discovery API. + let dst_labels = request + .extensions() + .get::() + .cloned(); let r = Self { id, uri: request.uri().clone(), method: request.method().clone(), server: Arc::clone(server), client: Arc::clone(client), + dst_labels, }; Arc::new(r) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 0a13f6b26..8646e547e 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -11,6 +11,7 @@ extern crate env_logger; #[macro_use] extern crate futures; extern crate futures_mpsc_lossy; +extern crate futures_watch; extern crate h2; extern crate http; extern crate httparse; diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 9c58c78f3..426e78e6b 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -18,6 +18,7 @@ use bind::{self, Bind, Protocol}; use control::{self, discovery}; use control::discovery::Bind as BindTrait; use ctx; +use telemetry::metrics::prometheus; use timeout::Timeout; use transparency::h1; use transport::{DnsNameAndPort, Host, HostAndPort}; @@ -168,8 +169,8 @@ where type Key = SocketAddr; type Request = http::Request; type Response = bind::HttpResponse; - type Error = as tower::Service>::Error; - type Service = bind::Service; + type Error = ::Error; + type Service = prometheus::Labeled>; type DiscoverError = BindError; fn poll(&mut self) -> Poll, Self::DiscoverError> { @@ -184,6 +185,9 @@ where // closing down when the connection is no longer usable. if let Some((addr, bind)) = opt.take() { let svc = bind.bind(&addr) + // The controller has no labels to add to an external + // service. + .map(prometheus::Labeled::none) .map_err(|_| BindError::External{ addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { diff --git a/proxy/src/telemetry/metrics/prometheus/labels.rs b/proxy/src/telemetry/metrics/prometheus/labels.rs index c47f518cc..411548b4c 100644 --- a/proxy/src/telemetry/metrics/prometheus/labels.rs +++ b/proxy/src/telemetry/metrics/prometheus/labels.rs @@ -1,13 +1,31 @@ +use futures::Poll; +use futures_watch::Watch; use http; -use std::fmt; +use tower::Service; + +use std::fmt::{self, Write}; +use std::sync::Arc; use ctx; -#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)] +/// Middleware that adds an extension containing an optional set of metric +/// labels to requests. +#[derive(Clone, Debug)] +pub struct Labeled { + metric_labels: Option>>, + inner: T, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct RequestLabels { - outbound_labels: Option, + /// Was the request in the inbound or outbound direction? + direction: Direction, + + // Additional labels identifying the destination service of an outbound + // request, provided by the Conduit control plane's service discovery. + outbound_labels: Option, /// The value of the `:authority` (HTTP/2) or `Host` (HTTP/1.1) header of /// the request. @@ -36,48 +54,65 @@ enum Classification { Failure, } -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -// TODO: when #429 is done, this will no longer be dead code. -#[allow(dead_code)] -enum PodOwner { - /// The deployment to which this request is being sent. - Deployment(String), - - /// The job to which this request is being sent. - Job(String), - - /// The replica set to which this request is being sent. - ReplicaSet(String), - - /// The replication controller to which this request is being sent. - ReplicationController(String), +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +enum Direction { + Inbound, + Outbound, } -#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)] -struct OutboundLabels { - /// The owner of the destination pod. - // TODO: when #429 is done, this will no longer need to be an Option. - dst: Option, +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct DstLabels(Arc); - /// The namespace to which this request is being sent (if - /// applicable). - namespace: Option +// ===== impl Labeled ===== + +impl Labeled { + + /// Wrap `inner` with a `Watch` on dyanmically updated labels. + pub fn new(inner: T, watch: Watch>) -> Self { + Self { + metric_labels: Some(watch), + inner, + } + } + + /// Wrap `inner` with no `metric_labels`. + pub fn none(inner: T) -> Self { + Self { metric_labels: None, inner } + } } +impl Service for Labeled +where + T: Service>, +{ + type Request = T::Request; + type Response= T::Response; + type Error = T::Error; + type Future = T::Future; + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + let mut req = req; + if let Some(labels) = self.metric_labels.as_ref() + .and_then(|labels| (*labels.borrow()).as_ref().cloned()) + { + req.extensions_mut().insert(labels); + } + self.inner.call(req) + } + +} // ===== impl RequestLabels ===== impl<'a> RequestLabels { pub fn new(req: &ctx::http::Request) -> Self { - let outbound_labels = if req.server.proxy.is_outbound() { - Some(OutboundLabels { - // TODO: when #429 is done, add appropriate destination label. - ..Default::default() - }) - } else { - None - }; + let direction = Direction::from_context(req.server.proxy.as_ref()); + + let outbound_labels = req.dst_labels.as_ref().cloned(); let authority = req.uri .authority_part() @@ -85,32 +120,27 @@ impl<'a> RequestLabels { .unwrap_or_else(String::new); RequestLabels { + direction, outbound_labels, authority, - ..Default::default() } } } impl fmt::Display for RequestLabels { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "authority=\"{}\",", self.authority)?; + write!(f, "authority=\"{}\",{}", self.authority, self.direction)?; + if let Some(ref outbound) = self.outbound_labels { - write!(f, "direction=\"outbound\"{comma}{dst}", - comma = if !outbound.is_empty() { "," } else { "" }, - dst = outbound - )?; - } else { - write!(f, "direction=\"inbound\"")?; + // leading comma added between the direction label and the + // destination labels, if there are destination labels. + write!(f, ",{}", outbound)?; } Ok(()) } - } - // ===== impl ResponseLabels ===== impl ResponseLabels { @@ -141,63 +171,23 @@ impl ResponseLabels { } impl fmt::Display for ResponseLabels { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{},{},status_code=\"{}\"", self.request_labels, self.classification, self.status_code )?; + if let Some(ref status) = self.grpc_status_code { + // leading comma added between the status code label and the + // gRPC status code labels, if there is a gRPC status code. write!(f, ",grpc_status_code=\"{}\"", status)?; } Ok(()) } - } -// ===== impl OutboundLabels ===== - -impl OutboundLabels { - fn is_empty(&self) -> bool { - self.namespace.is_none() && self.dst.is_none() - } -} - -impl fmt::Display for OutboundLabels { - - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - OutboundLabels { namespace: Some(ref ns), dst: Some(ref dst) } => - write!(f, "dst_namespace=\"{}\",dst_{}", ns, dst), - OutboundLabels { namespace: None, dst: Some(ref dst), } => - write!(f, "dst_{}", dst), - OutboundLabels { namespace: Some(ref ns), dst: None, } => - write!(f, "dst_namespace=\"{}\"", ns), - OutboundLabels { namespace: None, dst: None, } => - write!(f, ""), - } - } - -} - -impl fmt::Display for PodOwner { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - PodOwner::Deployment(ref s) => - write!(f, "deployment=\"{}\"", s), - PodOwner::Job(ref s) => - write!(f, "job=\"{}\",", s), - PodOwner::ReplicaSet(ref s) => - write!(f, "replica_set=\"{}\"", s), - PodOwner::ReplicationController(ref s) => - write!(f, "replication_controller=\"{}\"", s), - } - } -} - - // ===== impl Classification ===== impl Classification { @@ -228,12 +218,67 @@ impl Classification { } impl fmt::Display for Classification { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { &Classification::Success => f.pad("classification=\"success\""), &Classification::Failure => f.pad("classification=\"failure\""), } } - +} + +// ===== impl Direction ===== + +impl Direction { + fn from_context(context: &ctx::Proxy) -> Self { + match context { + &ctx::Proxy::Inbound(_) => Direction::Inbound, + &ctx::Proxy::Outbound(_) => Direction::Outbound, + } + } +} + +impl fmt::Display for Direction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + &Direction::Inbound => f.pad("direction=\"inbound\""), + &Direction::Outbound => f.pad("direction=\"outbound\""), + } + } +} + + +// ===== impl DstLabels ==== + +impl DstLabels { + pub fn new(labels: I) -> Option + where + I: IntoIterator, + S: fmt::Display, + { + let mut labels = labels.into_iter(); + + if let Some((k, v)) = labels.next() { + // Format the first label pair without a leading comma, since we + // don't know where it is in the output labels at this point. + let mut s = format!("dst_{}=\"{}\"", k, v); + + // Format subsequent label pairs with leading commas, since + // we know that we already formatted the first label pair. + for (k, v) in labels { + write!(s, ",dst_{}=\"{}\"", k, v) + .expect("writing to string should not fail"); + } + + Some(DstLabels(Arc::from(s))) + } else { + // The iterator is empty; return None + None + } + } +} + +impl fmt::Display for DstLabels { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } } diff --git a/proxy/src/telemetry/metrics/prometheus/mod.rs b/proxy/src/telemetry/metrics/prometheus/mod.rs index cc4e0b615..b58c4c3df 100644 --- a/proxy/src/telemetry/metrics/prometheus/mod.rs +++ b/proxy/src/telemetry/metrics/prometheus/mod.rs @@ -1,3 +1,31 @@ +//! Aggregates and serves Prometheus metrics. +//! +//! # A note on label formatting +//! +//! Prometheus labels are represented as a comma-separated list of values +//! Since the Conduit proxy labels its metrics with a fixed set of labels +//! which we know in advance, we represent these labels using a number of +//! `struct`s, all of which implement `fmt::Display`. Some of the label +//! `struct`s contain other structs which represent a subset of the labels +//! which can be present on metrics in that scope. In this case, the +//! `fmt::Display` impls for those structs call the `fmt::Display` impls for +//! the structs that they own. This has the potential to complicate the +//! insertion of commas to separate label values. +//! +//! In order to ensure that commas are added correctly to separate labels, +//! we expect the `fmt::Display` implementations for label types to behave in +//! a consistent way: A label struct is *never* responsible for printing +//! leading or trailing commas before or after the label values it contains. +//! If it contains multiple labels, it *is* responsible for ensuring any +//! labels it owns are comma-separated. This way, the `fmt::Display` impl for +//! any struct that represents a subset of the labels are position-agnostic; +//! they don't need to know if there are other labels before or after them in +//! the formatted output. The owner is responsible for managing that. +//! +//! If this rule is followed consistently across all structs representing +//! labels, we can add new labels or modify the existing ones without having +//! to worry about missing commas, double commas, or trailing commas at the +//! end of the label set (all of which will make Prometheus angry). use std::default::Default; use std::{fmt, ops, time}; use std::hash::Hash; @@ -21,6 +49,7 @@ use super::latency::{BUCKET_BOUNDS, Histogram}; mod labels; use self::labels::{RequestLabels, ResponseLabels}; +pub use self::labels::{DstLabels, Labeled}; #[derive(Debug, Clone)] struct Metrics { diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index 671a81944..4b7949186 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -9,7 +9,7 @@ use ctx; mod control; pub mod event; -mod metrics; +pub mod metrics; pub mod sensor; pub mod tap; diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index 98a89b084..b5298577b 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -108,7 +108,9 @@ macro_rules! generate_tests { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_times_out() { + use std::collections::HashMap; use std::thread; + let _ = env_logger::try_init(); let mut env = config::TestEnv::new(); @@ -122,7 +124,11 @@ macro_rules! generate_tests { // return the correct destination .destination_fn("disco.test.svc.cluster.local", move || { thread::sleep(Duration::from_millis(500)); - Some(controller::destination_update(addr)) + Some(controller::destination_update( + addr, + HashMap::new(), + HashMap::new(), + )) }) .run(); diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 896e328bd..4076e6f54 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -2,7 +2,7 @@ use support::*; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::net::IpAddr; use std::sync::{Arc, Mutex}; @@ -38,7 +38,23 @@ impl Controller { } pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self { - self.destination_fn(dest, move || Some(destination_update(addr))) + self.destination_fn(dest, move || Some(destination_update( + addr, + HashMap::new(), + HashMap::new(), + ))) + } + + pub fn labeled_destination(mut self, dest: &str, addr: SocketAddr, + addr_labels: HashMap, + set_labels:HashMap) + -> Self + { + self.destination_fn(dest, move || Some(destination_update( + addr, + addr_labels.clone(), + set_labels.clone(), + ))) } pub fn destination_fn(mut self, dest: &str, f: F) -> Self @@ -269,7 +285,11 @@ fn run(controller: Controller) -> Listening { } } -pub fn destination_update(addr: SocketAddr) -> pb::destination::Update { +pub fn destination_update(addr: SocketAddr, + set_labels: HashMap, + addr_labels: HashMap) + -> pb::destination::Update +{ pb::destination::Update { update: Some(pb::destination::update::Update::Add( pb::destination::WeightedAddrSet { @@ -280,10 +300,10 @@ pub fn destination_update(addr: SocketAddr) -> pb::destination::Update { port: u32::from(addr.port()), }), weight: 0, - ..Default::default() + metric_labels: addr_labels, }, ], - ..Default::default() + metric_labels: set_labels, }, )), } diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 8c7d1dbff..f11de2ace 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -678,6 +678,223 @@ fn metrics_endpoint_outbound_request_duration() { "request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 2"); } +// Tests for destination labels provided by control plane service discovery. +mod outbound_dst_labels { + use super::support::*; + + use std::collections::HashMap; + use std::iter::FromIterator; + + struct Fixture { + client: client::Client, + metrics: client::Client, + proxy: proxy::Listening, + } + + fn fixture(addr_labels: A, set_labels: B) -> Fixture + where + A: IntoIterator, + B: IntoIterator, + { + fixture_with_updates(vec![(addr_labels, set_labels)]) + } + + fn fixture_with_updates(updates: Vec<(A, B)>) -> Fixture + where + A: IntoIterator, + B: IntoIterator, + { + info!("running test server"); + let srv = server::new() + .route("/", "hello") + .run(); + + let mut ctrl = controller::new(); + for (addr_labels, set_labels) in updates { + ctrl = ctrl.labeled_destination( + "labeled.test.svc.cluster.local", + srv.addr, + HashMap::from_iter(addr_labels), + HashMap::from_iter(set_labels), + ); + } + + let proxy = proxy::new() + .controller(ctrl.run()) + .outbound(srv) + .metrics_flush_interval(Duration::from_millis(500)) + .run(); + let metrics = client::http1(proxy.metrics, "localhost"); + + let client = client::new( + proxy.outbound, + "labeled.test.svc.cluster.local" + ); + Fixture { client, metrics, proxy } + + } + + #[test] + fn multiple_addr_labels() { + let _ = env_logger::try_init(); + let Fixture { client, metrics, proxy: _proxy } = fixture ( + vec![ + (String::from("addr_label2"), String::from("bar")), + (String::from("addr_label1"), String::from("foo")), + ], + Vec::new(), + ); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // We can't make more specific assertions about the metrics + // besides asserting that both labels are present somewhere in the + // scrape, because testing for whole metric lines would depend on + // the order in which the labels occur, and we can't depend on hash + // map ordering. + assert_contains!(metrics.get("/metrics"), "dst_addr_label1=\"foo\""); + assert_contains!(metrics.get("/metrics"), "dst_addr_label2=\"bar\""); + } + + #[test] + fn multiple_addrset_labels() { + let _ = env_logger::try_init(); + let Fixture { client, metrics, proxy: _proxy } = fixture ( + Vec::new(), + vec![ + (String::from("set_label1"), String::from("foo")), + (String::from("set_label2"), String::from("bar")), + ] + ); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // We can't make more specific assertions about the metrics + // besides asserting that both labels are present somewhere in the + // scrape, because testing for whole metric lines would depend on + // the order in which the labels occur, and we can't depend on hash + // map ordering. + assert_contains!(metrics.get("/metrics"), "dst_set_label1=\"foo\""); + assert_contains!(metrics.get("/metrics"), "dst_set_label2=\"bar\""); + } + + #[test] + fn labeled_addr_and_addrset() { + let _ = env_logger::try_init(); + let Fixture { client, metrics, proxy: _proxy } = fixture( + vec![(String::from("addr_label"), String::from("foo"))], + vec![(String::from("set_label"), String::from("bar"))], + ); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1"); + } + + #[test] + fn controller_updates_addr_labels() { + let _ = env_logger::try_init(); + info!("running test server"); + let Fixture { client, metrics, proxy: _proxy } = + // the controller will update the value of `addr_label`. the value + // of `set_label` will remain unchanged throughout the test. + fixture_with_updates(vec![ + ( + vec![(String::from("addr_label"), String::from("foo"))], + vec![(String::from("set_label"), String::from("unchanged"))] + ), + ( + vec![(String::from("addr_label"), String::from("bar"))], + vec![(String::from("set_label"), String::from("unchanged"))] + ), + ]); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // the first request should be labeled with `dst_addr_label="foo"` + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // the second request should increment stats labeled with `dst_addr_label="bar"` + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + // stats recorded from the first request should still be present. + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1"); + } + + #[test] + fn controller_updates_set_labels() { + let _ = env_logger::try_init(); + info!("running test server"); + let Fixture { client, metrics, proxy: _proxy } = + fixture_with_updates(vec![ + (vec![], vec![(String::from("set_label"), String::from("foo"))]), + (vec![], vec![(String::from("set_label"), String::from("bar"))]), + ]); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // the first request should be labeled with `dst_addr_label="foo"` + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + // the second request should increment stats labeled with `dst_addr_label="bar"` + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1"); + // stats recorded from the first request should still be present. + assert_contains!(metrics.get("/metrics"), + "request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); + assert_contains!(metrics.get("/metrics"), + "request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1"); + assert_contains!(metrics.get("/metrics"), + "response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1"); + } +} + #[test] fn metrics_have_no_double_commas() { // Test for regressions to runconduit/conduit#600.