Add labels from service discovery to proxy metrics reports (#661)
PR #654 adds pod-based metric labels to the Destination API responses for cluster-local services. This PR modifies the proxy to actually add these labels to reported Prometheus metrics for outbound requests to local services. It enhances the proxy's `control::discovery` module to track these labels and add a `LabelRequest` middleware to the service stack built in `Bind` for labeled services. Requests transiting `LabelRequest` are given an `Extension` which contains these labels, which are then added to events produced by the `Sensors` for these requests. When these events are aggregated to Prometheus metrics, the labels are added. I've also added some tests in `test/telemetry.rs` ensuring that these metrics are added correctly when the Destination service provides labels. Closes #660 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
2b9033cf16
commit
7e242ca07a
|
@ -108,6 +108,7 @@ dependencies = [
|
|||
"env_logger 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-mpsc-lossy 0.3.0",
|
||||
"futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)",
|
||||
"h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -247,6 +248,15 @@ dependencies = [
|
|||
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-watch"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/carllerche/better-future.git#07baa13e91fefe7a51533dfde7b4e69e109ebe14"
|
||||
dependencies = [
|
||||
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.1.5"
|
||||
|
@ -957,6 +967,7 @@ dependencies = [
|
|||
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
|
||||
"checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7"
|
||||
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
|
||||
"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future.git)" = "<none>"
|
||||
"checksum h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "065fb096fc65bbfb9c765d48c9f3f1a21cdb25ba0d3f82105b38f30ddffa2f7e"
|
||||
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
|
||||
"checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab"
|
||||
|
|
|
@ -19,6 +19,7 @@ bytes = "0.4"
|
|||
domain = "0.2.3"
|
||||
env_logger = { version = "0.5", default-features = false }
|
||||
futures = "0.1"
|
||||
futures-watch = { git = "https://github.com/carllerche/better-future.git" }
|
||||
h2 = "0.1.5"
|
||||
http = "0.1"
|
||||
httparse = "1.2"
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::collections::hash_map::{Entry, HashMap};
|
||||
use std::net::SocketAddr;
|
||||
use std::fmt;
|
||||
use std::iter::IntoIterator;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::sync::mpsc;
|
||||
use futures_watch;
|
||||
use http;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tower::Service;
|
||||
use tower_h2::{HttpService, BoxBody, RecvBody};
|
||||
|
@ -16,13 +20,18 @@ use dns::{self, IpAddrListFuture};
|
|||
use super::fully_qualified_authority::FullyQualifiedAuthority;
|
||||
|
||||
use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
|
||||
use conduit_proxy_controller_grpc::destination::Update as PbUpdate;
|
||||
use conduit_proxy_controller_grpc::destination::{
|
||||
Update as PbUpdate,
|
||||
WeightedAddr,
|
||||
};
|
||||
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
|
||||
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
|
||||
use transport::DnsNameAndPort;
|
||||
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
|
||||
use ::telemetry::metrics::prometheus::{DstLabels, Labeled};
|
||||
|
||||
/// A handle to start watching a destination for address changes.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Discovery {
|
||||
|
@ -33,6 +42,12 @@ pub struct Discovery {
|
|||
#[derive(Debug)]
|
||||
pub struct Watch<B> {
|
||||
rx: mpsc::UnboundedReceiver<Update>,
|
||||
/// Map associating addresses with the `Store` for the watch on that
|
||||
/// service's metric labels (as provided by the Destination service).
|
||||
///
|
||||
/// This is used to update the `Labeled` middleware on those services
|
||||
/// without requiring the service stack to be re-bound.
|
||||
metric_labels: HashMap<SocketAddr, futures_watch::Store<Option<DstLabels>>>,
|
||||
bind: B,
|
||||
}
|
||||
|
||||
|
@ -60,8 +75,15 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
|
|||
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
/// Any additional metadata describing a discovered service.
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct Metadata {
|
||||
/// A set of Prometheus metric labels describing the destination.
|
||||
metric_labels: Option<DstLabels>,
|
||||
}
|
||||
|
||||
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
||||
addrs: Exists<Cache<SocketAddr, ()>>,
|
||||
addrs: Exists<Cache<SocketAddr, Metadata>>,
|
||||
query: Option<DestinationServiceQuery<T>>,
|
||||
dns_query: Option<IpAddrListFuture>,
|
||||
txs: Vec<mpsc::UnboundedSender<Update>>,
|
||||
|
@ -107,8 +129,9 @@ enum RxError<T> {
|
|||
|
||||
#[derive(Debug)]
|
||||
enum Update {
|
||||
Insert(SocketAddr),
|
||||
Insert(SocketAddr, Metadata),
|
||||
Remove(SocketAddr),
|
||||
ChangeMetadata(SocketAddr, Metadata),
|
||||
}
|
||||
|
||||
/// Bind a `SocketAddr` with a protocol.
|
||||
|
@ -162,6 +185,7 @@ impl Discovery {
|
|||
|
||||
Watch {
|
||||
rx,
|
||||
metric_labels: HashMap::new(),
|
||||
bind,
|
||||
}
|
||||
}
|
||||
|
@ -169,37 +193,77 @@ impl Discovery {
|
|||
|
||||
// ==== impl Watch =====
|
||||
|
||||
impl<B> Discover for Watch<B>
|
||||
impl<B> Watch<B> {
|
||||
fn update_metadata(&mut self,
|
||||
addr: SocketAddr,
|
||||
meta: Metadata)
|
||||
-> Result<(), ()>
|
||||
{
|
||||
if let Some(store) = self.metric_labels.get_mut(&addr) {
|
||||
store.store(meta.metric_labels)
|
||||
.map_err(|e| {
|
||||
error!("update_metadata: label store error: {:?}", e);
|
||||
})
|
||||
.map(|_| ())
|
||||
} else {
|
||||
// The store has already been removed, so nobody cares about
|
||||
// the metadata change. We expect that this shouldn't happen,
|
||||
// but if it does, log a warning and handle it gracefully.
|
||||
warn!(
|
||||
"update_metadata: ignoring ChangeMetadata for {:?} \
|
||||
because the service no longer exists.",
|
||||
addr
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, A> Discover for Watch<B>
|
||||
where
|
||||
B: Bind,
|
||||
B: Bind<Request = http::Request<A>>,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = B::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Service = B::Service;
|
||||
type Service = Labeled<B::Service>;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
let up = self.rx.poll();
|
||||
trace!("watch: {:?}", up);
|
||||
let update = match up {
|
||||
Ok(Async::Ready(Some(update))) => update,
|
||||
Ok(Async::Ready(None)) => unreachable!(),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(_) => return Err(()),
|
||||
};
|
||||
loop {
|
||||
let up = self.rx.poll();
|
||||
trace!("watch: {:?}", up);
|
||||
let update = try_ready!(up).expect("discovery stream must be infinite");
|
||||
|
||||
match update {
|
||||
Update::Insert(addr) => {
|
||||
let service = self.bind.bind(&addr).map_err(|_| ())?;
|
||||
match update {
|
||||
Update::Insert(addr, meta) => {
|
||||
// Construct a watch for the `Labeled` middleware that will
|
||||
// wrap the bound service, and insert the store into our map
|
||||
// so it can be updated later.
|
||||
let (labels_watch, labels_store) =
|
||||
futures_watch::Watch::new(meta.metric_labels);
|
||||
self.metric_labels.insert(addr, labels_store);
|
||||
|
||||
Ok(Async::Ready(Change::Insert(addr, service)))
|
||||
},
|
||||
// TODO: handle metadata changes by changing the labeling
|
||||
// middleware to hold a `futures-watch::Watch` on the label value,
|
||||
// so it can be updated.
|
||||
Update::Remove(addr) => Ok(Async::Ready(Change::Remove(addr))),
|
||||
let service = self.bind.bind(&addr)
|
||||
.map(|svc| Labeled::new(svc, labels_watch))
|
||||
.map_err(|_| ())?;
|
||||
|
||||
return Ok(Async::Ready(Change::Insert(addr, service)))
|
||||
},
|
||||
Update::ChangeMetadata(addr, meta) => {
|
||||
// Update metadata and continue polling `rx`.
|
||||
self.update_metadata(addr, meta)?;
|
||||
},
|
||||
Update::Remove(addr) => {
|
||||
// It's safe to drop the store handle here, even if
|
||||
// the `Labeled` middleware using the watch handle
|
||||
// still exists --- it will simply read the final
|
||||
// value from the watch.
|
||||
self.metric_labels.remove(&addr);
|
||||
return Ok(Async::Ready(Change::Remove(addr)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -278,8 +342,12 @@ where
|
|||
// them onto the new watch first
|
||||
match set.addrs {
|
||||
Exists::Yes(ref cache) => {
|
||||
for (&addr, _) in cache {
|
||||
tx.unbounded_send(Update::Insert(addr))
|
||||
for (&addr, meta) in cache {
|
||||
let update = Update::Insert(
|
||||
addr,
|
||||
meta.clone()
|
||||
);
|
||||
tx.unbounded_send(update)
|
||||
.expect("unbounded_send does not fail");
|
||||
}
|
||||
},
|
||||
|
@ -448,17 +516,19 @@ impl<T> DestinationSet<T>
|
|||
match rx.poll() {
|
||||
Ok(Async::Ready(Some(update))) => match update.update {
|
||||
Some(PbUpdate2::Add(a_set)) => {
|
||||
exists = Exists::Yes(());
|
||||
self.add(
|
||||
auth,
|
||||
a_set.addrs.iter().filter_map(
|
||||
|addr| addr.addr.clone().and_then(pb_to_sock_addr)));
|
||||
let set_labels = Arc::new(a_set.metric_labels);
|
||||
let addrs = a_set.addrs.into_iter()
|
||||
.filter_map(|pb|
|
||||
pb_to_addr_meta(pb, &set_labels)
|
||||
);
|
||||
self.add(auth, addrs)
|
||||
},
|
||||
Some(PbUpdate2::Remove(r_set)) => {
|
||||
exists = Exists::Yes(());
|
||||
self.remove(
|
||||
auth,
|
||||
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone())));
|
||||
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))
|
||||
);
|
||||
},
|
||||
Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => {
|
||||
exists = Exists::Yes(());
|
||||
|
@ -501,7 +571,7 @@ impl<T> DestinationSet<T>
|
|||
Ok(Async::Ready(dns::Response::Exists(ips))) => {
|
||||
trace!("positive result of DNS query for {:?}: {:?}", authority, ips);
|
||||
self.add(authority, ips.iter().map(|ip| {
|
||||
SocketAddr::from((*ip, authority.port))
|
||||
(SocketAddr::from((*ip, authority.port)), Metadata::no_metadata())
|
||||
}));
|
||||
},
|
||||
Ok(Async::Ready(dns::Response::DoesNotExist)) => {
|
||||
|
@ -533,16 +603,22 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
}
|
||||
|
||||
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A)
|
||||
where A: Iterator<Item = SocketAddr>
|
||||
where A: Iterator<Item = (SocketAddr, Metadata)>
|
||||
{
|
||||
let mut cache = match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => cache,
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
};
|
||||
cache.update_union(
|
||||
addrs_to_add.map(|a| (a, ())),
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
addrs_to_add,
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change,
|
||||
)
|
||||
);
|
||||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
||||
|
@ -553,8 +629,14 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
Exists::Yes(mut cache) => {
|
||||
cache.remove(
|
||||
addrs_to_remove,
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change,
|
||||
)
|
||||
);
|
||||
cache
|
||||
},
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
|
@ -568,8 +650,14 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.clear(
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change
|
||||
)
|
||||
);
|
||||
},
|
||||
Exists::Unknown | Exists::No => (),
|
||||
};
|
||||
|
@ -583,20 +671,20 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
addr: SocketAddr,
|
||||
meta: Metadata,
|
||||
change: CacheChange) {
|
||||
let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) =
|
||||
let (update_str, update_constructor): (&'static str, fn(SocketAddr, Metadata) -> Update) =
|
||||
match change {
|
||||
CacheChange::Insertion => ("insert", Update::Insert),
|
||||
CacheChange::Removal => ("remove", Update::Remove),
|
||||
CacheChange::Modification => {
|
||||
// TODO: generate `ChangeMetadata` events.
|
||||
return;
|
||||
}
|
||||
CacheChange::Removal =>
|
||||
("remove", |addr, _| Update::Remove(addr)),
|
||||
CacheChange::Modification =>
|
||||
("change metadata for", Update::ChangeMetadata),
|
||||
};
|
||||
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
|
||||
// retain is used to drop any senders that are dead
|
||||
txs.retain(|tx| {
|
||||
tx.unbounded_send(update_constructor(addr)).is_ok()
|
||||
tx.unbounded_send(update_constructor(addr, meta.clone())).is_ok()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -644,7 +732,29 @@ where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl RxError =====
|
||||
// ===== impl Metadata =====
|
||||
|
||||
impl Metadata {
|
||||
fn no_metadata() -> Self {
|
||||
Metadata {
|
||||
metric_labels: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
|
||||
fn pb_to_addr_meta(pb: WeightedAddr, set_labels: &Arc<HashMap<String, String>>)
|
||||
-> Option<(SocketAddr, Metadata)> {
|
||||
let addr = pb.addr.and_then(pb_to_sock_addr)?;
|
||||
let label_iter =
|
||||
set_labels.as_ref()
|
||||
.iter()
|
||||
.chain(pb.metric_labels.iter());
|
||||
let meta = Metadata {
|
||||
metric_labels: DstLabels::new(label_iter),
|
||||
};
|
||||
Some((addr, meta))
|
||||
}
|
||||
|
||||
fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
|
||||
use conduit_proxy_controller_grpc::common::ip_address::Ip;
|
||||
|
|
|
@ -2,6 +2,7 @@ use http;
|
|||
use std::sync::Arc;
|
||||
|
||||
use ctx;
|
||||
use telemetry::metrics::prometheus;
|
||||
|
||||
/// Describes a stream's request headers.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
|
@ -17,6 +18,11 @@ pub struct Request {
|
|||
|
||||
/// Identifies the proxy client that dispatched the request.
|
||||
pub client: Arc<ctx::transport::Client>,
|
||||
|
||||
/// Optional information on the request's destination service, which may
|
||||
/// be provided by the control plane for destinations lookups against its
|
||||
/// discovery API.
|
||||
pub dst_labels: Option<prometheus::DstLabels>,
|
||||
}
|
||||
|
||||
/// Describes a stream's response headers.
|
||||
|
@ -41,12 +47,19 @@ impl Request {
|
|||
client: &Arc<ctx::transport::Client>,
|
||||
id: usize,
|
||||
) -> Arc<Self> {
|
||||
// Look up whether the request has been extended with optional
|
||||
// destination labels from the control plane's discovery API.
|
||||
let dst_labels = request
|
||||
.extensions()
|
||||
.get::<prometheus::DstLabels>()
|
||||
.cloned();
|
||||
let r = Self {
|
||||
id,
|
||||
uri: request.uri().clone(),
|
||||
method: request.method().clone(),
|
||||
server: Arc::clone(server),
|
||||
client: Arc::clone(client),
|
||||
dst_labels,
|
||||
};
|
||||
|
||||
Arc::new(r)
|
||||
|
|
|
@ -11,6 +11,7 @@ extern crate env_logger;
|
|||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate futures_mpsc_lossy;
|
||||
extern crate futures_watch;
|
||||
extern crate h2;
|
||||
extern crate http;
|
||||
extern crate httparse;
|
||||
|
|
|
@ -18,6 +18,7 @@ use bind::{self, Bind, Protocol};
|
|||
use control::{self, discovery};
|
||||
use control::discovery::Bind as BindTrait;
|
||||
use ctx;
|
||||
use telemetry::metrics::prometheus;
|
||||
use timeout::Timeout;
|
||||
use transparency::h1;
|
||||
use transport::{DnsNameAndPort, Host, HostAndPort};
|
||||
|
@ -168,8 +169,8 @@ where
|
|||
type Key = SocketAddr;
|
||||
type Request = http::Request<B>;
|
||||
type Response = bind::HttpResponse;
|
||||
type Error = <bind::Service<B> as tower::Service>::Error;
|
||||
type Service = bind::Service<B>;
|
||||
type Error = <Self::Service as tower::Service>::Error;
|
||||
type Service = prometheus::Labeled<bind::Service<B>>;
|
||||
type DiscoverError = BindError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
|
@ -184,6 +185,9 @@ where
|
|||
// closing down when the connection is no longer usable.
|
||||
if let Some((addr, bind)) = opt.take() {
|
||||
let svc = bind.bind(&addr)
|
||||
// The controller has no labels to add to an external
|
||||
// service.
|
||||
.map(prometheus::Labeled::none)
|
||||
.map_err(|_| BindError::External{ addr })?;
|
||||
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||
} else {
|
||||
|
|
|
@ -1,13 +1,31 @@
|
|||
|
||||
use futures::Poll;
|
||||
use futures_watch::Watch;
|
||||
use http;
|
||||
use std::fmt;
|
||||
use tower::Service;
|
||||
|
||||
use std::fmt::{self, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ctx;
|
||||
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
|
||||
/// Middleware that adds an extension containing an optional set of metric
|
||||
/// labels to requests.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Labeled<T> {
|
||||
metric_labels: Option<Watch<Option<DstLabels>>>,
|
||||
inner: T,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct RequestLabels {
|
||||
|
||||
outbound_labels: Option<OutboundLabels>,
|
||||
/// Was the request in the inbound or outbound direction?
|
||||
direction: Direction,
|
||||
|
||||
// Additional labels identifying the destination service of an outbound
|
||||
// request, provided by the Conduit control plane's service discovery.
|
||||
outbound_labels: Option<DstLabels>,
|
||||
|
||||
/// The value of the `:authority` (HTTP/2) or `Host` (HTTP/1.1) header of
|
||||
/// the request.
|
||||
|
@ -36,48 +54,65 @@ enum Classification {
|
|||
Failure,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
// TODO: when #429 is done, this will no longer be dead code.
|
||||
#[allow(dead_code)]
|
||||
enum PodOwner {
|
||||
/// The deployment to which this request is being sent.
|
||||
Deployment(String),
|
||||
|
||||
/// The job to which this request is being sent.
|
||||
Job(String),
|
||||
|
||||
/// The replica set to which this request is being sent.
|
||||
ReplicaSet(String),
|
||||
|
||||
/// The replication controller to which this request is being sent.
|
||||
ReplicationController(String),
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
enum Direction {
|
||||
Inbound,
|
||||
Outbound,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
|
||||
struct OutboundLabels {
|
||||
/// The owner of the destination pod.
|
||||
// TODO: when #429 is done, this will no longer need to be an Option.
|
||||
dst: Option<PodOwner>,
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct DstLabels(Arc<str>);
|
||||
|
||||
/// The namespace to which this request is being sent (if
|
||||
/// applicable).
|
||||
namespace: Option<String>
|
||||
// ===== impl Labeled =====
|
||||
|
||||
impl<T> Labeled<T> {
|
||||
|
||||
/// Wrap `inner` with a `Watch` on dyanmically updated labels.
|
||||
pub fn new(inner: T, watch: Watch<Option<DstLabels>>) -> Self {
|
||||
Self {
|
||||
metric_labels: Some(watch),
|
||||
inner,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap `inner` with no `metric_labels`.
|
||||
pub fn none(inner: T) -> Self {
|
||||
Self { metric_labels: None, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, A> Service for Labeled<T>
|
||||
where
|
||||
T: Service<Request=http::Request<A>>,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response= T::Response;
|
||||
type Error = T::Error;
|
||||
type Future = T::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let mut req = req;
|
||||
if let Some(labels) = self.metric_labels.as_ref()
|
||||
.and_then(|labels| (*labels.borrow()).as_ref().cloned())
|
||||
{
|
||||
req.extensions_mut().insert(labels);
|
||||
}
|
||||
self.inner.call(req)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl RequestLabels =====
|
||||
|
||||
impl<'a> RequestLabels {
|
||||
pub fn new(req: &ctx::http::Request) -> Self {
|
||||
let outbound_labels = if req.server.proxy.is_outbound() {
|
||||
Some(OutboundLabels {
|
||||
// TODO: when #429 is done, add appropriate destination label.
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let direction = Direction::from_context(req.server.proxy.as_ref());
|
||||
|
||||
let outbound_labels = req.dst_labels.as_ref().cloned();
|
||||
|
||||
let authority = req.uri
|
||||
.authority_part()
|
||||
|
@ -85,32 +120,27 @@ impl<'a> RequestLabels {
|
|||
.unwrap_or_else(String::new);
|
||||
|
||||
RequestLabels {
|
||||
direction,
|
||||
outbound_labels,
|
||||
authority,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RequestLabels {
|
||||
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "authority=\"{}\",", self.authority)?;
|
||||
write!(f, "authority=\"{}\",{}", self.authority, self.direction)?;
|
||||
|
||||
if let Some(ref outbound) = self.outbound_labels {
|
||||
write!(f, "direction=\"outbound\"{comma}{dst}",
|
||||
comma = if !outbound.is_empty() { "," } else { "" },
|
||||
dst = outbound
|
||||
)?;
|
||||
} else {
|
||||
write!(f, "direction=\"inbound\"")?;
|
||||
// leading comma added between the direction label and the
|
||||
// destination labels, if there are destination labels.
|
||||
write!(f, ",{}", outbound)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// ===== impl ResponseLabels =====
|
||||
|
||||
impl ResponseLabels {
|
||||
|
@ -141,63 +171,23 @@ impl ResponseLabels {
|
|||
}
|
||||
|
||||
impl fmt::Display for ResponseLabels {
|
||||
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{},{},status_code=\"{}\"",
|
||||
self.request_labels,
|
||||
self.classification,
|
||||
self.status_code
|
||||
)?;
|
||||
|
||||
if let Some(ref status) = self.grpc_status_code {
|
||||
// leading comma added between the status code label and the
|
||||
// gRPC status code labels, if there is a gRPC status code.
|
||||
write!(f, ",grpc_status_code=\"{}\"", status)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl OutboundLabels =====
|
||||
|
||||
impl OutboundLabels {
|
||||
fn is_empty(&self) -> bool {
|
||||
self.namespace.is_none() && self.dst.is_none()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for OutboundLabels {
|
||||
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
OutboundLabels { namespace: Some(ref ns), dst: Some(ref dst) } =>
|
||||
write!(f, "dst_namespace=\"{}\",dst_{}", ns, dst),
|
||||
OutboundLabels { namespace: None, dst: Some(ref dst), } =>
|
||||
write!(f, "dst_{}", dst),
|
||||
OutboundLabels { namespace: Some(ref ns), dst: None, } =>
|
||||
write!(f, "dst_namespace=\"{}\"", ns),
|
||||
OutboundLabels { namespace: None, dst: None, } =>
|
||||
write!(f, ""),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl fmt::Display for PodOwner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
PodOwner::Deployment(ref s) =>
|
||||
write!(f, "deployment=\"{}\"", s),
|
||||
PodOwner::Job(ref s) =>
|
||||
write!(f, "job=\"{}\",", s),
|
||||
PodOwner::ReplicaSet(ref s) =>
|
||||
write!(f, "replica_set=\"{}\"", s),
|
||||
PodOwner::ReplicationController(ref s) =>
|
||||
write!(f, "replication_controller=\"{}\"", s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Classification =====
|
||||
|
||||
impl Classification {
|
||||
|
@ -228,12 +218,67 @@ impl Classification {
|
|||
}
|
||||
|
||||
impl fmt::Display for Classification {
|
||||
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
&Classification::Success => f.pad("classification=\"success\""),
|
||||
&Classification::Failure => f.pad("classification=\"failure\""),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl Direction =====
|
||||
|
||||
impl Direction {
|
||||
fn from_context(context: &ctx::Proxy) -> Self {
|
||||
match context {
|
||||
&ctx::Proxy::Inbound(_) => Direction::Inbound,
|
||||
&ctx::Proxy::Outbound(_) => Direction::Outbound,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Direction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
&Direction::Inbound => f.pad("direction=\"inbound\""),
|
||||
&Direction::Outbound => f.pad("direction=\"outbound\""),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl DstLabels ====
|
||||
|
||||
impl DstLabels {
|
||||
pub fn new<I, S>(labels: I) -> Option<Self>
|
||||
where
|
||||
I: IntoIterator<Item=(S, S)>,
|
||||
S: fmt::Display,
|
||||
{
|
||||
let mut labels = labels.into_iter();
|
||||
|
||||
if let Some((k, v)) = labels.next() {
|
||||
// Format the first label pair without a leading comma, since we
|
||||
// don't know where it is in the output labels at this point.
|
||||
let mut s = format!("dst_{}=\"{}\"", k, v);
|
||||
|
||||
// Format subsequent label pairs with leading commas, since
|
||||
// we know that we already formatted the first label pair.
|
||||
for (k, v) in labels {
|
||||
write!(s, ",dst_{}=\"{}\"", k, v)
|
||||
.expect("writing to string should not fail");
|
||||
}
|
||||
|
||||
Some(DstLabels(Arc::from(s)))
|
||||
} else {
|
||||
// The iterator is empty; return None
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DstLabels {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,31 @@
|
|||
//! Aggregates and serves Prometheus metrics.
|
||||
//!
|
||||
//! # A note on label formatting
|
||||
//!
|
||||
//! Prometheus labels are represented as a comma-separated list of values
|
||||
//! Since the Conduit proxy labels its metrics with a fixed set of labels
|
||||
//! which we know in advance, we represent these labels using a number of
|
||||
//! `struct`s, all of which implement `fmt::Display`. Some of the label
|
||||
//! `struct`s contain other structs which represent a subset of the labels
|
||||
//! which can be present on metrics in that scope. In this case, the
|
||||
//! `fmt::Display` impls for those structs call the `fmt::Display` impls for
|
||||
//! the structs that they own. This has the potential to complicate the
|
||||
//! insertion of commas to separate label values.
|
||||
//!
|
||||
//! In order to ensure that commas are added correctly to separate labels,
|
||||
//! we expect the `fmt::Display` implementations for label types to behave in
|
||||
//! a consistent way: A label struct is *never* responsible for printing
|
||||
//! leading or trailing commas before or after the label values it contains.
|
||||
//! If it contains multiple labels, it *is* responsible for ensuring any
|
||||
//! labels it owns are comma-separated. This way, the `fmt::Display` impl for
|
||||
//! any struct that represents a subset of the labels are position-agnostic;
|
||||
//! they don't need to know if there are other labels before or after them in
|
||||
//! the formatted output. The owner is responsible for managing that.
|
||||
//!
|
||||
//! If this rule is followed consistently across all structs representing
|
||||
//! labels, we can add new labels or modify the existing ones without having
|
||||
//! to worry about missing commas, double commas, or trailing commas at the
|
||||
//! end of the label set (all of which will make Prometheus angry).
|
||||
use std::default::Default;
|
||||
use std::{fmt, ops, time};
|
||||
use std::hash::Hash;
|
||||
|
@ -21,6 +49,7 @@ use super::latency::{BUCKET_BOUNDS, Histogram};
|
|||
|
||||
mod labels;
|
||||
use self::labels::{RequestLabels, ResponseLabels};
|
||||
pub use self::labels::{DstLabels, Labeled};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Metrics {
|
||||
|
|
|
@ -9,7 +9,7 @@ use ctx;
|
|||
|
||||
mod control;
|
||||
pub mod event;
|
||||
mod metrics;
|
||||
pub mod metrics;
|
||||
pub mod sensor;
|
||||
pub mod tap;
|
||||
|
||||
|
|
|
@ -108,7 +108,9 @@ macro_rules! generate_tests {
|
|||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_times_out() {
|
||||
use std::collections::HashMap;
|
||||
use std::thread;
|
||||
|
||||
let _ = env_logger::try_init();
|
||||
let mut env = config::TestEnv::new();
|
||||
|
||||
|
@ -122,7 +124,11 @@ macro_rules! generate_tests {
|
|||
// return the correct destination
|
||||
.destination_fn("disco.test.svc.cluster.local", move || {
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
Some(controller::destination_update(addr))
|
||||
Some(controller::destination_update(
|
||||
addr,
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
))
|
||||
})
|
||||
.run();
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use support::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
@ -38,7 +38,23 @@ impl Controller {
|
|||
}
|
||||
|
||||
pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self {
|
||||
self.destination_fn(dest, move || Some(destination_update(addr)))
|
||||
self.destination_fn(dest, move || Some(destination_update(
|
||||
addr,
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn labeled_destination(mut self, dest: &str, addr: SocketAddr,
|
||||
addr_labels: HashMap<String, String>,
|
||||
set_labels:HashMap<String, String>)
|
||||
-> Self
|
||||
{
|
||||
self.destination_fn(dest, move || Some(destination_update(
|
||||
addr,
|
||||
addr_labels.clone(),
|
||||
set_labels.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn destination_fn<F>(mut self, dest: &str, f: F) -> Self
|
||||
|
@ -269,7 +285,11 @@ fn run(controller: Controller) -> Listening {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn destination_update(addr: SocketAddr) -> pb::destination::Update {
|
||||
pub fn destination_update(addr: SocketAddr,
|
||||
set_labels: HashMap<String, String>,
|
||||
addr_labels: HashMap<String, String>)
|
||||
-> pb::destination::Update
|
||||
{
|
||||
pb::destination::Update {
|
||||
update: Some(pb::destination::update::Update::Add(
|
||||
pb::destination::WeightedAddrSet {
|
||||
|
@ -280,10 +300,10 @@ pub fn destination_update(addr: SocketAddr) -> pb::destination::Update {
|
|||
port: u32::from(addr.port()),
|
||||
}),
|
||||
weight: 0,
|
||||
..Default::default()
|
||||
metric_labels: addr_labels,
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
metric_labels: set_labels,
|
||||
},
|
||||
)),
|
||||
}
|
||||
|
|
|
@ -678,6 +678,223 @@ fn metrics_endpoint_outbound_request_duration() {
|
|||
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 2");
|
||||
}
|
||||
|
||||
// Tests for destination labels provided by control plane service discovery.
|
||||
mod outbound_dst_labels {
|
||||
use super::support::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
struct Fixture {
|
||||
client: client::Client,
|
||||
metrics: client::Client,
|
||||
proxy: proxy::Listening,
|
||||
}
|
||||
|
||||
fn fixture<A, B>(addr_labels: A, set_labels: B) -> Fixture
|
||||
where
|
||||
A: IntoIterator<Item=(String, String)>,
|
||||
B: IntoIterator<Item=(String, String)>,
|
||||
{
|
||||
fixture_with_updates(vec![(addr_labels, set_labels)])
|
||||
}
|
||||
|
||||
fn fixture_with_updates<A, B>(updates: Vec<(A, B)>) -> Fixture
|
||||
where
|
||||
A: IntoIterator<Item=(String, String)>,
|
||||
B: IntoIterator<Item=(String, String)>,
|
||||
{
|
||||
info!("running test server");
|
||||
let srv = server::new()
|
||||
.route("/", "hello")
|
||||
.run();
|
||||
|
||||
let mut ctrl = controller::new();
|
||||
for (addr_labels, set_labels) in updates {
|
||||
ctrl = ctrl.labeled_destination(
|
||||
"labeled.test.svc.cluster.local",
|
||||
srv.addr,
|
||||
HashMap::from_iter(addr_labels),
|
||||
HashMap::from_iter(set_labels),
|
||||
);
|
||||
}
|
||||
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl.run())
|
||||
.outbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let metrics = client::http1(proxy.metrics, "localhost");
|
||||
|
||||
let client = client::new(
|
||||
proxy.outbound,
|
||||
"labeled.test.svc.cluster.local"
|
||||
);
|
||||
Fixture { client, metrics, proxy }
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_addr_labels() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy: _proxy } = fixture (
|
||||
vec![
|
||||
(String::from("addr_label2"), String::from("bar")),
|
||||
(String::from("addr_label1"), String::from("foo")),
|
||||
],
|
||||
Vec::new(),
|
||||
);
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// We can't make more specific assertions about the metrics
|
||||
// besides asserting that both labels are present somewhere in the
|
||||
// scrape, because testing for whole metric lines would depend on
|
||||
// the order in which the labels occur, and we can't depend on hash
|
||||
// map ordering.
|
||||
assert_contains!(metrics.get("/metrics"), "dst_addr_label1=\"foo\"");
|
||||
assert_contains!(metrics.get("/metrics"), "dst_addr_label2=\"bar\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_addrset_labels() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy: _proxy } = fixture (
|
||||
Vec::new(),
|
||||
vec![
|
||||
(String::from("set_label1"), String::from("foo")),
|
||||
(String::from("set_label2"), String::from("bar")),
|
||||
]
|
||||
);
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// We can't make more specific assertions about the metrics
|
||||
// besides asserting that both labels are present somewhere in the
|
||||
// scrape, because testing for whole metric lines would depend on
|
||||
// the order in which the labels occur, and we can't depend on hash
|
||||
// map ordering.
|
||||
assert_contains!(metrics.get("/metrics"), "dst_set_label1=\"foo\"");
|
||||
assert_contains!(metrics.get("/metrics"), "dst_set_label2=\"bar\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn labeled_addr_and_addrset() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy: _proxy } = fixture(
|
||||
vec![(String::from("addr_label"), String::from("foo"))],
|
||||
vec![(String::from("set_label"), String::from("bar"))],
|
||||
);
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn controller_updates_addr_labels() {
|
||||
let _ = env_logger::try_init();
|
||||
info!("running test server");
|
||||
let Fixture { client, metrics, proxy: _proxy } =
|
||||
// the controller will update the value of `addr_label`. the value
|
||||
// of `set_label` will remain unchanged throughout the test.
|
||||
fixture_with_updates(vec![
|
||||
(
|
||||
vec![(String::from("addr_label"), String::from("foo"))],
|
||||
vec![(String::from("set_label"), String::from("unchanged"))]
|
||||
),
|
||||
(
|
||||
vec![(String::from("addr_label"), String::from("bar"))],
|
||||
vec![(String::from("set_label"), String::from("unchanged"))]
|
||||
),
|
||||
]);
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// the first request should be labeled with `dst_addr_label="foo"`
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// the second request should increment stats labeled with `dst_addr_label="bar"`
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"bar\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
// stats recorded from the first request should still be present.
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_addr_label=\"foo\",dst_set_label=\"unchanged\",classification=\"success\",status_code=\"200\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn controller_updates_set_labels() {
|
||||
let _ = env_logger::try_init();
|
||||
info!("running test server");
|
||||
let Fixture { client, metrics, proxy: _proxy } =
|
||||
fixture_with_updates(vec![
|
||||
(vec![], vec![(String::from("set_label"), String::from("foo"))]),
|
||||
(vec![], vec![(String::from("set_label"), String::from("bar"))]),
|
||||
]);
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// the first request should be labeled with `dst_addr_label="foo"`
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// the second request should increment stats labeled with `dst_addr_label="bar"`
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"bar\",classification=\"success\",status_code=\"200\"} 1");
|
||||
// stats recorded from the first request should still be present.
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_duration_ms_count{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"request_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\"} 1");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"response_total{authority=\"labeled.test.svc.cluster.local\",direction=\"outbound\",dst_set_label=\"foo\",classification=\"success\",status_code=\"200\"} 1");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_have_no_double_commas() {
|
||||
// Test for regressions to runconduit/conduit#600.
|
||||
|
|
Loading…
Reference in New Issue