proxy: Group metrics by label (#879)
Previously, we maintained a map of labels for each metric. Because the same keys are used in multiple scopes, this causes redundant hashing & map lookup when updating metrics. With this change, there is now only one map per unique label scope and all of the metrics for each scope are stored in the value. This makes metrics inserting faster and prepares for eviction of idle metrics. The Metric type has been split into Metric, which now only holds metric metadata and is responsible for printing a given metric, and Scopes which holds groupings of metrics by label. The metrics! macro is provided to make it easy to define Metric instances statically.
This commit is contained in:
parent
c63f0a1976
commit
01aba7c711
|
@ -63,6 +63,8 @@ impl ops::AddAssign<Self> for Counter {
|
|||
}
|
||||
|
||||
impl FmtMetric for Counter {
|
||||
const KIND: &'static str = "counter";
|
||||
|
||||
fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result {
|
||||
writeln!(f, "{} {}", name, self.0)
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ impl Into<u64> for Gauge {
|
|||
}
|
||||
|
||||
impl FmtMetric for Gauge {
|
||||
const KIND: &'static str = "gauge";
|
||||
|
||||
fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result {
|
||||
writeln!(f, "{} {}", name, self.0)
|
||||
}
|
||||
|
|
|
@ -103,6 +103,8 @@ impl<'a, V: Into<u64>> IntoIterator for &'a Histogram<V> {
|
|||
}
|
||||
|
||||
impl<V: Into<u64>> FmtMetric for Histogram<V> {
|
||||
const KIND: &'static str = "histogram";
|
||||
|
||||
fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result {
|
||||
let mut total = Counter::default();
|
||||
for (le, count) in self {
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::{
|
||||
latency,
|
||||
Counter,
|
||||
Histogram,
|
||||
Metric,
|
||||
RequestLabels,
|
||||
ResponseLabels,
|
||||
Scopes
|
||||
};
|
||||
|
||||
pub(super) type RequestScopes = Scopes<RequestLabels, RequestMetrics>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct RequestMetrics {
|
||||
total: Counter,
|
||||
}
|
||||
|
||||
pub(super) type ResponseScopes = Scopes<ResponseLabels, ResponseMetrics>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ResponseMetrics {
|
||||
total: Counter,
|
||||
latency: Histogram<latency::Ms>,
|
||||
}
|
||||
|
||||
// ===== impl RequestScopes =====
|
||||
|
||||
impl RequestScopes {
|
||||
metrics! {
|
||||
request_total: Counter { "Total count of HTTP requests." }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RequestScopes {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if self.scopes.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::request_total.fmt_help(f)?;
|
||||
Self::request_total.fmt_scopes(f, &self, |s| &s.total)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl RequestMetrics =====
|
||||
|
||||
impl RequestMetrics {
|
||||
pub fn end(&mut self) {
|
||||
self.total.incr();
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl ResponseScopes =====
|
||||
|
||||
impl ResponseScopes {
|
||||
metrics! {
|
||||
response_total: Counter { "Total count of HTTP responses" },
|
||||
response_latency_ms: Histogram<latency::Ms> {
|
||||
"Elapsed times between a request's headers being received \
|
||||
and its response stream completing"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ResponseScopes {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if self.scopes.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::response_total.fmt_help(f)?;
|
||||
Self::response_total.fmt_scopes(f, &self, |s| &s.total)?;
|
||||
|
||||
Self::response_latency_ms.fmt_help(f)?;
|
||||
Self::response_latency_ms.fmt_scopes(f, &self, |s| &s.latency)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl ResponseMetrics =====
|
||||
|
||||
impl ResponseMetrics {
|
||||
pub fn end(&mut self, duration: Duration) {
|
||||
self.total.incr();
|
||||
self.latency.add(duration);
|
||||
}
|
||||
}
|
|
@ -29,20 +29,36 @@
|
|||
use std::default::Default;
|
||||
use std::fmt::{self, Display};
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
|
||||
use ctx;
|
||||
|
||||
macro_rules! metrics {
|
||||
{ $( $name:ident : $kind:ty { $help:expr } ),+ } => {
|
||||
$(
|
||||
#[allow(non_upper_case_globals)]
|
||||
const $name: Metric<'static, $kind> = Metric {
|
||||
name: stringify!($name),
|
||||
help: $help,
|
||||
_p: ::std::marker::PhantomData,
|
||||
};
|
||||
)+
|
||||
}
|
||||
}
|
||||
|
||||
mod counter;
|
||||
mod gauge;
|
||||
mod histogram;
|
||||
mod http;
|
||||
mod labels;
|
||||
mod latency;
|
||||
mod record;
|
||||
mod serve;
|
||||
mod transport;
|
||||
|
||||
use self::counter::Counter;
|
||||
use self::gauge::Gauge;
|
||||
|
@ -63,6 +79,9 @@ pub use self::serve::Serve;
|
|||
/// differences in formatting each type of metric. Specifically, `Histogram` formats a
|
||||
/// counter for each bucket, as well as a count and total sum.
|
||||
trait FmtMetric {
|
||||
/// The metric's `TYPE` in help messages.
|
||||
const KIND: &'static str;
|
||||
|
||||
/// Writes a metric with the given name and no labels.
|
||||
fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result;
|
||||
|
||||
|
@ -73,35 +92,32 @@ trait FmtMetric {
|
|||
L: Display;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Metrics {
|
||||
request_total: Metric<Counter, RequestLabels>,
|
||||
|
||||
response_total: Metric<Counter, ResponseLabels>,
|
||||
response_latency: Metric<Histogram<latency::Ms>, ResponseLabels>,
|
||||
|
||||
tcp: TcpMetrics,
|
||||
|
||||
start_time: u64,
|
||||
/// Describes a metric statically.
|
||||
///
|
||||
/// Formats help messages and metric values for prometheus output.
|
||||
struct Metric<'a, M: FmtMetric> {
|
||||
name: &'a str,
|
||||
help: &'a str,
|
||||
_p: PhantomData<M>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TcpMetrics {
|
||||
open_total: Metric<Counter, TransportLabels>,
|
||||
close_total: Metric<Counter, TransportCloseLabels>,
|
||||
/// The root scope for all runtime metrics.
|
||||
#[derive(Debug, Default)]
|
||||
struct Root {
|
||||
requests: http::RequestScopes,
|
||||
responses: http::ResponseScopes,
|
||||
transports: transport::OpenScopes,
|
||||
transport_closes: transport::CloseScopes,
|
||||
|
||||
connection_duration: Metric<Histogram<latency::Ms>, TransportCloseLabels>,
|
||||
open_connections: Metric<Gauge, TransportLabels>,
|
||||
|
||||
write_bytes_total: Metric<Counter, TransportLabels>,
|
||||
read_bytes_total: Metric<Counter, TransportLabels>,
|
||||
start_time: Gauge,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Metric<M, L: Hash + Eq> {
|
||||
name: &'static str,
|
||||
help: &'static str,
|
||||
values: IndexMap<L, M>
|
||||
/// Holds an `S`-typed scope for each `L`-typed label set.
|
||||
///
|
||||
/// An `S` type typically holds one or more metrics.
|
||||
#[derive(Debug)]
|
||||
struct Scopes<L: Display + Hash + Eq, S> {
|
||||
scopes: IndexMap<L, S>,
|
||||
}
|
||||
|
||||
/// Construct the Prometheus metrics.
|
||||
|
@ -111,255 +127,100 @@ struct Metric<M, L: Hash + Eq> {
|
|||
/// scrape endpoint, while the `Record` side can receive updates to the
|
||||
/// metrics by calling `record_event`.
|
||||
pub fn new(process: &Arc<ctx::Process>) -> (Record, Serve){
|
||||
let metrics = Arc::new(Mutex::new(Metrics::new(process)));
|
||||
let metrics = Arc::new(Mutex::new(Root::new(process)));
|
||||
(Record::new(&metrics), Serve::new(&metrics))
|
||||
}
|
||||
|
||||
// ===== impl Metrics =====
|
||||
|
||||
impl Metrics {
|
||||
|
||||
pub fn new(process: &Arc<ctx::Process>) -> Self {
|
||||
|
||||
let start_time = process.start_time
|
||||
.duration_since(time::UNIX_EPOCH)
|
||||
.expect(
|
||||
"process start time should not be before the beginning \
|
||||
of the Unix epoch"
|
||||
)
|
||||
.as_secs();
|
||||
|
||||
let request_total = Metric::<Counter, RequestLabels>::new(
|
||||
"request_total",
|
||||
"A counter of the number of requests the proxy has received.",
|
||||
);
|
||||
|
||||
let response_total = Metric::<Counter, ResponseLabels>::new(
|
||||
"response_total",
|
||||
"A counter of the number of responses the proxy has received.",
|
||||
);
|
||||
|
||||
let response_latency = Metric::<Histogram<latency::Ms>, ResponseLabels>::new(
|
||||
"response_latency_ms",
|
||||
"A histogram of the total latency of a response. This is measured \
|
||||
from when the request headers are received to when the response \
|
||||
stream has completed.",
|
||||
);
|
||||
|
||||
Metrics {
|
||||
request_total,
|
||||
response_total,
|
||||
response_latency,
|
||||
tcp: TcpMetrics::new(),
|
||||
start_time,
|
||||
}
|
||||
}
|
||||
|
||||
fn request_total(&mut self,
|
||||
labels: RequestLabels)
|
||||
-> &mut Counter {
|
||||
self.request_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn response_latency(&mut self,
|
||||
labels: ResponseLabels)
|
||||
-> &mut Histogram<latency::Ms> {
|
||||
self.response_latency.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn response_total(&mut self,
|
||||
labels: ResponseLabels)
|
||||
-> &mut Counter {
|
||||
self.response_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp(&mut self) -> &mut TcpMetrics {
|
||||
&mut self.tcp
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Metrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
writeln!(f, "{}", self.request_total)?;
|
||||
writeln!(f, "{}", self.response_total)?;
|
||||
writeln!(f, "{}", self.response_latency)?;
|
||||
writeln!(f, "{}", self.tcp)?;
|
||||
|
||||
writeln!(f, "process_start_time_seconds {}", self.start_time)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl TcpMetrics =====
|
||||
|
||||
impl TcpMetrics {
|
||||
pub fn new() -> TcpMetrics {
|
||||
let open_total = Metric::<Counter, TransportLabels>::new(
|
||||
"tcp_open_total",
|
||||
"A counter of the total number of transport connections.",
|
||||
);
|
||||
|
||||
let close_total = Metric::<Counter, TransportCloseLabels>::new(
|
||||
"tcp_close_total",
|
||||
"A counter of the total number of transport connections.",
|
||||
);
|
||||
|
||||
let connection_duration = Metric::<Histogram<latency::Ms>, TransportCloseLabels>::new(
|
||||
"tcp_connection_duration_ms",
|
||||
"A histogram of the duration of the lifetime of connections, in milliseconds",
|
||||
);
|
||||
|
||||
let open_connections = Metric::<Gauge, TransportLabels>::new(
|
||||
"tcp_open_connections",
|
||||
"A gauge of the number of transport connections currently open.",
|
||||
);
|
||||
|
||||
let read_bytes_total = Metric::<Counter, TransportLabels>::new(
|
||||
"tcp_read_bytes_total",
|
||||
"A counter of the total number of recieved bytes."
|
||||
);
|
||||
|
||||
let write_bytes_total = Metric::<Counter, TransportLabels>::new(
|
||||
"tcp_write_bytes_total",
|
||||
"A counter of the total number of sent bytes."
|
||||
);
|
||||
|
||||
Self {
|
||||
open_total,
|
||||
close_total,
|
||||
connection_duration,
|
||||
open_connections,
|
||||
read_bytes_total,
|
||||
write_bytes_total,
|
||||
}
|
||||
}
|
||||
|
||||
fn open_total(&mut self, labels: TransportLabels) -> &mut Counter {
|
||||
self.open_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Default::default)
|
||||
}
|
||||
|
||||
fn close_total(&mut self, labels: TransportCloseLabels) -> &mut Counter {
|
||||
self.close_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn connection_duration(&mut self, labels: TransportCloseLabels) -> &mut Histogram<latency::Ms> {
|
||||
self.connection_duration.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn open_connections(&mut self, labels: TransportLabels) -> &mut Gauge {
|
||||
self.open_connections.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Gauge::default)
|
||||
}
|
||||
|
||||
fn write_bytes_total(&mut self, labels: TransportLabels) -> &mut Counter {
|
||||
self.write_bytes_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn read_bytes_total(&mut self, labels: TransportLabels) -> &mut Counter {
|
||||
self.read_bytes_total.values
|
||||
.entry(labels)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TcpMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
writeln!(f, "{}", self.open_total)?;
|
||||
writeln!(f, "{}", self.close_total)?;
|
||||
writeln!(f, "{}", self.connection_duration)?;
|
||||
writeln!(f, "{}", self.open_connections)?;
|
||||
writeln!(f, "{}", self.write_bytes_total)?;
|
||||
writeln!(f, "{}", self.read_bytes_total)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Metric =====
|
||||
|
||||
impl<M, L: Hash + Eq> Metric<M, L> {
|
||||
|
||||
pub fn new(name: &'static str, help: &'static str) -> Self {
|
||||
Metric {
|
||||
name,
|
||||
help,
|
||||
values: IndexMap::new(),
|
||||
}
|
||||
impl<'a, M: FmtMetric> Metric<'a, M> {
|
||||
/// Formats help messages for this metric.
|
||||
pub fn fmt_help(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
writeln!(f, "# HELP {} {}", self.name, self.help)?;
|
||||
writeln!(f, "# TYPE {} {}", self.name, M::KIND)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
/// Formats a single metric without labels.
|
||||
pub fn fmt_metric(&self, f: &mut fmt::Formatter, metric: M) -> fmt::Result {
|
||||
metric.fmt_metric(f, self.name)
|
||||
}
|
||||
|
||||
impl<L> fmt::Display for Metric<Counter, L>
|
||||
where
|
||||
L: fmt::Display,
|
||||
L: Hash + Eq,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f,
|
||||
"# HELP {name} {help}\n# TYPE {name} counter\n",
|
||||
name = self.name,
|
||||
help = self.help,
|
||||
)?;
|
||||
|
||||
for (labels, value) in &self.values {
|
||||
value.fmt_metric_labeled(f, self.name, labels)?;
|
||||
/// Formats a single metric across labeled scopes.
|
||||
pub fn fmt_scopes<L: Display + Hash + Eq, S, F: Fn(&S) -> &M>(
|
||||
&self,
|
||||
f: &mut fmt::Formatter,
|
||||
scopes: &Scopes<L, S>,
|
||||
to_metric: F
|
||||
)-> fmt::Result {
|
||||
for (labels, scope) in &scopes.scopes {
|
||||
to_metric(scope).fmt_metric_labeled(f, self.name, labels)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<L> fmt::Display for Metric<Gauge, L>
|
||||
where
|
||||
L: fmt::Display,
|
||||
L: Hash + Eq,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f,
|
||||
"# HELP {name} {help}\n# TYPE {name} gauge\n",
|
||||
name = self.name,
|
||||
help = self.help,
|
||||
)?;
|
||||
// ===== impl Root =====
|
||||
|
||||
for (labels, value) in &self.values {
|
||||
value.fmt_metric_labeled(f, self.name, labels)?;
|
||||
impl Root {
|
||||
metrics! {
|
||||
process_start_time_seconds: Gauge {
|
||||
"Time that the process started (in seconds since the UNIX epoch)"
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(process: &Arc<ctx::Process>) -> Self {
|
||||
let t0 = process.start_time
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("process start time")
|
||||
.as_secs();
|
||||
|
||||
Self {
|
||||
start_time: t0.into(),
|
||||
.. Root::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics {
|
||||
self.requests.scopes.entry(labels)
|
||||
.or_insert_with(http::RequestMetrics::default)
|
||||
}
|
||||
|
||||
fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics {
|
||||
self.responses.scopes.entry(labels)
|
||||
.or_insert_with(http::ResponseMetrics::default)
|
||||
}
|
||||
|
||||
fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics {
|
||||
self.transports.scopes.entry(labels)
|
||||
.or_insert_with(transport::OpenMetrics::default)
|
||||
}
|
||||
|
||||
fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics {
|
||||
self.transport_closes.scopes.entry(labels)
|
||||
.or_insert_with(transport::CloseMetrics::default)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Root {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
self.requests.fmt(f)?;
|
||||
self.responses.fmt(f)?;
|
||||
self.transports.fmt(f)?;
|
||||
self.transport_closes.fmt(f)?;
|
||||
|
||||
Self::process_start_time_seconds.fmt_help(f)?;
|
||||
Self::process_start_time_seconds.fmt_metric(f, self.start_time)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, V> fmt::Display for Metric<Histogram<V>, L> where
|
||||
L: fmt::Display,
|
||||
L: Hash + Eq,
|
||||
V: Into<u64>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f,
|
||||
"# HELP {name} {help}\n# TYPE {name} histogram\n",
|
||||
name = self.name,
|
||||
help = self.help,
|
||||
)?;
|
||||
// ===== impl Scopes =====
|
||||
|
||||
for (labels, histogram) in &self.values {
|
||||
histogram.fmt_metric_labeled(f, self.name, labels)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
impl<L: Display + Hash + Eq, M> Default for Scopes<L, M> {
|
||||
fn default() -> Self {
|
||||
Scopes { scopes: IndexMap::default(), }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use telemetry::event::Event;
|
||||
use super::Metrics;
|
||||
use super::Root;
|
||||
use super::labels::{
|
||||
RequestLabels,
|
||||
ResponseLabels,
|
||||
|
@ -12,18 +12,18 @@ use super::labels::{
|
|||
/// Tracks Prometheus metrics
|
||||
#[derive(Debug)]
|
||||
pub struct Record {
|
||||
metrics: Arc<Mutex<Metrics>>,
|
||||
metrics: Arc<Mutex<Root>>,
|
||||
}
|
||||
|
||||
// ===== impl Record =====
|
||||
|
||||
impl Record {
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self {
|
||||
Self { metrics: metrics.clone() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update<F: Fn(&mut Metrics)>(&mut self, f: F) {
|
||||
fn update<F: Fn(&mut Root)>(&mut self, f: F) {
|
||||
let mut lock = self.metrics.lock()
|
||||
.expect("metrics lock poisoned");
|
||||
f(&mut *lock);
|
||||
|
@ -31,71 +31,52 @@ impl Record {
|
|||
|
||||
/// Observe the given event.
|
||||
pub fn record_event(&mut self, event: &Event) {
|
||||
trace!("Metrics::record({:?})", event);
|
||||
trace!("Root::record({:?})", event);
|
||||
match *event {
|
||||
|
||||
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
|
||||
// Do nothing; we'll record metrics for the request or response
|
||||
// when the stream *finishes*.
|
||||
},
|
||||
Event::StreamRequestOpen(_) => {},
|
||||
|
||||
Event::StreamRequestFail(ref req, _) => {
|
||||
self.update(|metrics| {
|
||||
metrics.request_total(RequestLabels::new(req)).incr();
|
||||
metrics.request(RequestLabels::new(req)).end();
|
||||
})
|
||||
},
|
||||
|
||||
Event::StreamRequestEnd(ref req, _) => {
|
||||
self.update(|metrics| {
|
||||
metrics.request_total(RequestLabels::new(req)).incr();
|
||||
metrics.request(RequestLabels::new(req)).end();
|
||||
})
|
||||
},
|
||||
|
||||
Event::StreamResponseOpen(_, _) => {},
|
||||
|
||||
Event::StreamResponseEnd(ref res, ref end) => {
|
||||
self.update(|metrics| {
|
||||
let labels = ResponseLabels::new(res, end.grpc_status);
|
||||
metrics.response_total(labels.clone()).incr();
|
||||
metrics.response_latency(labels).add(end.since_request_open);
|
||||
metrics.response(ResponseLabels::new(res, end.grpc_status))
|
||||
.end(end.since_request_open);
|
||||
});
|
||||
},
|
||||
|
||||
Event::StreamResponseFail(ref res, ref fail) => {
|
||||
// TODO: do we care about the failure's error code here?
|
||||
self.update(|metrics| {
|
||||
// TODO: do we care about the failure's error code here?
|
||||
let labels = ResponseLabels::fail(res);
|
||||
metrics.response_total(labels.clone()).incr();
|
||||
metrics.response_latency(labels).add(fail.since_request_open);
|
||||
metrics.response(ResponseLabels::fail(res)).end(fail.since_request_open)
|
||||
});
|
||||
},
|
||||
|
||||
Event::TransportOpen(ref ctx) => {
|
||||
let labels = TransportLabels::new(ctx);
|
||||
self.update(|metrics| {
|
||||
metrics.tcp().open_total(labels).incr();
|
||||
metrics.tcp().open_connections(labels).incr();
|
||||
metrics.transport(TransportLabels::new(ctx)).open();
|
||||
})
|
||||
},
|
||||
|
||||
Event::TransportClose(ref ctx, ref close) => {
|
||||
let labels = TransportLabels::new(ctx);
|
||||
let close_labels = TransportCloseLabels::new(ctx, close);
|
||||
self.update(|metrics| {
|
||||
*metrics.tcp().write_bytes_total(labels) += close.tx_bytes as u64;
|
||||
*metrics.tcp().read_bytes_total(labels) += close.rx_bytes as u64;
|
||||
metrics.transport(TransportLabels::new(ctx))
|
||||
.close(close.rx_bytes, close.tx_bytes);
|
||||
|
||||
metrics.tcp().connection_duration(close_labels).add(close.duration);
|
||||
metrics.tcp().close_total(close_labels).incr();
|
||||
|
||||
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
|
||||
debug_assert!(metrics.is_some());
|
||||
match metrics {
|
||||
Some(m) => {
|
||||
m.decr();
|
||||
}
|
||||
None => {
|
||||
error!("Closed transport missing from metrics registry: {{{}}}", labels);
|
||||
}
|
||||
}
|
||||
metrics.transport_close(TransportCloseLabels::new(ctx, close))
|
||||
.close(close.duration);
|
||||
})
|
||||
},
|
||||
};
|
||||
|
|
|
@ -7,18 +7,18 @@ use hyper::server::{Request, Response, Service};
|
|||
use std::io::Write;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use super::Metrics;
|
||||
use super::Root;
|
||||
|
||||
/// Serve Prometheues metrics.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Serve {
|
||||
metrics: Arc<Mutex<Metrics>>,
|
||||
metrics: Arc<Mutex<Root>>,
|
||||
}
|
||||
|
||||
// ===== impl Serve =====
|
||||
|
||||
impl Serve {
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self {
|
||||
Serve {
|
||||
metrics: metrics.clone(),
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::{
|
||||
latency,
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
Metric,
|
||||
TransportLabels,
|
||||
TransportCloseLabels,
|
||||
Scopes
|
||||
};
|
||||
|
||||
pub(super) type OpenScopes = Scopes<TransportLabels, OpenMetrics>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct OpenMetrics {
|
||||
open_total: Counter,
|
||||
open_connections: Gauge,
|
||||
write_bytes_total: Counter,
|
||||
read_bytes_total: Counter,
|
||||
}
|
||||
|
||||
pub(super) type CloseScopes = Scopes<TransportCloseLabels, CloseMetrics>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct CloseMetrics {
|
||||
close_total: Counter,
|
||||
connection_duration: Histogram<latency::Ms>,
|
||||
}
|
||||
|
||||
// ===== impl OpenScopes =====
|
||||
|
||||
impl OpenScopes {
|
||||
metrics! {
|
||||
tcp_open_total: Counter { "Total count of opened connections" },
|
||||
tcp_open_connections: Gauge { "Number of currently-open connections" },
|
||||
tcp_read_bytes_total: Counter { "Total count of bytes read from peers" },
|
||||
tcp_write_bytes_total: Counter { "Total count of bytes written to peers" }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for OpenScopes {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if self.scopes.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::tcp_open_total.fmt_help(f)?;
|
||||
Self::tcp_open_total.fmt_scopes(f, &self, |s| &s.open_total)?;
|
||||
|
||||
Self::tcp_open_connections.fmt_help(f)?;
|
||||
Self::tcp_open_connections.fmt_scopes(f, &self, |s| &s.open_connections)?;
|
||||
|
||||
Self::tcp_read_bytes_total.fmt_help(f)?;
|
||||
Self::tcp_read_bytes_total.fmt_scopes(f, &self, |s| &s.read_bytes_total)?;
|
||||
|
||||
Self::tcp_write_bytes_total.fmt_help(f)?;
|
||||
Self::tcp_write_bytes_total.fmt_scopes(f, &self, |s| &s.write_bytes_total)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl OpenMetrics =====
|
||||
|
||||
impl OpenMetrics {
|
||||
pub(super) fn open(&mut self) {
|
||||
self.open_total.incr();
|
||||
self.open_connections.incr();
|
||||
}
|
||||
|
||||
pub(super) fn close(&mut self, rx: u64, tx: u64) {
|
||||
self.open_connections.decr();
|
||||
self.read_bytes_total += rx;
|
||||
self.write_bytes_total += tx;
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl CloseScopes =====
|
||||
|
||||
impl CloseScopes {
|
||||
metrics! {
|
||||
tcp_close_total: Counter { "Total count of closed connections" },
|
||||
tcp_connection_duration_ms: Histogram<latency::Ms> { "Connection lifetimes" }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for CloseScopes {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if self.scopes.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::tcp_close_total.fmt_help(f)?;
|
||||
Self::tcp_close_total.fmt_scopes(f, &self, |s| &s.close_total)?;
|
||||
|
||||
Self::tcp_connection_duration_ms.fmt_help(f)?;
|
||||
Self::tcp_connection_duration_ms.fmt_scopes(f, &self, |s| &s.connection_duration)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl CloseMetrics =====
|
||||
|
||||
impl CloseMetrics {
|
||||
pub(super) fn close(&mut self, duration: Duration) {
|
||||
self.close_total.incr();
|
||||
self.connection_duration.add(duration);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue