refactor(http/retry): port remaining `ReplayBody` tests to `Frame<T>` (#3567)

based on #3564. see linkerd/linkerd2#8733.

this branch upgrades the remaining parts of the `ReplayBody<B>` test
suite to poll bodies in terms of `Frame<T>`s. 

1eb822f2e6

---

* refactor(http/retry): `replays_trailers()` uses `Frame<T>`

see https://github.com/linkerd/linkerd2/issues/8733.

this commit upgrades a test that uses defunct `data()` and `trailers()`
futures.

Signed-off-by: katelyn martin <kate@buoyant.io>

* refactor(http/retry): `trailers_only()` uses `Frame<T>`

see https://github.com/linkerd/linkerd2/issues/8733.

this commit upgrades a test that uses defunct `data()` and `trailers()`
futures.

Signed-off-by: katelyn martin <kate@buoyant.io>

* feat(http/retry): `ForwardCompatibleBody::is_end_stream()`

this commit adds a method that exposes the inner `B`-typed body's
`is_end_stream()` trait method, gated for use in tests.

Signed-off-by: katelyn martin <kate@buoyant.io>

* refactor(http/retry): `body_to_string()` helper uses `Frame<T>`

this is a refactoring commit, upgrading more of the replay body test to
work in terms of `Frame<T>`. this updates the `body_to_string()` helper
in particular.

Signed-off-by: katelyn martin <kate@buoyant.io>

* refactor(http/retry): `chunk()` helper uses `Frame<T>`

Signed-off-by: katelyn martin <kate@buoyant.io>

---------

Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
katelyn martin 2025-01-27 15:45:31 -05:00 committed by GitHub
parent 1eb822f2e6
commit 32042783d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 125 additions and 83 deletions

View File

@ -40,6 +40,12 @@ impl<B: Body> ForwardCompatibleBody<B> {
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
combinators::Frame(self)
}
/// Returns `true` when the end of stream has been reached.
#[cfg(test)]
pub(crate) fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
}
/// Future that resolves to the next frame from a `Body`.

View File

