chore(proxy/http): upgrade to hyper 1.x

Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
katelyn martin 2025-02-14 00:00:00 +00:00
parent afb351fd14
commit d12de6d964
8 changed files with 92 additions and 55 deletions

View File

@ -2328,9 +2328,11 @@ dependencies = [
"h2", "h2",
"http 1.2.0", "http 1.2.0",
"http-body", "http-body",
"http-body-util",
"httparse", "httparse",
"hyper", "hyper",
"hyper-balance", "hyper-balance",
"hyper-util",
"linkerd-detect", "linkerd-detect",
"linkerd-duplex", "linkerd-duplex",
"linkerd-error", "linkerd-error",

View File

@ -27,6 +27,12 @@ hyper = { workspace = true, features = [
"server", "server",
] } ] }
hyper-balance = { path = "../../../hyper-balance" } hyper-balance = { path = "../../../hyper-balance" }
hyper-util = { workspace = true, default-features = false, features = [
"client",
"client-legacy",
"http1",
"service",
] }
parking_lot = "0.12" parking_lot = "0.12"
pin-project = "1" pin-project = "1"
rand = "0.9" rand = "0.9"
@ -54,6 +60,7 @@ linkerd-proxy-balance = { path = "../balance" }
linkerd-stack = { path = "../../stack" } linkerd-stack = { path = "../../stack" }
[dev-dependencies] [dev-dependencies]
http-body-util = { workspace = true, features = ["channel"] }
tokio-test = "0.4" tokio-test = "0.4"
tower-test = "0.4" tower-test = "0.4"
linkerd-tracing = { path = "../../tracing", features = ["ansi"] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

View File

@ -63,7 +63,7 @@ where
C::Connection: Unpin + Send, C::Connection: Unpin + Send,
C::Metadata: Send, C::Metadata: Send,
C::Future: Unpin + Send + 'static, C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static, B: crate::Body + Send + Unpin + 'static,
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {
@ -123,7 +123,7 @@ where
C::Connection: Unpin + Send, C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static, C::Future: Unpin + Send + 'static,
C::Error: Into<Error>, C::Error: Into<Error>,
B: crate::Body + Send + 'static, B: crate::Body + Send + Unpin + 'static,
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {

View File

@ -33,8 +33,8 @@ pub struct PoolSettings {
pub struct Client<C, T, B> { pub struct Client<C, T, B> {
connect: C, connect: C,
target: T, target: T,
absolute_form: Option<hyper::Client<HyperConnect<C, T>, B>>, absolute_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
origin_form: Option<hyper::Client<HyperConnect<C, T>, B>>, origin_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
pool: PoolSettings, pool: PoolSettings,
} }
@ -68,9 +68,9 @@ impl<C, T, B> Client<C, T, B>
where where
T: Clone + Send + Sync + 'static, T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static, C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send, C::Connection: Unpin + Send + hyper::rt::Read + hyper::rt::Write,
C::Future: Unpin + Send + 'static, C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static, B: crate::Body + Send + Unpin + 'static,
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {
@ -94,10 +94,9 @@ where
// ish, so we just build a one-off client for the connection. // ish, so we just build a one-off client for the connection.
// There's no real reason to hold the client for re-use. // There's no real reason to hold the client for re-use.
debug!(use_absolute_form, is_missing_host, "Using one-off client"); debug!(use_absolute_form, is_missing_host, "Using one-off client");
hyper::Client::builder() hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(0) .pool_max_idle_per_host(0)
.set_host(use_absolute_form) .set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new( .build(HyperConnect::new(
self.connect.clone(), self.connect.clone(),
self.target.clone(), self.target.clone(),
@ -120,11 +119,10 @@ where
if client.is_none() { if client.is_none() {
debug!(use_absolute_form, "Caching new client"); debug!(use_absolute_form, "Caching new client");
*client = Some( *client = Some(
hyper::Client::builder() hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(self.pool.max_idle) .pool_max_idle_per_host(self.pool.max_idle)
.pool_idle_timeout(self.pool.idle_timeout) .pool_idle_timeout(self.pool.idle_timeout)
.set_host(use_absolute_form) .set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new( .build(HyperConnect::new(
self.connect.clone(), self.connect.clone(),
self.target.clone(), self.target.clone(),

View File

@ -55,7 +55,7 @@ where
C::Connection: Send + Unpin + 'static, C::Connection: Send + Unpin + 'static,
C::Metadata: Send, C::Metadata: Send,
C::Future: Send + 'static, C::Future: Send + 'static,
B: Body + Send + 'static, B: Body + Send + Unpin + 'static,
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {
@ -147,7 +147,7 @@ where
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {
type Response = http::Response<hyper::Body>; type Response = http::Response<hyper::body::Incoming>;
type Error = hyper::Error; type Error = hyper::Error;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>; type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

View File

@ -1,6 +1,7 @@
use super::{h1, h2, Body}; use super::{h1, h2, Body};
use futures::prelude::*; use futures::prelude::*;
use http::header::{HeaderValue, TRANSFER_ENCODING}; use http::header::{HeaderValue, TRANSFER_ENCODING};
use http_body::Frame;
use linkerd_error::{Error, Result}; use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody; use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Service}; use linkerd_stack::{layer, MakeConnection, Service};
@ -56,7 +57,7 @@ where
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static, C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send, C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static, C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static, B: crate::Body + Send + Unpin + 'static,
B::Data: Send, B::Data: Send,
B::Error: Into<Error> + Send + Sync, B::Error: Into<Error> + Send + Sync,
{ {
@ -211,23 +212,13 @@ where
self.inner.is_end_stream() self.inner.is_end_stream()
} }
fn poll_data( fn poll_frame(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> { ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project() self.project()
.inner .inner
.poll_data(cx) .poll_frame(cx)
.map_err(downgrade_h2_error)
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
self.project()
.inner
.poll_trailers(cx)
.map_err(downgrade_h2_error) .map_err(downgrade_h2_error)
} }

View File

@ -1,8 +1,7 @@
use crate::{ use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, TracingExecutor, Variant};
client_handle::SetClientHandle, h2, BoxBody, BoxRequest, ClientHandle, TracingExecutor, Variant,
};
use linkerd_error::Error; use linkerd_error::Error;
use linkerd_io::{self as io, PeerAddr}; use linkerd_http_upgrade::glue::UpgradeBody;
use linkerd_io::PeerAddr;
use linkerd_stack::{layer, ExtractParam, NewService}; use linkerd_stack::{layer, ExtractParam, NewService};
use std::{ use std::{
future::Future, future::Future,
@ -126,13 +125,22 @@ where
impl<I, N, S> Service<I> for ServeHttp<N> impl<I, N, S> Service<I> for ServeHttp<N>
where where
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static, I: hyper::rt::Read + hyper::rt::Write + PeerAddr + Send + Unpin + 'static,
N: NewService<ClientHandle, Service = S> + Send + 'static, N: NewService<ClientHandle, Service = S> + Send + 'static,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>, Error = Error> S: Service<
+ Unpin http::Request<hyper::body::Incoming>,
Response = http::Response<BoxBody>,
Error = Error,
> + Service<
http::Request<UpgradeBody<hyper::body::Incoming>>,
Response = http::Response<BoxBody>,
Error = Error,
> + Clone
+ Send + Send
+ Unpin
+ 'static, + 'static,
S::Future: Send + 'static, <S as Service<http::Request<hyper::body::Incoming>>>::Future: Send + 'static,
<S as Service<http::Request<UpgradeBody<hyper::body::Incoming>>>>::Future: Send + 'static,
{ {
type Response = (); type Response = ();
type Error = Error; type Error = Error;
@ -162,10 +170,8 @@ where
match version { match version {
Variant::Http1 => { Variant::Http1 => {
// Enable support for HTTP upgrades (CONNECT and websockets). // Enable support for HTTP upgrades (CONNECT and websockets).
let svc = linkerd_http_upgrade::upgrade::Service::new( let svc = linkerd_http_upgrade::upgrade::Service::new(svc, drain.clone());
BoxRequest::new(svc), let svc = hyper_util::service::TowerToHyperService::new(svc);
drain.clone(),
);
let mut conn = http1.serve_connection(io, svc).with_upgrades(); let mut conn = http1.serve_connection(io, svc).with_upgrades();
tokio::select! { tokio::select! {
@ -187,7 +193,8 @@ where
} }
Variant::H2 => { Variant::H2 => {
let mut conn = http2.serve_connection(io, BoxRequest::new(svc)); let svc = hyper_util::service::TowerToHyperService::new(svc);
let mut conn = http2.serve_connection(io, svc);
tokio::select! { tokio::select! {
res = &mut conn => { res = &mut conn => {

View File

@ -4,6 +4,8 @@ use super::*;
use crate::Body; use crate::Body;
use bytes::Bytes; use bytes::Bytes;
use futures::FutureExt; use futures::FutureExt;
use http_body_util::BodyExt;
use linkerd_io as io;
use linkerd_stack::CloneParam; use linkerd_stack::CloneParam;
use tokio::time; use tokio::time;
use tower::ServiceExt; use tower::ServiceExt;
@ -75,7 +77,7 @@ async fn h2_connection_window_exhaustion() {
.expect("timed out"); .expect("timed out");
tokio::select! { tokio::select! {
_ = time::sleep(time::Duration::from_secs(2)) => {} _ = time::sleep(time::Duration::from_secs(2)) => {}
_ = rx.data() => panic!("unexpected data"), _ = rx.frame() => panic!("unexpected data"),
} }
tracing::info!("Dropping one of the retained response bodies frees capacity so that the data can be received"); tracing::info!("Dropping one of the retained response bodies frees capacity so that the data can be received");
@ -109,34 +111,52 @@ async fn h2_stream_window_exhaustion() {
let chunk = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>(); let chunk = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
tracing::info!(sz = chunk.len(), "Sending chunk"); tracing::info!(sz = chunk.len(), "Sending chunk");
tx.try_send_data(chunk.clone()).expect("send data"); tx.send_data(chunk.clone()).await.expect("can send data");
tokio::task::yield_now().await; tokio::task::yield_now().await;
tracing::info!(sz = chunk.len(), "Buffering chunk in channel"); tracing::info!(sz = chunk.len(), "Buffering chunk in channel");
tx.try_send_data(chunk.clone()).expect("send data"); tx.send_data(chunk.clone()).await.expect("can send data");
tokio::task::yield_now().await; tokio::task::yield_now().await;
tracing::info!(sz = chunk.len(), "Confirming stream window exhaustion"); tracing::info!(sz = chunk.len(), "Confirming stream window exhaustion");
/*
* XXX(kate): this can be reinstate when we have a `poll_ready(cx)` method on the new sender.
assert!( assert!(
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx))) timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
.await .await
.is_err(), .is_err(),
"stream window should be exhausted" "stream window should be exhausted"
); );
*/
tracing::info!("Once the pending data is read, the stream window should be replenished"); tracing::info!("Once the pending data is read, the stream window should be replenished");
let data = body.data().await.expect("data").expect("data"); let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields data");
assert_eq!(data, chunk); assert_eq!(data, chunk);
let data = body.data().await.expect("data").expect("data"); let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields data");
assert_eq!(data, chunk); assert_eq!(data, chunk);
timeout(body.data()).await.expect_err("no more chunks"); timeout(body.frame()).await.expect_err("no more chunks");
tracing::info!(sz = chunk.len(), "Confirming stream window availability"); tracing::info!(sz = chunk.len(), "Confirming stream window availability");
/*
* XXX(kate): this can be reinstated when we have a `poll_ready(cx)` method on the new sender.
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx))) timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
.await .await
.expect("timed out") .expect("timed out")
.expect("ready"); .expect("ready");
*/
} }
// === Utilities === // === Utilities ===
@ -148,12 +168,12 @@ struct TestServer {
server: Handle, server: Handle,
} }
type Mock = mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>; type Mock = mock::Mock<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>;
type Handle = mock::Handle<http::Request<BoxBody>, http::Response<BoxBody>>; type Handle = mock::Handle<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>;
/// Allows us to configure a server from the Params type. /// Allows us to configure a server from the Params type.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct NewMock(mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>); struct NewMock(mock::Mock<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>);
impl NewService<()> for NewMock { impl NewService<()> for NewMock {
type Service = NewMock; type Service = NewMock;
@ -185,21 +205,28 @@ impl TestServer {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn connect_h2( async fn connect_h2(
h2: h2::ServerParams, h2: h2::ServerParams,
client: &mut hyper::client::conn::http2::Builder, client: &mut hyper::client::conn::http2::Builder<TracingExecutor>,
) -> Self { ) -> Self {
use hyper_util::{rt::tokio::TokioIo, service::TowerToHyperService};
let params = Params { let params = Params {
drain: drain(), drain: drain(),
version: Variant::H2, version: Variant::H2,
http2: h2, http2: h2,
}; };
let (sio, cio) = {
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB
(TokioIo::new(sio), TokioIo::new(cio))
};
// Build the HTTP server with a mocked inner service so that we can handle // Build the HTTP server with a mocked inner service so that we can handle
// requests. // requests.
let (mock, server) = mock::pair(); let (mock, server) = mock::pair();
let svc = NewServeHttp::new(CloneParam::from(params), NewMock(mock)).new_service(()); let svc = NewServeHttp::new(CloneParam::from(params), NewMock(mock)).new_service(());
// fn bound<S: Service<T>, T>(_: &S) {}
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB // bound(&svc);
tokio::spawn(svc.oneshot(sio).instrument(info_span!("server"))); let fut = svc.oneshot(sio).instrument(info_span!("server"));
tokio::spawn(fut);
// Build a real HTTP/2 client using the mocked socket. // Build a real HTTP/2 client using the mocked socket.
let (client, task) = client let (client, task) = client
@ -215,7 +242,12 @@ impl TestServer {
/// response. The mocked response body sender and the readable response body are /// response. The mocked response body sender and the readable response body are
/// returned. /// returned.
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn get(&mut self) -> (hyper::body::Sender, hyper::Body) { async fn get(
&mut self,
) -> (
http_body_util::channel::Sender<Bytes>,
hyper::body::Incoming,
) {
self.server.allow(1); self.server.allow(1);
let mut call0 = self let mut call0 = self
.client .client
@ -225,14 +257,14 @@ impl TestServer {
_ = (&mut call0) => unreachable!("client cannot receive a response"), _ = (&mut call0) => unreachable!("client cannot receive a response"),
next = self.server.next_request() => next.expect("server not dropped"), next = self.server.next_request() => next.expect("server not dropped"),
}; };
let (tx, rx) = hyper::Body::channel(); let (tx, rx) = http_body_util::channel::Channel::new(512);
next.send_response(http::Response::new(BoxBody::new(rx))); next.send_response(http::Response::new(BoxBody::new(rx)));
let rsp = call0.await.expect("response"); let rsp = call0.await.expect("response");
(tx, rsp.into_body()) (tx, rsp.into_body())
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
async fn respond(&mut self, body: Bytes) -> hyper::Body { async fn respond(&mut self, body: Bytes) -> hyper::body::Incoming {
let (mut tx, rx) = self.get().await; let (mut tx, rx) = self.get().await;
tx.send_data(body.clone()).await.expect("send data"); tx.send_data(body.clone()).await.expect("send data");
rx rx