Move `Counter` and `Gauge` to their own modules (#861)
In preparation for a larger metrics refactor, this change splits the Counter and Gauge types into their own modules. Furthermore, this makes the minor change to these types: incr() and decr() no longer return `self`. We were not actually ever using the returned self references, and I find the unit return type to more obviously indicate the side-effecty-ness of these calls. #smpfy
This commit is contained in:
parent
219872bab8
commit
6bcc8b25fb
|
@ -0,0 +1,60 @@
|
||||||
|
use std::{fmt, ops};
|
||||||
|
use std::num::Wrapping;
|
||||||
|
|
||||||
|
/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int.
|
||||||
|
///
|
||||||
|
/// Counters always explicitly wrap on overflows rather than panicking in
|
||||||
|
/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks
|
||||||
|
/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less
|
||||||
|
/// problematic than panicking in this case.
|
||||||
|
///
|
||||||
|
/// Note, however, that Prometheus represents counters using 64-bit
|
||||||
|
/// floating-point numbers. The correct semantics are to ensure the counter
|
||||||
|
/// always gets reset to zero after Prometheus reads it, before it would ever
|
||||||
|
/// overflow a 52-bit `f64` mantissa.
|
||||||
|
///
|
||||||
|
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
|
||||||
|
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
|
||||||
|
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
|
||||||
|
///
|
||||||
|
// TODO: Implement Prometheus reset semantics correctly, taking into
|
||||||
|
// consideration that Prometheus models counters as `f64` and so
|
||||||
|
// there are only 52 significant bits.
|
||||||
|
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
|
||||||
|
pub struct Counter(Wrapping<u64>);
|
||||||
|
|
||||||
|
// ===== impl Counter =====
|
||||||
|
|
||||||
|
impl Counter {
|
||||||
|
/// Increment the counter by one.
|
||||||
|
///
|
||||||
|
/// This function wraps on overflows.
|
||||||
|
pub fn incr(&mut self) {
|
||||||
|
(*self).0 += Wrapping(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<u64> for Counter {
|
||||||
|
fn into(self) -> u64 {
|
||||||
|
(self.0).0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ops::Add for Counter {
|
||||||
|
type Output = Self;
|
||||||
|
fn add(self, Counter(rhs): Self) -> Self::Output {
|
||||||
|
Counter(self.0 + rhs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ops::AddAssign<u64> for Counter {
|
||||||
|
fn add_assign(&mut self, rhs: u64) {
|
||||||
|
(*self).0 += Wrapping(rhs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Counter {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
/// An instaneous metric value.
|
||||||
|
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
|
||||||
|
pub struct Gauge(u64);
|
||||||
|
|
||||||
|
impl Gauge {
|
||||||
|
/// Increment the gauge by one.
|
||||||
|
pub fn incr(&mut self) {
|
||||||
|
if let Some(new_value) = self.0.checked_add(1) {
|
||||||
|
(*self).0 = new_value;
|
||||||
|
} else {
|
||||||
|
warn!("Gauge overflow");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decrement the gauge by one.
|
||||||
|
pub fn decr(&mut self) {
|
||||||
|
if let Some(new_value) = self.0.checked_sub(1) {
|
||||||
|
(*self).0 = new_value;
|
||||||
|
} else {
|
||||||
|
warn!("Gauge underflow");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<u64> for Gauge {
|
||||||
|
fn from(n: u64) -> Self {
|
||||||
|
Gauge(n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<u64> for Gauge {
|
||||||
|
fn into(self) -> u64 {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Gauge {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,10 +29,8 @@
|
||||||
use std::default::Default;
|
use std::default::Default;
|
||||||
use std::{fmt, time};
|
use std::{fmt, time};
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::num::Wrapping;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::ops;
|
|
||||||
|
|
||||||
use deflate::CompressionOptions;
|
use deflate::CompressionOptions;
|
||||||
use deflate::write::GzEncoder;
|
use deflate::write::GzEncoder;
|
||||||
|
@ -49,9 +47,13 @@ use indexmap::{IndexMap};
|
||||||
use ctx;
|
use ctx;
|
||||||
use telemetry::event::Event;
|
use telemetry::event::Event;
|
||||||
|
|
||||||
|
mod counter;
|
||||||
|
mod gauge;
|
||||||
mod labels;
|
mod labels;
|
||||||
mod latency;
|
mod latency;
|
||||||
|
|
||||||
|
use self::counter::Counter;
|
||||||
|
use self::gauge::Gauge;
|
||||||
use self::labels::{
|
use self::labels::{
|
||||||
RequestLabels,
|
RequestLabels,
|
||||||
ResponseLabels,
|
ResponseLabels,
|
||||||
|
@ -92,32 +94,6 @@ struct Metric<M, L: Hash + Eq> {
|
||||||
values: IndexMap<L, M>
|
values: IndexMap<L, M>
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int.
|
|
||||||
///
|
|
||||||
/// Counters always explicitly wrap on overflows rather than panicking in
|
|
||||||
/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks
|
|
||||||
/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less
|
|
||||||
/// problematic than panicking in this case.
|
|
||||||
///
|
|
||||||
/// Note, however, that Prometheus represents counters using 64-bit
|
|
||||||
/// floating-point numbers. The correct semantics are to ensure the counter
|
|
||||||
/// always gets reset to zero after Prometheus reads it, before it would ever
|
|
||||||
/// overflow a 52-bit `f64` mantissa.
|
|
||||||
///
|
|
||||||
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
|
|
||||||
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
|
|
||||||
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
|
|
||||||
///
|
|
||||||
// TODO: Implement Prometheus reset semantics correctly, taking into
|
|
||||||
// consideration that Prometheus models counters as `f64` and so
|
|
||||||
// there are only 52 significant bits.
|
|
||||||
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
|
|
||||||
pub struct Counter(Wrapping<u64>);
|
|
||||||
|
|
||||||
/// A Prometheus gauge
|
|
||||||
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
|
|
||||||
pub struct Gauge(u64);
|
|
||||||
|
|
||||||
/// Tracks Prometheus metrics
|
/// Tracks Prometheus metrics
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Aggregate {
|
pub struct Aggregate {
|
||||||
|
@ -316,76 +292,6 @@ impl fmt::Display for TcpMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl Counter =====
|
|
||||||
|
|
||||||
impl fmt::Display for Counter {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "{}", (self.0).0 as f64)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<u64> for Counter {
|
|
||||||
fn into(self) -> u64 {
|
|
||||||
(self.0).0 as u64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ops::Add for Counter {
|
|
||||||
type Output = Self;
|
|
||||||
fn add(self, Counter(rhs): Self) -> Self::Output {
|
|
||||||
Counter(self.0 + rhs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ops::AddAssign<u64> for Counter {
|
|
||||||
fn add_assign(&mut self, rhs: u64) {
|
|
||||||
(*self).0 += Wrapping(rhs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl Counter {
|
|
||||||
|
|
||||||
/// Increment the counter by one.
|
|
||||||
///
|
|
||||||
/// This function wraps on overflows.
|
|
||||||
pub fn incr(&mut self) -> &mut Self {
|
|
||||||
(*self).0 += Wrapping(1);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Gauge =====
|
|
||||||
|
|
||||||
impl fmt::Display for Gauge {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "{}", self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Gauge {
|
|
||||||
/// Increment the gauge by one.
|
|
||||||
pub fn incr(&mut self) -> &mut Self {
|
|
||||||
if let Some(new_value) = self.0.checked_add(1) {
|
|
||||||
(*self).0 = new_value;
|
|
||||||
} else {
|
|
||||||
warn!("Gauge::incr() would wrap!");
|
|
||||||
}
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Decrement the gauge by one.
|
|
||||||
pub fn decr(&mut self) -> &mut Self {
|
|
||||||
if let Some(new_value) = self.0.checked_sub(1) {
|
|
||||||
(*self).0 = new_value;
|
|
||||||
} else {
|
|
||||||
warn!("Gauge::decr() called on a gauge with value 0");
|
|
||||||
}
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl Metric =====
|
// ===== impl Metric =====
|
||||||
|
|
||||||
impl<M, L: Hash + Eq> Metric<M, L> {
|
impl<M, L: Hash + Eq> Metric<M, L> {
|
||||||
|
@ -527,14 +433,14 @@ impl Aggregate {
|
||||||
Event::StreamRequestFail(ref req, _) => {
|
Event::StreamRequestFail(ref req, _) => {
|
||||||
let labels = Arc::new(RequestLabels::new(req));
|
let labels = Arc::new(RequestLabels::new(req));
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
*metrics.request_total(&labels).incr();
|
metrics.request_total(&labels).incr();
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
Event::StreamRequestEnd(ref req, _) => {
|
Event::StreamRequestEnd(ref req, _) => {
|
||||||
let labels = Arc::new(RequestLabels::new(req));
|
let labels = Arc::new(RequestLabels::new(req));
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
*metrics.request_total(&labels).incr();
|
metrics.request_total(&labels).incr();
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -544,7 +450,7 @@ impl Aggregate {
|
||||||
end.grpc_status,
|
end.grpc_status,
|
||||||
));
|
));
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
*metrics.response_total(&labels).incr();
|
metrics.response_total(&labels).incr();
|
||||||
*metrics.response_latency(&labels) += end.since_request_open;
|
*metrics.response_latency(&labels) += end.since_request_open;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
@ -553,7 +459,7 @@ impl Aggregate {
|
||||||
// TODO: do we care about the failure's error code here?
|
// TODO: do we care about the failure's error code here?
|
||||||
let labels = Arc::new(ResponseLabels::fail(res));
|
let labels = Arc::new(ResponseLabels::fail(res));
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
*metrics.response_total(&labels).incr();
|
metrics.response_total(&labels).incr();
|
||||||
*metrics.response_latency(&labels) += fail.since_request_open;
|
*metrics.response_latency(&labels) += fail.since_request_open;
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
@ -561,8 +467,8 @@ impl Aggregate {
|
||||||
Event::TransportOpen(ref ctx) => {
|
Event::TransportOpen(ref ctx) => {
|
||||||
let labels = Arc::new(TransportLabels::new(ctx));
|
let labels = Arc::new(TransportLabels::new(ctx));
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
*metrics.tcp().open_total(&labels).incr();
|
metrics.tcp().open_total(&labels).incr();
|
||||||
*metrics.tcp().open_connections(&labels).incr();
|
metrics.tcp().open_connections(&labels).incr();
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -574,13 +480,13 @@ impl Aggregate {
|
||||||
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
|
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
|
||||||
|
|
||||||
*metrics.tcp().connection_duration(&close_labels) += close.duration;
|
*metrics.tcp().connection_duration(&close_labels) += close.duration;
|
||||||
*metrics.tcp().close_total(&close_labels).incr();
|
metrics.tcp().close_total(&close_labels).incr();
|
||||||
|
|
||||||
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
|
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
|
||||||
debug_assert!(metrics.is_some());
|
debug_assert!(metrics.is_some());
|
||||||
match metrics {
|
match metrics {
|
||||||
Some(m) => {
|
Some(m) => {
|
||||||
*m.decr();
|
m.decr();
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
error!("Closed transport missing from metrics registry: {{{}}}", labels);
|
error!("Closed transport missing from metrics registry: {{{}}}", labels);
|
||||||
|
|
Loading…
Reference in New Issue