Prepare HTTP metrics for per-route classification (#112)

Previously, stacks were built with `Layer::and_then`. This pattern
severely impacts compile-times as stack complexity grows.

In order to ameliorate this, `app::main` has been changed to build
stacks from the "bottom" (endpoint client) to "top" (serverside
connection) by _push_-ing Layers onto a concrete stack, i.e. and not
composing layers for an abstract stack.

While doing this, we take the oppportunity to remove a ton of
now-unnecessary `PhantomData`. A new, dedicated `phantom_data` stack
module can be used to aid type inference as needed.

Other stack utilities like `map_target` and `map_err` have been
introduced to assist this transition.

Furthermore, all instances of `Layer::new` have been changed to a free
`fn layer` to improve readability.

This change sets up two upcoming changes: a stack-oriented `controller`
client and, subsequently, service-profile-based routing.

* Prepare HTTP metrics for per-route classification

In order to support Service Profiles, the proxy will add a new scope of
HTTP metrics prefixed with `route_`, i.e. so that the proxy exposes
`request_total` and `route_request_total` independently.

Furthermore, the proxy must be able to use different
response-classification logic for each route, and this classification
logic should apply to both metrics scopes.

This alters the `proxy::http::metrics` module so that:

1. HTTP metrics may be scoped with a prefix (as the stack is described).

2. The HTTP metrics layer now discovers the classifier by trying to
   extract it from each request's extensions or fall back to a `Default`
   implementation. Only a default implementation is used presently.

3. It was too easy to use the `Classify` trait API incorrectly.
   Non-default classify implementation could cause a runtime panic!
   The API has been changed so that the type system ensures correct
   usage.

4. The HTTP classifier must be configurable per-request. In order to do
   this, we expect a higher stack layer will add response classifiers to
   request extensions when appropriate (i.e., in a follow-up).

Finally, the `telemetry::Report` type requires updating every time a new
set of metrics is added. We don't need a struct to represent this.
`FmtMetrics::and_then` has been added as a combinator so that a fixed
type is not necessary.
This commit is contained in:
Oliver Gould 2018-10-30 10:47:33 -07:00 committed by GitHub
parent 4e0a1f0100
commit 81b83784f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 273 additions and 177 deletions

View File

@ -8,11 +8,22 @@ pub trait FmtMetrics {
fn as_display(&self) -> DisplayMetrics<&Self> where Self: Sized { fn as_display(&self) -> DisplayMetrics<&Self> where Self: Sized {
DisplayMetrics(self) DisplayMetrics(self)
} }
fn and_then<N>(self, next: N) -> AndThen<Self, N>
where
N: FmtMetrics,
Self: Sized,
{
AndThen(self, next)
}
} }
/// Adapts `FmtMetrics` to `fmt::Display`. /// Adapts `FmtMetrics` to `fmt::Display`.
pub struct DisplayMetrics<F>(F); pub struct DisplayMetrics<F>(F);
#[derive(Clone, Debug)]
pub struct AndThen<A, B>(A, B);
impl<F: FmtMetrics> fmt::Display for DisplayMetrics<F> { impl<F: FmtMetrics> fmt::Display for DisplayMetrics<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt_metrics(f) self.0.fmt_metrics(f)
@ -55,6 +66,14 @@ pub struct Metric<'a, M: FmtMetric> {
// ===== impl Metric ===== // ===== impl Metric =====
impl<'a, M: FmtMetric> Metric<'a, M> { impl<'a, M: FmtMetric> Metric<'a, M> {
pub fn new(name: &'a str, help: &'a str) -> Self {
Self {
name,
help,
_p: PhantomData,
}
}
/// Formats help messages for this metric. /// Formats help messages for this metric.
pub fn fmt_help(&self, f: &mut fmt::Formatter) -> fmt::Result { pub fn fmt_help(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "# HELP {} {}", self.name, self.help)?; writeln!(f, "# HELP {} {}", self.name, self.help)?;
@ -137,3 +156,11 @@ impl<'a, A: FmtMetrics + 'a> FmtMetrics for &'a A {
} }
} }
impl<A: FmtMetrics, B: FmtMetrics> FmtMetrics for AndThen<A, B> {
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt_metrics(f)?;
self.1.fmt_metrics(f)?;
Ok(())
}
}

