diff --git a/Cargo.lock b/Cargo.lock index 58ac880f7..91c7aaa89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,15 +1879,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "linkerd-http-executor" -version = "0.1.0" -dependencies = [ - "hyper", - "tokio", - "tracing", -] - [[package]] name = "linkerd-http-h2" version = "0.1.0" @@ -2424,7 +2415,6 @@ dependencies = [ "linkerd-http-box", "linkerd-http-classify", "linkerd-http-detect", - "linkerd-http-executor", "linkerd-http-h2", "linkerd-http-insert", "linkerd-http-override-authority", diff --git a/Cargo.toml b/Cargo.toml index 145e65ec7..c0c45d3c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ members = [ "linkerd/http/box", "linkerd/http/classify", "linkerd/http/detect", - "linkerd/http/executor", "linkerd/http/h2", "linkerd/http/insert", "linkerd/http/metrics", @@ -111,7 +110,6 @@ h2 = { version = "0.4" } http = { version = "1" } http-body = { version = "1" } hyper = { version = "1", default-features = false } -hyper-util = { version = "0.1", default-features = false } prometheus-client = { version = "0.23" } prost = { version = "0.13" } prost-build = { version = "0.13", default-features = false } @@ -131,5 +129,10 @@ version = "0.1.3" default-features = false features = ["channel"] +[workspace.dependencies.hyper-util] +version = "0.1" +default-features = false +features = ["tokio", "tracing"] + [workspace.dependencies.linkerd2-proxy-api] version = "0.16.0" diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 6362d571e..aeda68b4a 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -12,7 +12,7 @@ use linkerd_app_core::{ errors::header::L5D_PROXY_ERROR, identity, io, metrics, proxy::http::{self, BoxBody}, - svc::{self, http::TracingExecutor, NewService, Param}, + svc::{self, http::TokioExecutor, NewService, Param}, tls, transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr}, Error, NameAddr, ProxyRuntime, @@ -95,7 +95,7 @@ async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 let mut server = hyper::server::conn::http1::Builder::new(); server.timer(hyper_util::rt::TokioTimer::new()); - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let _trace = trace_init(); @@ -168,7 +168,7 @@ async fn downgrade_origin_form() { #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let mut server = hyper::server::conn::http1::Builder::new(); server.timer(hyper_util::rt::TokioTimer::new()); @@ -441,7 +441,7 @@ async fn h2_response_meshed_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let profiles = profile::resolver(); let profile_tx = @@ -482,7 +482,7 @@ async fn h2_response_unmeshed_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let profiles = profile::resolver(); let profile_tx = @@ -525,7 +525,7 @@ async fn grpc_meshed_response_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let profiles = profile::resolver(); let profile_tx = @@ -567,7 +567,7 @@ async fn grpc_unmeshed_response_error_header() { let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error()); // Build a client using the connect that always errors. - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let profiles = profile::resolver(); let profile_tx = @@ -609,7 +609,7 @@ async fn grpc_response_class() { // Build a mock connector serves a gRPC server that returns errors. let connect = { - let mut server = hyper::server::conn::http2::Builder::new(TracingExecutor); + let mut server = hyper::server::conn::http2::Builder::new(TokioExecutor::new()); server.timer(hyper_util::rt::TokioTimer::new()); support::connect().endpoint_fn_boxed( Target::addr(), @@ -618,7 +618,7 @@ async fn grpc_response_class() { }; // Build a client using the connect that always errors. - let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut client = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); client.timer(hyper_util::rt::TokioTimer::new()); let profiles = profile::resolver(); let profile_tx = @@ -818,7 +818,7 @@ fn hello_server( #[tracing::instrument] fn grpc_status_server( - server: hyper::server::conn::http2::Builder, + server: hyper::server::conn::http2::Builder, status: tonic::Code, ) -> impl Fn(Remote) -> io::Result { move |endpoint| { diff --git a/linkerd/app/integration/src/client.rs b/linkerd/app/integration/src/client.rs index a70dd5ca7..11d39ad2b 100644 --- a/linkerd/app/integration/src/client.rs +++ b/linkerd/app/integration/src/client.rs @@ -1,6 +1,6 @@ use super::*; use http::{Request, Response}; -use linkerd_app_core::{proxy::http::TracingExecutor, svc::http::BoxBody}; +use linkerd_app_core::{proxy::http::TokioExecutor, svc::http::BoxBody}; use parking_lot::Mutex; use std::io; use tokio::{net::TcpStream, task::JoinHandle}; @@ -272,7 +272,7 @@ fn run( let span = info_span!("test client", peer_addr = %addr, ?version, test = %test_name); let work = async move { - let client = hyper_util::client::legacy::Client::builder(TracingExecutor) + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) .http2_only(http2_only) .build::(conn); tracing::trace!("client task started"); diff --git a/linkerd/app/integration/src/controller.rs b/linkerd/app/integration/src/controller.rs index 1296964dd..82fa6a8ad 100644 --- a/linkerd/app/integration/src/controller.rs +++ b/linkerd/app/integration/src/controller.rs @@ -2,7 +2,7 @@ use super::*; pub use linkerd2_proxy_api::destination as pb; use linkerd2_proxy_api::net; -use linkerd_app_core::proxy::http::TracingExecutor; +use linkerd_app_core::proxy::http::TokioExecutor; use parking_lot::Mutex; use std::collections::VecDeque; use std::net::IpAddr; @@ -372,7 +372,7 @@ where let _ = listening_tx.send(()); } - let mut http = hyper::server::conn::http2::Builder::new(TracingExecutor); + let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new()); loop { let (sock, addr) = listener.accept().await?; let span = tracing::debug_span!("conn", %addr).or_current(); diff --git a/linkerd/app/integration/src/server.rs b/linkerd/app/integration/src/server.rs index bf05f89f0..0099e84ef 100644 --- a/linkerd/app/integration/src/server.rs +++ b/linkerd/app/integration/src/server.rs @@ -1,4 +1,4 @@ -use super::app_core::svc::http::TracingExecutor; +use super::app_core::svc::http::TokioExecutor; use super::*; use http::{Request, Response}; use linkerd_app_core::svc::http::BoxBody; @@ -216,11 +216,13 @@ impl Server { .serve_connection(sock, svc) .await .map_err(|e| tracing::error!("support/server error: {}", e)), - Run::Http2 => hyper::server::conn::http2::Builder::new(TracingExecutor) - .timer(hyper_util::rt::TokioTimer::new()) - .serve_connection(sock, svc) - .await - .map_err(|e| tracing::error!("support/server error: {}", e)), + Run::Http2 => { + hyper::server::conn::http2::Builder::new(TokioExecutor::new()) + .timer(hyper_util::rt::TokioTimer::new()) + .serve_connection(sock, svc) + .await + .map_err(|e| tracing::error!("support/server error: {}", e)) + } }; tracing::trace!(?result, "serve done"); result diff --git a/linkerd/app/integration/src/tests/transparency.rs b/linkerd/app/integration/src/tests/transparency.rs index d64d7f327..45845b29c 100644 --- a/linkerd/app/integration/src/tests/transparency.rs +++ b/linkerd/app/integration/src/tests/transparency.rs @@ -1,5 +1,5 @@ use crate::*; -use linkerd_app_core::svc::http::{BoxBody, TracingExecutor}; +use linkerd_app_core::svc::http::{BoxBody, TokioExecutor}; use std::error::Error as _; use tokio::time::timeout; @@ -1605,7 +1605,7 @@ async fn http2_request_without_authority() { let io = tokio::net::TcpStream::connect(&addr) .await .expect("connect error"); - let (mut client, conn) = hyper::client::conn::http2::Builder::new(TracingExecutor) + let (mut client, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor::new()) .timer(hyper_util::rt::TokioTimer::new()) .handshake(hyper_util::rt::TokioIo::new(io)) .await diff --git a/linkerd/app/outbound/src/http/endpoint/tests.rs b/linkerd/app/outbound/src/http/endpoint/tests.rs index 272685081..a65db5002 100644 --- a/linkerd/app/outbound/src/http/endpoint/tests.rs +++ b/linkerd/app/outbound/src/http/endpoint/tests.rs @@ -5,7 +5,7 @@ use linkerd_app_core::metrics::OutboundZoneLocality; use linkerd_app_core::{ io, proxy::api_resolve::ProtocolHint, - svc::{http::TracingExecutor, NewService, ServiceExt}, + svc::{http::TokioExecutor, NewService, ServiceExt}, Infallible, }; use linkerd_http_box::BoxBody; @@ -250,7 +250,7 @@ fn serve(version: ::http::Version) -> io::Result { tokio::spawn(fut); } ::http::Version::HTTP_2 => { - let mut http = hyper::server::conn::http2::Builder::new(TracingExecutor); + let mut http = hyper::server::conn::http2::Builder::new(TokioExecutor::new()); let fut = http .timer(hyper_util::rt::TokioTimer::new()) .serve_connection(hyper_util::rt::TokioIo::new(server_io), svc); diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index a5213b3f3..b728644b6 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -1,6 +1,6 @@ use crate::{ app_core::{ - svc::{self, http::TracingExecutor}, + svc::{self, http::TokioExecutor}, Error, }, io, ContextError, @@ -71,7 +71,7 @@ where /// /// [send]: hyper::client::conn::http2::SendRequest pub async fn connect_and_accept_http2( - client_settings: &mut hyper::client::conn::http2::Builder, + client_settings: &mut hyper::client::conn::http2::Builder, server: BoxServer, ) -> ( hyper::client::conn::http2::SendRequest, diff --git a/linkerd/http/executor/Cargo.toml b/linkerd/http/executor/Cargo.toml deleted file mode 100644 index f985c9c55..000000000 --- a/linkerd/http/executor/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "linkerd-http-executor" -version = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -edition = { workspace = true } -publish = { workspace = true } -description = """ -HTTP runtime components for Linkerd. -""" - -[dependencies] -hyper = { workspace = true } -tokio = { version = "1", features = ["rt"] } -tracing = "0.1" diff --git a/linkerd/http/executor/src/lib.rs b/linkerd/http/executor/src/lib.rs deleted file mode 100644 index efb502a43..000000000 --- a/linkerd/http/executor/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! HTTP runtime components for Linkerd. - -use hyper::rt::Executor; -use std::future::Future; -use tracing::instrument::Instrument; - -/// An [`Executor`] that propagates [`tracing`] spans. -#[derive(Clone, Debug, Default)] -pub struct TracingExecutor; - -impl Executor for TracingExecutor -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - #[inline] - fn execute(&self, f: F) { - tokio::spawn(f.in_current_span()); - } -} diff --git a/linkerd/proxy/http/Cargo.toml b/linkerd/proxy/http/Cargo.toml index 8d29a0a41..cb04df260 100644 --- a/linkerd/proxy/http/Cargo.toml +++ b/linkerd/proxy/http/Cargo.toml @@ -32,6 +32,8 @@ hyper-util = { workspace = true, default-features = false, features = [ "client-legacy", "http1", "service", + "tokio", + "tracing", ] } parking_lot = "0.12" pin-project = "1" @@ -47,7 +49,6 @@ linkerd-error = { path = "../../error" } linkerd-http-box = { path = "../../http/box" } linkerd-http-classify = { path = "../../http/classify" } linkerd-http-detect = { path = "../../http/detect" } -linkerd-http-executor = { path = "../../http/executor" } linkerd-http-h2 = { path = "../../http/h2" } linkerd-http-insert = { path = "../../http/insert" } linkerd-http-override-authority = { path = "../../http/override-authority" } diff --git a/linkerd/proxy/http/src/h1.rs b/linkerd/proxy/http/src/h1.rs index b874caa51..9f51ac9f1 100644 --- a/linkerd/proxy/http/src/h1.rs +++ b/linkerd/proxy/http/src/h1.rs @@ -1,4 +1,4 @@ -use crate::TracingExecutor; +use crate::TokioExecutor; use futures::prelude::*; use http::{ header::{CONTENT_LENGTH, TRANSFER_ENCODING}, @@ -91,7 +91,7 @@ where // ish, so we just build a one-off client for the connection. // There's no real reason to hold the client for re-use. debug!(use_absolute_form, is_missing_host, "Using one-off client"); - hyper_util::client::legacy::Client::builder(TracingExecutor) + hyper_util::client::legacy::Client::builder(TokioExecutor::new()) .pool_max_idle_per_host(0) .set_host(use_absolute_form) .build(HyperConnect::new( @@ -116,7 +116,7 @@ where if client.is_none() { debug!(use_absolute_form, "Caching new client"); *client = Some( - hyper_util::client::legacy::Client::builder(TracingExecutor) + hyper_util::client::legacy::Client::builder(TokioExecutor::new()) .pool_max_idle_per_host(self.pool.max_idle) .pool_idle_timeout(self.pool.idle_timeout) .set_host(use_absolute_form) diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index 5b7b3f380..62c0ed58d 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -1,4 +1,4 @@ -use crate::{Body, TracingExecutor}; +use crate::{Body, TokioExecutor}; use futures::prelude::*; use linkerd_error::{Error, Result}; use linkerd_stack::{MakeConnection, Service}; @@ -85,7 +85,7 @@ where Box::pin( async move { let (io, _meta) = connect.err_into::().await?; - let mut builder = hyper::client::conn::http2::Builder::new(TracingExecutor); + let mut builder = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); builder.timer(hyper_util::rt::TokioTimer::new()); match flow_control { None => {} diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index b4dc00db1..fc24fe5ef 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -34,12 +34,12 @@ pub use http::{ uri, Method, Request, Response, StatusCode, }; pub use http_body::Body; +pub use hyper_util::rt::tokio::TokioExecutor; pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseResponse}; pub use linkerd_http_classify as classify; pub use linkerd_http_detect::{ DetectMetrics, DetectMetricsFamilies, DetectParams, Detection, NewDetect, }; -pub use linkerd_http_executor::TracingExecutor; pub use linkerd_http_insert as insert; pub use linkerd_http_override_authority::{AuthorityOverride, NewOverrideAuthority}; pub use linkerd_http_retain::{self as retain, Retain}; diff --git a/linkerd/proxy/http/src/server.rs b/linkerd/proxy/http/src/server.rs index f0bfb5e1c..32c1e98c3 100644 --- a/linkerd/proxy/http/src/server.rs +++ b/linkerd/proxy/http/src/server.rs @@ -1,4 +1,5 @@ -use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, TracingExecutor, Variant}; +use crate::{client_handle::SetClientHandle, h2, BoxBody, ClientHandle, Variant}; +use hyper_util::rt::tokio::TokioExecutor; use linkerd_error::Error; use linkerd_http_box::BoxRequest; use linkerd_io::{self as io, PeerAddr}; @@ -34,7 +35,7 @@ pub struct NewServeHttp { pub struct ServeHttp { version: Variant, http1: hyper::server::conn::http1::Builder, - http2: hyper::server::conn::http2::Builder, + http2: hyper::server::conn::http2::Builder, inner: N, drain: drain::Watch, } @@ -75,7 +76,7 @@ where max_pending_accept_reset_streams, } = h2; - let mut http2 = hyper::server::conn::http2::Builder::new(TracingExecutor); + let mut http2 = hyper::server::conn::http2::Builder::new(TokioExecutor::new()); http2.timer(hyper_util::rt::TokioTimer::new()); match flow_control { None => {} diff --git a/linkerd/proxy/http/src/server/tests.rs b/linkerd/proxy/http/src/server/tests.rs index 2da4872d6..a7b806144 100644 --- a/linkerd/proxy/http/src/server/tests.rs +++ b/linkerd/proxy/http/src/server/tests.rs @@ -1,11 +1,10 @@ -use std::vec; - use super::*; use bytes::Bytes; use futures::FutureExt; use http_body_util::BodyExt; use linkerd_io as io; use linkerd_stack::CloneParam; +use std::vec; use tokio::time; use tower::ServiceExt; use tower_test::mock; @@ -28,7 +27,7 @@ async fn h2_connection_window_exhaustion() { h2::ServerParams::default(), // An HTTP/2 client with constrained connection and stream windows to // force window exhaustion. - hyper::client::conn::http2::Builder::new(TracingExecutor) + hyper::client::conn::http2::Builder::new(TokioExecutor::new()) .initial_connection_window_size(CLIENT_CONN_WINDOW) .initial_stream_window_size(CLIENT_STREAM_WINDOW) .timer(hyper_util::rt::TokioTimer::new()), @@ -102,7 +101,7 @@ async fn h2_stream_window_exhaustion() { // A basic HTTP/2 server configuration with no overrides. h2::ServerParams::default(), // An HTTP/2 client with stream windows to force window exhaustion. - hyper::client::conn::http2::Builder::new(TracingExecutor) + hyper::client::conn::http2::Builder::new(TokioExecutor::new()) .initial_stream_window_size(CLIENT_STREAM_WINDOW) .timer(hyper_util::rt::TokioTimer::new()), ) @@ -206,7 +205,7 @@ impl TestServer { #[tracing::instrument(skip_all)] async fn connect_h2( h2: h2::ServerParams, - client: &mut hyper::client::conn::http2::Builder, + client: &mut hyper::client::conn::http2::Builder, ) -> Self { let params = Params { drain: drain(), diff --git a/linkerd/proxy/tap/Cargo.toml b/linkerd/proxy/tap/Cargo.toml index 2e12230c4..6c45368b8 100644 --- a/linkerd/proxy/tap/Cargo.toml +++ b/linkerd/proxy/tap/Cargo.toml @@ -11,7 +11,7 @@ bytes = { workspace = true } http = { workspace = true } http-body = { workspace = true } hyper = { workspace = true, features = ["http1", "http2"] } -hyper-util = { workspace = true, features = ["service"] } +hyper-util = { workspace = true, features = ["service", "tokio", "tracing"] } futures = { version = "0.3", default-features = false } ipnet = "2.11" linkerd2-proxy-api = { workspace = true, features = ["tap"] } diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 16a919c2d..38e61b4d1 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -1,11 +1,11 @@ use crate::grpc::Server; use futures::future; +use hyper_util::rt::tokio::TokioExecutor; use linkerd2_proxy_api::tap::tap_server::{Tap, TapServer}; use linkerd_conditional::Conditional; use linkerd_error::Error; use linkerd_io as io; use linkerd_meshtls as meshtls; -use linkerd_proxy_http::TracingExecutor; use linkerd_tls as tls; use std::{ collections::HashSet, @@ -46,7 +46,7 @@ impl AcceptPermittedClients { use hyper_util::{rt::TokioIo, service::TowerToHyperService}; let svc = TapServer::new(tap); Box::pin(async move { - hyper::server::conn::http2::Builder::new(TracingExecutor) + hyper::server::conn::http2::Builder::new(TokioExecutor::new()) .timer(hyper_util::rt::TokioTimer::new()) .serve_connection(TokioIo::new(io), TowerToHyperService::new(svc)) .await