proxy: move metrics::prometheus module to root metrics module (#763)
The proxy `telemetry::metrics::prometheus` module was initially added in order to give the Prometheus metrics export code a separate namespace from the controller push metrics. Since the controller push metrics code was removed from the proxy in #616, we no longer need a separate module for the Prometheus-specific metrics code. Therefore, I've moved that code to the root `telemetry::metrics` module, which should hopefully make the proxy source tree structure a little simpler. This is a fairly trivial refactor. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
734cfbbe97
commit
239b362e9a
|
@ -30,7 +30,7 @@ use transport::DnsNameAndPort;
|
||||||
|
|
||||||
use control::cache::{Cache, CacheChange, Exists};
|
use control::cache::{Cache, CacheChange, Exists};
|
||||||
|
|
||||||
use ::telemetry::metrics::prometheus::{DstLabels, Labeled};
|
use ::telemetry::metrics::{DstLabels, Labeled};
|
||||||
|
|
||||||
/// A handle to start watching a destination for address changes.
|
/// A handle to start watching a destination for address changes.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
|
@ -2,7 +2,7 @@ use http;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use ctx;
|
use ctx;
|
||||||
use telemetry::metrics::prometheus;
|
use telemetry::metrics;
|
||||||
|
|
||||||
/// Describes a stream's request headers.
|
/// Describes a stream's request headers.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
|
@ -22,7 +22,7 @@ pub struct Request {
|
||||||
/// Optional information on the request's destination service, which may
|
/// Optional information on the request's destination service, which may
|
||||||
/// be provided by the control plane for destinations lookups against its
|
/// be provided by the control plane for destinations lookups against its
|
||||||
/// discovery API.
|
/// discovery API.
|
||||||
pub dst_labels: Option<prometheus::DstLabels>,
|
pub dst_labels: Option<metrics::DstLabels>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Describes a stream's response headers.
|
/// Describes a stream's response headers.
|
||||||
|
@ -51,7 +51,7 @@ impl Request {
|
||||||
// destination labels from the control plane's discovery API.
|
// destination labels from the control plane's discovery API.
|
||||||
let dst_labels = request
|
let dst_labels = request
|
||||||
.extensions()
|
.extensions()
|
||||||
.get::<prometheus::DstLabels>()
|
.get::<metrics::DstLabels>()
|
||||||
.cloned();
|
.cloned();
|
||||||
let r = Self {
|
let r = Self {
|
||||||
id,
|
id,
|
||||||
|
|
|
@ -18,7 +18,7 @@ use bind::{self, Bind, Protocol};
|
||||||
use control::{self, discovery};
|
use control::{self, discovery};
|
||||||
use control::discovery::Bind as BindTrait;
|
use control::discovery::Bind as BindTrait;
|
||||||
use ctx;
|
use ctx;
|
||||||
use telemetry::metrics::prometheus;
|
use telemetry::metrics;
|
||||||
use timeout::Timeout;
|
use timeout::Timeout;
|
||||||
use transparency::h1;
|
use transparency::h1;
|
||||||
use transport::{DnsNameAndPort, Host, HostAndPort};
|
use transport::{DnsNameAndPort, Host, HostAndPort};
|
||||||
|
@ -170,7 +170,7 @@ where
|
||||||
type Request = http::Request<B>;
|
type Request = http::Request<B>;
|
||||||
type Response = bind::HttpResponse;
|
type Response = bind::HttpResponse;
|
||||||
type Error = <Self::Service as tower::Service>::Error;
|
type Error = <Self::Service as tower::Service>::Error;
|
||||||
type Service = prometheus::Labeled<bind::Service<B>>;
|
type Service = metrics::Labeled<bind::Service<B>>;
|
||||||
type DiscoverError = BindError;
|
type DiscoverError = BindError;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||||
|
@ -187,7 +187,7 @@ where
|
||||||
let svc = bind.bind(&addr)
|
let svc = bind.bind(&addr)
|
||||||
// The controller has no labels to add to an external
|
// The controller has no labels to add to an external
|
||||||
// service.
|
// service.
|
||||||
.map(prometheus::Labeled::none)
|
.map(metrics::Labeled::none)
|
||||||
.map_err(|_| BindError::External{ addr })?;
|
.map_err(|_| BindError::External{ addr })?;
|
||||||
Ok(Async::Ready(Change::Insert(addr, svc)))
|
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6,7 +6,7 @@ use futures_mpsc_lossy::Receiver;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
|
|
||||||
use super::event::Event;
|
use super::event::Event;
|
||||||
use super::metrics::prometheus;
|
use super::metrics;
|
||||||
use super::tap::Taps;
|
use super::tap::Taps;
|
||||||
use connection;
|
use connection;
|
||||||
use ctx;
|
use ctx;
|
||||||
|
@ -34,10 +34,10 @@ pub struct MakeControl {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Control {
|
pub struct Control {
|
||||||
/// Aggregates scrapable metrics.
|
/// Aggregates scrapable metrics.
|
||||||
metrics_aggregate: prometheus::Aggregate,
|
metrics_aggregate: metrics::Aggregate,
|
||||||
|
|
||||||
/// Serves scrapable metrics.
|
/// Serves scrapable metrics.
|
||||||
metrics_service: prometheus::Serve,
|
metrics_service: metrics::Serve,
|
||||||
|
|
||||||
/// Receives telemetry events.
|
/// Receives telemetry events.
|
||||||
rx: Option<Receiver<Event>>,
|
rx: Option<Receiver<Event>>,
|
||||||
|
@ -77,7 +77,7 @@ impl MakeControl {
|
||||||
/// - `Err(io::Error)` if the timeout could not be created.
|
/// - `Err(io::Error)` if the timeout could not be created.
|
||||||
pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> {
|
pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> {
|
||||||
let (metrics_aggregate, metrics_service) =
|
let (metrics_aggregate, metrics_service) =
|
||||||
prometheus::new(&self.process_ctx);
|
metrics::new(&self.process_ctx);
|
||||||
|
|
||||||
Ok(Control {
|
Ok(Control {
|
||||||
metrics_aggregate,
|
metrics_aggregate,
|
||||||
|
|
|
@ -2,8 +2,7 @@
|
||||||
use std::{fmt, iter, ops, slice, u32};
|
use std::{fmt, iter, ops, slice, u32};
|
||||||
use std::num::Wrapping;
|
use std::num::Wrapping;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use super::Counter;
|
||||||
use super::prometheus;
|
|
||||||
|
|
||||||
/// The number of buckets in a latency histogram.
|
/// The number of buckets in a latency histogram.
|
||||||
pub const NUM_BUCKETS: usize = 26;
|
pub const NUM_BUCKETS: usize = 26;
|
||||||
|
@ -64,7 +63,7 @@ pub struct Histogram {
|
||||||
/// Array of buckets in which to count latencies.
|
/// Array of buckets in which to count latencies.
|
||||||
///
|
///
|
||||||
/// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`.
|
/// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`.
|
||||||
buckets: [prometheus::Counter; NUM_BUCKETS],
|
buckets: [Counter; NUM_BUCKETS],
|
||||||
|
|
||||||
/// The total sum of all observed latency values.
|
/// The total sum of all observed latency values.
|
||||||
///
|
///
|
||||||
|
@ -137,8 +136,8 @@ where
|
||||||
impl<'a> IntoIterator for &'a Histogram {
|
impl<'a> IntoIterator for &'a Histogram {
|
||||||
type Item = u64;
|
type Item = u64;
|
||||||
type IntoIter = iter::Map<
|
type IntoIter = iter::Map<
|
||||||
slice::Iter<'a, prometheus::Counter>,
|
slice::Iter<'a, Counter>,
|
||||||
fn(&'a prometheus::Counter) -> u64
|
fn(&'a Counter) -> u64
|
||||||
>;
|
>;
|
||||||
|
|
||||||
fn into_iter(self) -> Self::IntoIter {
|
fn into_iter(self) -> Self::IntoIter {
|
||||||
|
|
|
@ -1,2 +1,455 @@
|
||||||
|
//! Aggregates and serves Prometheus metrics.
|
||||||
|
//!
|
||||||
|
//! # A note on label formatting
|
||||||
|
//!
|
||||||
|
//! Prometheus labels are represented as a comma-separated list of values
|
||||||
|
//! Since the Conduit proxy labels its metrics with a fixed set of labels
|
||||||
|
//! which we know in advance, we represent these labels using a number of
|
||||||
|
//! `struct`s, all of which implement `fmt::Display`. Some of the label
|
||||||
|
//! `struct`s contain other structs which represent a subset of the labels
|
||||||
|
//! which can be present on metrics in that scope. In this case, the
|
||||||
|
//! `fmt::Display` impls for those structs call the `fmt::Display` impls for
|
||||||
|
//! the structs that they own. This has the potential to complicate the
|
||||||
|
//! insertion of commas to separate label values.
|
||||||
|
//!
|
||||||
|
//! In order to ensure that commas are added correctly to separate labels,
|
||||||
|
//! we expect the `fmt::Display` implementations for label types to behave in
|
||||||
|
//! a consistent way: A label struct is *never* responsible for printing
|
||||||
|
//! leading or trailing commas before or after the label values it contains.
|
||||||
|
//! If it contains multiple labels, it *is* responsible for ensuring any
|
||||||
|
//! labels it owns are comma-separated. This way, the `fmt::Display` impl for
|
||||||
|
//! any struct that represents a subset of the labels are position-agnostic;
|
||||||
|
//! they don't need to know if there are other labels before or after them in
|
||||||
|
//! the formatted output. The owner is responsible for managing that.
|
||||||
|
//!
|
||||||
|
//! If this rule is followed consistently across all structs representing
|
||||||
|
//! labels, we can add new labels or modify the existing ones without having
|
||||||
|
//! to worry about missing commas, double commas, or trailing commas at the
|
||||||
|
//! end of the label set (all of which will make Prometheus angry).
|
||||||
|
use std::default::Default;
|
||||||
|
use std::{fmt, ops, time};
|
||||||
|
use std::hash::Hash;
|
||||||
|
use std::num::Wrapping;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use futures::future::{self, FutureResult};
|
||||||
|
use hyper;
|
||||||
|
use hyper::header::{ContentLength, ContentType};
|
||||||
|
use hyper::StatusCode;
|
||||||
|
use hyper::server::{
|
||||||
|
Service as HyperService,
|
||||||
|
Request as HyperRequest,
|
||||||
|
Response as HyperResponse
|
||||||
|
};
|
||||||
|
use indexmap::{IndexMap};
|
||||||
|
|
||||||
|
use ctx;
|
||||||
|
use telemetry::event::Event;
|
||||||
|
|
||||||
|
mod labels;
|
||||||
mod latency;
|
mod latency;
|
||||||
pub mod prometheus;
|
|
||||||
|
use self::labels::{RequestLabels, ResponseLabels};
|
||||||
|
use self::latency::{BUCKET_BOUNDS, Histogram};
|
||||||
|
pub use self::labels::{DstLabels, Labeled};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Metrics {
|
||||||
|
request_total: Metric<Counter, Arc<RequestLabels>>,
|
||||||
|
request_duration: Metric<Histogram, Arc<RequestLabels>>,
|
||||||
|
|
||||||
|
response_total: Metric<Counter, Arc<ResponseLabels>>,
|
||||||
|
response_duration: Metric<Histogram, Arc<ResponseLabels>>,
|
||||||
|
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
|
||||||
|
|
||||||
|
start_time: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Metric<M, L: Hash + Eq> {
|
||||||
|
name: &'static str,
|
||||||
|
help: &'static str,
|
||||||
|
values: IndexMap<L, M>
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int.
|
||||||
|
///
|
||||||
|
/// Counters always explicitly wrap on overflows rather than panicking in
|
||||||
|
/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks
|
||||||
|
/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less
|
||||||
|
/// problematic than panicking in this case.
|
||||||
|
///
|
||||||
|
/// Note, however, that Prometheus represents counters using 64-bit
|
||||||
|
/// floating-point numbers. The correct semantics are to ensure the counter
|
||||||
|
/// always gets reset to zero after Prometheus reads it, before it would ever
|
||||||
|
/// overflow a 52-bit `f64` mantissa.
|
||||||
|
///
|
||||||
|
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
|
||||||
|
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
|
||||||
|
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
|
||||||
|
///
|
||||||
|
// TODO: Implement Prometheus reset semantics correctly, taking into
|
||||||
|
// consideration that Prometheus models counters as `f64` and so
|
||||||
|
// there are only 52 significant bits.
|
||||||
|
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
|
||||||
|
pub struct Counter(Wrapping<u64>);
|
||||||
|
|
||||||
|
/// Tracks Prometheus metrics
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Aggregate {
|
||||||
|
metrics: Arc<Mutex<Metrics>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serve Prometheues metrics.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Serve {
|
||||||
|
metrics: Arc<Mutex<Metrics>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct the Prometheus metrics.
|
||||||
|
///
|
||||||
|
/// Returns the `Aggregate` and `Serve` sides. The `Serve` side
|
||||||
|
/// is a Hyper service which can be used to create the server for the
|
||||||
|
/// scrape endpoint, while the `Aggregate` side can receive updates to the
|
||||||
|
/// metrics by calling `record_event`.
|
||||||
|
pub fn new(process: &Arc<ctx::Process>) -> (Aggregate, Serve) {
|
||||||
|
let metrics = Arc::new(Mutex::new(Metrics::new(process)));
|
||||||
|
(Aggregate::new(&metrics), Serve::new(&metrics))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Metrics =====
|
||||||
|
|
||||||
|
impl Metrics {
|
||||||
|
|
||||||
|
pub fn new(process: &Arc<ctx::Process>) -> Self {
|
||||||
|
|
||||||
|
let start_time = process.start_time
|
||||||
|
.duration_since(time::UNIX_EPOCH)
|
||||||
|
.expect(
|
||||||
|
"process start time should not be before the beginning \
|
||||||
|
of the Unix epoch"
|
||||||
|
)
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
let request_total = Metric::<Counter, Arc<RequestLabels>>::new(
|
||||||
|
"request_total",
|
||||||
|
"A counter of the number of requests the proxy has received.",
|
||||||
|
);
|
||||||
|
|
||||||
|
let request_duration = Metric::<Histogram, Arc<RequestLabels>>::new(
|
||||||
|
"request_duration_ms",
|
||||||
|
"A histogram of the duration of a request. This is measured from \
|
||||||
|
when the request headers are received to when the request \
|
||||||
|
stream has completed.",
|
||||||
|
);
|
||||||
|
|
||||||
|
let response_total = Metric::<Counter, Arc<ResponseLabels>>::new(
|
||||||
|
"response_total",
|
||||||
|
"A counter of the number of responses the proxy has received.",
|
||||||
|
);
|
||||||
|
|
||||||
|
let response_duration = Metric::<Histogram, Arc<ResponseLabels>>::new(
|
||||||
|
"response_duration_ms",
|
||||||
|
"A histogram of the duration of a response. This is measured from \
|
||||||
|
when the response headers are received to when the response \
|
||||||
|
stream has completed.",
|
||||||
|
);
|
||||||
|
|
||||||
|
let response_latency = Metric::<Histogram, Arc<ResponseLabels>>::new(
|
||||||
|
"response_latency_ms",
|
||||||
|
"A histogram of the total latency of a response. This is measured \
|
||||||
|
from when the request headers are received to when the response \
|
||||||
|
stream has completed.",
|
||||||
|
);
|
||||||
|
|
||||||
|
Metrics {
|
||||||
|
request_total,
|
||||||
|
request_duration,
|
||||||
|
response_total,
|
||||||
|
response_duration,
|
||||||
|
response_latency,
|
||||||
|
start_time,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_total(&mut self,
|
||||||
|
labels: &Arc<RequestLabels>)
|
||||||
|
-> &mut Counter {
|
||||||
|
self.request_total.values
|
||||||
|
.entry(labels.clone())
|
||||||
|
.or_insert_with(Default::default)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_duration(&mut self,
|
||||||
|
labels: &Arc<RequestLabels>)
|
||||||
|
-> &mut Histogram {
|
||||||
|
self.request_duration.values
|
||||||
|
.entry(labels.clone())
|
||||||
|
.or_insert_with(Default::default)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_duration(&mut self,
|
||||||
|
labels: &Arc<ResponseLabels>)
|
||||||
|
-> &mut Histogram {
|
||||||
|
self.response_duration.values
|
||||||
|
.entry(labels.clone())
|
||||||
|
.or_insert_with(Default::default)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_latency(&mut self,
|
||||||
|
labels: &Arc<ResponseLabels>)
|
||||||
|
-> &mut Histogram {
|
||||||
|
self.response_latency.values
|
||||||
|
.entry(labels.clone())
|
||||||
|
.or_insert_with(Default::default)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_total(&mut self,
|
||||||
|
labels: &Arc<ResponseLabels>)
|
||||||
|
-> &mut Counter {
|
||||||
|
self.response_total.values
|
||||||
|
.entry(labels.clone())
|
||||||
|
.or_insert_with(Default::default)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl fmt::Display for Metrics {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "{}\n{}\n{}\n{}\n{}\nprocess_start_time_seconds {}\n",
|
||||||
|
self.request_total,
|
||||||
|
self.request_duration,
|
||||||
|
self.response_total,
|
||||||
|
self.response_duration,
|
||||||
|
self.response_latency,
|
||||||
|
self.start_time,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ===== impl Counter =====
|
||||||
|
|
||||||
|
impl fmt::Display for Counter {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "{}", (self.0).0 as f64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<u64> for Counter {
|
||||||
|
fn into(self) -> u64 {
|
||||||
|
(self.0).0 as u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ops::Add for Counter {
|
||||||
|
type Output = Self;
|
||||||
|
fn add(self, Counter(rhs): Self) -> Self::Output {
|
||||||
|
Counter(self.0 + rhs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Counter {
|
||||||
|
|
||||||
|
/// Increment the counter by one.
|
||||||
|
///
|
||||||
|
/// This function wraps on overflows.
|
||||||
|
pub fn incr(&mut self) -> &mut Self {
|
||||||
|
(*self).0 += Wrapping(1);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Metric =====
|
||||||
|
|
||||||
|
impl<M, L: Hash + Eq> Metric<M, L> {
|
||||||
|
|
||||||
|
pub fn new(name: &'static str, help: &'static str) -> Self {
|
||||||
|
Metric {
|
||||||
|
name,
|
||||||
|
help,
|
||||||
|
values: IndexMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L> fmt::Display for Metric<Counter, L>
|
||||||
|
where
|
||||||
|
L: fmt::Display,
|
||||||
|
L: Hash + Eq,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f,
|
||||||
|
"# HELP {name} {help}\n# TYPE {name} counter\n",
|
||||||
|
name = self.name,
|
||||||
|
help = self.help,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
for (labels, value) in &self.values {
|
||||||
|
write!(f, "{name}{{{labels}}} {value}\n",
|
||||||
|
name = self.name,
|
||||||
|
labels = labels,
|
||||||
|
value = value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L> fmt::Display for Metric<Histogram, L> where
|
||||||
|
L: fmt::Display,
|
||||||
|
L: Hash + Eq,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f,
|
||||||
|
"# HELP {name} {help}\n# TYPE {name} histogram\n",
|
||||||
|
name = self.name,
|
||||||
|
help = self.help,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
for (labels, histogram) in &self.values {
|
||||||
|
// Look up the bucket numbers against the BUCKET_BOUNDS array
|
||||||
|
// to turn them into upper bounds.
|
||||||
|
let bounds_and_counts = histogram.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(num, count)| (BUCKET_BOUNDS[num], count));
|
||||||
|
|
||||||
|
// Since Prometheus expects each bucket's value to be the sum of
|
||||||
|
// the number of values in this bucket and all lower buckets,
|
||||||
|
// track the total count here.
|
||||||
|
let mut total_count = 0;
|
||||||
|
for (le, count) in bounds_and_counts {
|
||||||
|
// Add this bucket's count to the total count.
|
||||||
|
total_count += count;
|
||||||
|
write!(f, "{name}_bucket{{{labels},le=\"{le}\"}} {count}\n",
|
||||||
|
name = self.name,
|
||||||
|
labels = labels,
|
||||||
|
le = le,
|
||||||
|
// Print the total count *as of this iteration*.
|
||||||
|
count = total_count,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print the total count and histogram sum stats.
|
||||||
|
write!(f,
|
||||||
|
"{name}_count{{{labels}}} {count}\n\
|
||||||
|
{name}_sum{{{labels}}} {sum}\n",
|
||||||
|
name = self.name,
|
||||||
|
labels = labels,
|
||||||
|
count = total_count,
|
||||||
|
sum = histogram.sum_in_ms(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Aggregate =====
|
||||||
|
|
||||||
|
impl Aggregate {
|
||||||
|
|
||||||
|
fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
||||||
|
Aggregate {
|
||||||
|
metrics: metrics.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn update<F: Fn(&mut Metrics)>(&mut self, f: F) {
|
||||||
|
let mut lock = self.metrics.lock()
|
||||||
|
.expect("metrics lock poisoned");
|
||||||
|
f(&mut *lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Observe the given event.
|
||||||
|
pub fn record_event(&mut self, event: &Event) {
|
||||||
|
trace!("Metrics::record({:?})", event);
|
||||||
|
match *event {
|
||||||
|
|
||||||
|
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
|
||||||
|
// Do nothing; we'll record metrics for the request or response
|
||||||
|
// when the stream *finishes*.
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::StreamRequestFail(ref req, ref fail) => {
|
||||||
|
let labels = Arc::new(RequestLabels::new(req));
|
||||||
|
self.update(|metrics| {
|
||||||
|
*metrics.request_duration(&labels) +=
|
||||||
|
fail.since_request_open;
|
||||||
|
*metrics.request_total(&labels).incr();
|
||||||
|
})
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::StreamRequestEnd(ref req, ref end) => {
|
||||||
|
let labels = Arc::new(RequestLabels::new(req));
|
||||||
|
self.update(|metrics| {
|
||||||
|
*metrics.request_total(&labels).incr();
|
||||||
|
*metrics.request_duration(&labels) +=
|
||||||
|
end.since_request_open;
|
||||||
|
})
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::StreamResponseEnd(ref res, ref end) => {
|
||||||
|
let labels = Arc::new(ResponseLabels::new(
|
||||||
|
res,
|
||||||
|
end.grpc_status,
|
||||||
|
));
|
||||||
|
self.update(|metrics| {
|
||||||
|
*metrics.response_total(&labels).incr();
|
||||||
|
*metrics.response_duration(&labels) += end.since_response_open;
|
||||||
|
*metrics.response_latency(&labels) += end.since_request_open;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::StreamResponseFail(ref res, ref fail) => {
|
||||||
|
// TODO: do we care about the failure's error code here?
|
||||||
|
let labels = Arc::new(ResponseLabels::fail(res));
|
||||||
|
self.update(|metrics| {
|
||||||
|
*metrics.response_total(&labels).incr();
|
||||||
|
*metrics.response_duration(&labels) += fail.since_response_open;
|
||||||
|
*metrics.response_latency(&labels) += fail.since_request_open;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::TransportOpen(_) | Event::TransportClose(_, _) => {
|
||||||
|
// TODO: we don't collect any metrics around transport events.
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ===== impl Serve =====
|
||||||
|
|
||||||
|
impl Serve {
|
||||||
|
fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
||||||
|
Serve { metrics: metrics.clone() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HyperService for Serve {
|
||||||
|
type Request = HyperRequest;
|
||||||
|
type Response = HyperResponse;
|
||||||
|
type Error = hyper::Error;
|
||||||
|
type Future = FutureResult<Self::Response, Self::Error>;
|
||||||
|
|
||||||
|
fn call(&self, req: Self::Request) -> Self::Future {
|
||||||
|
if req.path() != "/metrics" {
|
||||||
|
return future::ok(HyperResponse::new()
|
||||||
|
.with_status(StatusCode::NotFound));
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = {
|
||||||
|
let metrics = self.metrics.lock()
|
||||||
|
.expect("metrics lock poisoned");
|
||||||
|
format!("{}", *metrics)
|
||||||
|
};
|
||||||
|
future::ok(HyperResponse::new()
|
||||||
|
.with_header(ContentLength(body.len() as u64))
|
||||||
|
.with_header(ContentType::plaintext())
|
||||||
|
.with_body(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,453 +0,0 @@
|
||||||
//! Aggregates and serves Prometheus metrics.
|
|
||||||
//!
|
|
||||||
//! # A note on label formatting
|
|
||||||
//!
|
|
||||||
//! Prometheus labels are represented as a comma-separated list of values
|
|
||||||
//! Since the Conduit proxy labels its metrics with a fixed set of labels
|
|
||||||
//! which we know in advance, we represent these labels using a number of
|
|
||||||
//! `struct`s, all of which implement `fmt::Display`. Some of the label
|
|
||||||
//! `struct`s contain other structs which represent a subset of the labels
|
|
||||||
//! which can be present on metrics in that scope. In this case, the
|
|
||||||
//! `fmt::Display` impls for those structs call the `fmt::Display` impls for
|
|
||||||
//! the structs that they own. This has the potential to complicate the
|
|
||||||
//! insertion of commas to separate label values.
|
|
||||||
//!
|
|
||||||
//! In order to ensure that commas are added correctly to separate labels,
|
|
||||||
//! we expect the `fmt::Display` implementations for label types to behave in
|
|
||||||
//! a consistent way: A label struct is *never* responsible for printing
|
|
||||||
//! leading or trailing commas before or after the label values it contains.
|
|
||||||
//! If it contains multiple labels, it *is* responsible for ensuring any
|
|
||||||
//! labels it owns are comma-separated. This way, the `fmt::Display` impl for
|
|
||||||
//! any struct that represents a subset of the labels are position-agnostic;
|
|
||||||
//! they don't need to know if there are other labels before or after them in
|
|
||||||
//! the formatted output. The owner is responsible for managing that.
|
|
||||||
//!
|
|
||||||
//! If this rule is followed consistently across all structs representing
|
|
||||||
//! labels, we can add new labels or modify the existing ones without having
|
|
||||||
//! to worry about missing commas, double commas, or trailing commas at the
|
|
||||||
//! end of the label set (all of which will make Prometheus angry).
|
|
||||||
use std::default::Default;
|
|
||||||
use std::{fmt, ops, time};
|
|
||||||
use std::hash::Hash;
|
|
||||||
use std::num::Wrapping;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use futures::future::{self, FutureResult};
|
|
||||||
use hyper;
|
|
||||||
use hyper::header::{ContentLength, ContentType};
|
|
||||||
use hyper::StatusCode;
|
|
||||||
use hyper::server::{
|
|
||||||
Service as HyperService,
|
|
||||||
Request as HyperRequest,
|
|
||||||
Response as HyperResponse
|
|
||||||
};
|
|
||||||
use indexmap::{IndexMap};
|
|
||||||
|
|
||||||
use ctx;
|
|
||||||
use telemetry::event::Event;
|
|
||||||
use super::latency::{BUCKET_BOUNDS, Histogram};
|
|
||||||
|
|
||||||
mod labels;
|
|
||||||
use self::labels::{RequestLabels, ResponseLabels};
|
|
||||||
pub use self::labels::{DstLabels, Labeled};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct Metrics {
|
|
||||||
request_total: Metric<Counter, Arc<RequestLabels>>,
|
|
||||||
request_duration: Metric<Histogram, Arc<RequestLabels>>,
|
|
||||||
|
|
||||||
response_total: Metric<Counter, Arc<ResponseLabels>>,
|
|
||||||
response_duration: Metric<Histogram, Arc<ResponseLabels>>,
|
|
||||||
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
|
|
||||||
|
|
||||||
start_time: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct Metric<M, L: Hash + Eq> {
|
|
||||||
name: &'static str,
|
|
||||||
help: &'static str,
|
|
||||||
values: IndexMap<L, M>
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int.
|
|
||||||
///
|
|
||||||
/// Counters always explicitly wrap on overflows rather than panicking in
|
|
||||||
/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks
|
|
||||||
/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less
|
|
||||||
/// problematic than panicking in this case.
|
|
||||||
///
|
|
||||||
/// Note, however, that Prometheus represents counters using 64-bit
|
|
||||||
/// floating-point numbers. The correct semantics are to ensure the counter
|
|
||||||
/// always gets reset to zero after Prometheus reads it, before it would ever
|
|
||||||
/// overflow a 52-bit `f64` mantissa.
|
|
||||||
///
|
|
||||||
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
|
|
||||||
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
|
|
||||||
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
|
|
||||||
///
|
|
||||||
// TODO: Implement Prometheus reset semantics correctly, taking into
|
|
||||||
// consideration that Prometheus models counters as `f64` and so
|
|
||||||
// there are only 52 significant bits.
|
|
||||||
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
|
|
||||||
pub struct Counter(Wrapping<u64>);
|
|
||||||
|
|
||||||
/// Tracks Prometheus metrics
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Aggregate {
|
|
||||||
metrics: Arc<Mutex<Metrics>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Serve Prometheues metrics.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Serve {
|
|
||||||
metrics: Arc<Mutex<Metrics>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Construct the Prometheus metrics.
|
|
||||||
///
|
|
||||||
/// Returns the `Aggregate` and `Serve` sides. The `Serve` side
|
|
||||||
/// is a Hyper service which can be used to create the server for the
|
|
||||||
/// scrape endpoint, while the `Aggregate` side can receive updates to the
|
|
||||||
/// metrics by calling `record_event`.
|
|
||||||
pub fn new(process: &Arc<ctx::Process>) -> (Aggregate, Serve) {
|
|
||||||
let metrics = Arc::new(Mutex::new(Metrics::new(process)));
|
|
||||||
(Aggregate::new(&metrics), Serve::new(&metrics))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Metrics =====
|
|
||||||
|
|
||||||
impl Metrics {
|
|
||||||
|
|
||||||
pub fn new(process: &Arc<ctx::Process>) -> Self {
|
|
||||||
|
|
||||||
let start_time = process.start_time
|
|
||||||
.duration_since(time::UNIX_EPOCH)
|
|
||||||
.expect(
|
|
||||||
"process start time should not be before the beginning \
|
|
||||||
of the Unix epoch"
|
|
||||||
)
|
|
||||||
.as_secs();
|
|
||||||
|
|
||||||
let request_total = Metric::<Counter, Arc<RequestLabels>>::new(
|
|
||||||
"request_total",
|
|
||||||
"A counter of the number of requests the proxy has received.",
|
|
||||||
);
|
|
||||||
|
|
||||||
let request_duration = Metric::<Histogram, Arc<RequestLabels>>::new(
|
|
||||||
"request_duration_ms",
|
|
||||||
"A histogram of the duration of a request. This is measured from \
|
|
||||||
when the request headers are received to when the request \
|
|
||||||
stream has completed.",
|
|
||||||
);
|
|
||||||
|
|
||||||
let response_total = Metric::<Counter, Arc<ResponseLabels>>::new(
|
|
||||||
"response_total",
|
|
||||||
"A counter of the number of responses the proxy has received.",
|
|
||||||
);
|
|
||||||
|
|
||||||
let response_duration = Metric::<Histogram, Arc<ResponseLabels>>::new(
|
|
||||||
"response_duration_ms",
|
|
||||||
"A histogram of the duration of a response. This is measured from \
|
|
||||||
when the response headers are received to when the response \
|
|
||||||
stream has completed.",
|
|
||||||
);
|
|
||||||
|
|
||||||
let response_latency = Metric::<Histogram, Arc<ResponseLabels>>::new(
|
|
||||||
"response_latency_ms",
|
|
||||||
"A histogram of the total latency of a response. This is measured \
|
|
||||||
from when the request headers are received to when the response \
|
|
||||||
stream has completed.",
|
|
||||||
);
|
|
||||||
|
|
||||||
Metrics {
|
|
||||||
request_total,
|
|
||||||
request_duration,
|
|
||||||
response_total,
|
|
||||||
response_duration,
|
|
||||||
response_latency,
|
|
||||||
start_time,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn request_total(&mut self,
|
|
||||||
labels: &Arc<RequestLabels>)
|
|
||||||
-> &mut Counter {
|
|
||||||
self.request_total.values
|
|
||||||
.entry(labels.clone())
|
|
||||||
.or_insert_with(Default::default)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn request_duration(&mut self,
|
|
||||||
labels: &Arc<RequestLabels>)
|
|
||||||
-> &mut Histogram {
|
|
||||||
self.request_duration.values
|
|
||||||
.entry(labels.clone())
|
|
||||||
.or_insert_with(Default::default)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn response_duration(&mut self,
|
|
||||||
labels: &Arc<ResponseLabels>)
|
|
||||||
-> &mut Histogram {
|
|
||||||
self.response_duration.values
|
|
||||||
.entry(labels.clone())
|
|
||||||
.or_insert_with(Default::default)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn response_latency(&mut self,
|
|
||||||
labels: &Arc<ResponseLabels>)
|
|
||||||
-> &mut Histogram {
|
|
||||||
self.response_latency.values
|
|
||||||
.entry(labels.clone())
|
|
||||||
.or_insert_with(Default::default)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn response_total(&mut self,
|
|
||||||
labels: &Arc<ResponseLabels>)
|
|
||||||
-> &mut Counter {
|
|
||||||
self.response_total.values
|
|
||||||
.entry(labels.clone())
|
|
||||||
.or_insert_with(Default::default)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl fmt::Display for Metrics {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "{}\n{}\n{}\n{}\n{}\nprocess_start_time_seconds {}\n",
|
|
||||||
self.request_total,
|
|
||||||
self.request_duration,
|
|
||||||
self.response_total,
|
|
||||||
self.response_duration,
|
|
||||||
self.response_latency,
|
|
||||||
self.start_time,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ===== impl Counter =====
|
|
||||||
|
|
||||||
impl fmt::Display for Counter {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "{}", (self.0).0 as f64)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<u64> for Counter {
|
|
||||||
fn into(self) -> u64 {
|
|
||||||
(self.0).0 as u64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ops::Add for Counter {
|
|
||||||
type Output = Self;
|
|
||||||
fn add(self, Counter(rhs): Self) -> Self::Output {
|
|
||||||
Counter(self.0 + rhs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Counter {
|
|
||||||
|
|
||||||
/// Increment the counter by one.
|
|
||||||
///
|
|
||||||
/// This function wraps on overflows.
|
|
||||||
pub fn incr(&mut self) -> &mut Self {
|
|
||||||
(*self).0 += Wrapping(1);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Metric =====
|
|
||||||
|
|
||||||
impl<M, L: Hash + Eq> Metric<M, L> {
|
|
||||||
|
|
||||||
pub fn new(name: &'static str, help: &'static str) -> Self {
|
|
||||||
Metric {
|
|
||||||
name,
|
|
||||||
help,
|
|
||||||
values: IndexMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<L> fmt::Display for Metric<Counter, L>
|
|
||||||
where
|
|
||||||
L: fmt::Display,
|
|
||||||
L: Hash + Eq,
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f,
|
|
||||||
"# HELP {name} {help}\n# TYPE {name} counter\n",
|
|
||||||
name = self.name,
|
|
||||||
help = self.help,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for (labels, value) in &self.values {
|
|
||||||
write!(f, "{name}{{{labels}}} {value}\n",
|
|
||||||
name = self.name,
|
|
||||||
labels = labels,
|
|
||||||
value = value,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<L> fmt::Display for Metric<Histogram, L> where
|
|
||||||
L: fmt::Display,
|
|
||||||
L: Hash + Eq,
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f,
|
|
||||||
"# HELP {name} {help}\n# TYPE {name} histogram\n",
|
|
||||||
name = self.name,
|
|
||||||
help = self.help,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for (labels, histogram) in &self.values {
|
|
||||||
// Look up the bucket numbers against the BUCKET_BOUNDS array
|
|
||||||
// to turn them into upper bounds.
|
|
||||||
let bounds_and_counts = histogram.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(num, count)| (BUCKET_BOUNDS[num], count));
|
|
||||||
|
|
||||||
// Since Prometheus expects each bucket's value to be the sum of
|
|
||||||
// the number of values in this bucket and all lower buckets,
|
|
||||||
// track the total count here.
|
|
||||||
let mut total_count = 0;
|
|
||||||
for (le, count) in bounds_and_counts {
|
|
||||||
// Add this bucket's count to the total count.
|
|
||||||
total_count += count;
|
|
||||||
write!(f, "{name}_bucket{{{labels},le=\"{le}\"}} {count}\n",
|
|
||||||
name = self.name,
|
|
||||||
labels = labels,
|
|
||||||
le = le,
|
|
||||||
// Print the total count *as of this iteration*.
|
|
||||||
count = total_count,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Print the total count and histogram sum stats.
|
|
||||||
write!(f,
|
|
||||||
"{name}_count{{{labels}}} {count}\n\
|
|
||||||
{name}_sum{{{labels}}} {sum}\n",
|
|
||||||
name = self.name,
|
|
||||||
labels = labels,
|
|
||||||
count = total_count,
|
|
||||||
sum = histogram.sum_in_ms(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Aggregate =====
|
|
||||||
|
|
||||||
impl Aggregate {
|
|
||||||
|
|
||||||
fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
|
||||||
Aggregate {
|
|
||||||
metrics: metrics.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn update<F: Fn(&mut Metrics)>(&mut self, f: F) {
|
|
||||||
let mut lock = self.metrics.lock()
|
|
||||||
.expect("metrics lock poisoned");
|
|
||||||
f(&mut *lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Observe the given event.
|
|
||||||
pub fn record_event(&mut self, event: &Event) {
|
|
||||||
trace!("Metrics::record({:?})", event);
|
|
||||||
match *event {
|
|
||||||
|
|
||||||
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
|
|
||||||
// Do nothing; we'll record metrics for the request or response
|
|
||||||
// when the stream *finishes*.
|
|
||||||
},
|
|
||||||
|
|
||||||
Event::StreamRequestFail(ref req, ref fail) => {
|
|
||||||
let labels = Arc::new(RequestLabels::new(req));
|
|
||||||
self.update(|metrics| {
|
|
||||||
*metrics.request_duration(&labels) +=
|
|
||||||
fail.since_request_open;
|
|
||||||
*metrics.request_total(&labels).incr();
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
Event::StreamRequestEnd(ref req, ref end) => {
|
|
||||||
let labels = Arc::new(RequestLabels::new(req));
|
|
||||||
self.update(|metrics| {
|
|
||||||
*metrics.request_total(&labels).incr();
|
|
||||||
*metrics.request_duration(&labels) +=
|
|
||||||
end.since_request_open;
|
|
||||||
})
|
|
||||||
},
|
|
||||||
|
|
||||||
Event::StreamResponseEnd(ref res, ref end) => {
|
|
||||||
let labels = Arc::new(ResponseLabels::new(
|
|
||||||
res,
|
|
||||||
end.grpc_status,
|
|
||||||
));
|
|
||||||
self.update(|metrics| {
|
|
||||||
*metrics.response_total(&labels).incr();
|
|
||||||
*metrics.response_duration(&labels) += end.since_response_open;
|
|
||||||
*metrics.response_latency(&labels) += end.since_request_open;
|
|
||||||
});
|
|
||||||
},
|
|
||||||
|
|
||||||
Event::StreamResponseFail(ref res, ref fail) => {
|
|
||||||
// TODO: do we care about the failure's error code here?
|
|
||||||
let labels = Arc::new(ResponseLabels::fail(res));
|
|
||||||
self.update(|metrics| {
|
|
||||||
*metrics.response_total(&labels).incr();
|
|
||||||
*metrics.response_duration(&labels) += fail.since_response_open;
|
|
||||||
*metrics.response_latency(&labels) += fail.since_request_open;
|
|
||||||
});
|
|
||||||
},
|
|
||||||
|
|
||||||
Event::TransportOpen(_) | Event::TransportClose(_, _) => {
|
|
||||||
// TODO: we don't collect any metrics around transport events.
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ===== impl Serve =====
|
|
||||||
|
|
||||||
impl Serve {
|
|
||||||
fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
|
||||||
Serve { metrics: metrics.clone() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HyperService for Serve {
|
|
||||||
type Request = HyperRequest;
|
|
||||||
type Response = HyperResponse;
|
|
||||||
type Error = hyper::Error;
|
|
||||||
type Future = FutureResult<Self::Response, Self::Error>;
|
|
||||||
|
|
||||||
fn call(&self, req: Self::Request) -> Self::Future {
|
|
||||||
if req.path() != "/metrics" {
|
|
||||||
return future::ok(HyperResponse::new()
|
|
||||||
.with_status(StatusCode::NotFound));
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = {
|
|
||||||
let metrics = self.metrics.lock()
|
|
||||||
.expect("metrics lock poisoned");
|
|
||||||
format!("{}", *metrics)
|
|
||||||
};
|
|
||||||
future::ok(HyperResponse::new()
|
|
||||||
.with_header(ContentLength(body.len() as u64))
|
|
||||||
.with_header(ContentType::plaintext())
|
|
||||||
.with_body(body))
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue