refactor(http/retry): port remaining `ReplayBody` tests to `Frame<T>` (#3567)
based on #3564. see linkerd/linkerd2#8733.
this branch upgrades the remaining parts of the `ReplayBody<B>` test
suite to poll bodies in terms of `Frame<T>`s.
1eb822f2e6
---
* refactor(http/retry): `replays_trailers()` uses `Frame<T>`
see https://github.com/linkerd/linkerd2/issues/8733.
this commit upgrades a test that uses defunct `data()` and `trailers()`
futures.
Signed-off-by: katelyn martin <kate@buoyant.io>
* refactor(http/retry): `trailers_only()` uses `Frame<T>`
see https://github.com/linkerd/linkerd2/issues/8733.
this commit upgrades a test that uses defunct `data()` and `trailers()`
futures.
Signed-off-by: katelyn martin <kate@buoyant.io>
* feat(http/retry): `ForwardCompatibleBody::is_end_stream()`
this commit adds a method that exposes the inner `B`-typed body's
`is_end_stream()` trait method, gated for use in tests.
Signed-off-by: katelyn martin <kate@buoyant.io>
* refactor(http/retry): `body_to_string()` helper uses `Frame<T>`
this is a refactoring commit, upgrading more of the replay body test to
work in terms of `Frame<T>`. this updates the `body_to_string()` helper
in particular.
Signed-off-by: katelyn martin <kate@buoyant.io>
* refactor(http/retry): `chunk()` helper uses `Frame<T>`
Signed-off-by: katelyn martin <kate@buoyant.io>
---------
Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
parent
1eb822f2e6
commit
32042783d9
|
|
@ -40,6 +40,12 @@ impl<B: Body> ForwardCompatibleBody<B> {
|
|||
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
|
||||
combinators::Frame(self)
|
||||
}
|
||||
|
||||
/// Returns `true` when the end of stream has been reached.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn is_end_stream(&self) -> bool {
|
||||
self.inner.is_end_stream()
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves to the next frame from a `Body`.
|
||||
|
|
|
|||
|
|
@ -513,11 +513,16 @@ mod tests {
|
|||
tx.send_data("hello world").await;
|
||||
drop(tx);
|
||||
|
||||
let initial = body_to_string(initial).await;
|
||||
assert_eq!(initial, "hello world");
|
||||
|
||||
let replay = body_to_string(replay).await;
|
||||
assert_eq!(replay, "hello world");
|
||||
{
|
||||
let (data, trailers) = body_to_string(initial).await;
|
||||
assert_eq!(data, "hello world");
|
||||
assert_eq!(trailers, None);
|
||||
}
|
||||
{
|
||||
let (data, trailers) = body_to_string(replay).await;
|
||||
assert_eq!(data, "hello world");
|
||||
assert_eq!(trailers, None);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -536,11 +541,13 @@ mod tests {
|
|||
tx.send_data(" of fun!").await;
|
||||
});
|
||||
|
||||
let initial = body_to_string(initial).await;
|
||||
let (initial, trailers) = body_to_string(initial).await;
|
||||
assert_eq!(initial, "hello world, have lots of fun!");
|
||||
assert!(trailers.is_none());
|
||||
|
||||
let replay = body_to_string(replay).await;
|
||||
let (replay, trailers) = body_to_string(replay).await;
|
||||
assert_eq!(replay, "hello world, have lots of fun!");
|
||||
assert!(trailers.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -654,18 +661,17 @@ mod tests {
|
|||
tx.send_trailers(HeaderMap::new()).await;
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
body_to_string(&mut replay).await,
|
||||
"hello world, have lots of fun"
|
||||
);
|
||||
let (data, trailers) = body_to_string(&mut replay).await;
|
||||
assert_eq!(data, "hello world, have lots of fun");
|
||||
assert!(trailers.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn multiple_replays() {
|
||||
let Test {
|
||||
mut tx,
|
||||
mut initial,
|
||||
mut replay,
|
||||
initial,
|
||||
replay,
|
||||
_trace,
|
||||
} = Test::new();
|
||||
|
||||
|
|
@ -680,27 +686,18 @@ mod tests {
|
|||
tx.send_trailers(tlrs2).await;
|
||||
});
|
||||
|
||||
assert_eq!(body_to_string(&mut initial).await, "hello world");
|
||||
let read = |body| async {
|
||||
let (data, trailers) = body_to_string(body).await;
|
||||
assert_eq!(data, "hello world");
|
||||
assert_eq!(trailers.as_ref(), Some(&tlrs));
|
||||
};
|
||||
|
||||
let initial_tlrs = initial.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(initial_tlrs.as_ref(), Some(&tlrs));
|
||||
read(initial).await;
|
||||
|
||||
// drop the initial body to send the data to the replay
|
||||
drop(initial);
|
||||
|
||||
let mut replay2 = replay.clone();
|
||||
assert_eq!(body_to_string(&mut replay).await, "hello world");
|
||||
|
||||
let replay_tlrs = replay.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(replay_tlrs.as_ref(), Some(&tlrs));
|
||||
|
||||
// drop the initial body to send the data to the replay
|
||||
drop(replay);
|
||||
|
||||
assert_eq!(body_to_string(&mut replay2).await, "hello world");
|
||||
|
||||
let replay2_tlrs = replay2.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs));
|
||||
// Replay the body twice.
|
||||
let replay2 = replay.clone();
|
||||
read(replay).await;
|
||||
read(replay2).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
|
|
@ -723,7 +720,7 @@ mod tests {
|
|||
drop(initial);
|
||||
tracing::info!("dropped initial body");
|
||||
|
||||
let mut replay2 = replay.clone();
|
||||
let replay2 = replay.clone();
|
||||
|
||||
tx.send_data(" world").await;
|
||||
assert_eq!(chunk(&mut replay).await.unwrap(), "hello");
|
||||
|
|
@ -740,21 +737,17 @@ mod tests {
|
|||
tx.send_trailers(tlrs2).await;
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
body_to_string(&mut replay2).await,
|
||||
"hello world, have lots of fun!"
|
||||
);
|
||||
|
||||
let replay2_tlrs = replay2.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs));
|
||||
let (data, replay2_trailers) = body_to_string(replay2).await;
|
||||
assert_eq!(data, "hello world, have lots of fun!");
|
||||
assert_eq!(replay2_trailers.as_ref(), Some(&tlrs));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn drop_clone_early() {
|
||||
let Test {
|
||||
mut tx,
|
||||
mut initial,
|
||||
mut replay,
|
||||
initial,
|
||||
replay,
|
||||
_trace,
|
||||
} = Test::new();
|
||||
|
||||
|
|
@ -769,21 +762,23 @@ mod tests {
|
|||
tx.send_trailers(tlrs2).await;
|
||||
});
|
||||
|
||||
assert_eq!(body_to_string(&mut initial).await, "hello world");
|
||||
{
|
||||
let body = initial;
|
||||
let (data, trailers) = body_to_string(body).await;
|
||||
assert_eq!(data, "hello world");
|
||||
assert_eq!(trailers.as_ref(), Some(&tlrs));
|
||||
}
|
||||
|
||||
let initial_tlrs = initial.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(initial_tlrs.as_ref(), Some(&tlrs));
|
||||
|
||||
// drop the initial body to send the data to the replay
|
||||
drop(initial);
|
||||
|
||||
// clone the body again and then drop it
|
||||
// Clone the body, and then drop it before polling it.
|
||||
let replay2 = replay.clone();
|
||||
drop(replay2);
|
||||
|
||||
assert_eq!(body_to_string(&mut replay).await, "hello world");
|
||||
let replay_tlrs = replay.trailers().await.expect("trailers should not error");
|
||||
assert_eq!(replay_tlrs.as_ref(), Some(&tlrs));
|
||||
{
|
||||
let body = replay;
|
||||
let (data, trailers) = body_to_string(body).await;
|
||||
assert_eq!(data, "hello world");
|
||||
assert_eq!(trailers.as_ref(), Some(&tlrs));
|
||||
}
|
||||
}
|
||||
|
||||
// This test is specifically for behavior across clones, so the clippy lint
|
||||
|
|
@ -808,40 +803,58 @@ mod tests {
|
|||
async fn eos_only_when_fully_replayed() {
|
||||
// Test that each clone of a body is not EOS until the data has been
|
||||
// fully replayed.
|
||||
let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024)
|
||||
let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024)
|
||||
.expect("body must not be too large");
|
||||
let mut replay = initial.clone();
|
||||
let replay = initial.clone();
|
||||
|
||||
body_to_string(&mut initial).await;
|
||||
assert!(!replay.is_end_stream());
|
||||
let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
|
||||
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
|
||||
|
||||
initial.trailers().await.expect("trailers should not error");
|
||||
// Read the initial body, show that the replay does not consider itself to have reached the
|
||||
// end-of-stream. Then drop the initial body, show that the replay is still not done.
|
||||
assert!(!initial.is_end_stream());
|
||||
initial
|
||||
.frame()
|
||||
.await
|
||||
.expect("yields a result")
|
||||
.expect("yields a frame")
|
||||
.into_data()
|
||||
.expect("yields a data frame");
|
||||
assert!(initial.is_end_stream());
|
||||
assert!(!replay.is_end_stream());
|
||||
|
||||
// drop the initial body to send the data to the replay
|
||||
drop(initial);
|
||||
|
||||
assert!(!replay.is_end_stream());
|
||||
|
||||
body_to_string(&mut replay).await;
|
||||
// Read the replay body.
|
||||
assert!(!replay.is_end_stream());
|
||||
|
||||
replay.trailers().await.expect("trailers should not error");
|
||||
replay
|
||||
.frame()
|
||||
.await
|
||||
.expect("yields a result")
|
||||
.expect("yields a frame")
|
||||
.into_data()
|
||||
.expect("yields a data frame");
|
||||
assert!(replay.frame().await.is_none());
|
||||
assert!(replay.is_end_stream());
|
||||
|
||||
// Even if we clone a body _after_ it has been driven to EOS, the clone
|
||||
// must not be EOS.
|
||||
let mut replay2 = replay.clone();
|
||||
// Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS.
|
||||
let replay = replay.into_inner();
|
||||
let replay2 = replay.clone();
|
||||
assert!(!replay2.is_end_stream());
|
||||
|
||||
// drop the initial body to send the data to the replay
|
||||
// Drop the first replay body to send the data to the second replay.
|
||||
drop(replay);
|
||||
|
||||
body_to_string(&mut replay2).await;
|
||||
assert!(!replay2.is_end_stream());
|
||||
|
||||
replay2.trailers().await.expect("trailers should not error");
|
||||
// Read the second replay body.
|
||||
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
|
||||
replay2
|
||||
.frame()
|
||||
.await
|
||||
.expect("yields a result")
|
||||
.expect("yields a frame")
|
||||
.into_data()
|
||||
.expect("yields a data frame");
|
||||
assert!(replay2.frame().await.is_none());
|
||||
assert!(replay2.is_end_stream());
|
||||
}
|
||||
|
||||
|
|
@ -1003,26 +1016,49 @@ mod tests {
|
|||
T: http_body::Body + Unpin,
|
||||
{
|
||||
tracing::trace!("waiting for a body chunk...");
|
||||
let chunk = body
|
||||
.data()
|
||||
let chunk = crate::compat::ForwardCompatibleBody::new(body)
|
||||
.frame()
|
||||
.await
|
||||
.map(|res| res.map_err(|_| ()).unwrap())
|
||||
.expect("yields a result")
|
||||
.ok()
|
||||
.expect("yields a frame")
|
||||
.into_data()
|
||||
.ok()
|
||||
.map(string);
|
||||
tracing::info!(?chunk);
|
||||
chunk
|
||||
}
|
||||
|
||||
async fn body_to_string<T>(mut body: T) -> String
|
||||
async fn body_to_string<B>(body: B) -> (String, Option<HeaderMap>)
|
||||
where
|
||||
T: http_body::Body + Unpin,
|
||||
T::Error: std::fmt::Debug,
|
||||
B: http_body::Body + Unpin,
|
||||
B::Error: std::fmt::Debug,
|
||||
{
|
||||
let mut s = String::new();
|
||||
while let Some(chunk) = chunk(&mut body).await {
|
||||
s.push_str(&chunk[..]);
|
||||
let mut body = crate::compat::ForwardCompatibleBody::new(body);
|
||||
let mut data = String::new();
|
||||
let mut trailers = None;
|
||||
|
||||
// Continue reading frames from the body until it is finished.
|
||||
while let Some(frame) = body
|
||||
.frame()
|
||||
.await
|
||||
.transpose()
|
||||
.expect("reading a frame succeeds")
|
||||
{
|
||||
match frame.into_data().map(string) {
|
||||
Ok(ref s) => data.push_str(s),
|
||||
Err(frame) => {
|
||||
let trls = frame
|
||||
.into_trailers()
|
||||
.map_err(drop)
|
||||
.expect("test frame is either data or trailers");
|
||||
trailers = Some(trls);
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(body = ?s, "no more data");
|
||||
s
|
||||
|
||||
tracing::info!(?data, ?trailers, "finished reading body");
|
||||
(data, trailers)
|
||||
}
|
||||
|
||||
fn string(mut data: impl Buf) -> String {
|
||||
|
|
|
|||
Loading…
Reference in New Issue