refactor(http): consolidate HTTP protocol detection (#3720)

Linkerd's HTTP protocol detection logic is spread across a few crates: the
linkerd-detect crate is generic over the actual protocol detection logic, and
the linkerd-proxy-http crate provides an implementation. There are no other
implemetations of the Detect interface. This leads to gnarly type signatures in
the form `Result<Option<http::Variant>, DetectTimeoutError>`: simultaneously
verbose and not particularly informative (what does the None case mean exactly).

This commit introduces a new crate, `linkerd-http-detect`, consolidating this
logic and removes the prior implementations. The admin, inbound, and outbound
stacks are updated to use these new types. This work is done in anticipation of
introducing metrics that report HTTP detection behavior.

There are no functional changes.
This commit is contained in:
Oliver Gould 2025-03-09 08:27:25 -07:00 committed by GitHub
parent 114ee8d878
commit c87d202098
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 445 additions and 480 deletions

View File

@ -1350,7 +1350,6 @@ dependencies = [
"ipnet",
"linkerd-addr",
"linkerd-conditional",
"linkerd-detect",
"linkerd-dns",
"linkerd-duplex",
"linkerd-errno",
@ -1578,21 +1577,6 @@ dependencies = [
name = "linkerd-conditional"
version = "0.1.0"
[[package]]
name = "linkerd-detect"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"linkerd-error",
"linkerd-io",
"linkerd-stack",
"thiserror 2.0.11",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "linkerd-distribute"
version = "0.1.0"
@ -1730,6 +1714,23 @@ dependencies = [
"tracing",
]
[[package]]
name = "linkerd-http-detect"
version = "0.1.0"
dependencies = [
"bytes",
"httparse",
"linkerd-error",
"linkerd-http-variant",
"linkerd-io",
"linkerd-stack",
"linkerd-tracing",
"thiserror 2.0.11",
"tokio",
"tokio-test",
"tracing",
]
[[package]]
name = "linkerd-http-executor"
version = "0.1.0"
@ -2265,11 +2266,11 @@ dependencies = [
"httparse",
"hyper",
"hyper-balance",
"linkerd-detect",
"linkerd-duplex",
"linkerd-error",
"linkerd-http-box",
"linkerd-http-classify",
"linkerd-http-detect",
"linkerd-http-executor",
"linkerd-http-h2",
"linkerd-http-insert",

View File

@ -16,7 +16,6 @@ members = [
"linkerd/app",
"linkerd/conditional",
"linkerd/distribute",
"linkerd/detect",
"linkerd/dns/name",
"linkerd/dns",
"linkerd/duplex",
@ -28,6 +27,7 @@ members = [
"linkerd/http/body-compat",
"linkerd/http/box",
"linkerd/http/classify",
"linkerd/http/detect",
"linkerd/http/executor",
"linkerd/http/h2",
"linkerd/http/insert",

View File

@ -1,7 +1,7 @@
use linkerd_app_core::{
classify,
config::ServerConfig,
detect, drain, errors, identity,
drain, errors, identity,
metrics::{self, FmtMetrics},
proxy::http,
serve,
@ -136,11 +136,11 @@ impl Config {
}))
.push_filter(
|(http, tcp): (
Result<Option<http::Variant>, detect::DetectTimeoutError<_>>,
http::Detection,
Tcp,
)| {
match http {
Ok(Some(version)) => Ok(Http { version, tcp }),
http::Detection::Http(version) => Ok(Http { version, tcp }),
// If detection timed out, we can make an educated guess at the proper
// behavior:
// - If the connection was meshed, it was most likely transported over
@ -148,7 +148,7 @@ impl Config {
// - If the connection was unmeshed, it was mostly likely HTTP/1.
// - If we received some unexpected SNI, the client is mostly likely
// confused/stale.
Err(_timeout) => {
http::Detection::ReadTimeout(_timeout) => {
let version = match tcp.tls {
tls::ConditionalServerTls::None(_) => http::Variant::Http1,
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
@ -166,7 +166,7 @@ impl Config {
}
// If the connection failed HTTP detection, check if we detected TLS for
// another target. This might indicate that the client is confused/stale.
Ok(None) => match tcp.tls {
http::Detection::Empty | http::Detection::NotHttp => match tcp.tls {
tls::ConditionalServerTls::Some(tls::ServerTls::Passthru { sni }) => {
Err(UnexpectedSni(sni, tcp.client).into())
}
@ -177,8 +177,8 @@ impl Config {
)
.arc_new_tcp()
.lift_new_with_target()
.push(detect::NewDetectService::layer(svc::stack::CloneParam::from(
detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT),
.push(http::NewDetect::layer(svc::stack::CloneParam::from(
http::DetectParams { read_timeout: DETECT_TIMEOUT }
)))
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {

View File

@ -34,7 +34,6 @@ pin-project = "1"
linkerd-addr = { path = "../../addr" }
linkerd-conditional = { path = "../../conditional" }
linkerd-dns = { path = "../../dns" }
linkerd-detect = { path = "../../detect" }
linkerd-duplex = { path = "../../duplex" }
linkerd-errno = { path = "../../errno" }
linkerd-error = { path = "../../error" }

View File

@ -1,7 +1,7 @@
pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{self, h1, h2},
svc::{queue, CloneParam, ExtractParam, Param},
proxy::http::{h1, h2},
svc::{queue, ExtractParam, Param},
transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout},
};
use std::time::Duration;
@ -59,14 +59,6 @@ impl<T> ExtractParam<queue::Timeout, T> for QueueConfig {
}
}
// === impl ProxyConfig ===
impl ProxyConfig {
pub fn detect_http(&self) -> CloneParam<linkerd_detect::Config<http::DetectHttp>> {
linkerd_detect::Config::from_timeout(self.detect_protocol_timeout).into()
}
}
// === impl ServerConfig ===
impl Param<DualListenAddr> for ServerConfig {

View File

@ -32,7 +32,6 @@ pub use drain;
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
pub use linkerd_addr::{self as addr, Addr, AddrMatch, IpMatch, NameAddr, NameMatch};
pub use linkerd_conditional::Conditional;
pub use linkerd_detect as detect;
pub use linkerd_dns;
pub use linkerd_error::{cause_ref, is_caused_by, Error, Infallible, Recover, Result};
pub use linkerd_exp_backoff as exp_backoff;

View File

@ -3,7 +3,7 @@ use crate::{
Inbound,
};
use linkerd_app_core::{
detect, identity, io,
identity, io,
metrics::ServerLabel,
proxy::http,
svc, tls,
@ -48,9 +48,6 @@ struct Detect {
tls: Tls,
}
#[derive(Copy, Clone, Debug)]
struct ConfigureHttpDetect;
#[derive(Clone)]
struct TlsParams {
timeout: tls::server::Timeout,
@ -111,42 +108,58 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
.push_switch(
|(detected, Detect { tls, .. })| -> Result<_, Infallible> {
match detected {
Ok(Some(http)) => Ok(svc::Either::A(Http { http, tls })),
Ok(None) => Ok(svc::Either::B(tls)),
http::Detection::Http(http) => Ok(svc::Either::A(Http { http, tls })),
http::Detection::Empty | http::Detection::NotHttp => {
Ok(svc::Either::B(tls))
}
// When HTTP detection fails, forward the connection to the application as
// an opaque TCP stream.
Err(timeout) => match tls.policy.protocol() {
Protocol::Http1 { .. } => {
// If the protocol was hinted to be HTTP/1.1 but detection
// failed, we'll usually be handling HTTP/1, but we may actually
// be handling HTTP/2 via protocol upgrade. Our options are:
// handle the connection as HTTP/1, assuming it will be rare for
// a proxy to initiate TLS, etc and not send the 16B of
// connection header; or we can handle it as opaque--but there's
// no chance the server will be able to handle the H2 protocol
// upgrade. So, it seems best to assume it's HTTP/1 and let the
// proxy handle the protocol error if we're in an edge case.
info!(%timeout, "Handling connection as HTTP/1 due to policy");
Ok(svc::Either::A(Http {
http: http::Variant::Http1,
tls,
}))
http::Detection::ReadTimeout(timeout) => {
match tls.policy.protocol() {
Protocol::Http1 { .. } => {
// If the protocol was hinted to be HTTP/1.1 but detection
// failed, we'll usually be handling HTTP/1, but we may actually
// be handling HTTP/2 via protocol upgrade. Our options are:
// handle the connection as HTTP/1, assuming it will be rare for
// a proxy to initiate TLS, etc and not send the 16B of
// connection header; or we can handle it as opaque--but there's
// no chance the server will be able to handle the H2 protocol
// upgrade. So, it seems best to assume it's HTTP/1 and let the
// proxy handle the protocol error if we're in an edge case.
info!(
?timeout,
"Handling connection as HTTP/1 due to policy"
);
Ok(svc::Either::A(Http {
http: http::Variant::Http1,
tls,
}))
}
// Otherwise, the protocol hint must have
// been `Detect` or the protocol was updated
// after detection was initiated, otherwise
// we would have avoided detection below.
// Continue handling the connection as if it
// were opaque.
_ => {
info!(
?timeout,
"Handling connection as opaque due to policy"
);
Ok(svc::Either::B(tls))
}
}
// Otherwise, the protocol hint must have been `Detect` or the
// protocol was updated after detection was initiated, otherwise we
// would have avoided detection below. Continue handling the
// connection as if it were opaque.
_ => {
info!(%timeout, "Handling connection as opaque");
Ok(svc::Either::B(tls))
}
},
}
}
},
forward.into_inner(),
)
.lift_new_with_target()
.push(detect::NewDetectService::layer(ConfigureHttpDetect))
.push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| {
http::DetectParams {
read_timeout: *timeout,
}
}))
.arc_new_tcp();
http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
@ -332,14 +345,6 @@ impl svc::Param<tls::ConditionalServerTls> for Tls {
}
}
// === impl ConfigureHttpDetect ===
impl svc::ExtractParam<detect::Config<http::DetectHttp>, Detect> for ConfigureHttpDetect {
fn extract_param(&self, detect: &Detect) -> detect::Config<http::DetectHttp> {
detect::Config::from_timeout(detect.timeout)
}
}
// === impl Http ===
impl svc::Param<http::Variant> for Http {

View File

@ -1,6 +1,6 @@
use crate::{http, opaq, policy, Discovery, Outbound, ParentRef};
use linkerd_app_core::{
detect, errors, io, profiles,
errors, io, profiles,
proxy::{
api_resolve::{ConcreteAddr, Metadata},
core::Resolve,
@ -274,8 +274,6 @@ impl<N> Outbound<N> {
NSvc::Future: Send,
{
self.map_stack(|config, rt, inner| {
let detect_http = config.proxy.detect_http();
// Route requests with destinations that can be discovered via the
// `l5d-dst-override` header through the (load balanced) logical
// stack. Route requests without the header through the endpoint
@ -292,10 +290,12 @@ impl<N> Outbound<N> {
.push_on_service(
svc::layers()
.push(http::BoxRequest::layer())
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER)),
)
.lift_new()
.push(svc::NewOneshotRoute::layer_via(|t: &Http<T>| SelectTarget(t.clone())))
.push(svc::NewOneshotRoute::layer_via(|t: &Http<T>| {
SelectTarget(t.clone())
}))
.check_new_service::<Http<T>, http::Request<_>>();
// HTTP detection is **always** performed. If detection fails, then we
@ -307,26 +307,33 @@ impl<N> Outbound<N> {
let h2 = config.proxy.server.http2.clone();
let drain = rt.drain.clone();
move |http: &Http<T>| http::ServerParams {
version: http.version,
http2: h2.clone(),
drain: drain.clone()
version: http.version,
http2: h2.clone(),
drain: drain.clone(),
}
}))
.check_new_service::<Http<T>, I>()
.push_switch(
|(detected, target): (detect::Result<http::Variant>, T)| -> Result<_, Infallible> {
if let Some(version) = detect::allow_timeout(detected) {
return Ok(svc::Either::A(Http {
version,
parent: target,
}));
|(detected, parent): (http::Detection, T)| -> Result<_, Infallible> {
match detected {
http::Detection::Http(version) => {
return Ok(svc::Either::A(Http { version, parent }));
}
http::Detection::ReadTimeout(timeout) => {
tracing::info!("Continuing after timeout: {timeout:?}");
}
_ => {}
}
Ok(svc::Either::B(target))
Ok(svc::Either::B(parent))
},
fallback,
)
.lift_new_with_target()
.push(detect::NewDetectService::layer(detect_http))
.push(http::NewDetect::layer(svc::CloneParam::from(
http::DetectParams {
read_timeout: config.proxy.detect_protocol_timeout,
},
)))
.arc_new_tcp()
})
}

View File

@ -1,5 +1,5 @@
use crate::{http, Outbound};
use linkerd_app_core::{detect, io, svc, Error, Infallible};
use linkerd_app_core::{io, svc, Error, Infallible};
use std::{fmt::Debug, hash::Hash};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -64,11 +64,17 @@ impl<N> Outbound<N> {
let detect = http.clone().map_stack(|config, _, http| {
http.push_switch(
|(result, parent): (detect::Result<http::Variant>, T)| -> Result<_, Infallible> {
Ok(match detect::allow_timeout(result) {
Some(version) => svc::Either::A(Http { version, parent }),
None => svc::Either::B(parent),
})
|(detected, parent): (http::Detection, T)| -> Result<_, Infallible> {
match detected {
http::Detection::Http(version) => {
return Ok(svc::Either::A(Http { version, parent }));
}
http::Detection::ReadTimeout(timeout) => {
tracing::info!("Continuing after timeout: {timeout:?}");
}
_ => {}
}
Ok(svc::Either::B(parent))
},
opaq.clone().into_inner(),
)
@ -77,8 +83,12 @@ impl<N> Outbound<N> {
// unexpected reason) the inner service is not ready.
.push_on_service(svc::LoadShed::layer())
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.lift_new_with_target::<(detect::Result<http::Variant>, T)>()
.push(detect::NewDetectService::layer(config.proxy.detect_http()))
.lift_new_with_target::<(http::Detection, T)>()
.push(http::NewDetect::layer(svc::CloneParam::from(
http::DetectParams {
read_timeout: config.proxy.detect_protocol_timeout,
},
)))
.arc_new_tcp()
});

View File

@ -1,18 +0,0 @@
[package]
name = "linkerd-detect"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
license = "Apache-2.0"
edition = "2021"
publish = false
[dependencies]
async-trait = "0.1"
bytes = { workspace = true }
linkerd-error = { path = "../error" }
linkerd-io = { path = "../io" }
linkerd-stack = { path = "../stack" }
thiserror = "2"
tokio = { version = "1", features = ["time"] }
tower = { workspace = true }
tracing = "0.1"

View File

@ -1,187 +0,0 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]
use bytes::BytesMut;
use linkerd_error::Error;
use linkerd_io as io;
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
fmt,
future::Future,
pin::Pin,
result::Result as StdResult,
task::{Context, Poll},
};
use thiserror::Error;
use tokio::time;
use tower::util::ServiceExt;
use tracing::{debug, info, trace};
#[async_trait::async_trait]
pub trait Detect<I>: Clone + Send + Sync + 'static {
type Protocol: Send;
async fn detect(
&self,
io: &mut I,
buf: &mut BytesMut,
) -> StdResult<Option<Self::Protocol>, Error>;
}
pub type Result<P> = StdResult<Option<P>, DetectTimeoutError<P>>;
#[derive(Error)]
#[error("{typ} protocol detection timed out after {0:?}", typ=std::any::type_name::<P>())]
pub struct DetectTimeoutError<P>(time::Duration, std::marker::PhantomData<P>);
#[derive(Copy, Clone, Debug)]
pub struct Config<D> {
pub detect: D,
pub capacity: usize,
pub timeout: time::Duration,
}
#[derive(Copy, Clone, Debug)]
pub struct NewDetectService<P, D, N> {
inner: N,
params: P,
_detect: std::marker::PhantomData<fn() -> D>,
}
#[derive(Copy, Clone, Debug)]
pub struct DetectService<D, N> {
config: Config<D>,
inner: N,
}
pub fn allow_timeout<P>(p: Result<P>) -> Option<P> {
match p {
Ok(p) => p,
Err(e) => {
info!("Continuing after timeout: {}", e);
None
}
}
}
// === impl Config ===
impl<D: Default> Config<D> {
const DEFAULT_CAPACITY: usize = 1024;
pub fn from_timeout(timeout: time::Duration) -> Self {
Self {
detect: D::default(),
capacity: Self::DEFAULT_CAPACITY,
timeout,
}
}
}
// === impl NewDetectService ===
impl<P, D, N> NewDetectService<P, D, N> {
pub fn new(params: P, inner: N) -> Self {
Self {
inner,
params,
_detect: std::marker::PhantomData,
}
}
pub fn layer(params: P) -> impl layer::Layer<N, Service = Self> + Clone
where
P: Clone,
{
layer::mk(move |inner| Self::new(params.clone(), inner))
}
}
impl<T, P, D, N> NewService<T> for NewDetectService<P, D, N>
where
P: ExtractParam<Config<D>, T>,
N: NewService<T>,
{
type Service = DetectService<D, N::Service>;
fn new_service(&self, target: T) -> Self::Service {
let config = self.params.extract_param(&target);
DetectService {
config,
inner: self.inner.new_service(target),
}
}
}
// === impl DetectService ===
impl<I, D, N, NSvc> tower::Service<I> for DetectService<D, N>
where
I: Send + 'static,
D: Detect<I>,
D::Protocol: std::fmt::Debug,
N: NewService<Result<D::Protocol>, Service = NSvc> + Clone + Send + 'static,
NSvc: tower::Service<io::PrefixedIo<I>, Response = ()> + Send,
NSvc::Error: Into<Error>,
NSvc::Future: Send,
{
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = StdResult<(), Error>> + Send + 'static>>;
#[inline]
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut io: I) -> Self::Future {
let Config {
detect,
capacity,
timeout,
} = self.config.clone();
let inner = self.inner.clone();
Box::pin(async move {
trace!(%capacity, ?timeout, "Starting protocol detection");
let t0 = time::Instant::now();
let mut buf = BytesMut::with_capacity(capacity);
let detected = match time::timeout(timeout, detect.detect(&mut io, &mut buf)).await {
Ok(Ok(protocol)) => {
debug!(
?protocol,
elapsed = ?time::Instant::now().saturating_duration_since(t0),
"Detected protocol",
);
Ok(protocol)
}
Err(_) => Err(DetectTimeoutError(timeout, std::marker::PhantomData)),
Ok(Err(e)) => return Err(e),
};
trace!("Dispatching connection");
let svc = inner.new_service(detected);
let mut svc = svc.ready_oneshot().await.map_err(Into::into)?;
svc.call(io::PrefixedIo::new(buf.freeze(), io))
.await
.map_err(Into::into)?;
trace!("Connection completed");
// Hold the service until it's done being used so that cache
// idleness is reset.
drop(svc);
Ok(())
})
}
}
// === impl DetectTimeoutError ===
impl<P> fmt::Debug for DetectTimeoutError<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(std::any::type_name::<Self>())
.field(&self.0)
.finish()
}
}

View File

@ -0,0 +1,22 @@
[package]
name = "linkerd-http-detect"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
bytes = { workspace = true }
httparse = "1"
thiserror = "2"
tokio = { version = "1", features = ["time"] }
tracing = { version = "0.1" }
linkerd-error = { path = "../../error" }
linkerd-http-variant = { path = "../variant" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }
[dev-dependencies]
tokio-test = "0.4"
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }

View File

@ -0,0 +1,307 @@
use bytes::BytesMut;
use linkerd_error::{Error, Result};
use linkerd_http_variant::Variant;
use linkerd_io::{self as io, AsyncReadExt};
use linkerd_stack::{self as svc, ServiceExt};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::time;
use tracing::{debug, trace};
#[derive(Copy, Clone, Debug)]
pub struct DetectParams {
pub read_timeout: time::Duration,
}
#[derive(Debug, Clone)]
pub enum Detection {
Empty,
NotHttp,
Http(Variant),
ReadTimeout(time::Duration),
}
/// Attempts to detect the HTTP version of a stream.
///
/// This module biases towards availability instead of correctness. I.e. instead
/// of buffering until we can be sure that we're dealing with an HTTP stream, we
/// instead perform only a single read and use that data to inform protocol
/// hinting. If a single read doesn't provide enough data to make a decision, we
/// treat the protocol as unknown.
///
/// This allows us to interoperate with protocols that send very small initial
/// messages. In rare situations, we may fail to properly detect that a stream is
/// HTTP.
#[derive(Copy, Clone, Debug)]
pub struct Detect<N> {
params: DetectParams,
inner: N,
}
#[derive(Copy, Clone, Debug)]
pub struct NewDetect<P, N> {
inner: N,
params: P,
}
#[derive(Debug, thiserror::Error)]
#[error("read timed out after {0:?}")]
pub struct ReadTimeoutError(pub time::Duration);
// Coincidentally, both our abbreviated H2 preface and our smallest possible
// HTTP/1 message are 14 bytes.
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0";
const SMALLEST_POSSIBLE_HTTP1_REQ: &str = "GET / HTTP/1.1";
const READ_CAPACITY: usize = 1024;
// === impl NewDetect ===
impl<P, N> NewDetect<P, N> {
pub fn new(params: P, inner: N) -> Self {
Self { inner, params }
}
pub fn layer(params: P) -> impl svc::layer::Layer<N, Service = Self> + Clone
where
P: Clone,
{
svc::layer::mk(move |inner| Self::new(params.clone(), inner))
}
}
impl<T, P, N> svc::NewService<T> for NewDetect<P, N>
where
P: svc::ExtractParam<DetectParams, T>,
N: svc::NewService<T>,
{
type Service = Detect<N::Service>;
fn new_service(&self, target: T) -> Self::Service {
let params = self.params.extract_param(&target);
Detect {
params,
inner: self.inner.new_service(target),
}
}
}
// === impl Detect ===
impl<I, N, NSvc> svc::Service<I> for Detect<N>
where
I: io::AsyncRead + Send + Unpin + 'static,
N: svc::NewService<Detection, Service = NSvc> + Clone + Send + 'static,
NSvc: svc::Service<io::PrefixedIo<I>, Response = ()> + Send,
NSvc::Error: Into<Error>,
NSvc::Future: Send,
{
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
#[inline]
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut io: I) -> Self::Future {
let params = self.params;
let inner = self.inner.clone();
Box::pin(async move {
let t0 = time::Instant::now();
let mut buf = BytesMut::with_capacity(READ_CAPACITY);
let detection = detect(params, &mut io, &mut buf).await?;
debug!(
?detection,
elapsed = ?time::Instant::now().saturating_duration_since(t0),
"Detected",
);
trace!("Dispatching connection");
let svc = inner.new_service(detection);
let mut svc = svc.ready_oneshot().await.map_err(Into::into)?;
svc.call(io::PrefixedIo::new(buf.freeze(), io))
.await
.map_err(Into::into)?;
trace!("Connection completed");
// Hold the service until it's done being used so that cache
// idleness is reset.
drop(svc);
Ok(())
})
}
}
impl Detection {
pub fn variant(&self) -> Option<Variant> {
match self {
Detection::Http(v) => Some(*v),
_ => None,
}
}
}
async fn detect<I: io::AsyncRead + Send + Unpin + 'static>(
DetectParams { read_timeout }: DetectParams,
io: &mut I,
buf: &mut BytesMut,
) -> io::Result<Detection> {
debug_assert!(buf.capacity() > 0, "buffer must have capacity");
trace!(capacity = READ_CAPACITY, timeout = ?read_timeout, "Reading");
let sz = match time::timeout(read_timeout, io.read_buf(buf)).await {
Ok(res) => res?,
Err(_) => return Ok(Detection::ReadTimeout(read_timeout)),
};
trace!(sz, "Read");
if sz == 0 {
return Ok(Detection::Empty);
}
// HTTP/2 checking is faster because it's a simple string match. If we
// have enough data, check it first. We don't bother matching on the
// entire H2 preface because the first part is enough to get a clear
// signal.
if buf.len() >= H2_PREFACE.len() {
trace!("Checking H2 preface");
if &buf[..H2_PREFACE.len()] == H2_PREFACE {
return Ok(Detection::Http(Variant::H2));
}
}
// Otherwise, we try to parse the data as an HTTP/1 message.
if buf.len() >= SMALLEST_POSSIBLE_HTTP1_REQ.len() {
trace!("Parsing HTTP/1 message");
if let Ok(_) | Err(httparse::Error::TooManyHeaders) =
httparse::Request::new(&mut [httparse::EMPTY_HEADER; 0]).parse(&buf[..])
{
return Ok(Detection::Http(Variant::Http1));
}
}
Ok(Detection::NotHttp)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test::io;
const HTTP11_LINE: &[u8] = b"GET / HTTP/1.1\r\n";
const H2_AND_GARBAGE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\ngarbage";
const GARBAGE: &[u8] =
b"garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage";
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn timeout() {
let _trace = linkerd_tracing::test::trace_init();
let params = DetectParams {
read_timeout: time::Duration::from_millis(1),
};
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().wait(params.read_timeout * 2).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert!(matches!(kind, Detection::ReadTimeout(_)), "{kind:?}");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn h2() {
let _trace = linkerd_tracing::test::trace_init();
let params = DetectParams {
read_timeout: time::Duration::from_millis(1),
};
for read in &[H2_PREFACE, H2_AND_GARBAGE] {
debug!(read = ?std::str::from_utf8(read).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(read).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert_eq!(kind.variant(), Some(Variant::H2), "{kind:?}");
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http1() {
let _trace = linkerd_tracing::test::trace_init();
let params = DetectParams {
read_timeout: time::Duration::from_millis(1),
};
for i in 1..SMALLEST_POSSIBLE_HTTP1_REQ.len() {
debug!(read = ?std::str::from_utf8(&HTTP11_LINE[..i]).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&HTTP11_LINE[..i]).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert!(matches!(kind, Detection::NotHttp), "{kind:?}");
}
debug!(read = ?std::str::from_utf8(HTTP11_LINE).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(HTTP11_LINE).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}");
const REQ: &[u8] = b"GET /foo/bar/bar/blah HTTP/1.1\r\nHost: foob.example.com\r\n\r\n";
for i in SMALLEST_POSSIBLE_HTTP1_REQ.len()..REQ.len() {
debug!(read = ?std::str::from_utf8(&REQ[..i]).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&REQ[..i]).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}");
assert_eq!(buf[..], REQ[..i]);
}
// Starts with a P, like the h2 preface.
const POST: &[u8] = b"POST /foo HTTP/1.1\r\n";
for i in SMALLEST_POSSIBLE_HTTP1_REQ.len()..POST.len() {
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&POST[..i]).build();
debug!(read = ?std::str::from_utf8(&POST[..i]).unwrap());
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert_eq!(kind.variant(), Some(Variant::Http1), "{kind:?}");
assert_eq!(buf[..], POST[..i]);
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn unknown() {
let _trace = linkerd_tracing::test::trace_init();
let params = DetectParams {
read_timeout: time::Duration::from_millis(1),
};
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(b"foo.bar.blah\r\nbobo").build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert!(matches!(kind, Detection::NotHttp), "{kind:?}");
assert_eq!(&buf[..], b"foo.bar.blah\r\nbobo");
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(GARBAGE).build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert!(matches!(kind, Detection::NotHttp), "{kind:?}");
assert_eq!(&buf[..], GARBAGE);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn empty() {
let _trace = linkerd_tracing::test::trace_init();
let params = DetectParams {
read_timeout: time::Duration::from_millis(1),
};
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().build();
let kind = detect(params, &mut io, &mut buf).await.unwrap();
assert!(matches!(kind, Detection::Empty), "{kind:?}");
assert_eq!(&buf[..], b"");
}
}

View File

@ -40,11 +40,11 @@ tower = { workspace = true, default-features = false }
tracing = "0.1"
try-lock = "0.2"
linkerd-detect = { path = "../../detect" }
linkerd-duplex = { path = "../../duplex" }
linkerd-error = { path = "../../error" }
linkerd-http-box = { path = "../../http/box" }
linkerd-http-classify = { path = "../../http/classify" }
linkerd-http-detect = { path = "../../http/detect" }
linkerd-http-executor = { path = "../../http/executor" }
linkerd-http-h2 = { path = "../../http/h2" }
linkerd-http-insert = { path = "../../http/insert" }

View File

@ -1,171 +0,0 @@
use crate::Variant;
use bytes::BytesMut;
use linkerd_detect::Detect;
use linkerd_error::Error;
use linkerd_io::{self as io, AsyncReadExt};
use tracing::{debug, trace};
// Coincidentally, both our abbreviated H2 preface and our smallest possible
// HTTP/1 message are 14 bytes.
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0";
const SMALLEST_POSSIBLE_HTTP1_REQ: &str = "GET / HTTP/1.1";
/// Attempts to detect the HTTP version of a stream.
///
/// This module biases towards availability instead of correctness. I.e. instead
/// of buffering until we can be sure that we're dealing with an HTTP stream, we
/// instead perform only a single read and use that data to inform protocol
/// hinting. If a single read doesn't provide enough data to make a decision, we
/// treat the protocol as unknown.
///
/// This allows us to interoperate with protocols that send very small initial
/// messages. In rare situations, we may fail to properly detect that a stream is
/// HTTP.
#[derive(Clone, Debug, Default)]
pub struct DetectHttp(());
#[async_trait::async_trait]
impl<I: io::AsyncRead + Send + Unpin + 'static> Detect<I> for DetectHttp {
type Protocol = Variant;
async fn detect(&self, io: &mut I, buf: &mut BytesMut) -> Result<Option<Variant>, Error> {
trace!(capacity = buf.capacity(), "Reading");
let sz = io.read_buf(buf).await?;
trace!(sz, "Read");
if sz == 0 {
// No data was read because the socket closed or the
// buffer capacity was exhausted.
debug!(read = buf.len(), "Could not detect protocol");
return Ok(None);
}
// HTTP/2 checking is faster because it's a simple string match. If we
// have enough data, check it first. We don't bother matching on the
// entire H2 preface because the first part is enough to get a clear
// signal.
if buf.len() >= H2_PREFACE.len() {
trace!("Checking H2 preface");
if &buf[..H2_PREFACE.len()] == H2_PREFACE {
trace!("Matched HTTP/2 prefix");
return Ok(Some(Variant::H2));
}
}
// Otherwise, we try to parse the data as an HTTP/1 message.
if buf.len() >= SMALLEST_POSSIBLE_HTTP1_REQ.len() {
trace!("Parsing HTTP/1 message");
if let Ok(_) | Err(httparse::Error::TooManyHeaders) =
httparse::Request::new(&mut [httparse::EMPTY_HEADER; 0]).parse(&buf[..])
{
trace!("Matched HTTP/1");
return Ok(Some(Variant::Http1));
}
}
trace!("Not HTTP");
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test::io;
const HTTP11_LINE: &[u8] = b"GET / HTTP/1.1\r\n";
const H2_AND_GARBAGE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\ngarbage";
const GARBAGE: &[u8] =
b"garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage garbage";
#[tokio::test]
async fn h2() {
let _trace = linkerd_tracing::test::trace_init();
for read in &[H2_PREFACE, H2_AND_GARBAGE] {
debug!(read = ?std::str::from_utf8(read).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(read).build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, Some(Variant::H2));
}
}
#[tokio::test]
async fn http1() {
let _trace = linkerd_tracing::test::trace_init();
for i in 1..SMALLEST_POSSIBLE_HTTP1_REQ.len() {
debug!(read = ?std::str::from_utf8(&HTTP11_LINE[..i]).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&HTTP11_LINE[..i]).build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, None);
}
debug!(read = ?std::str::from_utf8(HTTP11_LINE).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(HTTP11_LINE).build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, Some(Variant::Http1));
const REQ: &[u8] = b"GET /foo/bar/bar/blah HTTP/1.1\r\nHost: foob.example.com\r\n\r\n";
for i in SMALLEST_POSSIBLE_HTTP1_REQ.len()..REQ.len() {
debug!(read = ?std::str::from_utf8(&REQ[..i]).unwrap());
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&REQ[..i]).build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, Some(Variant::Http1));
assert_eq!(buf[..], REQ[..i]);
}
// Starts with a P, like the h2 preface.
const POST: &[u8] = b"POST /foo HTTP/1.1\r\n";
for i in SMALLEST_POSSIBLE_HTTP1_REQ.len()..POST.len() {
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(&POST[..i]).build();
debug!(read = ?std::str::from_utf8(&POST[..i]).unwrap());
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, Some(Variant::Http1));
assert_eq!(buf[..], POST[..i]);
}
}
#[tokio::test(flavor = "current_thread")]
async fn unknown() {
let _trace = linkerd_tracing::test::trace_init();
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(b"foo.bar.blah\r\nbobo").build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, None);
assert_eq!(&buf[..], b"foo.bar.blah\r\nbobo");
let mut buf = BytesMut::with_capacity(1024);
let mut io = io::Builder::new().read(GARBAGE).build();
let kind = DetectHttp(()).detect(&mut io, &mut buf).await.unwrap();
assert_eq!(kind, None);
assert_eq!(&buf[..], GARBAGE);
}
}
#[cfg(fuzzing)]
pub mod fuzz_logic {
use super::*;
pub async fn fuzz_entry(input: &[u8]) {
use tokio::io::AsyncWriteExt;
let (mut client, mut server) = tokio::io::duplex(input.len());
let mut buf = bytes::Bytes::copy_from_slice(input);
let write = tokio::spawn(async move { client.write_buf(&mut buf).await });
let mut buf = BytesMut::with_capacity(1024);
let _kind = DetectHttp(()).detect(&mut server, &mut buf).await.unwrap();
write
.await
.expect("Spawn must succeed")
.expect("Write must succeed");
}
}

View File

@ -7,7 +7,6 @@ use linkerd_error::Error;
pub mod balance;
pub mod client;
pub mod client_handle;
pub mod detect;
pub mod h1;
pub mod h2;
mod header_from_target;
@ -24,7 +23,6 @@ pub use self::{
NewInsertClassifyResponse,
},
client_handle::{ClientHandle, SetClientHandle},
detect::DetectHttp,
header_from_target::NewHeaderFromTarget,
normalize_uri::{MarkAbsoluteForm, NewNormalizeUri},
server::{NewServeHttp, Params as ServerParams, ServeHttp},
@ -38,6 +36,7 @@ pub use http::{
pub use http_body::Body;
pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseResponse};
pub use linkerd_http_classify as classify;
pub use linkerd_http_detect::{DetectParams, Detection, NewDetect};
pub use linkerd_http_executor::TracingExecutor;
pub use linkerd_http_insert as insert;
pub use linkerd_http_override_authority::{AuthorityOverride, NewOverrideAuthority};