View File

@ -3,12 +3,15 @@ use http;
use proxy::http::classify; use proxy::http::classify;
#[derive(Clone, Debug)] #[derive(Clone, Debug, Default)]
pub struct Classify; pub struct Classify {}
#[derive(Clone, Debug)] #[derive(Clone, Debug, Default)]
pub struct ClassifyResponse { pub struct ClassifyResponse {}
status: Option<http::StatusCode>,
#[derive(Clone, Debug, Default)]
pub struct ClassifyEos {
status: http::StatusCode,
} }
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Hash, PartialEq, Eq)]
@ -19,28 +22,48 @@ pub enum Class {
} }
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum SuccessOrFailure { Success, Failure } pub enum SuccessOrFailure {
Success,
Failure,
}
// === impl Classify ===
impl classify::Classify for Classify { impl classify::Classify for Classify {
type Class = Class; type Class = Class;
type Error = h2::Error; type Error = h2::Error;
type ClassifyResponse = ClassifyResponse; type ClassifyResponse = ClassifyResponse;
type ClassifyEos = ClassifyEos;
fn classify<B>(&self, _: &http::Request<B>) -> Self::ClassifyResponse { fn classify<B>(&self, _: &http::Request<B>) -> Self::ClassifyResponse {
ClassifyResponse { status: None } ClassifyResponse {}
} }
} }
// === impl ClassifyResponse ===
impl classify::ClassifyResponse for ClassifyResponse { impl classify::ClassifyResponse for ClassifyResponse {
type Class = Class; type Class = Class;
type Error = h2::Error; type Error = h2::Error;
type ClassifyEos = ClassifyEos;
fn start<B>(&mut self, rsp: &http::Response<B>) -> Option<Self::Class> { fn start<B>(self, rsp: &http::Response<B>) -> (ClassifyEos, Option<Class>) {
self.status = Some(rsp.status().clone()); let eos = ClassifyEos {
None status: rsp.status(),
};
(eos, None)
} }
fn eos(&mut self, trailers: Option<&http::HeaderMap>) -> Self::Class { fn error(self, err: &h2::Error) -> Self::Class {
Class::Stream(SuccessOrFailure::Failure, format!("{}", err))
}
}
impl classify::ClassifyEos for ClassifyEos {
type Class = Class;
type Error = h2::Error;
fn eos(self, trailers: Option<&http::HeaderMap>) -> Self::Class {
if let Some(ref trailers) = trailers { if let Some(ref trailers) = trailers {
let mut grpc_status = trailers let mut grpc_status = trailers
.get("grpc-status") .get("grpc-status")
@ -51,21 +74,19 @@ impl classify::ClassifyResponse for ClassifyResponse {
Class::Grpc(SuccessOrFailure::Success, grpc_status) Class::Grpc(SuccessOrFailure::Success, grpc_status)
} else { } else {
Class::Grpc(SuccessOrFailure::Failure, grpc_status) Class::Grpc(SuccessOrFailure::Failure, grpc_status)
} };
} }
} }
let status = self.status.take().expect("response closed more than once"); let result = if self.status.is_server_error() {
let result = if status.is_server_error() {
SuccessOrFailure::Failure SuccessOrFailure::Failure
} else { } else {
SuccessOrFailure::Success SuccessOrFailure::Success
}; };
Class::Http(result, status) Class::Http(result, self.status)
} }
fn error(&mut self, err: &h2::Error) -> Self::Class { fn error(self, err: &h2::Error) -> Self::Class {
Class::Stream(SuccessOrFailure::Failure, format!("{}", err)) Class::Stream(SuccessOrFailure::Failure, format!("{}", err))
} }
} }

View File

