mirror of https://github.com/linkerd/linkerd2.git
Add request_duration_ms metric and increment request_total on request end (#589)
This PR adds the `request_duration_ms` metric to the Prometheus metrics exported by the proxy. It also modifies the `request_total` metric so that it is incremented when a request stream finishes, rather than when it opens, for consistency with how the `response_total` metric is generated. Making this change required modifying `telemetry::sensors::http` to generate a `StreamRequestEnd` event similar to the `StreamResponseEnd` event. This is done similarly to how sensors are added to response bodies, by generalizing the `ResponseBody` type into a `MeasuredBody` type that can wrap a request or response body. Since this changed the type of request bodies, it necessitated changing request types pretty much everywhere else in the proxy codebase in order to fix the resulting type errors, which is why the diff for this PR is so large. Closes #570
This commit is contained in:
parent
fb1d6a5c66
commit
9321932918
|
@ -79,6 +79,8 @@ pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>;
|
|||
|
||||
pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
|
||||
|
||||
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
|
||||
|
||||
pub type Client<B> = transparency::Client<
|
||||
sensor::Connect<transport::Connect>,
|
||||
B,
|
||||
|
|
|
@ -3,8 +3,8 @@ use std::net::SocketAddr;
|
|||
use std::time::Duration;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use http;
|
||||
use futures::{Async, Poll};
|
||||
use rand;
|
||||
use tower;
|
||||
use tower_balance::{self, choose, load, Balance};
|
||||
|
|
|
@ -12,6 +12,7 @@ pub enum Event {
|
|||
|
||||
StreamRequestOpen(Arc<ctx::http::Request>),
|
||||
StreamRequestFail(Arc<ctx::http::Request>, StreamRequestFail),
|
||||
StreamRequestEnd(Arc<ctx::http::Request>, StreamRequestEnd),
|
||||
|
||||
StreamResponseOpen(Arc<ctx::http::Response>, StreamResponseOpen),
|
||||
StreamResponseFail(Arc<ctx::http::Response>, StreamResponseFail),
|
||||
|
@ -37,6 +38,11 @@ pub struct StreamRequestFail {
|
|||
pub error: h2::Reason,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StreamRequestEnd {
|
||||
pub since_request_open: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StreamResponseOpen {
|
||||
pub since_request_open: Duration,
|
||||
|
@ -67,6 +73,7 @@ impl Event {
|
|||
match *self {
|
||||
Event::StreamRequestOpen(_) |
|
||||
Event::StreamRequestFail(_, _) |
|
||||
Event::StreamRequestEnd(_, _) |
|
||||
Event::StreamResponseOpen(_, _) |
|
||||
Event::StreamResponseFail(_, _) |
|
||||
Event::StreamResponseEnd(_, _) => true,
|
||||
|
@ -84,9 +91,9 @@ impl Event {
|
|||
pub fn proxy(&self) -> &Arc<ctx::Proxy> {
|
||||
match *self {
|
||||
Event::TransportOpen(ref ctx) | Event::TransportClose(ref ctx, _) => ctx.proxy(),
|
||||
Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => {
|
||||
&req.server.proxy
|
||||
}
|
||||
Event::StreamRequestOpen(ref req) |
|
||||
Event::StreamRequestFail(ref req, _) |
|
||||
Event::StreamRequestEnd(ref req, _) => &req.server.proxy,
|
||||
Event::StreamResponseOpen(ref rsp, _) |
|
||||
Event::StreamResponseFail(ref rsp, _) |
|
||||
Event::StreamResponseEnd(ref rsp, _) => &rsp.request.server.proxy,
|
||||
|
|
|
@ -127,7 +127,10 @@ impl Metrics {
|
|||
stats.latencies += fail.since_request_open;
|
||||
*ends += 1;
|
||||
}
|
||||
|
||||
Event::StreamRequestEnd(_, _) => {
|
||||
// Do nothing, as the push metrics are slated for removal and
|
||||
// we don't need to support this event.
|
||||
}
|
||||
Event::StreamResponseOpen(ref res, ref open) => {
|
||||
self.response(res).latencies += open.since_request_open;
|
||||
},
|
||||
|
|
|
@ -22,6 +22,8 @@ use super::latency::{BUCKET_BOUNDS, Histogram};
|
|||
#[derive(Debug, Clone)]
|
||||
struct Metrics {
|
||||
request_total: Metric<Counter, Arc<RequestLabels>>,
|
||||
request_duration: Metric<Histogram, Arc<RequestLabels>>,
|
||||
|
||||
response_total: Metric<Counter, Arc<ResponseLabels>>,
|
||||
response_duration: Metric<Histogram, Arc<ResponseLabels>>,
|
||||
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
|
||||
|
@ -122,6 +124,13 @@ impl Metrics {
|
|||
"A counter of the number of requests the proxy has received.",
|
||||
);
|
||||
|
||||
let request_duration = Metric::<Histogram, Arc<RequestLabels>>::new(
|
||||
"request_duration_ms",
|
||||
"A histogram of the duration of a request. This is measured from \
|
||||
when the request headers are received to when the request \
|
||||
stream has completed.",
|
||||
);
|
||||
|
||||
let response_total = Metric::<Counter, Arc<ResponseLabels>>::new(
|
||||
"response_total",
|
||||
"A counter of the number of responses the proxy has received.",
|
||||
|
@ -133,14 +142,17 @@ impl Metrics {
|
|||
when the response headers are received to when the response \
|
||||
stream has completed.",
|
||||
);
|
||||
|
||||
let response_latency = Metric::<Histogram, Arc<ResponseLabels>>::new(
|
||||
"response_latency_ms",
|
||||
"A histogram of the total latency of a response. This is measured \
|
||||
from when the request headers are received to when the response \
|
||||
stream has completed.",
|
||||
);
|
||||
|
||||
Metrics {
|
||||
request_total,
|
||||
request_duration,
|
||||
response_total,
|
||||
response_duration,
|
||||
response_latency,
|
||||
|
@ -153,6 +165,14 @@ impl Metrics {
|
|||
.or_insert_with(Default::default).0
|
||||
}
|
||||
|
||||
fn request_duration(&mut self,
|
||||
labels: &Arc<RequestLabels>)
|
||||
-> &mut Histogram {
|
||||
self.request_duration.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
}
|
||||
|
||||
fn response_duration(&mut self,
|
||||
labels: &Arc<ResponseLabels>)
|
||||
-> &mut Histogram {
|
||||
|
@ -179,8 +199,9 @@ impl Metrics {
|
|||
|
||||
impl fmt::Display for Metrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}\n{}\n{}\n{}",
|
||||
write!(f, "{}\n{}\n{}\n{}\n{}",
|
||||
self.request_total,
|
||||
self.request_duration,
|
||||
self.response_total,
|
||||
self.response_duration,
|
||||
self.response_latency,
|
||||
|
@ -302,21 +323,28 @@ impl Aggregate {
|
|||
pub fn record_event(&mut self, event: &Event) {
|
||||
trace!("Metrics::record({:?})", event);
|
||||
match *event {
|
||||
Event::StreamRequestOpen(ref req) => {
|
||||
|
||||
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
|
||||
// Do nothing; we'll record metrics for the request or response
|
||||
// when the stream *finishes*.
|
||||
},
|
||||
|
||||
Event::StreamRequestFail(ref req, ref fail) => {
|
||||
let labels = Arc::new(RequestLabels::new(req));
|
||||
self.update(|metrics| {
|
||||
*metrics.request_total(&labels) += 1;
|
||||
*metrics.request_duration(&labels) +=
|
||||
fail.since_request_open;
|
||||
})
|
||||
},
|
||||
|
||||
Event::StreamRequestFail(_, _) => {
|
||||
// The request total was already incremented by the
|
||||
// StreamRequestOpen event; when we count requests on
|
||||
// stream ends, we'll care about this event.
|
||||
},
|
||||
|
||||
Event::StreamResponseOpen(_, _) => {
|
||||
// Wait until the response stream ends to record it.
|
||||
Event::StreamRequestEnd(ref req, ref end) => {
|
||||
let labels = Arc::new(RequestLabels::new(req));
|
||||
self.update(|metrics| {
|
||||
*metrics.request_total(&labels) += 1;
|
||||
*metrics.request_duration(&labels) +=
|
||||
end.since_request_open;
|
||||
})
|
||||
},
|
||||
|
||||
Event::StreamResponseEnd(ref res, ref end) => {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use bytes::{Buf, IntoBuf};
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use h2;
|
||||
use http;
|
||||
use std::default::Default;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
@ -54,15 +55,27 @@ struct RespondInner {
|
|||
request_open: Instant,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct ResponseBody<B> {
|
||||
pub type ResponseBody<B> = MeasuredBody<B, ResponseBodyInner>;
|
||||
pub type RequestBody<B> = MeasuredBody<B, RequestBodyInner>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MeasuredBody<B, I: BodySensor> {
|
||||
body: B,
|
||||
inner: Option<ResponseBodyInner>,
|
||||
inner: Option<I>,
|
||||
_p: PhantomData<(B)>,
|
||||
}
|
||||
|
||||
/// The `inner` portion of a `MeasuredBody`, with differing implementations
|
||||
/// for request and response streams.
|
||||
pub trait BodySensor: Sized {
|
||||
fn fail(self, reason: h2::Reason);
|
||||
fn end(self, grpc_status: Option<u32>);
|
||||
fn frames_sent(&mut self) -> &mut u32;
|
||||
fn bytes_sent(&mut self) -> &mut u64;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ResponseBodyInner {
|
||||
pub struct ResponseBodyInner {
|
||||
handle: super::Handle,
|
||||
ctx: Arc<ctx::http::Response>,
|
||||
bytes_sent: u64,
|
||||
|
@ -71,6 +84,16 @@ struct ResponseBodyInner {
|
|||
response_open: Instant,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RequestBodyInner {
|
||||
handle: super::Handle,
|
||||
ctx: Arc<ctx::http::Request>,
|
||||
bytes_sent: u64,
|
||||
frames_sent: u32,
|
||||
request_open: Instant,
|
||||
}
|
||||
|
||||
// === NewHttp ===
|
||||
|
||||
impl<N, A, B> NewHttp<N, A, B>
|
||||
|
@ -78,7 +101,7 @@ where
|
|||
A: Body + 'static,
|
||||
B: Body + 'static,
|
||||
N: NewService<
|
||||
Request = http::Request<A>,
|
||||
Request = http::Request<RequestBody<A>>,
|
||||
Response = http::Response<B>,
|
||||
Error = client::Error,
|
||||
>
|
||||
|
@ -105,13 +128,13 @@ where
|
|||
A: Body + 'static,
|
||||
B: Body + 'static,
|
||||
N: NewService<
|
||||
Request = http::Request<A>,
|
||||
Request = http::Request<RequestBody<A>>,
|
||||
Response = http::Response<B>,
|
||||
Error = client::Error,
|
||||
>
|
||||
+ 'static,
|
||||
{
|
||||
type Request = N::Request;
|
||||
type Request = http::Request<A>;
|
||||
type Response = http::Response<ResponseBody<B>>;
|
||||
type Error = N::Error;
|
||||
type InitError = N::InitError;
|
||||
|
@ -136,7 +159,10 @@ where
|
|||
A: Body + 'static,
|
||||
B: Body + 'static,
|
||||
F: Future,
|
||||
F::Item: Service<Request = http::Request<A>, Response = http::Response<B>>,
|
||||
F::Item: Service<
|
||||
Request = http::Request<RequestBody<A>>,
|
||||
Response = http::Response<B>
|
||||
>,
|
||||
{
|
||||
type Item = Http<F::Item, A, B>;
|
||||
type Error = F::Error;
|
||||
|
@ -161,13 +187,13 @@ where
|
|||
A: Body + 'static,
|
||||
B: Body + 'static,
|
||||
S: Service<
|
||||
Request = http::Request<A>,
|
||||
Request = http::Request<RequestBody<A>>,
|
||||
Response = http::Response<B>,
|
||||
Error = client::Error,
|
||||
>
|
||||
+ 'static,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Request = http::Request<A>;
|
||||
type Response = http::Response<ResponseBody<B>>;
|
||||
type Error = S::Error;
|
||||
type Future = Respond<S::Future, B>;
|
||||
|
@ -177,8 +203,8 @@ where
|
|||
}
|
||||
|
||||
fn call(&mut self, mut req: Self::Request) -> Self::Future {
|
||||
let inner = match req.extensions_mut().remove::<Arc<ctx::transport::Server>>() {
|
||||
None => None,
|
||||
let (inner, body_inner) = match req.extensions_mut().remove::<Arc<ctx::transport::Server>>() {
|
||||
None => (None, None),
|
||||
Some(ctx) => {
|
||||
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
|
||||
let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id);
|
||||
|
@ -186,15 +212,42 @@ where
|
|||
self.handle
|
||||
.send(|| Event::StreamRequestOpen(Arc::clone(&ctx)));
|
||||
|
||||
Some(RespondInner {
|
||||
ctx,
|
||||
let request_open = Instant::now();
|
||||
|
||||
let respond_inner = Some(RespondInner {
|
||||
ctx: ctx.clone(),
|
||||
handle: self.handle.clone(),
|
||||
request_open: Instant::now(),
|
||||
})
|
||||
request_open,
|
||||
});
|
||||
let body_inner =
|
||||
if req.body().is_end_stream() {
|
||||
self.handle.send(|| {
|
||||
Event::StreamRequestEnd(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamRequestEnd {
|
||||
since_request_open: request_open.elapsed(),
|
||||
},
|
||||
)
|
||||
});
|
||||
None
|
||||
} else {
|
||||
Some(RequestBodyInner {
|
||||
ctx,
|
||||
handle: self.handle.clone(),
|
||||
request_open,
|
||||
frames_sent: 0,
|
||||
bytes_sent: 0,
|
||||
})
|
||||
};
|
||||
(respond_inner, body_inner)
|
||||
}
|
||||
};
|
||||
let req = {
|
||||
let (parts, body) = req.into_parts();
|
||||
let body = MeasuredBody::new(body, body_inner);
|
||||
http::Request::from_parts(parts, body)
|
||||
};
|
||||
|
||||
// TODO measure request lifetime.
|
||||
let future = self.service.call(req);
|
||||
|
||||
Respond {
|
||||
|
@ -272,11 +325,7 @@ where
|
|||
|
||||
let rsp = {
|
||||
let (parts, body) = rsp.into_parts();
|
||||
let body = ResponseBody {
|
||||
body,
|
||||
inner,
|
||||
_p: PhantomData,
|
||||
};
|
||||
let body = ResponseBody::new(body, inner);
|
||||
http::Response::from_parts(parts, body)
|
||||
};
|
||||
|
||||
|
@ -310,9 +359,17 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// === ResponseBody ===
|
||||
// === MeasuredBody ===
|
||||
|
||||
impl<B, I: BodySensor> MeasuredBody<B, I> {
|
||||
pub fn new(body: B, inner: Option<I>) -> Self {
|
||||
Self {
|
||||
body,
|
||||
inner,
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> ResponseBody<B> {
|
||||
/// Wraps an operation on the underlying transport with error telemetry.
|
||||
///
|
||||
/// If the transport operation results in a non-recoverable error, a transport close
|
||||
|
@ -326,28 +383,7 @@ impl<B> ResponseBody<B> {
|
|||
Err(e) => {
|
||||
if let Some(error) = e.reason() {
|
||||
if let Some(i) = self.inner.take() {
|
||||
let ResponseBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
response_open,
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
..
|
||||
} = i;
|
||||
|
||||
handle.send(|| {
|
||||
event::Event::StreamResponseFail(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamResponseFail {
|
||||
error,
|
||||
since_request_open: request_open.elapsed(),
|
||||
since_response_open: response_open.elapsed(),
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
},
|
||||
)
|
||||
});
|
||||
i.fail(error);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,9 +393,10 @@ impl<B> ResponseBody<B> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<B> Body for ResponseBody<B>
|
||||
impl<B, I> Body for MeasuredBody<B, I>
|
||||
where
|
||||
B: Body + 'static,
|
||||
I: BodySensor,
|
||||
{
|
||||
/// The body chunk type
|
||||
type Data = <B::Data as IntoBuf>::Buf;
|
||||
|
@ -373,8 +410,8 @@ where
|
|||
let frame = frame.map(|frame| {
|
||||
let frame = frame.into_buf();
|
||||
if let Some(ref mut inner) = self.inner {
|
||||
inner.frames_sent += 1;
|
||||
inner.bytes_sent += frame.remaining() as u64;
|
||||
*inner.frames_sent() += 1;
|
||||
*inner.bytes_sent() += frame.remaining() as u64;
|
||||
}
|
||||
frame
|
||||
});
|
||||
|
@ -387,32 +424,11 @@ where
|
|||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(trls)) => {
|
||||
if let Some(i) = self.inner.take() {
|
||||
let ResponseBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
response_open,
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
} = i;
|
||||
|
||||
handle.send(|| {
|
||||
let grpc_status = trls.as_ref()
|
||||
.and_then(|t| t.get(GRPC_STATUS))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<u32>().ok());
|
||||
|
||||
event::Event::StreamResponseEnd(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamResponseEnd {
|
||||
grpc_status,
|
||||
since_request_open: request_open.elapsed(),
|
||||
since_response_open: response_open.elapsed(),
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
},
|
||||
)
|
||||
})
|
||||
let grpc_status = trls.as_ref()
|
||||
.and_then(|t| t.get(GRPC_STATUS))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<u32>().ok());
|
||||
i.end(grpc_status);
|
||||
}
|
||||
|
||||
Ok(Async::Ready(trls))
|
||||
|
@ -420,3 +436,141 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, I> Default for MeasuredBody<B, I>
|
||||
where
|
||||
B: Default,
|
||||
I: BodySensor,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
body: B::default(),
|
||||
inner: None,
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, I> Stream for MeasuredBody<B, I>
|
||||
where
|
||||
B: Stream,
|
||||
I: BodySensor,
|
||||
{
|
||||
type Item = B::Item;
|
||||
type Error = B::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.body.poll()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl BodySensor =====
|
||||
|
||||
impl BodySensor for ResponseBodyInner {
|
||||
|
||||
fn fail(self, error: h2::Reason) {
|
||||
let ResponseBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
response_open,
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
..
|
||||
} = self;
|
||||
|
||||
handle.send(|| {
|
||||
event::Event::StreamResponseFail(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamResponseFail {
|
||||
error,
|
||||
since_request_open: request_open.elapsed(),
|
||||
since_response_open: response_open.elapsed(),
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
},
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
fn end(self, grpc_status: Option<u32>) {
|
||||
let ResponseBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
response_open,
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
} = self;
|
||||
|
||||
handle.send(||
|
||||
event::Event::StreamResponseEnd(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamResponseEnd {
|
||||
grpc_status,
|
||||
since_request_open: request_open.elapsed(),
|
||||
since_response_open: response_open.elapsed(),
|
||||
bytes_sent,
|
||||
frames_sent,
|
||||
},
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fn frames_sent(&mut self) -> &mut u32 {
|
||||
&mut self.frames_sent
|
||||
}
|
||||
|
||||
fn bytes_sent(&mut self) -> &mut u64 {
|
||||
&mut self.bytes_sent
|
||||
}
|
||||
}
|
||||
|
||||
impl BodySensor for RequestBodyInner {
|
||||
|
||||
fn fail(self, error: h2::Reason) {
|
||||
let RequestBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
..
|
||||
} = self;
|
||||
|
||||
handle.send(||
|
||||
event::Event::StreamRequestFail(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamRequestFail {
|
||||
error,
|
||||
since_request_open: request_open.elapsed(),
|
||||
},
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fn end(self, _grpc_status: Option<u32>) {
|
||||
let RequestBodyInner {
|
||||
ctx,
|
||||
mut handle,
|
||||
request_open,
|
||||
..
|
||||
} = self;
|
||||
|
||||
handle.send(||
|
||||
event::Event::StreamRequestEnd(
|
||||
Arc::clone(&ctx),
|
||||
event::StreamRequestEnd {
|
||||
since_request_open: request_open.elapsed(),
|
||||
},
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fn frames_sent(&mut self) -> &mut u32 {
|
||||
&mut self.frames_sent
|
||||
}
|
||||
|
||||
fn bytes_sent(&mut self) -> &mut u64 {
|
||||
&mut self.bytes_sent
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,11 @@ impl Sensors {
|
|||
where
|
||||
A: Body + 'static,
|
||||
B: Body + 'static,
|
||||
N: NewService<Request = Request<A>, Response = Response<B>, Error = client::Error>
|
||||
N: NewService<
|
||||
Request = Request<http::RequestBody<A>>,
|
||||
Response = Response<B>,
|
||||
Error = client::Error
|
||||
>
|
||||
+ 'static,
|
||||
{
|
||||
NewHttp::new(next_id, new_service, &self.0, client_ctx)
|
||||
|
|
|
@ -5,26 +5,30 @@ use hyper;
|
|||
use tokio_connect::Connect;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tower::{Service, NewService};
|
||||
use tower_h2;
|
||||
use tower_h2::{self, Body};
|
||||
|
||||
use bind;
|
||||
use telemetry::sensor::http::RequestBody;
|
||||
use super::glue::{BodyStream, HttpBody, HyperConnect};
|
||||
use super::h1::UriIsAbsoluteForm;
|
||||
|
||||
type HyperClient<C, B> =
|
||||
hyper::Client<HyperConnect<C>, BodyStream<RequestBody<B>>>;
|
||||
|
||||
/// A `NewService` that can speak either HTTP/1 or HTTP/2.
|
||||
pub struct Client<C, B>
|
||||
where
|
||||
B: tower_h2::Body,
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
inner: ClientInner<C, B>,
|
||||
}
|
||||
|
||||
enum ClientInner<C, B>
|
||||
where
|
||||
B: tower_h2::Body,
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
Http1(hyper::Client<HyperConnect<C>, BodyStream<B>>),
|
||||
Http2(tower_h2::client::Connect<C, Handle, B>),
|
||||
Http1(HyperClient<C, B>),
|
||||
Http2(tower_h2::client::Connect<C, Handle, RequestBody<B>>),
|
||||
}
|
||||
|
||||
/// A `Future` returned from `Client::new_service()`.
|
||||
|
@ -41,14 +45,14 @@ where
|
|||
B: tower_h2::Body + 'static,
|
||||
C: Connect + 'static,
|
||||
{
|
||||
Http1(Option<hyper::Client<HyperConnect<C>, BodyStream<B>>>),
|
||||
Http2(tower_h2::client::ConnectFuture<C, Handle, B>),
|
||||
Http1(Option<HyperClient<C, B>>),
|
||||
Http2(tower_h2::client::ConnectFuture<C, Handle, RequestBody<B>>),
|
||||
}
|
||||
|
||||
/// The `Service` yielded by `Client::new_service()`.
|
||||
pub struct ClientService<C, B>
|
||||
where
|
||||
B: tower_h2::Body,
|
||||
B: tower_h2::Body + 'static,
|
||||
C: Connect,
|
||||
{
|
||||
inner: ClientServiceInner<C, B>,
|
||||
|
@ -56,14 +60,14 @@ where
|
|||
|
||||
enum ClientServiceInner<C, B>
|
||||
where
|
||||
B: tower_h2::Body,
|
||||
B: tower_h2::Body + 'static,
|
||||
C: Connect
|
||||
{
|
||||
Http1(hyper::Client<HyperConnect<C>, BodyStream<B>>),
|
||||
Http1(HyperClient<C, B>),
|
||||
Http2(tower_h2::client::Connection<
|
||||
<C as Connect>::Connected,
|
||||
Handle,
|
||||
B
|
||||
RequestBody<B>,
|
||||
>),
|
||||
}
|
||||
|
||||
|
@ -113,7 +117,7 @@ where
|
|||
C::Future: 'static,
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
type Request = http::Request<B>;
|
||||
type Request = bind::HttpRequest<B>;
|
||||
type Response = http::Response<HttpBody>;
|
||||
type Error = tower_h2::client::Error;
|
||||
type InitError = tower_h2::client::ConnectError<C::Error>;
|
||||
|
@ -164,7 +168,7 @@ where
|
|||
C::Future: 'static,
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
type Request = http::Request<B>;
|
||||
type Request = bind::HttpRequest<B>;
|
||||
type Response = http::Response<HttpBody>;
|
||||
type Error = tower_h2::client::Error;
|
||||
type Future = ClientServiceFuture;
|
||||
|
|
|
@ -326,7 +326,12 @@ fn metrics_endpoint_outbound_request_count() {
|
|||
|
||||
}
|
||||
|
||||
// Ignore this test on CI, because our method of adding latency to requests
|
||||
// (calling `thread::sleep`) is likely to be flakey on Travis.
|
||||
// Eventually, we can add some kind of mock timer system for simulating latency
|
||||
// more reliably, and re-enable this test.
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn metrics_endpoint_inbound_response_latency() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
|
@ -408,8 +413,12 @@ fn metrics_endpoint_inbound_response_latency() {
|
|||
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 4");
|
||||
}
|
||||
|
||||
|
||||
// Ignore this test on CI, because our method of adding latency to requests
|
||||
// (calling `thread::sleep`) is likely to be flakey on Travis.
|
||||
// Eventually, we can add some kind of mock timer system for simulating latency
|
||||
// more reliably, and re-enable this test.
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn metrics_endpoint_outbound_response_latency() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
|
@ -493,6 +502,75 @@ fn metrics_endpoint_outbound_response_latency() {
|
|||
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_endpoint_inbound_request_duration() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
info!("running test server");
|
||||
let srv = server::new()
|
||||
.route("/hey", "hello")
|
||||
.run();
|
||||
|
||||
let ctrl = controller::new();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl.run())
|
||||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
let metrics = client::http1(proxy.metrics, "localhost");
|
||||
|
||||
// request with body should increment request_duration
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
let scrape = metrics.get("/metrics");
|
||||
assert_contains!(scrape,
|
||||
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 1");
|
||||
|
||||
// request without body should also increment request_duration
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
let scrape = metrics.get("/metrics");
|
||||
assert_contains!(scrape,
|
||||
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_endpoint_outbound_request_duration() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
info!("running test server");
|
||||
let srv = server::new()
|
||||
.route("/hey", "hello")
|
||||
.run();
|
||||
let ctrl = controller::new()
|
||||
.destination("tele.test.svc.cluster.local", srv.addr)
|
||||
.run();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl)
|
||||
.outbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
|
||||
let metrics = client::http1(proxy.metrics, "localhost");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
let scrape = metrics.get("/metrics");
|
||||
assert_contains!(scrape,
|
||||
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 1");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
let scrape = metrics.get("/metrics");
|
||||
assert_contains!(scrape,
|
||||
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_have_no_double_commas() {
|
||||
// Test for regressions to runconduit/conduit#600.
|
||||
|
|
Loading…
Reference in New Issue