chore(proxy/http): upgrade to hyper 1.x

Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
katelyn martin 2025-02-14 00:00:00 +00:00
parent 3481d8b0f3
commit 1bddc61fba
8 changed files with 92 additions and 55 deletions

View File

@ -2327,9 +2327,11 @@ dependencies = [
"h2",
"http 1.2.0",
"http-body",
"http-body-util",
"httparse",
"hyper",
"hyper-balance",
"hyper-util",
"linkerd-detect",
"linkerd-duplex",
"linkerd-error",

View File

@ -27,6 +27,12 @@ hyper = { workspace = true, features = [
"server",
] }
hyper-balance = { path = "../../../hyper-balance" }
hyper-util = { workspace = true, default-features = false, features = [
"client",
"client-legacy",
"http1",
"service",
] }
parking_lot = "0.12"
pin-project = "1"
rand = "0.9"
@ -54,6 +60,7 @@ linkerd-proxy-balance = { path = "../balance" }
linkerd-stack = { path = "../../stack" }
[dev-dependencies]
http-body-util = { workspace = true, features = ["channel"] }
tokio-test = "0.4"
tower-test = "0.4"
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

View File

@ -63,7 +63,7 @@ where
C::Connection: Unpin + Send,
C::Metadata: Send,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
@ -123,7 +123,7 @@ where
C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
C::Error: Into<Error>,
B: crate::Body + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{

View File

@ -33,8 +33,8 @@ pub struct PoolSettings {
pub struct Client<C, T, B> {
connect: C,
target: T,
absolute_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
origin_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
absolute_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
origin_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
pool: PoolSettings,
}
@ -68,9 +68,9 @@ impl<C, T, B> Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Connection: Unpin + Send + hyper::rt::Read + hyper::rt::Write,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
@ -94,10 +94,9 @@ where
// ish, so we just build a one-off client for the connection.
// There's no real reason to hold the client for re-use.
debug!(use_absolute_form, is_missing_host, "Using one-off client");
hyper::Client::builder()
hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(0)
.set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new(
self.connect.clone(),
self.target.clone(),
@ -120,11 +119,10 @@ where
if client.is_none() {
debug!(use_absolute_form, "Caching new client");
*client = Some(
hyper::Client::builder()
hyper_util::client::legacy::Client::builder(TracingExecutor)
.pool_max_idle_per_host(self.pool.max_idle)
.pool_idle_timeout(self.pool.idle_timeout)
.set_host(use_absolute_form)
.executor(TracingExecutor)
.build(HyperConnect::new(
self.connect.clone(),
self.target.clone(),

View File

@ -55,7 +55,7 @@ where
C::Connection: Send + Unpin + 'static,
C::Metadata: Send,
C::Future: Send + 'static,
B: Body + Send + 'static,
B: Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
@ -147,7 +147,7 @@ where
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
type Response = http::Response<hyper::Body>;
type Response = http::Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

View File

@ -1,6 +1,7 @@
use super::{h1, h2, Body};
use futures::prelude::*;
use http::header::{HeaderValue, TRANSFER_ENCODING};
use http_body::Frame;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Service};
@ -56,7 +57,7 @@ where
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
@ -211,23 +212,13 @@ where
self.inner.is_end_stream()
}
fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project()
.inner
.poll_data(cx)
.map_err(downgrade_h2_error)
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
self.project()
.inner
.poll_trailers(cx)
.poll_frame(cx)
.map_err(downgrade_h2_error)
}

View File

@ -1,8 +1,7 @@
use crate::{
client_handle::SetClientHandle, h2, BoxBody, BoxRequest, ClientHandle, TracingExecutor, Variant,
};
use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, TracingExecutor, Variant};
use linkerd_error::Error;
use linkerd_io::{self as io, PeerAddr};
use linkerd_http_upgrade::glue::UpgradeBody;
use linkerd_io::PeerAddr;
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
future::Future,
@ -126,13 +125,22 @@ where
impl<I, N, S> Service<I> for ServeHttp<N>
where
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
I: hyper::rt::Read + hyper::rt::Write + PeerAddr + Send + Unpin + 'static,
N: NewService<ClientHandle, Service = S> + Send + 'static,
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>, Error = Error>
+ Unpin
S: Service<
http::Request<hyper::body::Incoming>,
Response = http::Response<BoxBody>,
Error = Error,
> + Service<
http::Request<UpgradeBody<hyper::body::Incoming>>,
Response = http::Response<BoxBody>,
Error = Error,
> + Clone
+ Send
+ Unpin
+ 'static,
S::Future: Send + 'static,
<S as Service<http::Request<hyper::body::Incoming>>>::Future: Send + 'static,
<S as Service<http::Request<UpgradeBody<hyper::body::Incoming>>>>::Future: Send + 'static,
{
type Response = ();
type Error = Error;
@ -162,10 +170,8 @@ where
match version {
Variant::Http1 => {
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = linkerd_http_upgrade::upgrade::Service::new(
BoxRequest::new(svc),
drain.clone(),
);
let svc = linkerd_http_upgrade::upgrade::Service::new(svc, drain.clone());
let svc = hyper_util::service::TowerToHyperService::new(svc);
let mut conn = http1.serve_connection(io, svc).with_upgrades();
tokio::select! {
@ -187,7 +193,8 @@ where
}
Variant::H2 => {
let mut conn = http2.serve_connection(io, BoxRequest::new(svc));
let svc = hyper_util::service::TowerToHyperService::new(svc);
let mut conn = http2.serve_connection(io, svc);
tokio::select! {
res = &mut conn => {

View File

@ -4,6 +4,8 @@ use super::*;
use crate::Body;
use bytes::Bytes;
use futures::FutureExt;
use http_body_util::BodyExt;
use linkerd_io as io;
use linkerd_stack::CloneParam;
use tokio::time;
use tower::ServiceExt;
@ -75,7 +77,7 @@ async fn h2_connection_window_exhaustion() {
.expect("timed out");
tokio::select! {
_ = time::sleep(time::Duration::from_secs(2)) => {}
_ = rx.data() => panic!("unexpected data"),
_ = rx.frame() => panic!("unexpected data"),
}
tracing::info!("Dropping one of the retained response bodies frees capacity so that the data can be received");
@ -109,34 +111,52 @@ async fn h2_stream_window_exhaustion() {
let chunk = (0..CLIENT_STREAM_WINDOW).map(|_| b'a').collect::<Bytes>();
tracing::info!(sz = chunk.len(), "Sending chunk");
tx.try_send_data(chunk.clone()).expect("send data");
tx.send_data(chunk.clone()).await.expect("can send data");
tokio::task::yield_now().await;
tracing::info!(sz = chunk.len(), "Buffering chunk in channel");
tx.try_send_data(chunk.clone()).expect("send data");
tx.send_data(chunk.clone()).await.expect("can send data");
tokio::task::yield_now().await;
tracing::info!(sz = chunk.len(), "Confirming stream window exhaustion");
/*
* XXX(kate): this can be reinstate when we have a `poll_ready(cx)` method on the new sender.
assert!(
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
.await
.is_err(),
"stream window should be exhausted"
);
*/
tracing::info!("Once the pending data is read, the stream window should be replenished");
let data = body.data().await.expect("data").expect("data");
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields data");
assert_eq!(data, chunk);
let data = body.data().await.expect("data").expect("data");
let data = body
.frame()
.await
.expect("yields a result")
.expect("yields a frame")
.into_data()
.expect("yields data");
assert_eq!(data, chunk);
timeout(body.data()).await.expect_err("no more chunks");
timeout(body.frame()).await.expect_err("no more chunks");
tracing::info!(sz = chunk.len(), "Confirming stream window availability");
/*
* XXX(kate): this can be reinstated when we have a `poll_ready(cx)` method on the new sender.
timeout(futures::future::poll_fn(|cx| tx.poll_ready(cx)))
.await
.expect("timed out")
.expect("ready");
*/
}
// === Utilities ===
@ -148,12 +168,12 @@ struct TestServer {
server: Handle,
}
type Mock = mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>;
type Handle = mock::Handle<http::Request<BoxBody>, http::Response<BoxBody>>;
type Mock = mock::Mock<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>;
type Handle = mock::Handle<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>;
/// Allows us to configure a server from the Params type.
#[derive(Clone, Debug)]
struct NewMock(mock::Mock<http::Request<BoxBody>, http::Response<BoxBody>>);
struct NewMock(mock::Mock<http::Request<hyper::body::Incoming>, http::Response<BoxBody>>);
impl NewService<()> for NewMock {
type Service = NewMock;
@ -185,21 +205,28 @@ impl TestServer {
#[tracing::instrument(skip_all)]
async fn connect_h2(
h2: h2::ServerParams,
client: &mut hyper::client::conn::http2::Builder,
client: &mut hyper::client::conn::http2::Builder<TracingExecutor>,
) -> Self {
use hyper_util::{rt::tokio::TokioIo, service::TowerToHyperService};
let params = Params {
drain: drain(),
version: Variant::H2,
http2: h2,
};
let (sio, cio) = {
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB
(TokioIo::new(sio), TokioIo::new(cio))
};
// Build the HTTP server with a mocked inner service so that we can handle
// requests.
let (mock, server) = mock::pair();
let svc = NewServeHttp::new(CloneParam::from(params), NewMock(mock)).new_service(());
let (sio, cio) = io::duplex(20 * 1024 * 1024); // 20 MB
tokio::spawn(svc.oneshot(sio).instrument(info_span!("server")));
// fn bound<S: Service<T>, T>(_: &S) {}
// bound(&svc);
let fut = svc.oneshot(sio).instrument(info_span!("server"));
tokio::spawn(fut);
// Build a real HTTP/2 client using the mocked socket.
let (client, task) = client
@ -215,7 +242,12 @@ impl TestServer {
/// response. The mocked response body sender and the readable response body are
/// returned.
#[tracing::instrument(skip(self))]
async fn get(&mut self) -> (hyper::body::Sender, hyper::Body) {
async fn get(
&mut self,
) -> (
http_body_util::channel::Sender<Bytes>,
hyper::body::Incoming,
) {
self.server.allow(1);
let mut call0 = self
.client
@ -225,14 +257,14 @@ impl TestServer {
_ = (&mut call0) => unreachable!("client cannot receive a response"),
next = self.server.next_request() => next.expect("server not dropped"),
};
let (tx, rx) = hyper::Body::channel();
let (tx, rx) = http_body_util::channel::Channel::new(512);
next.send_response(http::Response::new(BoxBody::new(rx)));
let rsp = call0.await.expect("response");
(tx, rsp.into_body())
}
#[tracing::instrument(skip(self))]
async fn respond(&mut self, body: Bytes) -> hyper::Body {
async fn respond(&mut self, body: Bytes) -> hyper::body::Incoming {
let (mut tx, rx) = self.get().await;
tx.send_data(body.clone()).await.expect("send data");
rx