chore(proxy/http): use `hyper_util::rt::tokio::TokioExecutor` (#3833)

this commit removes the `linkerd-http-executor` crate, and replaces all
usage of its `TracingExecutor` type with the `TokioExecutor` type
provided by `hyper-util`.

this work is based upon hyperium/hyper-util#166. that change, included
in the 0.1.11 release, altered the `TokioExecutor` type so that it
propagates tracing context when the `tracing` feature is enabled.

with that change made, our `TracingExecutor` type is now redundant.

* https://github.com/hyperium/hyper-util/pull/166
* https://github.com/hyperium/hyper-util/blob/master/CHANGELOG.md#0111-2025-03-31

Signed-off-by: katelyn martin <kate@buoyant.io>
This commit is contained in:
katelyn martin 2025-04-01 13:19:53 -04:00 committed by GitHub
parent c01e7e268a
commit 686934cec6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 52 additions and 91 deletions

View File

@ -1879,15 +1879,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "linkerd-http-executor"
version = "0.1.0"
dependencies = [
"hyper",
"tokio",
"tracing",
]
[[package]]
name = "linkerd-http-h2"
version = "0.1.0"
@ -2424,7 +2415,6 @@ dependencies = [
"linkerd-http-box",
"linkerd-http-classify",
"linkerd-http-detect",
"linkerd-http-executor",
"linkerd-http-h2",
"linkerd-http-insert",
"linkerd-http-override-authority",

View File

@ -27,7 +27,6 @@ members = [
"linkerd/http/box",
"linkerd/http/classify",
"linkerd/http/detect",
"linkerd/http/executor",
"linkerd/http/h2",
"linkerd/http/insert",
"linkerd/http/metrics",
@ -111,7 +110,6 @@ h2 = { version = "0.4" }
http = { version = "1" }
http-body = { version = "1" }
hyper = { version = "1", default-features = false }
hyper-util = { version = "0.1", default-features = false }
prometheus-client = { version = "0.23" }
prost = { version = "0.13" }
prost-build = { version = "0.13", default-features = false }
@ -131,5 +129,10 @@ version = "0.1.3"
default-features = false
features = ["channel"]
[workspace.dependencies.hyper-util]
version = "0.1"
default-features = false
features = ["tokio", "tracing"]
[workspace.dependencies.linkerd2-proxy-api]
version = "0.16.0"

View File

@ -12,7 +12,7 @@ use linkerd_app_core::{
errors::header::L5D_PROXY_ERROR,
identity, io, metrics,
proxy::http::{self, BoxBody},
svc::{self, http::TracingExecutor, NewService, Param},
svc::{self, http::TokioExecutor, NewService, Param},
tls,
transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr},
Error, NameAddr, ProxyRuntime,
@ -95,7 +95,7 @@ async fn downgrade_origin_form() {
// Reproduces https://github.com/linkerd/linkerd2/issues/5298
let mut server = hyper::server::conn::http1::Builder::new();
server.timer(hyper_util::rt::TokioTimer::new());
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let _trace = trace_init();
@ -168,7 +168,7 @@ async fn downgrade_origin_form() {
#[tokio::test(flavor = "current_thread")]
async fn downgrade_absolute_form() {
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let mut server = hyper::server::conn::http1::Builder::new();
server.timer(hyper_util::rt::TokioTimer::new());
@ -441,7 +441,7 @@ async fn h2_response_meshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());
// Build a client using the connect that always errors.
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let profiles = profile::resolver();
let profile_tx =
@ -482,7 +482,7 @@ async fn h2_response_unmeshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());
// Build a client using the connect that always errors.
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let profiles = profile::resolver();
let profile_tx =
@ -525,7 +525,7 @@ async fn grpc_meshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());
// Build a client using the connect that always errors.
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let profiles = profile::resolver();
let profile_tx =
@ -567,7 +567,7 @@ async fn grpc_unmeshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());
// Build a client using the connect that always errors.
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let profiles = profile::resolver();
let profile_tx =
@ -609,7 +609,7 @@ async fn grpc_response_class() {
// Build a mock connector serves a gRPC server that returns errors.
let connect = {
let mut server = hyper::server::conn::http2::Builder::new(TracingExecutor);
let mut server = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
server.timer(hyper_util::rt::TokioTimer::new());
support::connect().endpoint_fn_boxed(
Target::addr(),
@ -618,7 +618,7 @@ async fn grpc_response_class() {
};
// Build a client using the connect that always errors.
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
client.timer(hyper_util::rt::TokioTimer::new());
let profiles = profile::resolver();
let profile_tx =
@ -818,7 +818,7 @@ fn hello_server(
#[tracing::instrument]
fn grpc_status_server(
server: hyper::server::conn::http2::Builder<TracingExecutor>,
server: hyper::server::conn::http2::Builder<TokioExecutor>,
status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {

View File

@ -1,6 +1,6 @@
use super::*;
use http::{Request, Response};
use linkerd_app_core::{proxy::http::TracingExecutor, svc::http::BoxBody};
use linkerd_app_core::{proxy::http::TokioExecutor, svc::http::BoxBody};
use parking_lot::Mutex;
use std::io;
use tokio::{net::TcpStream, task::JoinHandle};
@ -272,7 +272,7 @@ fn run(
let span = info_span!("test client", peer_addr = %addr, ?version, test = %test_name);
let work = async move {
let client = hyper_util::client::legacy::Client::builder(TracingExecutor)
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.http2_only(http2_only)
.build::<Conn, BoxBody>(conn);
tracing::trace!("client task started");

View File

@ -2,7 +2,7 @@ use super::*;
pub use linkerd2_proxy_api::destination as pb;
use linkerd2_proxy_api::net;
use linkerd_app_core::proxy::http::TracingExecutor;
use linkerd_app_core::proxy::http::TokioExecutor;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::net::IpAddr;
@ -372,7 +372,7 @@ where
let _ = listening_tx.send(());
}
let mut http = hyper::server::conn::http2::Builder::new(TracingExecutor);
let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
loop {
let (sock, addr) = listener.accept().await?;
let span = tracing::debug_span!("conn", %addr).or_current();

View File

@ -1,4 +1,4 @@
use super::app_core::svc::http::TracingExecutor;
use super::app_core::svc::http::TokioExecutor;
use super::*;
use http::{Request, Response};
use linkerd_app_core::svc::http::BoxBody;
@ -216,11 +216,13 @@ impl Server {
.serve_connection(sock, svc)
.await
.map_err(|e| tracing::error!("support/server error: {}", e)),
Run::Http2 => hyper::server::conn::http2::Builder::new(TracingExecutor)
.timer(hyper_util::rt::TokioTimer::new())
.serve_connection(sock, svc)
.await
.map_err(|e| tracing::error!("support/server error: {}", e)),
Run::Http2 => {
hyper::server::conn::http2::Builder::new(TokioExecutor::new())
.timer(hyper_util::rt::TokioTimer::new())
.serve_connection(sock, svc)
.await
.map_err(|e| tracing::error!("support/server error: {}", e))
}
};
tracing::trace!(?result, "serve done");
result

View File

@ -1,5 +1,5 @@
use crate::*;
use linkerd_app_core::svc::http::{BoxBody, TracingExecutor};
use linkerd_app_core::svc::http::{BoxBody, TokioExecutor};
use std::error::Error as _;
use tokio::time::timeout;
@ -1605,7 +1605,7 @@ async fn http2_request_without_authority() {
let io = tokio::net::TcpStream::connect(&addr)
.await
.expect("connect error");
let (mut client, conn) = hyper::client::conn::http2::Builder::new(TracingExecutor)
let (mut client, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.timer(hyper_util::rt::TokioTimer::new())
.handshake(hyper_util::rt::TokioIo::new(io))
.await

View File

@ -5,7 +5,7 @@ use linkerd_app_core::metrics::OutboundZoneLocality;
use linkerd_app_core::{
io,
proxy::api_resolve::ProtocolHint,
svc::{http::TracingExecutor, NewService, ServiceExt},
svc::{http::TokioExecutor, NewService, ServiceExt},
Infallible,
};
use linkerd_http_box::BoxBody;
@ -250,7 +250,7 @@ fn serve(version: ::http::Version) -> io::Result<io::BoxedIo> {
tokio::spawn(fut);
}
::http::Version::HTTP_2 => {
let mut http = hyper::server::conn::http2::Builder::new(TracingExecutor);
let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
let fut = http
.timer(hyper_util::rt::TokioTimer::new())
.serve_connection(hyper_util::rt::TokioIo::new(server_io), svc);

View File

@ -1,6 +1,6 @@
use crate::{
app_core::{
svc::{self, http::TracingExecutor},
svc::{self, http::TokioExecutor},
Error,
},
io, ContextError,
@ -71,7 +71,7 @@ where
///
/// [send]: hyper::client::conn::http2::SendRequest
pub async fn connect_and_accept_http2<B>(
client_settings: &mut hyper::client::conn::http2::Builder<TracingExecutor>,
client_settings: &mut hyper::client::conn::http2::Builder<TokioExecutor>,
server: BoxServer,
) -> (
hyper::client::conn::http2::SendRequest<B>,

View File

@ -1,15 +0,0 @@
[package]
name = "linkerd-http-executor"
version = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = { workspace = true }
description = """
HTTP runtime components for Linkerd.
"""
[dependencies]
hyper = { workspace = true }
tokio = { version = "1", features = ["rt"] }
tracing = "0.1"

View File

@ -1,20 +0,0 @@
//! HTTP runtime components for Linkerd.
use hyper::rt::Executor;
use std::future::Future;
use tracing::instrument::Instrument;
/// An [`Executor<F>`] that propagates [`tracing`] spans.
#[derive(Clone, Debug, Default)]
pub struct TracingExecutor;
impl<F> Executor<F> for TracingExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[inline]
fn execute(&self, f: F) {
tokio::spawn(f.in_current_span());
}
}

View File

@ -32,6 +32,8 @@ hyper-util = { workspace = true, default-features = false, features = [
"client-legacy",
"http1",
"service",
"tokio",
"tracing",
] }
parking_lot = "0.12"
pin-project = "1"
@ -47,7 +49,6 @@ linkerd-error = { path = "../../error" }
linkerd-http-box = { path = "../../http/box" }
linkerd-http-classify = { path = "../../http/classify" }
linkerd-http-detect = { path = "../../http/detect" }
linkerd-http-executor = { path = "../../http/executor" }
linkerd-http-h2 = { path = "../../http/h2" }
linkerd-http-insert = { path = "../../http/insert" }
linkerd-http-override-authority = { path = "../../http/override-authority" }

View File

@ -1,4 +1,4 @@
use crate::TracingExecutor;
use crate::TokioExecutor;
use futures::prelude::*;
use http::{
header::{CONTENT_LENGTH, TRANSFER_ENCODING},
@ -91,7 +91,7 @@ where
// ish, so we just build a one-off client for the connection.
// There's no real reason to hold the client for re-use.
debug!(use_absolute_form, is_missing_host, "Using one-off client");
hyper_util::client::legacy::Client::builder(TracingExecutor)
hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.pool_max_idle_per_host(0)
.set_host(use_absolute_form)
.build(HyperConnect::new(
@ -116,7 +116,7 @@ where
if client.is_none() {
debug!(use_absolute_form, "Caching new client");
*client = Some(
hyper_util::client::legacy::Client::builder(TracingExecutor)
hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.pool_max_idle_per_host(self.pool.max_idle)
.pool_idle_timeout(self.pool.idle_timeout)
.set_host(use_absolute_form)

View File

@ -1,4 +1,4 @@
use crate::{Body, TracingExecutor};
use crate::{Body, TokioExecutor};
use futures::prelude::*;
use linkerd_error::{Error, Result};
use linkerd_stack::{MakeConnection, Service};
@ -85,7 +85,7 @@ where
Box::pin(
async move {
let (io, _meta) = connect.err_into::<Error>().await?;
let mut builder = hyper::client::conn::http2::Builder::new(TracingExecutor);
let mut builder = hyper::client::conn::http2::Builder::new(TokioExecutor::new());
builder.timer(hyper_util::rt::TokioTimer::new());
match flow_control {
None => {}

View File

@ -34,12 +34,12 @@ pub use http::{
uri, Method, Request, Response, StatusCode,
};
pub use http_body::Body;
pub use hyper_util::rt::tokio::TokioExecutor;
pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseResponse};
pub use linkerd_http_classify as classify;
pub use linkerd_http_detect::{
DetectMetrics, DetectMetricsFamilies, DetectParams, Detection, NewDetect,
};
pub use linkerd_http_executor::TracingExecutor;
pub use linkerd_http_insert as insert;
pub use linkerd_http_override_authority::{AuthorityOverride, NewOverrideAuthority};
pub use linkerd_http_retain::{self as retain, Retain};

View File

@ -1,4 +1,5 @@
use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, TracingExecutor, Variant};
use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, Variant};
use hyper_util::rt::tokio::TokioExecutor;
use linkerd_error::Error;
use linkerd_http_box::BoxRequest;
use linkerd_io::{self as io, PeerAddr};
@ -34,7 +35,7 @@ pub struct NewServeHttp<X, N> {
pub struct ServeHttp<N> {
version: Variant,
http1: hyper::server::conn::http1::Builder,
http2: hyper::server::conn::http2::Builder<TracingExecutor>,
http2: hyper::server::conn::http2::Builder<TokioExecutor>,
inner: N,
drain: drain::Watch,
}
@ -75,7 +76,7 @@ where
max_pending_accept_reset_streams,
} = h2;
let mut http2 = hyper::server::conn::http2::Builder::new(TracingExecutor);
let mut http2 = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
http2.timer(hyper_util::rt::TokioTimer::new());
match flow_control {
None => {}

View File

@ -1,11 +1,10 @@
use std::vec;
use super::*;
use bytes::Bytes;
use futures::FutureExt;
use http_body_util::BodyExt;
use linkerd_io as io;
use linkerd_stack::CloneParam;
use std::vec;
use tokio::time;
use tower::ServiceExt;
use tower_test::mock;
@ -28,7 +27,7 @@ async fn h2_connection_window_exhaustion() {
h2::ServerParams::default(),
// An HTTP/2 client with constrained connection and stream windows to
// force window exhaustion.
hyper::client::conn::http2::Builder::new(TracingExecutor)
hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.initial_connection_window_size(CLIENT_CONN_WINDOW)
.initial_stream_window_size(CLIENT_STREAM_WINDOW)
.timer(hyper_util::rt::TokioTimer::new()),
@ -102,7 +101,7 @@ async fn h2_stream_window_exhaustion() {
// A basic HTTP/2 server configuration with no overrides.
h2::ServerParams::default(),
// An HTTP/2 client with stream windows to force window exhaustion.
hyper::client::conn::http2::Builder::new(TracingExecutor)
hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.initial_stream_window_size(CLIENT_STREAM_WINDOW)
.timer(hyper_util::rt::TokioTimer::new()),
)
@ -206,7 +205,7 @@ impl TestServer {
#[tracing::instrument(skip_all)]
async fn connect_h2(
h2: h2::ServerParams,
client: &mut hyper::client::conn::http2::Builder<TracingExecutor>,
client: &mut hyper::client::conn::http2::Builder<TokioExecutor>,
) -> Self {
let params = Params {
drain: drain(),

View File

@ -11,7 +11,7 @@ bytes = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2"] }
hyper-util = { workspace = true, features = ["service"] }
hyper-util = { workspace = true, features = ["service", "tokio", "tracing"] }
futures = { version = "0.3", default-features = false }
ipnet = "2.11"
linkerd2-proxy-api = { workspace = true, features = ["tap"] }

View File

@ -1,11 +1,11 @@
use crate::grpc::Server;
use futures::future;
use hyper_util::rt::tokio::TokioExecutor;
use linkerd2_proxy_api::tap::tap_server::{Tap, TapServer};
use linkerd_conditional::Conditional;
use linkerd_error::Error;
use linkerd_io as io;
use linkerd_meshtls as meshtls;
use linkerd_proxy_http::TracingExecutor;
use linkerd_tls as tls;
use std::{
collections::HashSet,
@ -46,7 +46,7 @@ impl AcceptPermittedClients {
use hyper_util::{rt::TokioIo, service::TowerToHyperService};
let svc = TapServer::new(tap);
Box::pin(async move {
hyper::server::conn::http2::Builder::new(TracingExecutor)
hyper::server::conn::http2::Builder::new(TokioExecutor::new())
.timer(hyper_util::rt::TokioTimer::new())
.serve_connection(TokioIo::new(io), TowerToHyperService::new(svc))
.await