@ -11,13 +11,14 @@ use tokio::executor::{self, DefaultExecutor, Executor};
use tokio::runtime::current_thread; use tokio::runtime::current_thread;
use tower_h2; use tower_h2;
use app::{classify, metric_labels::EndpointLabels}; use app::classify::{Class, ClassifyResponse};
use app::{metric_labels::EndpointLabels};
use control; use control;
use dns; use dns;
use drain; use drain;
use futures; use futures;
use logging; use logging;
use metrics; use metrics::{self, FmtMetrics};
use proxy::{ use proxy::{
self, buffer, self, buffer,
http::{client, insert_target, metrics::timestamp_request_open, normalize_uri, router}, http::{client, insert_target, metrics::timestamp_request_open, normalize_uri, router},
@ -175,18 +176,16 @@ where
let (taps, observe) = control::Observe::new(100); let (taps, observe) = control::Observe::new(100);
let (http_metrics, http_report) = proxy::http::metrics::new::< let (http_metrics, http_report) = proxy::http::metrics::new::<
EndpointLabels, EndpointLabels,
classify::Class, Class,
>(config.metrics_retain_idle); >(config.metrics_retain_idle);
let (transport_metrics, transport_report) = transport::metrics::new(); let (transport_metrics, transport_report) = transport::metrics::new();
let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new(); let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();
let report = telemetry::Report::new( let report = http_report
http_report, .and_then(transport_report)
transport_report, .and_then(tls_config_report)
tls_config_report, .and_then(telemetry::process::Report::new(start_time));
telemetry::process::Report::new(start_time),
);
let tls_client_config = tls_config_watch.client.clone(); let tls_client_config = tls_config_watch.client.clone();
let tls_cfg_bg = tls_config_watch.start(tls_config_sensor); let tls_cfg_bg = tls_config_watch.start(tls_config_sensor);
@ -251,7 +250,7 @@ where
.push(normalize_uri::layer()) .push(normalize_uri::layer())
.push(orig_proto_upgrade::layer()) .push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone())) .push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(metrics::layer(http_metrics, classify::Classify)) .push(metrics::layer::<_, ClassifyResponse>(http_metrics))
.push(svc::watch::layer(tls_client_config)); .push(svc::watch::layer(tls_client_config));
let dst_router_stack = endpoint_stack let dst_router_stack = endpoint_stack
@ -320,7 +319,7 @@ where
.push(svc::stack_per_request::layer()) .push(svc::stack_per_request::layer())
.push(normalize_uri::layer()) .push(normalize_uri::layer())
.push(tap::layer(tap_next_id, taps)) .push(tap::layer(tap_next_id, taps))
.push(metrics::layer(http_metrics, classify::Classify)) .push(metrics::layer::<_, ClassifyResponse>(http_metrics))
.push(buffer::layer()) .push(buffer::layer())
.push(limit::layer(MAX_IN_FLIGHT)) .push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(inbound::Recognize::new(default_fwd_addr))); .push(router::layer(inbound::Recognize::new(default_fwd_addr)));

View File

