chore(http/upgrade): replace `hyper::Body` with `BoxBody` (#3479)

* chore(http/upgrade): replace `hyper::Body` with `BoxBody`

`hyper::Body` is removed in the 1.0 version.

this commit removes it from our upgrade facilities, using a generic body
parameter that defaults to BoxBody.

see <https://github.com/linkerd/linkerd2/issues/8733>.

Signed-off-by: katelyn martin <kate@buoyant.io>

* review(http/upgrade): remove frivolous `Unpin` bound

https://github.com/linkerd/linkerd2-proxy/pull/3479/files#r1894068885

in `main` this isn't currently pinned, so this was needed to add the `B`
parameter originally in development, but tweaking how we poll the body
(_see lines 70-80, below_) means this bound is indeed frivolous now.

this commit removes an extraneous `Unpin` bound.

Co-authored-by: Scott Fleener <scott@buoyant.io>
Signed-off-by: katelyn martin <kate@buoyant.io>

---------

Signed-off-by: katelyn martin <kate@buoyant.io>
Co-authored-by: Scott Fleener <scott@buoyant.io>
This commit is contained in:
katelyn martin 2024-12-20 11:53:07 -05:00 committed by GitHub
parent 81b43e5633
commit fcfde84a39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 49 additions and 43 deletions

View File

@ -1859,6 +1859,7 @@ dependencies = [
"hyper",
"linkerd-duplex",
"linkerd-error",
"linkerd-http-box",
"linkerd-http-version",
"linkerd-io",
"linkerd-stack",

View File

@ -24,6 +24,7 @@ try-lock = "0.2"
linkerd-duplex = { path = "../../duplex" }
linkerd-error = { path = "../../error" }
linkerd-http-box = { path = "../box" }
linkerd-http-version = { path = "../version" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }

View File

@ -1,9 +1,9 @@
use crate::upgrade::Http11Upgrade;
use bytes::Bytes;
use futures::TryFuture;
use futures::{ready, TryFuture};
use http_body::Body;
use hyper::client::connect as hyper_connect;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_io::{self as io, AsyncRead, AsyncWrite};
use linkerd_stack::{MakeConnection, Service};
use pin_project::{pin_project, pinned_drop};
@ -17,10 +17,11 @@ use tracing::debug;
/// Provides optional HTTP/1.1 upgrade support on the body.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct UpgradeBody {
pub struct UpgradeBody<B = BoxBody> {
/// In UpgradeBody::drop, if this was an HTTP upgrade, the body is taken
/// to be inserted into the Http11Upgrade half.
body: hyper::Body,
#[pin]
body: B,
pub(super) upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>,
}
@ -50,9 +51,13 @@ pub struct HyperConnectFuture<F> {
// === impl UpgradeBody ===
impl Body for UpgradeBody {
type Data = Bytes;
type Error = hyper::Error;
impl<B> Body for UpgradeBody<B>
where
B: Body,
B::Error: std::fmt::Display,
{
type Data = B::Data;
type Error = B::Error;
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
@ -62,28 +67,34 @@ impl Body for UpgradeBody {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let body = self.project().body;
let poll = futures::ready!(Pin::new(body) // `hyper::Body` is Unpin
.poll_data(cx));
Poll::Ready(poll.map(|x| {
x.map_err(|e| {
debug!("http body error: {}", e);
e
})
}))
// Poll the next chunk from the body.
let this = self.project();
let body = this.body;
let data = ready!(body.poll_data(cx));
// Log errors.
if let Some(Err(e)) = &data {
debug!("http body error: {}", e);
}
Poll::Ready(data)
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let body = self.project().body;
Pin::new(body) // `hyper::Body` is Unpin
.poll_trailers(cx)
.map_err(|e| {
debug!("http trailers error: {}", e);
e
})
// Poll the trailers from the body.
let this = self.project();
let body = this.body;
let trailers = ready!(body.poll_trailers(cx));
// Log errors.
if let Err(e) = &trailers {
debug!("http trailers error: {}", e);
}
Poll::Ready(trailers)
}
#[inline]
@ -92,32 +103,23 @@ impl Body for UpgradeBody {
}
}
impl Default for UpgradeBody {
impl<B: Default> Default for UpgradeBody<B> {
fn default() -> Self {
hyper::Body::empty().into()
}
}
impl From<hyper::Body> for UpgradeBody {
fn from(body: hyper::Body) -> Self {
Self {
body,
body: B::default(),
upgrade: None,
}
}
}
impl UpgradeBody {
pub fn new(
body: hyper::Body,
upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>,
) -> Self {
impl<B> UpgradeBody<B> {
pub fn new(body: B, upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>) -> Self {
Self { body, upgrade }
}
}
#[pinned_drop]
impl PinnedDrop for UpgradeBody {
impl<B> PinnedDrop for UpgradeBody<B> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
// If an HTTP/1 upgrade was wanted, send the upgrade future.
@ -164,6 +166,8 @@ where
}
}
// === impl HyperConnectFuture ===
impl<F, I, M> Future for HyperConnectFuture<F>
where
F: TryFuture<Ok = (I, M)> + 'static,
@ -181,7 +185,7 @@ where
}
}
// === impl Connected ===
// === impl Connection ===
impl<C> AsyncRead for Connection<C>
where

View File

@ -174,20 +174,20 @@ impl<S> Service<S> {
type ResponseFuture<F, B, E> = Either<F, future::Ready<Result<http::Response<B>, E>>>;
impl<S, B> tower::Service<http::Request<hyper::Body>> for Service<S>
impl<S, ReqB, RespB> tower::Service<http::Request<ReqB>> for Service<S>
where
S: tower::Service<http::Request<UpgradeBody>, Response = http::Response<B>>,
B: Default,
S: tower::Service<http::Request<UpgradeBody<ReqB>>, Response = http::Response<RespB>>,
RespB: Default,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future, B, S::Error>;
type Future = ResponseFuture<S::Future, RespB, S::Error>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, mut req: http::Request<hyper::Body>) -> Self::Future {
fn call(&mut self, mut req: http::Request<ReqB>) -> Self::Future {
// Should this rejection happen later in the Service stack?
//
// Rejecting here means telemetry doesn't record anything about it...