@ -513,11 +513,16 @@ mod tests {
tx.send_data("hello world").await;
drop(tx);
let initial = body_to_string(initial).await;
assert_eq!(initial, "hello world");
let replay = body_to_string(replay).await;
assert_eq!(replay, "hello world");
{
let (data, trailers) = body_to_string(initial).await;
assert_eq!(data, "hello world");
assert_eq!(trailers, None);
}
{
let (data, trailers) = body_to_string(replay).await;
assert_eq!(data, "hello world");
assert_eq!(trailers, None);
}
}
#[tokio::test]
@ -536,11 +541,13 @@ mod tests {
tx.send_data(" of fun!").await;
});
let initial = body_to_string(initial).await;
let (initial, trailers) = body_to_string(initial).await;
assert_eq!(initial, "hello world, have lots of fun!");
assert!(trailers.is_none());
let replay = body_to_string(replay).await;
let (replay, trailers) = body_to_string(replay).await;
assert_eq!(replay, "hello world, have lots of fun!");
assert!(trailers.is_none());
}
#[tokio::test]
@ -654,18 +661,17 @@ mod tests {
tx.send_trailers(HeaderMap::new()).await;
});
assert_eq!(
body_to_string(&mut replay).await,
"hello world, have lots of fun"
);
let (data, trailers) = body_to_string(&mut replay).await;
assert_eq!(data, "hello world, have lots of fun");
assert!(trailers.is_some());
}
#[tokio::test(flavor = "current_thread")]
async fn multiple_replays() {
let Test {
mut tx,
mut initial,
mut replay,
initial,
replay,
_trace,
} = Test::new();
@ -680,27 +686,18 @@ mod tests {
tx.send_trailers(tlrs2).await;
});
assert_eq!(body_to_string(&mut initial).await, "hello world");
let read = |body| async {
let (data, trailers) = body_to_string(body).await;
assert_eq!(data, "hello world");
assert_eq!(trailers.as_ref(), Some(&tlrs));
};
let initial_tlrs = initial.trailers().await.expect("trailers should not error");
assert_eq!(initial_tlrs.as_ref(), Some(&tlrs));
read(initial).await;
// drop the initial body to send the data to the replay
drop(initial);
let mut replay2 = replay.clone();
assert_eq!(body_to_string(&mut replay).await, "hello world");
let replay_tlrs = replay.trailers().await.expect("trailers should not error");
assert_eq!(replay_tlrs.as_ref(), Some(&tlrs));
// drop the initial body to send the data to the replay
drop(replay);
assert_eq!(body_to_string(&mut replay2).await, "hello world");
let replay2_tlrs = replay2.trailers().await.expect("trailers should not error");
assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs));
// Replay the body twice.
let replay2 = replay.clone();
read(replay).await;
read(replay2).await;
}
#[tokio::test(flavor = "current_thread")]
@ -723,7 +720,7 @@ mod tests {
drop(initial);
tracing::info!("dropped initial body");
let mut replay2 = replay.clone();
let replay2 = replay.clone();
tx.send_data(" world").await;
assert_eq!(chunk(&mut replay).await.unwrap(), "hello");
@ -740,21 +737,17 @@ mod tests {
tx.send_trailers(tlrs2).await;
});
assert_eq!(
body_to_string(&mut replay2).await,
"hello world, have lots of fun!"
);
let replay2_tlrs = replay2.trailers().await.expect("trailers should not error");
assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs));
let (data, replay2_trailers) = body_to_string(replay2).await;
assert_eq!(data, "hello world, have lots of fun!");
assert_eq!(replay2_trailers.as_ref(), Some(&tlrs));
}
#[tokio::test(flavor = "current_thread")]
async fn drop_clone_early() {
let Test {
mut tx,
mut initial,
mut replay,
initial,
replay,
_trace,
} = Test::new();
@ -769,21 +762,23 @@ mod tests {
tx.send_trailers(tlrs2).await;
});
assert_eq!(body_to_string(&mut initial).await, "hello world");
{
let body = initial;
let (data, trailers) = body_to_string(body).await;
assert_eq!(data, "hello world");
assert_eq!(trailers.as_ref(), Some(&tlrs));
}
let initial_tlrs = initial.trailers().await.expect("trailers should not error");
assert_eq!(initial_tlrs.as_ref(), Some(&tlrs));
// drop the initial body to send the data to the replay
drop(initial);
// clone the body again and then drop it
// Clone the body, and then drop it before polling it.
let replay2 = replay.clone();
drop(replay2);
assert_eq!(body_to_string(&mut replay).await, "hello world");
let replay_tlrs = replay.trailers().await.expect("trailers should not error");
assert_eq!(replay_tlrs.as_ref(), Some(&tlrs));
{
let body = replay;
let (data, trailers) = body_to_string(body).await;
assert_eq!(data, "hello world");
assert_eq!(trailers.as_ref(), Some(&tlrs));
}
}
// This test is specifically for behavior across clones, so the clippy lint
@ -808,40 +803,58 @@ mod tests {
async fn eos_only_when_fully_replayed() {
// Test that each clone of a body is not EOS until the data has been
// fully replayed.
let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024)
let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024)
.expect("body must not be too large");
let mut replay = initial.clone();
let replay = initial.clone();
body_to_string(&mut initial).await;
assert!(!replay.is_end_stream());
let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
initial.trailers().await.expect("trailers should not error");
// Read the initial body, show that the replay does not consider itself to have reached the
// end-of-stream. Then drop the initial body, show that the replay is still not done.
assert!(!initial.is_end_stream());
initial
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields a data frame");
assert!(initial.is_end_stream());
assert!(!replay.is_end_stream());
// drop the initial body to send the data to the replay
drop(initial);
assert!(!replay.is_end_stream());
body_to_string(&mut replay).await;
// Read the replay body.
assert!(!replay.is_end_stream());
replay.trailers().await.expect("trailers should not error");
replay
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields a data frame");
assert!(replay.frame().await.is_none());
assert!(replay.is_end_stream());
// Even if we clone a body _after_ it has been driven to EOS, the clone
// must not be EOS.
let mut replay2 = replay.clone();
// Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS.
let replay = replay.into_inner();
let replay2 = replay.clone();
assert!(!replay2.is_end_stream());
// drop the initial body to send the data to the replay
// Drop the first replay body to send the data to the second replay.
drop(replay);
body_to_string(&mut replay2).await;
assert!(!replay2.is_end_stream());
replay2.trailers().await.expect("trailers should not error");
// Read the second replay body.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
replay2
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields a data frame");
assert!(replay2.frame().await.is_none());
assert!(replay2.is_end_stream());
}
@ -1003,26 +1016,49 @@ mod tests {
T: http_body::Body + Unpin,
{
tracing::trace!("waiting for a body chunk...");
let chunk = body
.data()
let chunk = crate::compat::ForwardCompatibleBody::new(body)
.frame()
.await
.map(|res| res.map_err(|_| ()).unwrap())
.expect("yields a result")
.ok()
.expect("yields a frame")
.into_data()
.ok()
.map(string);
tracing::info!(?chunk);
chunk
}
async fn body_to_string<T>(mut body: T) -> String
async fn body_to_string<B>(body: B) -> (String, Option<HeaderMap>)
where
T: http_body::Body + Unpin,
T::Error: std::fmt::Debug,
B: http_body::Body + Unpin,
B::Error: std::fmt::Debug,
{
let mut s = String::new();
while let Some(chunk) = chunk(&mut body).await {
s.push_str(&chunk[..]);
let mut body = crate::compat::ForwardCompatibleBody::new(body);
let mut data = String::new();
let mut trailers = None;
// Continue reading frames from the body until it is finished.
while let Some(frame) = body
.frame()
.await
.transpose()
.expect("reading a frame succeeds")
{
match frame.into_data().map(string) {
Ok(ref s) => data.push_str(s),
Err(frame) => {
let trls = frame
.into_trailers()
.map_err(drop)
.expect("test frame is either data or trailers");
trailers = Some(trls);
}
}
}
tracing::info!(body = ?s, "no more data");
s
tracing::info!(?data, ?trailers, "finished reading body");
(data, trailers)
}
fn string(mut data: impl Buf) -> String {