fix(http/prom): record bodies when eos reached (#3856)

* chore(app/outbound): `linkerd-mock-http-body` test dependency

this adds a development dependency, so we can use this mock body type in
the outbound proxy's unit tests.

Signed-off-by: katelyn martin <kate@buoyant.io>

* chore(app/outbound): additional http route metrics tests

Signed-off-by: katelyn martin <kate@buoyant.io>

* chore(app/outbound): additional grpc route metrics tests

Signed-off-by: katelyn martin <kate@buoyant.io>

* fix(http/prom): record bodies when eos reached

this commit fixes a bug discovered by @alpeb, which was introduced in
proxy v2.288.0.

> The associated metric is `outbound_http_route_request_statuses_total`:
>
> ```
> $ linkerd dg proxy-metrics -n booksapp deploy/webapp|rg outbound_http_route_request_statuses_total.*authors
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="204",error=""} 5
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="201",error="UNKNOWN"} 5
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="200",error="UNKNOWN"} 10
> ```
>
> The problem was introduced in `edge-25.3.4`, with the proxy `v2.288.0`.
> Before that the metrics looked like:
>
> ```
> $ linkerd dg proxy-metrics -n booksapp deploy/webapp|rg outbound_http_route_request_statuses_total.*authors
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="200",error=""} 193
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="204",error=""} 96
> outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="201",error=""} 96
> ```
>
> So the difference is the non-empty value for `error=UNKNOWN` even
> when `https_status` is 2xx, which `linkerd viz stat-outbound`
> interprets as failed requests.

in #3086 we introduced a suite of route- and backend-level metrics. that
subsystem contains a body middleware that will report itself as having
reached the end-of-stream by delegating directly down to its inner
body's `is_end_stream()` hint.

this is roughly correct, but is slightly distinct from the actual
invariant: a `linkerd_http_prom::record_response::ResponseBody<B>` must
call its `end_stream` helper to classify the outcome and increment the
corresponding time series in the
`outbound_http_route_request_statuses_total` metric family.

in #3504 we upgraded our hyper dependency. while doing so, we neglected
to include a call to `end_stream` if a data frame is yielded and the
inner body reports itself as having reached the end-of-stream.

this meant that instrumented bodies would be polled until the end is
reached, but were being dropped before a `None` was encountered.

this commit fixes this issue in two ways, to be defensive:

* invoke `end_stream()` if a non-trailers frame is yielded, and the
  inner body now reports itself as having ended. this restores the
  behavior in place prior to #3504. see the relevant component of that
  diff, here:
  <https://github.com/linkerd/linkerd2-proxy/pull/3504/files#diff-45d0bc344f76c111551a8eaf5d3f0e0c22ee6e6836a626e46402a6ae3cbc0035L262-R274>

* rather than delegating to the inner `<B as Body>::is_end_stream()`
  method, report the end-of-stream being reached by inspecting whether
  or not the inner response state has been taken. this is the state that
  directly indicates whether or not the `ResponseBody<B>` middleware is
  finished.

X-ref: #3504
X-ref: #3086
X-ref: linkerd/linkerd2#8733
Signed-off-by: katelyn martin <kate@buoyant.io>

---------

Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
katelyn martin 2025-04-09 15:30:55 -04:00 committed by GitHub
parent 985580f9b5
commit 6426c38906
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 372 additions and 1 deletions

View File

@ -1675,6 +1675,7 @@ dependencies = [
"linkerd-io", "linkerd-io",
"linkerd-meshtls", "linkerd-meshtls",
"linkerd-meshtls-rustls", "linkerd-meshtls-rustls",
"linkerd-mock-http-body",
"linkerd-opaq-route", "linkerd-opaq-route",
"linkerd-proxy-client-policy", "linkerd-proxy-client-policy",
"linkerd-retry", "linkerd-retry",

View File

@ -71,6 +71,7 @@ linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
"test-util", "test-util",
] } ] }
linkerd-mock-http-body = { path = "../../mock/http-body" }
linkerd-stack = { path = "../../stack", features = ["test-util"] } linkerd-stack = { path = "../../stack", features = ["test-util"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

View File

@ -4,6 +4,9 @@ use super::{
test_util::*, test_util::*,
LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics, LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics,
}; };
use bytes::{Buf, Bytes};
use http_body::Body;
use http_body_util::BodyExt;
use linkerd_app_core::{ use linkerd_app_core::{
dns, dns,
svc::{ svc::{
@ -14,6 +17,10 @@ use linkerd_app_core::{
}; };
use linkerd_http_prom::body_data::request::RequestBodyFamilies; use linkerd_http_prom::body_data::request::RequestBodyFamilies;
use linkerd_proxy_client_policy as policy; use linkerd_proxy_client_policy as policy;
use std::task::Poll;
static GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
static GRPC_STATUS_OK: http::HeaderValue = http::HeaderValue::from_static("0");
#[tokio::test(flavor = "current_thread", start_paused = true)] #[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_request_statuses() { async fn http_request_statuses() {
@ -520,6 +527,160 @@ async fn http_route_request_body_frames() {
tracing::info!("passed"); tracing::info!("passed");
} }
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_response_body_drop_on_eos() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body(BoxBody::from_static("contents"))
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// Show that the body reports itself as being complete.
debug_assert!(body.is_end_stream());
assert_eq!(ok.get(), 1);
assert_eq!(err.get(), 0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_response_body_drop_early() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body(BoxBody::from_static("contents"))
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// The body reports an error if it was not completed.
drop(body);
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 1);
}
#[tokio::test(flavor = "current_thread", start_paused = true)] #[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() { async fn grpc_request_statuses_ok() {
const EXPORT_HOSTNAME_LABELS: bool = true; const EXPORT_HOSTNAME_LABELS: bool = true;
@ -723,6 +884,210 @@ async fn grpc_request_statuses_error_body() {
.await; .await;
} }
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_response_body_drop_on_eos() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body({
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
let trailers = {
let mut trailers = http::HeaderMap::with_capacity(1);
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
Poll::Ready(Some(Ok(trailers)))
};
let body = linkerd_mock_http_body::MockBody::default()
.then_yield_data(data)
.then_yield_trailer(trailers);
BoxBody::new(body)
})
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// Poll the trailers out of the body.
let trls = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_trailers()
.ok()
.expect("yields trailers");
assert_eq!(trls.get(&GRPC_STATUS).unwrap(), GRPC_STATUS_OK);
// Show that the body reports itself as being complete.
debug_assert!(body.is_end_stream());
assert_eq!(ok.get(), 1);
assert_eq!(err.get(), 0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_response_body_drop_early() {
use linkerd_app_core::svc::{Service, ServiceExt};
const EXPORT_HOSTNAME_LABELS: bool = false;
let _trace = linkerd_tracing::test::trace_init();
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(
&requests,
&body_data,
&parent_ref,
&route_ref,
EXPORT_HOSTNAME_LABELS,
);
// Define a request and a response.
let req = http::Request::default();
let rsp = http::Response::builder()
.status(200)
.body({
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
let trailers = {
let mut trailers = http::HeaderMap::with_capacity(1);
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
Poll::Ready(Some(Ok(trailers)))
};
let body = linkerd_mock_http_body::MockBody::default()
.then_yield_data(data)
.then_yield_trailer(trailers);
BoxBody::new(body)
})
.unwrap();
// Two counters for 200 responses that do/don't have an error.
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: Some(tonic::Code::Ok),
error: None,
},
));
let err = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
labels::GrpcRsp {
status: None,
error: Some(labels::Error::Unknown),
},
));
debug_assert_eq!(ok.get(), 0);
debug_assert_eq!(err.get(), 0);
// Send the request, and obtain the response.
let mut body = {
handle.allow(1);
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (_req, tx) = tokio::select! {
_ = (&mut call) => unreachable!(),
res = handle.next_request() => res.unwrap(),
};
assert_eq!(ok.get(), 0);
tx.send_response(rsp);
call.await.unwrap().into_body()
};
// The counters are not incremented yet.
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Poll a frame out of the body.
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.ok()
.expect("yields data");
assert_eq!(data.chunk(), "contents".as_bytes());
assert_eq!(data.remaining(), "contents".len());
// The counters are not incremented yet.
debug_assert!(!body.is_end_stream());
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 0);
// Then, drop the body without polling the trailers.
drop(body);
assert_eq!(ok.get(), 0);
assert_eq!(err.get(), 1);
}
// === Utils === // === Utils ===
const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method"; const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method";

View File

@ -268,6 +268,8 @@ where
Some(Ok(frame)) => { Some(Ok(frame)) => {
if let trls @ Some(_) = frame.trailers_ref() { if let trls @ Some(_) = frame.trailers_ref() {
end_stream(this.state, Ok(trls)); end_stream(this.state, Ok(trls));
} else if this.inner.is_end_stream() {
end_stream(this.state, Ok(None));
} }
} }
Some(Err(error)) => end_stream(this.state, Err(error)), Some(Err(error)) => end_stream(this.state, Err(error)),
@ -278,7 +280,9 @@ where
} }
fn is_end_stream(&self) -> bool { fn is_end_stream(&self) -> bool {
self.inner.is_end_stream() // If the inner response state is still in place, the end of the stream has not been
// classified and recorded yet.
self.state.is_none()
} }
} }