refactor(app): update deprecated hyper body calls (#3411)
* chore(app/admin): add `http-body` dependency before we address deprecated hyper interfaces related to `http_bodies`, we'll want to add this dependency so that we can call `Body::collect()`. Signed-off-by: katelyn martin <kate@buoyant.io> * refactor(app): update deprecated hyper body calls hyper 0.14.x provided a collection of interfaces related to collecting and aggregating request and response bodies, which were deprecated and removed in the 1.x major release. this commit updates calls to `hyper::body::to_bytes(..)` and `hyper::body::aggregate(..)`. for now, `http_body::Body` is used, but we can use `http_body_util::BodyExt` once we've bumped our hyper dependency to the 1.x major release. for more information, see: * https://github.com/linkerd/linkerd2/issues/8733 * https://github.com/hyperium/hyper/issues/2840 * https://github.com/hyperium/hyper/pull/3020 Signed-off-by: katelyn martin <kate@buoyant.io> --------- Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
parent
a4113a0d01
commit
36474b3e20
|
|
@ -1326,6 +1326,7 @@ dependencies = [
|
||||||
"deflate",
|
"deflate",
|
||||||
"futures",
|
"futures",
|
||||||
"http",
|
"http",
|
||||||
|
"http-body",
|
||||||
"hyper",
|
"hyper",
|
||||||
"linkerd-app-core",
|
"linkerd-app-core",
|
||||||
"linkerd-app-inbound",
|
"linkerd-app-inbound",
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ log-streaming = ["linkerd-tracing/stream"]
|
||||||
[dependencies]
|
[dependencies]
|
||||||
deflate = { version = "1", optional = true, features = ["gzip"] }
|
deflate = { version = "1", optional = true, features = ["gzip"] }
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
|
http-body = "0.4"
|
||||||
hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] }
|
hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] }
|
||||||
futures = { version = "0.3", default-features = false }
|
futures = { version = "0.3", default-features = false }
|
||||||
pprof = { version = "0.14", optional = true, features = ["prost-codec"] }
|
pprof = { version = "0.14", optional = true, features = ["prost-codec"] }
|
||||||
|
|
|
||||||
|
|
@ -21,10 +21,12 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
http::Method::PUT => {
|
http::Method::PUT => {
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = req
|
||||||
let body = hyper::body::aggregate(req.into_body())
|
.into_body()
|
||||||
|
.collect()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
|
||||||
|
.aggregate();
|
||||||
match level.set_from(body.chunk()) {
|
match level.set_from(body.chunk()) {
|
||||||
Ok(_) => mk_rsp(StatusCode::NO_CONTENT, Body::empty()),
|
Ok(_) => mk_rsp(StatusCode::NO_CONTENT, Body::empty()),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
|
|
||||||
|
|
@ -52,11 +52,11 @@ where
|
||||||
// If the request is a QUERY, use the request body
|
// If the request is a QUERY, use the request body
|
||||||
method if method.as_str() == "QUERY" => {
|
method if method.as_str() == "QUERY" => {
|
||||||
// TODO(eliza): validate that the request has a content-length...
|
// TODO(eliza): validate that the request has a content-length...
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
|
||||||
let body = recover!(
|
let body = recover!(
|
||||||
hyper::body::aggregate(req.into_body())
|
http_body::Body::collect(req.into_body())
|
||||||
.await
|
.await
|
||||||
.map_err(Into::into),
|
.map_err(Into::into)
|
||||||
|
.map(http_body::Collected::aggregate),
|
||||||
"Reading log stream request body",
|
"Reading log stream request body",
|
||||||
StatusCode::BAD_REQUEST
|
StatusCode::BAD_REQUEST
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -209,8 +209,7 @@ where
|
||||||
// just can't prove it.
|
// just can't prove it.
|
||||||
let req = futures::executor::block_on(async move {
|
let req = futures::executor::block_on(async move {
|
||||||
let (parts, body) = req.into_parts();
|
let (parts, body) = req.into_parts();
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = match body.collect().await.map(http_body::Collected::to_bytes) {
|
||||||
let body = match hyper::body::to_bytes(body).await {
|
|
||||||
Ok(body) => body,
|
Ok(body) => body,
|
||||||
Err(_) => unreachable!("body should not fail"),
|
Err(_) => unreachable!("body should not fail"),
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -482,14 +482,15 @@ mod http2 {
|
||||||
let res = fut.await.expect("beta response");
|
let res = fut.await.expect("beta response");
|
||||||
assert_eq!(res.status(), http::StatusCode::OK);
|
assert_eq!(res.status(), http::StatusCode::OK);
|
||||||
|
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = {
|
||||||
let body = String::from_utf8(
|
let body = res.into_body();
|
||||||
hyper::body::to_bytes(res.into_body())
|
let body = http_body::Body::collect(body)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.to_vec(),
|
.to_bytes()
|
||||||
)
|
.to_vec();
|
||||||
.unwrap();
|
String::from_utf8(body).unwrap()
|
||||||
|
};
|
||||||
assert_eq!(body, "beta");
|
assert_eq!(body, "beta");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -129,9 +129,11 @@ impl TestBuilder {
|
||||||
async move {
|
async move {
|
||||||
// Read the entire body before responding, so that the
|
// Read the entire body before responding, so that the
|
||||||
// client doesn't fail when writing it out.
|
// client doesn't fail when writing it out.
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = http_body::Body::collect(req.into_body())
|
||||||
let _body = hyper::body::to_bytes(req.into_body()).await;
|
.await
|
||||||
tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body");
|
.map(http_body::Collected::to_bytes);
|
||||||
|
let bytes = body.as_ref().map(Bytes::len);
|
||||||
|
tracing::debug!(?bytes, "recieved body");
|
||||||
Ok::<_, Error>(if fail {
|
Ok::<_, Error>(if fail {
|
||||||
Response::builder().status(533).body("nope".into()).unwrap()
|
Response::builder().status(533).body("nope".into()).unwrap()
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -48,14 +48,13 @@ async fn h2_exercise_goaways_connections() {
|
||||||
|
|
||||||
let bodies = resps
|
let bodies = resps
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(
|
.map(Response::into_body)
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
.map(|body| {
|
||||||
|resp| {
|
http_body::Body::collect(body)
|
||||||
hyper::body::aggregate(resp.into_body())
|
.map_ok(http_body::Collected::aggregate)
|
||||||
// Make sure the bodies weren't cut off
|
// Make sure the bodies weren't cut off
|
||||||
.map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE))
|
.map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE))
|
||||||
},
|
})
|
||||||
)
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// See that the proxy gives us all the bodies.
|
// See that the proxy gives us all the bodies.
|
||||||
|
|
|
||||||
|
|
@ -253,8 +253,12 @@ async fn grpc_headers_end() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(res.status(), 200);
|
assert_eq!(res.status(), 200);
|
||||||
assert_eq!(res.headers()["grpc-status"], "1");
|
assert_eq!(res.headers()["grpc-status"], "1");
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = res.into_body();
|
||||||
let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap().len();
|
let bytes = http_body::Body::collect(body)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.to_bytes()
|
||||||
|
.len();
|
||||||
assert_eq!(bytes, 0);
|
assert_eq!(bytes, 0);
|
||||||
|
|
||||||
let event = events.skip(2).next().await.expect("2nd").expect("stream");
|
let event = events.skip(2).next().await.expect("2nd").expect("stream");
|
||||||
|
|
|
||||||
|
|
@ -1304,10 +1304,13 @@ async fn metrics_compression() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let mut body = {
|
||||||
let mut body = hyper::body::aggregate(resp.into_body())
|
let body = resp.into_body();
|
||||||
.await
|
http_body::Body::collect(body)
|
||||||
.expect("response body concat");
|
.await
|
||||||
|
.expect("response body concat")
|
||||||
|
.aggregate()
|
||||||
|
};
|
||||||
let mut decoder = flate2::read::GzDecoder::new(std::io::Cursor::new(
|
let mut decoder = flate2::read::GzDecoder::new(std::io::Cursor::new(
|
||||||
body.copy_to_bytes(body.remaining()),
|
body.copy_to_bytes(body.remaining()),
|
||||||
));
|
));
|
||||||
|
|
|
||||||
|
|
@ -144,10 +144,12 @@ async fn assert_rsp<T: std::fmt::Debug>(
|
||||||
{
|
{
|
||||||
let rsp = rsp.await.expect("response must not fail");
|
let rsp = rsp.await.expect("response must not fail");
|
||||||
assert_eq!(rsp.status(), status, "expected status code to be {status}");
|
assert_eq!(rsp.status(), status, "expected status code to be {status}");
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = rsp
|
||||||
let body = hyper::body::to_bytes(rsp.into_body())
|
.into_body()
|
||||||
|
.collect()
|
||||||
.await
|
.await
|
||||||
.expect("body must not fail");
|
.expect("body must not fail")
|
||||||
|
.to_bytes();
|
||||||
assert_eq!(body, expected_body, "expected body to be {expected_body:?}");
|
assert_eq!(body, expected_body, "expected body to be {expected_body:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -121,9 +121,10 @@ where
|
||||||
T: HttpBody,
|
T: HttpBody,
|
||||||
T::Error: Into<Error>,
|
T::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
#[allow(deprecated)] // linkerd/linkerd2#8733
|
let body = body
|
||||||
let body = hyper::body::to_bytes(body)
|
.collect()
|
||||||
.await
|
.await
|
||||||
|
.map(http_body::Collected::to_bytes)
|
||||||
.map_err(ContextError::ctx("HTTP response body stream failed"))?;
|
.map_err(ContextError::ctx("HTTP response body stream failed"))?;
|
||||||
let body = std::str::from_utf8(&body[..])
|
let body = std::str::from_utf8(&body[..])
|
||||||
.map_err(ContextError::ctx("converting body to string failed"))?
|
.map_err(ContextError::ctx("converting body to string failed"))?
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue