diff --git a/Cargo.lock b/Cargo.lock index 977012b13..de8bbcb4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1675,6 +1675,7 @@ dependencies = [ "linkerd-io", "linkerd-meshtls", "linkerd-meshtls-rustls", + "linkerd-mock-http-body", "linkerd-opaq-route", "linkerd-proxy-client-policy", "linkerd-retry", diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 54fde5131..071db77e5 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -71,6 +71,7 @@ linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ "test-util", ] } +linkerd-mock-http-body = { path = "../../mock/http-body" } linkerd-stack = { path = "../../stack", features = ["test-util"] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 60e9bffda..b497e549e 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -4,6 +4,9 @@ use super::{ test_util::*, LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics, }; +use bytes::{Buf, Bytes}; +use http_body::Body; +use http_body_util::BodyExt; use linkerd_app_core::{ dns, svc::{ @@ -14,6 +17,10 @@ use linkerd_app_core::{ }; use linkerd_http_prom::body_data::request::RequestBodyFamilies; use linkerd_proxy_client_policy as policy; +use std::task::Poll; + +static GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status"); +static GRPC_STATUS_OK: http::HeaderValue = http::HeaderValue::from_static("0"); #[tokio::test(flavor = "current_thread", start_paused = true)] async fn http_request_statuses() { @@ -520,6 +527,160 @@ async fn http_route_request_body_frames() { tracing::info!("passed"); } +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_response_body_drop_on_eos() { + use linkerd_app_core::svc::{Service, ServiceExt}; + + const EXPORT_HOSTNAME_LABELS: bool = false; + let _trace = linkerd_tracing::test::trace_init(); + + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_http_route_metrics( + &requests, + &body_data, + &parent_ref, + &route_ref, + EXPORT_HOSTNAME_LABELS, + ); + + // Define a request and a response. + let req = http::Request::default(); + let rsp = http::Response::builder() + .status(200) + .body(BoxBody::from_static("contents")) + .unwrap(); + + // Two counters for 200 responses that do/don't have an error. + let ok = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: None, + }, + )); + let err = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: Some(labels::Error::Unknown), + }, + )); + debug_assert_eq!(ok.get(), 0); + debug_assert_eq!(err.get(), 0); + + // Send the request, and obtain the response. + let mut body = { + handle.allow(1); + svc.ready().await.expect("ready"); + let mut call = svc.call(req); + let (_req, tx) = tokio::select! { + _ = (&mut call) => unreachable!(), + res = handle.next_request() => res.unwrap(), + }; + assert_eq!(ok.get(), 0); + tx.send_response(rsp); + call.await.unwrap().into_body() + }; + + // The counters are not incremented yet. + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 0); + + // Poll a frame out of the body. + let data = body + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .ok() + .expect("yields data"); + assert_eq!(data.chunk(), "contents".as_bytes()); + assert_eq!(data.remaining(), "contents".len()); + + // Show that the body reports itself as being complete. + debug_assert!(body.is_end_stream()); + assert_eq!(ok.get(), 1); + assert_eq!(err.get(), 0); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_response_body_drop_early() { + use linkerd_app_core::svc::{Service, ServiceExt}; + + const EXPORT_HOSTNAME_LABELS: bool = false; + let _trace = linkerd_tracing::test::trace_init(); + + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_http_route_metrics( + &requests, + &body_data, + &parent_ref, + &route_ref, + EXPORT_HOSTNAME_LABELS, + ); + + // Define a request and a response. + let req = http::Request::default(); + let rsp = http::Response::builder() + .status(200) + .body(BoxBody::from_static("contents")) + .unwrap(); + + // Two counters for 200 responses that do/don't have an error. + let ok = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: None, + }, + )); + let err = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::HttpRsp { + status: Some(http::StatusCode::OK), + error: Some(labels::Error::Unknown), + }, + )); + debug_assert_eq!(ok.get(), 0); + debug_assert_eq!(err.get(), 0); + + // Send the request, and obtain the response. + let body = { + handle.allow(1); + svc.ready().await.expect("ready"); + let mut call = svc.call(req); + let (_req, tx) = tokio::select! { + _ = (&mut call) => unreachable!(), + res = handle.next_request() => res.unwrap(), + }; + assert_eq!(ok.get(), 0); + tx.send_response(rsp); + call.await.unwrap().into_body() + }; + + // The counters are not incremented yet. + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 0); + + // The body reports an error if it was not completed. + drop(body); + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 1); +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { const EXPORT_HOSTNAME_LABELS: bool = true; @@ -723,6 +884,210 @@ async fn grpc_request_statuses_error_body() { .await; } +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_response_body_drop_on_eos() { + use linkerd_app_core::svc::{Service, ServiceExt}; + + const EXPORT_HOSTNAME_LABELS: bool = false; + let _trace = linkerd_tracing::test::trace_init(); + + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics( + &requests, + &body_data, + &parent_ref, + &route_ref, + EXPORT_HOSTNAME_LABELS, + ); + + // Define a request and a response. + let req = http::Request::default(); + let rsp = http::Response::builder() + .status(200) + .body({ + let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents")))); + let trailers = { + let mut trailers = http::HeaderMap::with_capacity(1); + trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone()); + Poll::Ready(Some(Ok(trailers))) + }; + let body = linkerd_mock_http_body::MockBody::default() + .then_yield_data(data) + .then_yield_trailer(trailers); + BoxBody::new(body) + }) + .unwrap(); + + // Two counters for 200 responses that do/don't have an error. + let ok = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::GrpcRsp { + status: Some(tonic::Code::Ok), + error: None, + }, + )); + let err = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::GrpcRsp { + status: Some(tonic::Code::Ok), + error: Some(labels::Error::Unknown), + }, + )); + debug_assert_eq!(ok.get(), 0); + debug_assert_eq!(err.get(), 0); + + // Send the request, and obtain the response. + let mut body = { + handle.allow(1); + svc.ready().await.expect("ready"); + let mut call = svc.call(req); + let (_req, tx) = tokio::select! { + _ = (&mut call) => unreachable!(), + res = handle.next_request() => res.unwrap(), + }; + assert_eq!(ok.get(), 0); + tx.send_response(rsp); + call.await.unwrap().into_body() + }; + + // The counters are not incremented yet. + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 0); + + // Poll a frame out of the body. + let data = body + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .ok() + .expect("yields data"); + assert_eq!(data.chunk(), "contents".as_bytes()); + assert_eq!(data.remaining(), "contents".len()); + + // Poll the trailers out of the body. + let trls = body + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_trailers() + .ok() + .expect("yields trailers"); + assert_eq!(trls.get(&GRPC_STATUS).unwrap(), GRPC_STATUS_OK); + + // Show that the body reports itself as being complete. + debug_assert!(body.is_end_stream()); + assert_eq!(ok.get(), 1); + assert_eq!(err.get(), 0); +} + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn grpc_response_body_drop_early() { + use linkerd_app_core::svc::{Service, ServiceExt}; + + const EXPORT_HOSTNAME_LABELS: bool = false; + let _trace = linkerd_tracing::test::trace_init(); + + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_grpc_route_metrics( + &requests, + &body_data, + &parent_ref, + &route_ref, + EXPORT_HOSTNAME_LABELS, + ); + + // Define a request and a response. + let req = http::Request::default(); + let rsp = http::Response::builder() + .status(200) + .body({ + let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents")))); + let trailers = { + let mut trailers = http::HeaderMap::with_capacity(1); + trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone()); + Poll::Ready(Some(Ok(trailers))) + }; + let body = linkerd_mock_http_body::MockBody::default() + .then_yield_data(data) + .then_yield_trailer(trailers); + BoxBody::new(body) + }) + .unwrap(); + + // Two counters for 200 responses that do/don't have an error. + let ok = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::GrpcRsp { + status: Some(tonic::Code::Ok), + error: None, + }, + )); + let err = requests.get_statuses(&labels::Rsp( + labels::Route::new(parent_ref.clone(), route_ref.clone(), None), + labels::GrpcRsp { + status: None, + error: Some(labels::Error::Unknown), + }, + )); + debug_assert_eq!(ok.get(), 0); + debug_assert_eq!(err.get(), 0); + + // Send the request, and obtain the response. + let mut body = { + handle.allow(1); + svc.ready().await.expect("ready"); + let mut call = svc.call(req); + let (_req, tx) = tokio::select! { + _ = (&mut call) => unreachable!(), + res = handle.next_request() => res.unwrap(), + }; + assert_eq!(ok.get(), 0); + tx.send_response(rsp); + call.await.unwrap().into_body() + }; + + // The counters are not incremented yet. + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 0); + + // Poll a frame out of the body. + let data = body + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .ok() + .expect("yields data"); + assert_eq!(data.chunk(), "contents".as_bytes()); + assert_eq!(data.remaining(), "contents".len()); + + // The counters are not incremented yet. + debug_assert!(!body.is_end_stream()); + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 0); + + // Then, drop the body without polling the trailers. + drop(body); + assert_eq!(ok.get(), 0); + assert_eq!(err.get(), 1); +} + // === Utils === const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method"; diff --git a/linkerd/http/prom/src/record_response.rs b/linkerd/http/prom/src/record_response.rs index 136cd82cc..cfdc13441 100644 --- a/linkerd/http/prom/src/record_response.rs +++ b/linkerd/http/prom/src/record_response.rs @@ -268,6 +268,8 @@ where Some(Ok(frame)) => { if let trls @ Some(_) = frame.trailers_ref() { end_stream(this.state, Ok(trls)); + } else if this.inner.is_end_stream() { + end_stream(this.state, Ok(None)); } } Some(Err(error)) => end_stream(this.state, Err(error)), @@ -278,7 +280,9 @@ where } fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() + // If the inner response state is still in place, the end of the stream has not been + // classified and recorded yet. + self.state.is_none() } }