From 32042783d96e3d3c43520a6a8d039875d9fd27a7 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 27 Jan 2025 15:45:31 -0500 Subject: [PATCH] refactor(http/retry): port remaining `ReplayBody` tests to `Frame` (#3567) based on #3564. see linkerd/linkerd2#8733. this branch upgrades the remaining parts of the `ReplayBody` test suite to poll bodies in terms of `Frame`s. https://github.com/linkerd/linkerd2-proxy/commit/1eb822f2e6cf1b05ac93973b57e858e43903664a --- * refactor(http/retry): `replays_trailers()` uses `Frame` see https://github.com/linkerd/linkerd2/issues/8733. this commit upgrades a test that uses defunct `data()` and `trailers()` futures. Signed-off-by: katelyn martin * refactor(http/retry): `trailers_only()` uses `Frame` see https://github.com/linkerd/linkerd2/issues/8733. this commit upgrades a test that uses defunct `data()` and `trailers()` futures. Signed-off-by: katelyn martin * feat(http/retry): `ForwardCompatibleBody::is_end_stream()` this commit adds a method that exposes the inner `B`-typed body's `is_end_stream()` trait method, gated for use in tests. Signed-off-by: katelyn martin * refactor(http/retry): `body_to_string()` helper uses `Frame` this is a refactoring commit, upgrading more of the replay body test to work in terms of `Frame`. this updates the `body_to_string()` helper in particular. Signed-off-by: katelyn martin * refactor(http/retry): `chunk()` helper uses `Frame` Signed-off-by: katelyn martin --------- Signed-off-by: katelyn martin --- linkerd/http/retry/src/compat.rs | 6 + linkerd/http/retry/src/replay.rs | 202 ++++++++++++++++++------------- 2 files changed, 125 insertions(+), 83 deletions(-) diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index 129431949..79698d079 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -40,6 +40,12 @@ impl ForwardCompatibleBody { pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> { combinators::Frame(self) } + + /// Returns `true` when the end of stream has been reached. + #[cfg(test)] + pub(crate) fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } } /// Future that resolves to the next frame from a `Body`. diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 5003d6c1d..11eb53aa4 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -513,11 +513,16 @@ mod tests { tx.send_data("hello world").await; drop(tx); - let initial = body_to_string(initial).await; - assert_eq!(initial, "hello world"); - - let replay = body_to_string(replay).await; - assert_eq!(replay, "hello world"); + { + let (data, trailers) = body_to_string(initial).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } + { + let (data, trailers) = body_to_string(replay).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } } #[tokio::test] @@ -536,11 +541,13 @@ mod tests { tx.send_data(" of fun!").await; }); - let initial = body_to_string(initial).await; + let (initial, trailers) = body_to_string(initial).await; assert_eq!(initial, "hello world, have lots of fun!"); + assert!(trailers.is_none()); - let replay = body_to_string(replay).await; + let (replay, trailers) = body_to_string(replay).await; assert_eq!(replay, "hello world, have lots of fun!"); + assert!(trailers.is_none()); } #[tokio::test] @@ -654,18 +661,17 @@ mod tests { tx.send_trailers(HeaderMap::new()).await; }); - assert_eq!( - body_to_string(&mut replay).await, - "hello world, have lots of fun" - ); + let (data, trailers) = body_to_string(&mut replay).await; + assert_eq!(data, "hello world, have lots of fun"); + assert!(trailers.is_some()); } #[tokio::test(flavor = "current_thread")] async fn multiple_replays() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -680,27 +686,18 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); + let read = |body| async { + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + }; - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + read(initial).await; - // drop the initial body to send the data to the replay - drop(initial); - - let mut replay2 = replay.clone(); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(replay); - - assert_eq!(body_to_string(&mut replay2).await, "hello world"); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + // Replay the body twice. + let replay2 = replay.clone(); + read(replay).await; + read(replay2).await; } #[tokio::test(flavor = "current_thread")] @@ -723,7 +720,7 @@ mod tests { drop(initial); tracing::info!("dropped initial body"); - let mut replay2 = replay.clone(); + let replay2 = replay.clone(); tx.send_data(" world").await; assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); @@ -740,21 +737,17 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!( - body_to_string(&mut replay2).await, - "hello world, have lots of fun!" - ); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + let (data, replay2_trailers) = body_to_string(replay2).await; + assert_eq!(data, "hello world, have lots of fun!"); + assert_eq!(replay2_trailers.as_ref(), Some(&tlrs)); } #[tokio::test(flavor = "current_thread")] async fn drop_clone_early() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -769,21 +762,23 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); + { + let body = initial; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - // clone the body again and then drop it + // Clone the body, and then drop it before polling it. let replay2 = replay.clone(); drop(replay2); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + { + let body = replay; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } } // This test is specifically for behavior across clones, so the clippy lint @@ -808,40 +803,58 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) + let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) .expect("body must not be too large"); - let mut replay = initial.clone(); + let replay = initial.clone(); - body_to_string(&mut initial).await; - assert!(!replay.is_end_stream()); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - initial.trailers().await.expect("trailers should not error"); + // Read the initial body, show that the replay does not consider itself to have reached the + // end-of-stream. Then drop the initial body, show that the replay is still not done. + assert!(!initial.is_end_stream()); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); - - // drop the initial body to send the data to the replay drop(initial); - assert!(!replay.is_end_stream()); - body_to_string(&mut replay).await; + // Read the replay body. assert!(!replay.is_end_stream()); - - replay.trailers().await.expect("trailers should not error"); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay.frame().await.is_none()); assert!(replay.is_end_stream()); - // Even if we clone a body _after_ it has been driven to EOS, the clone - // must not be EOS. - let mut replay2 = replay.clone(); + // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. + let replay = replay.into_inner(); + let replay2 = replay.clone(); assert!(!replay2.is_end_stream()); - // drop the initial body to send the data to the replay + // Drop the first replay body to send the data to the second replay. drop(replay); - body_to_string(&mut replay2).await; - assert!(!replay2.is_end_stream()); - - replay2.trailers().await.expect("trailers should not error"); + // Read the second replay body. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay2.frame().await.is_none()); assert!(replay2.is_end_stream()); } @@ -1003,26 +1016,49 @@ mod tests { T: http_body::Body + Unpin, { tracing::trace!("waiting for a body chunk..."); - let chunk = body - .data() + let chunk = crate::compat::ForwardCompatibleBody::new(body) + .frame() .await - .map(|res| res.map_err(|_| ()).unwrap()) + .expect("yields a result") + .ok() + .expect("yields a frame") + .into_data() + .ok() .map(string); tracing::info!(?chunk); chunk } - async fn body_to_string(mut body: T) -> String + async fn body_to_string(body: B) -> (String, Option) where - T: http_body::Body + Unpin, - T::Error: std::fmt::Debug, + B: http_body::Body + Unpin, + B::Error: std::fmt::Debug, { - let mut s = String::new(); - while let Some(chunk) = chunk(&mut body).await { - s.push_str(&chunk[..]); + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut data = String::new(); + let mut trailers = None; + + // Continue reading frames from the body until it is finished. + while let Some(frame) = body + .frame() + .await + .transpose() + .expect("reading a frame succeeds") + { + match frame.into_data().map(string) { + Ok(ref s) => data.push_str(s), + Err(frame) => { + let trls = frame + .into_trailers() + .map_err(drop) + .expect("test frame is either data or trailers"); + trailers = Some(trls); + } + } } - tracing::info!(body = ?s, "no more data"); - s + + tracing::info!(?data, ?trailers, "finished reading body"); + (data, trailers) } fn string(mut data: impl Buf) -> String {