@ -5,12 +5,18 @@ pub trait Classify {
type Class; type Class;
type Error; type Error;
type ClassifyEos: ClassifyEos<Class = Self::Class, Error = Self::Error>;
/// Classifies responses. /// Classifies responses.
/// ///
/// Instances are intended to be used as an `http::Extension` that may be /// Instances are intended to be used as an `http::Extension` that may be
/// cloned to inner stack layers. Cloned instances are **not** intended to /// cloned to inner stack layers. Cloned instances are **not** intended to
/// share state. Each clone should maintain its own internal state. /// share state. Each clone should maintain its own internal state.
type ClassifyResponse: ClassifyResponse<Class = Self::Class, Error = Self::Error> type ClassifyResponse: ClassifyResponse<
Class = Self::Class,
Error = Self::Error,
ClassifyEos = Self::ClassifyEos,
>
+ Clone + Clone
+ Send + Send
+ Sync + Sync
@ -24,28 +30,30 @@ pub trait ClassifyResponse {
/// A response classification. /// A response classification.
type Class; type Class;
type Error; type Error;
type ClassifyEos: ClassifyEos<Class = Self::Class, Error = Self::Error>;
/// Update the classifier with the response headers. /// Produce a stream classifier for this response.
/// ///
/// If this is enough data to classify a response, a classification may be /// If this is enough data to classify a response, a classification may be
/// returned. Implementations should expect that `end` or `error` may be /// returned.
/// called even when a class is returned. fn start<B>(self, headers: &http::Response<B>) -> (Self::ClassifyEos, Option<Self::Class>);
///
/// This is expected to be called only once. /// Classifies the given error.
fn start<B>(&mut self, headers: &http::Response<B>) -> Option<Self::Class>; fn error(self, error: &Self::Error) -> Self::Class;
}
pub trait ClassifyEos {
type Class;
type Error;
/// Update the classifier with an EOS. /// Update the classifier with an EOS.
/// ///
/// Because trailers indicate an EOS, a classification must be returned. /// Because trailers indicate an EOS, a classification must be returned.
/// fn eos(self, trailers: Option<&http::HeaderMap>) -> Self::Class;
/// This is expected to be called only once.
fn eos(&mut self, trailers: Option<&http::HeaderMap>) -> Self::Class;
/// Update the classifier with an underlying error. /// Update the classifier with an underlying error.
/// ///
/// Because errors indicate an end-of-stream, a classification must be /// Because errors indicate an end-of-stream, a classification must be
/// returned. /// returned.
/// fn error(self, error: &Self::Error) -> Self::Class;
/// This is expected to be called only once.
fn error(&mut self, error: &Self::Error) -> Self::Class;
} }

View File

@ -8,15 +8,6 @@ use metrics::{latency, Counter, FmtLabels, FmtMetric, FmtMetrics, Histogram, Met
use super::{ClassMetrics, Metrics, Registry}; use super::{ClassMetrics, Metrics, Registry};
metrics! {
request_total: Counter { "Total count of HTTP requests." },
response_total: Counter { "Total count of HTTP responses" },
response_latency_ms: Histogram<latency::Ms> {
"Elapsed times between a request's headers being received \
and its response stream completing"
}
}
/// Reports HTTP metrics for prometheus. /// Reports HTTP metrics for prometheus.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Report<T, C> pub struct Report<T, C>
@ -24,10 +15,18 @@ where
T: FmtLabels + Hash + Eq, T: FmtLabels + Hash + Eq,
C: FmtLabels + Hash + Eq, C: FmtLabels + Hash + Eq,
{ {
scope: Scope,
registry: Arc<Mutex<Registry<T, C>>>, registry: Arc<Mutex<Registry<T, C>>>,
retain_idle: Duration, retain_idle: Duration,
} }
#[derive(Clone, Debug)]
struct Scope {
request_total_key: String,
response_total_key: String,
response_latency_ms_key: String,
}
// ===== impl Report ===== // ===== impl Report =====
impl<T, C> Report<T, C> impl<T, C> Report<T, C>
@ -36,7 +35,24 @@ where
C: FmtLabels + Hash + Eq, C: FmtLabels + Hash + Eq,
{ {
pub(super) fn new(retain_idle: Duration, registry: Arc<Mutex<Registry<T, C>>>) -> Self { pub(super) fn new(retain_idle: Duration, registry: Arc<Mutex<Registry<T, C>>>) -> Self {
Self { registry, retain_idle, } Self {
registry,
retain_idle,
scope: Scope::default(),
}
}
// FIXME This will be used for route_* metrics.
#[allow(dead_code)]
pub fn with_prefix(self, prefix: &'static str) -> Self {
if prefix.is_empty() {
return self;
}
Self {
scope: Scope::prefixed(prefix),
.. self
}
} }
} }
@ -63,15 +79,15 @@ where
return Ok(()); return Ok(());
} }
request_total.fmt_help(f)?; self.scope.request_total().fmt_help(f)?;
registry.fmt_by_target(f, request_total, |s| &s.total)?; registry.fmt_by_target(f, self.scope.request_total(), |s| &s.total)?;
response_total.fmt_help(f)?; self.scope.response_total().fmt_help(f)?;
registry.fmt_by_class(f, response_total, |s| &s.total)?; registry.fmt_by_class(f, self.scope.response_total(), |s| &s.total)?;
//registry.fmt_by_target(f, response_total, |s| &s.unclassified.total)?; //registry.fmt_by_target(f, response_total, |s| &s.unclassified.total)?;
response_latency_ms.fmt_help(f)?; self.scope.response_latency_ms().fmt_help(f)?;
registry.fmt_by_class(f, response_latency_ms, |s| &s.latency)?; registry.fmt_by_class(f, self.scope.response_latency_ms(), |s| &s.latency)?;
// registry.fmt_by_target(f, response_latency_ms, |s| { // registry.fmt_by_target(f, response_latency_ms, |s| {
// &s.unclassified.latency // &s.unclassified.latency
// })?; // })?;
@ -126,3 +142,49 @@ where
Ok(()) Ok(())
} }
} }
// === impl Scope ===
impl Default for Scope {
fn default() -> Self {
Self {
request_total_key: "request_total".to_owned(),
response_total_key: "response_total".to_owned(),
response_latency_ms_key: "response_latency_ms".to_owned(),
}
}
}
impl Scope {
fn prefixed(prefix: &'static str) -> Self {
if prefix.is_empty() {
return Self::default();
}
Self {
request_total_key: format!("{}_request_total", prefix),
response_total_key: format!("{}_response_total", prefix),
response_latency_ms_key: format!("{}_response_latency_ms", prefix),
}
}
fn request_total(&self) -> Metric<Counter> {
Metric::new(&self.request_total_key, &Self::REQUEST_TOTAL_HELP)
}
fn response_total(&self) -> Metric<Counter> {
Metric::new(&self.response_total_key, &Self::RESPONSE_TOTAL_HELP)
}
fn response_latency_ms(&self) -> Metric<Histogram<latency::Ms>> {
Metric::new(&self.response_latency_ms_key, &Self::RESPONSE_LATENCY_MS_HELP)
}
const REQUEST_TOTAL_HELP: &'static str = "Total count of HTTP requests.";
const RESPONSE_TOTAL_HELP: &'static str = "Total count of HTTP responses.";
const RESPONSE_LATENCY_MS_HELP: &'static str =
"Elapsed times between a request's headers being received \
and its response stream completing";
}

View File

@ -9,47 +9,46 @@ use std::time::Instant;
use tokio_timer::clock; use tokio_timer::clock;
use tower_h2; use tower_h2;
use super::super::{Classify, ClassifyResponse}; use proxy::http::classify::{ClassifyEos, ClassifyResponse};
use super::{ClassMetrics, Metrics, Registry}; use proxy::http::metrics::{ClassMetrics, Metrics, Registry};
use svc; use svc;
/// A stack module that wraps services to record metrics. /// A stack module that wraps services to record metrics.
#[derive(Debug, Clone)] #[derive(Debug)]
pub struct Layer<M, K, C> pub struct Layer<K, C>
where where
K: Clone + Hash + Eq, K: Clone + Hash + Eq,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
classify: C,
registry: Arc<Mutex<Registry<K, C::Class>>>, registry: Arc<Mutex<Registry<K, C::Class>>>,
_p: PhantomData<fn() -> (M)>, _p: PhantomData<fn() -> C>,
} }
/// Wraps services to record metrics. /// Wraps services to record metrics.
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct Stack<M, K, C> pub struct Stack<M, K, C>
where where
K: Clone + Hash + Eq, K: Clone + Hash + Eq,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
classify: C,
registry: Arc<Mutex<Registry<K, C::Class>>>, registry: Arc<Mutex<Registry<K, C::Class>>>,
inner: M, inner: M,
_p: PhantomData<fn() -> C>,
} }
/// A middleware that records HTTP metrics. /// A middleware that records HTTP metrics.
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct Service<S, C> pub struct Service<S, C>
where where
S: svc::Service, S: svc::Service,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
classify: C,
metrics: Option<Arc<Mutex<Metrics<C::Class>>>>, metrics: Option<Arc<Mutex<Metrics<C::Class>>>>,
inner: S, inner: S,
_p: PhantomData<fn() -> C>,
} }
pub struct ResponseFuture<S, C> pub struct ResponseFuture<S, C>
@ -78,7 +77,7 @@ where
pub struct ResponseBody<B, C> pub struct ResponseBody<B, C>
where where
B: tower_h2::Body, B: tower_h2::Body,
C: ClassifyResponse<Error = h2::Error>, C: ClassifyEos<Error = h2::Error>,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
class_at_first_byte: Option<C::Class>, class_at_first_byte: Option<C::Class>,
@ -89,33 +88,35 @@ where
inner: B, inner: B,
} }
// ===== impl Stack ===== // === impl Layer ===
pub fn layer<M, K, C, T, A, B>(registry: Arc<Mutex<Registry<K, C::Class>>>, classify: C) pub fn layer<K, C>(registry: Arc<Mutex<Registry<K, C::Class>>>) -> Layer<K, C>
-> Layer<M, K, C>
where where
K: Clone + Hash + Eq, K: Clone + Hash + Eq,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq, C::Class: Hash + Eq,
C::ClassifyResponse: Send + Sync + 'static,
T: Clone + Debug,
K: From<T>,
M: svc::Stack<T>,
M::Value: svc::Service<
Request = http::Request<RequestBody<A, C::Class>>,
Response = http::Response<B>,
>,
A: tower_h2::Body,
B: tower_h2::Body,
{ {
Layer { Layer {
classify,
registry, registry,
_p: PhantomData, _p: PhantomData,
} }
} }
impl<T, M, K, C, A, B> svc::Layer<T, T, M> for Layer<M, K, C> impl<K, C> Clone for Layer<K, C>
where
K: Clone + Hash + Eq,
C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq,
{
fn clone(&self) -> Self {
Self {
registry: self.registry.clone(),
_p: PhantomData,
}
}
}
impl<T, M, K, C, A, B> svc::Layer<T, T, M> for Layer<K, C>
where where
T: Clone + Debug, T: Clone + Debug,
K: Clone + Hash + Eq + From<T>, K: Clone + Hash + Eq + From<T>,
@ -126,8 +127,7 @@ where
>, >,
A: tower_h2::Body, A: tower_h2::Body,
B: tower_h2::Body, B: tower_h2::Body,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::ClassifyResponse: Debug + Send + Sync + 'static,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
type Value = <Stack<M, K, C> as svc::Stack<T>>::Value; type Value = <Stack<M, K, C> as svc::Stack<T>>::Value;
@ -136,14 +136,30 @@ where
fn bind(&self, inner: M) -> Self::Stack { fn bind(&self, inner: M) -> Self::Stack {
Stack { Stack {
classify: self.classify.clone(),
registry: self.registry.clone(),
inner, inner,
registry: self.registry.clone(),
_p: PhantomData,
} }
} }
} }
// ===== impl Stack ===== // === impl Stack ===
impl<M, K, C> Clone for Stack<M, K, C>
where
M: Clone,
K: Clone + Hash + Eq,
C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
registry: self.registry.clone(),
_p: PhantomData,
}
}
}
impl<T, M, K, C, A, B> svc::Stack<T> for Stack<M, K, C> impl<T, M, K, C, A, B> svc::Stack<T> for Stack<M, K, C>
where where
@ -156,9 +172,8 @@ where
>, >,
A: tower_h2::Body, A: tower_h2::Body,
B: tower_h2::Body, B: tower_h2::Body,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq, C::Class: Hash + Eq,
C::ClassifyResponse: Debug + Send + Sync + 'static,
{ {
type Value = Service<M::Value, C>; type Value = Service<M::Value, C>;
type Error = M::Error; type Error = M::Error;
@ -167,7 +182,6 @@ where
debug!("make: target={:?}", target); debug!("make: target={:?}", target);
let inner = self.inner.make(target)?; let inner = self.inner.make(target)?;
let classify = self.classify.clone();
let metrics = match self.registry.lock() { let metrics = match self.registry.lock() {
Ok(mut r) => Some( Ok(mut r) => Some(
r.by_target r.by_target
@ -179,11 +193,30 @@ where
}; };
debug!("make: metrics={}", metrics.is_some()); debug!("make: metrics={}", metrics.is_some());
Ok(Service { classify, metrics, inner }) Ok(Service {
metrics,
inner,
_p: PhantomData,
})
} }
} }
// ===== impl Service ===== // === impl Service ===
impl<S, C> Clone for Service<S, C>
where
S: svc::Service + Clone,
C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
_p: PhantomData,
}
}
}
impl<C, S, A, B> svc::Service for Service<S, C> impl<C, S, A, B> svc::Service for Service<S, C>
where where
@ -193,14 +226,13 @@ where
>, >,
A: tower_h2::Body, A: tower_h2::Body,
B: tower_h2::Body, B: tower_h2::Body,
C: Classify<Error = h2::Error> + Clone, C: ClassifyResponse<Error = h2::Error> + Clone + Default + Send + Sync + 'static,
C::Class: Hash + Eq, C::Class: Hash + Eq,
C::ClassifyResponse: Debug + Send + Sync + 'static,
{ {
type Request = http::Request<A>; type Request = http::Request<A>;
type Response = http::Response<ResponseBody<B, C::ClassifyResponse>>; type Response = http::Response<ResponseBody<B, C::ClassifyEos>>;
type Error = S::Error; type Error = S::Error;
type Future = ResponseFuture<S, C::ClassifyResponse>; type Future = ResponseFuture<S, C>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready() self.inner.poll_ready()
@ -228,8 +260,10 @@ where
http::Request::from_parts(head, body) http::Request::from_parts(head, body)
}; };
let classify = req.extensions().get::<C>().cloned().unwrap_or_default();
ResponseFuture { ResponseFuture {
classify: Some(self.classify.classify(&req)), classify: Some(classify),
metrics: self.metrics.clone(), metrics: self.metrics.clone(),
stream_open_at: clock::now(), stream_open_at: clock::now(),
inner: self.inner.call(req), inner: self.inner.call(req),
@ -241,17 +275,22 @@ impl<C, S, B> Future for ResponseFuture<S, C>
where where
S: svc::Service<Response = http::Response<B>>, S: svc::Service<Response = http::Response<B>>,
B: tower_h2::Body, B: tower_h2::Body,
C: ClassifyResponse<Error = h2::Error> + Debug + Send + Sync + 'static, C: ClassifyResponse<Error = h2::Error> + Send + Sync + 'static,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
type Item = http::Response<ResponseBody<B, C>>; type Item = http::Response<ResponseBody<B, C::ClassifyEos>>;
type Error = S::Error; type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let rsp = try_ready!(self.inner.poll()); let rsp = try_ready!(self.inner.poll());
let mut classify = self.classify.take(); let (classify, class_at_first_byte) = match self.classify.take() {
let class_at_first_byte = classify.as_mut().and_then(|c| c.start(&rsp)); Some(c) => {
let (eos, class) = c.start(&rsp);
(Some(eos), class)
}
None => (None, None),
};
let rsp = { let rsp = {
let (head, inner) = rsp.into_parts(); let (head, inner) = rsp.into_parts();
@ -303,7 +342,7 @@ where
impl<B, C> Default for ResponseBody<B, C> impl<B, C> Default for ResponseBody<B, C>
where where
B: tower_h2::Body + Default, B: tower_h2::Body + Default,
C: ClassifyResponse<Error = h2::Error>, C: ClassifyEos<Error = h2::Error>,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
fn default() -> Self { fn default() -> Self {
@ -321,7 +360,7 @@ where
impl<B, C> ResponseBody<B, C> impl<B, C> ResponseBody<B, C>
where where
B: tower_h2::Body, B: tower_h2::Body,
C: ClassifyResponse<Error = h2::Error>, C: ClassifyEos<Error = h2::Error>,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
fn record_class(&mut self, class: Option<C::Class>) { fn record_class(&mut self, class: Option<C::Class>) {
@ -352,8 +391,10 @@ where
} }
fn measure_err(&mut self, err: C::Error) -> C::Error { fn measure_err(&mut self, err: C::Error) -> C::Error {
self.class_at_first_byte = None; let c = self
let c = self.classify.take().map(|mut c| c.error(&err)); .class_at_first_byte
.take()
.or_else(|| self.classify.take().map(|c| c.error(&err)));
self.record_class(c); self.record_class(c);
err err
} }
@ -362,7 +403,7 @@ where
impl<B, C> tower_h2::Body for ResponseBody<B, C> impl<B, C> tower_h2::Body for ResponseBody<B, C>
where where
B: tower_h2::Body, B: tower_h2::Body,
C: ClassifyResponse<Error = h2::Error>, C: ClassifyEos<Error = h2::Error>,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
type Data = B::Data; type Data = B::Data;
@ -381,6 +422,7 @@ where
if let c @ Some(_) = self.class_at_first_byte.take() { if let c @ Some(_) = self.class_at_first_byte.take() {
self.record_class(c); self.record_class(c);
self.classify = None;
} }
Ok(Async::Ready(frame)) Ok(Async::Ready(frame))
@ -389,7 +431,7 @@ where
fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, h2::Error> { fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, h2::Error> {
let trls = try_ready!(self.inner.poll_trailers().map_err(|e| self.measure_err(e))); let trls = try_ready!(self.inner.poll_trailers().map_err(|e| self.measure_err(e)));
let c = self.classify.take().map(|mut c| c.eos(trls.as_ref())); let c = self.classify.take().map(|c| c.eos(trls.as_ref()));
self.record_class(c); self.record_class(c);
Ok(Async::Ready(trls)) Ok(Async::Ready(trls))
@ -399,11 +441,11 @@ where
impl<B, C> Drop for ResponseBody<B, C> impl<B, C> Drop for ResponseBody<B, C>
where where
B: tower_h2::Body, B: tower_h2::Body,
C: ClassifyResponse<Error = h2::Error>, C: ClassifyEos<Error = h2::Error>,
C::Class: Hash + Eq, C::Class: Hash + Eq,
{ {
fn drop(&mut self) { fn drop(&mut self) {
let c = self.classify.take().map(|mut c| c.eos(None)); let c = self.classify.take().map(|c| c.eos(None));
self.record_class(c); self.record_class(c);
} }
} }

View File

@ -2,10 +2,6 @@ use metrics as metrics;
mod errno; mod errno;
pub mod process; pub mod process;
mod report;
pub mod tls_config_reload; pub mod tls_config_reload;
pub use self::errno::Errno; pub use self::errno::Errno;
pub use self::report::Report;
pub type ServeMetrics<T, C> = metrics::Serve<Report<T, C>>;

View File

@ -1,59 +0,0 @@
use std::fmt;
use std::hash::Hash;
use metrics::{FmtLabels, FmtMetrics};
use proxy;
use transport::metrics as transport;
use super::{process, tls_config_reload};
/// Implements `FmtMetrics` to report runtime metrics.
#[derive(Clone, Debug)]
pub struct Report<T, C>
where
T: FmtLabels + Hash + Eq,
C: FmtLabels + Hash + Eq,
{
http: proxy::http::metrics::Report<T, C>,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,
process: process::Report,
}
// ===== impl Report =====
impl<T, C> Report<T, C>
where
T: FmtLabels + Hash + Eq,
C: FmtLabels + Hash + Eq,
{
pub fn new(
http: proxy::http::metrics::Report<T, C>,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,
process: process::Report,
) -> Self {
Self {
http,
transports,
tls_config_reload,
process,
}
}
}
// ===== impl Report =====
impl<T, C> FmtMetrics for Report<T, C>
where
T: FmtLabels + Hash + Eq,
C: FmtLabels + Hash + Eq,
{
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.http.fmt_metrics(f)?;
self.transports.fmt_metrics(f)?;
self.tls_config_reload.fmt_metrics(f)?;
self.process.fmt_metrics(f)?;
Ok(())
}
}