outbound: implement `OutboundPolicies` route request timeouts (#2418)

The latest proxy-api release, v0.10.0, adds fields to the
`OutboundPolicies` API for configuring HTTP request timeouts, based on
the proposed changes to HTTPRoute in kubernetes-sigs/gateway-api#1997.

This branch updates the proxy-api dependency to v0.10.0 and adds the new
timeout configuration fields to the proxy's internal client policy
types. In addition, this branch adds a timeout middleware to the HTTP
client policy stack, so that the timeout described by the
`Rule.request_timeout` field is now applied.

Implementing the `RouteBackend.request_timeout` field with semantics as
close as possible to those described in GEP-1742 will be somewhat more
complex, and will be added in a separate PR.
This commit is contained in:
Eliza Weisman 2023-06-14 11:36:40 -07:00 committed by GitHub
parent 864a5dbc97
commit a512ea658d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 168 additions and 20 deletions

View File

@ -1892,9 +1892,9 @@ dependencies = [
[[package]]
name = "linkerd2-proxy-api"
version = "0.9.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d"
checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14"
dependencies = [
"h2",
"http",

View File

@ -29,7 +29,7 @@ linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.9", features = ["inbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
rangemap = "1"

View File

@ -34,7 +34,7 @@ ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { version = "0.9", features = [
linkerd2-proxy-api = { version = "0.10", features = [
"destination",
"arbitrary",
] }

View File

@ -151,6 +151,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute {
}],
filters: Vec::new(),
backends: Some(http_first_available(std::iter::once(backend(dst)))),
request_timeout: None,
}],
}
}
@ -214,6 +215,7 @@ pub fn http_first_available(
.map(|backend| http_route::RouteBackend {
backend: Some(backend),
filters: Vec::new(),
request_timeout: None,
})
.collect(),
},

View File

@ -223,6 +223,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),
request_timeout: None,
};
let route = outbound::HttpRoute {
@ -236,6 +237,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// x-hello-city: sf | x-hello-city: san francisco
mk_header_rule(
@ -398,6 +400,8 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),
request_timeout: None,
};
let route = outbound::HttpRoute {
@ -411,6 +415,7 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
},
// /goodbye/*
mk_path_rule(

View File

@ -20,7 +20,7 @@ ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.9", features = ["outbound"] }
linkerd2-proxy-api = { version = "0.10", features = ["outbound"] }
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }

View File

@ -206,10 +206,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_OPAQ_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_OPAQ_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
}),
@ -223,10 +225,12 @@ pub fn synthesize_forward_policy(
meta: meta.clone(),
filters: NO_HTTP_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_HTTP_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
},

View File

@ -30,6 +30,7 @@ pub(crate) struct Route<T, F, E> {
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
pub(super) request_timeout: Option<std::time::Duration>,
}
pub(crate) type MatchedRoute<T, M, F, E> = Matched<M, Route<T, F, E>>;
@ -111,6 +112,8 @@ where
.push_on_service(svc::LoadShed::layer())
// TODO(ver) attach the `E` typed failure policy to requests.
.push(filters::NewApplyFilters::<Self, _, _>::layer())
// Sets an optional request timeout.
.push(http::NewTimeout::layer())
.push(classify::NewClassify::layer())
.push(svc::ArcNewService::layer())
.into_inner()
@ -124,6 +127,12 @@ impl<T: Clone, M, F, E> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T
}
}
impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, E> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}
impl<T> filters::Apply for Http<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {

View File

@ -204,6 +204,7 @@ where
filters,
distribution,
failure_policy,
request_timeout,
}| {
let route_ref = RouteRef(meta);
let distribution = mk_distribution(&route_ref, &distribution);
@ -214,6 +215,7 @@ where
filters,
failure_policy,
distribution,
request_timeout,
}
};

View File

@ -48,9 +48,11 @@ async fn header_based_route() {
}),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([policy::RouteBackend {
filters: Arc::new([]),
backend,
request_timeout: None,
}])),
};
@ -197,6 +199,7 @@ async fn http_filter_request_headers() {
policy: policy::RoutePolicy {
meta: policy::Meta::new_default("turtles"),
failure_policy: Default::default(),
request_timeout: None,
filters: Arc::new([policy::http::Filter::RequestHeaders(
policy::http::filter::ModifyHeader {
add: vec![(PIZZA.clone(), TUBULAR.clone())],
@ -212,6 +215,7 @@ async fn http_filter_request_headers() {
..Default::default()
},
)]),
request_timeout: None,
},
])),
},

View File

@ -285,6 +285,66 @@ async fn balancer_doesnt_select_tripped_breakers() {
}
}
#[tokio::test(flavor = "current_thread")]
async fn route_request_timeout() {
tokio::time::pause();
let _trace = trace::test::trace_init();
const REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2);
let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT);
let dest: NameAddr = format!("{AUTHORITY}:{PORT}")
.parse::<NameAddr>()
.expect("dest addr is valid");
let (svc, mut handle) = tower_test::mock::pair();
let connect = HttpConnect::default().service(addr, svc);
let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default());
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_http_cached(resolve)
.into_inner();
let (_route_tx, routes) = {
let backend = default_backend(&dest);
let mut route = default_route(backend.clone());
// set the request timeout on the route.
route.rules[0].policy.request_timeout = Some(REQUEST_TIMEOUT);
watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
})))
};
let target = Target {
num: 1,
version: http::Version::H2,
routes,
};
let svc = stack.new_service(target);
handle.allow(1);
let rsp = send_req(svc.clone(), http::Request::get("/"));
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_eq!(
rsp.await.expect("request must succeed").status(),
http::StatusCode::OK
);
// now, time out...
let rsp = send_req(svc.clone(), http::Request::get("/"));
tokio::time::sleep(REQUEST_TIMEOUT).await;
let error = rsp.await.expect_err("request must fail with a timeout");
assert!(
error.is::<LogicalError>(),
"error must originate in the logical stack"
);
assert!(errors::is_caused_by::<http::timeout::ResponseTimeoutError>(
error.as_ref()
));
}
#[derive(Clone, Debug)]
struct Target {
num: usize,
@ -448,9 +508,11 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route
meta: Meta::new_default("test_route"),
filters: NO_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: NO_FILTERS.clone(),
backend,
request_timeout: None,
}])),
},
}],

View File

@ -73,9 +73,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
},
}],
@ -96,9 +98,11 @@ impl ClientPolicies {
meta: Meta::new_default("default"),
filters: Arc::new([]),
failure_policy: Default::default(),
request_timeout: None,
distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend {
filters: Arc::new([]),
backend: backend.clone(),
request_timeout: None,
}])),
}),
},

View File

@ -17,7 +17,7 @@ tracing = "0.1"
url = "2"
[dependencies.linkerd2-proxy-api]
version = "0.9"
version = "0.10"
features = ["http-route", "grpc-route"]
optional = true

View File

@ -14,7 +14,7 @@ async-stream = "0.3"
futures = { version = "0.3", default-features = false }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd2-proxy-api = { version = "0.9", features = ["destination"] }
linkerd2-proxy-api = { version = "0.10", features = ["destination"] }
linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
linkerd-tls = { path = "../../tls" }

View File

@ -18,7 +18,7 @@ proto = [
ahash = "0.8"
ipnet = "2"
http = "0.2"
linkerd2-proxy-api = { version = "0.9", optional = true, features = [
linkerd2-proxy-api = { version = "0.10", optional = true, features = [
"outbound",
] }
linkerd-error = { path = "../../error" }

View File

@ -37,6 +37,7 @@ pub fn default(distribution: crate::RouteDistribution<Filter>) -> Route {
filters: Arc::new([]),
distribution,
failure_policy: Codes::default(),
request_timeout: None,
},
}],
}
@ -101,6 +102,7 @@ pub mod proto {
r#match::host::{proto::InvalidHostMatch, MatchHost},
},
};
use std::time::Duration;
#[derive(Debug, thiserror::Error)]
pub enum InvalidGrpcRoute {
@ -124,6 +126,9 @@ pub mod proto {
#[error("invalid failure accrual policy: {0}")]
Breaker(#[from] InvalidFailureAccrual),
#[error("invalid duration: {0}")]
Duration(#[from] prost_types::DurationError),
}
#[derive(Debug, thiserror::Error)]
@ -198,6 +203,7 @@ pub mod proto {
matches,
backends,
filters,
request_timeout,
} = proto;
let matches = matches
@ -214,6 +220,8 @@ pub mod proto {
.ok_or(InvalidGrpcRoute::Missing("distribution"))?
.try_into()?;
let request_timeout = request_timeout.map(Duration::try_from).transpose()?;
Ok(Rule {
matches,
policy: Policy {
@ -221,6 +229,7 @@ pub mod proto {
filters,
distribution,
failure_policy: Codes::default(),
request_timeout,
},
})
}
@ -270,10 +279,14 @@ pub mod proto {
impl TryFrom<grpc_route::RouteBackend> for RouteBackend<Filter> {
type Error = InvalidBackend;
fn try_from(
grpc_route::RouteBackend { backend, filters }: grpc_route::RouteBackend,
grpc_route::RouteBackend {
backend,
filters,
request_timeout,
}: grpc_route::RouteBackend,
) -> Result<RouteBackend<Filter>, InvalidBackend> {
let backend = backend.ok_or(InvalidBackend::Missing("backend"))?;
RouteBackend::try_from_proto(backend, filters)
RouteBackend::try_from_proto(backend, filters, request_timeout)
}
}

View File

@ -47,6 +47,7 @@ pub fn default(distribution: crate::RouteDistribution<Filter>) -> Route {
filters: Arc::new([]),
distribution,
failure_policy: StatusRanges::default(),
request_timeout: None,
},
}],
}
@ -130,6 +131,9 @@ pub mod proto {
#[error("missing {0}")]
Missing(&'static str),
#[error("invalid request timeout: {0}")]
Timeout(#[from] prost_types::DurationError),
}
#[derive(Debug, thiserror::Error)]
@ -217,6 +221,7 @@ pub mod proto {
matches,
backends,
filters,
request_timeout,
} = proto;
let matches = matches
@ -233,6 +238,10 @@ pub mod proto {
.ok_or(InvalidHttpRoute::Missing("distribution"))?
.try_into()?;
let request_timeout = request_timeout
.map(std::time::Duration::try_from)
.transpose()?;
Ok(Rule {
matches,
policy: Policy {
@ -240,6 +249,7 @@ pub mod proto {
filters,
distribution,
failure_policy: StatusRanges::default(),
request_timeout,
},
})
}
@ -289,10 +299,14 @@ pub mod proto {
impl TryFrom<http_route::RouteBackend> for RouteBackend<Filter> {
type Error = InvalidBackend;
fn try_from(
http_route::RouteBackend { backend, filters }: http_route::RouteBackend,
http_route::RouteBackend {
backend,
filters,
request_timeout,
}: http_route::RouteBackend,
) -> Result<Self, Self::Error> {
let backend = backend.ok_or(InvalidBackend::Missing("backend"))?;
RouteBackend::try_from_proto(backend, filters)
RouteBackend::try_from_proto(backend, filters, request_timeout)
}
}

View File

@ -58,6 +58,18 @@ pub struct RoutePolicy<T, F> {
pub meta: Arc<Meta>,
pub filters: Arc<[T]>,
pub distribution: RouteDistribution<T>,
/// Request timeout applied to HTTP and gRPC routes.
///
/// Opaque routes are proxied as opaque TCP, and therefore, we have no
/// concept of a "request", so this field is ignored by opaque routes.
/// It's somewhat unfortunate that this field is part of the `RoutePolicy`
/// struct, which is used to represent routes for all protocols, rather than
/// as a filter, which are a generic type that depends on the protocol in
/// use. However, this can't be easily modeled as a filter using the current
/// design for filters, as filters synchronously modify a request or return
/// an error --- a filter cannot wrap the response future in order to add a
/// timeout.
pub request_timeout: Option<time::Duration>,
/// Configures what responses are classified as failures.
pub failure_policy: F,
@ -78,6 +90,7 @@ pub enum RouteDistribution<T> {
pub struct RouteBackend<T> {
pub filters: Arc<[T]>,
pub backend: Backend,
pub request_timeout: Option<time::Duration>,
}
// TODO(ver) how does configuration like failure accrual fit in here? What about
@ -155,6 +168,7 @@ impl ClientPolicy {
.collect(),
distribution: RouteDistribution::Empty,
failure_policy: http::StatusRanges::default(),
request_timeout: None,
},
}],
}])
@ -532,6 +546,7 @@ pub mod proto {
pub(crate) fn try_from_proto<U>(
backend: outbound::Backend,
filters: impl IntoIterator<Item = U>,
request_timeout: Option<prost_types::Duration>,
) -> Result<Self, InvalidBackend>
where
T: TryFrom<U>,
@ -543,8 +558,20 @@ pub mod proto {
.map(T::try_from)
.collect::<Result<Arc<[_]>, _>>()
.map_err(|error| InvalidBackend::Filter(error.into()))?;
let request_timeout =
request_timeout
.map(|d| d.try_into())
.transpose()
.map_err(|error| InvalidBackend::Duration {
field: "backend request timeout",
error,
})?;
Ok(RouteBackend { filters, backend })
Ok(RouteBackend {
filters,
backend,
request_timeout,
})
}
}

View File

@ -127,6 +127,8 @@ pub(crate) mod proto {
filters: NO_FILTERS.clone(),
failure_policy: NonIoErrors::default(),
distribution,
// Request timeouts are ignored on opaque routes.
request_timeout: None,
})
}
@ -178,7 +180,7 @@ pub(crate) mod proto {
opaque_route::RouteBackend { backend }: opaque_route::RouteBackend,
) -> Result<Self, Self::Error> {
let backend = backend.ok_or(InvalidBackend::Missing("backend"))?;
RouteBackend::try_from_proto(backend, std::iter::empty::<()>())
RouteBackend::try_from_proto(backend, std::iter::empty::<()>(), None)
}
}

View File

@ -8,7 +8,7 @@ publish = false
[dependencies]
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.9", features = ["identity"] }
linkerd2-proxy-api = { version = "0.10", features = ["identity"] }
linkerd-error = { path = "../../error" }
linkerd-identity = { path = "../../identity" }
linkerd-metrics = { path = "../../metrics" }

View File

@ -17,7 +17,7 @@ prost-types = { version = "0.11", optional = true }
thiserror = "1"
[dependencies.linkerd2-proxy-api]
version = "0.9"
version = "0.10"
features = ["inbound"]
optional = true

View File

@ -11,7 +11,7 @@ http = "0.2"
hyper = { version = "0.14", features = ["http1", "http2"] }
futures = { version = "0.3", default-features = false }
ipnet = "2.7"
linkerd2-proxy-api = { version = "0.9", features = ["tap"] }
linkerd2-proxy-api = { version = "0.10", features = ["tap"] }
linkerd-conditional = { path = "../../conditional" }
linkerd-error = { path = "../../error" }
linkerd-meshtls = { path = "../../meshtls" }
@ -30,5 +30,5 @@ tracing = "0.1"
pin-project = "1"
[dev-dependencies]
linkerd2-proxy-api = { version = "0.9", features = ["arbitrary"] }
linkerd2-proxy-api = { version = "0.10", features = ["arbitrary"] }
quickcheck = { version = "1", default-features = false }

View File

@ -21,7 +21,7 @@ linkerd-http-box = { path = "../http-box" }
linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" }
linkerd-stack = { path = "../stack" }
linkerd-tonic-watch = { path = "../tonic-watch" }
linkerd2-proxy-api = { version = "0.9", features = ["destination"] }
linkerd2-proxy-api = { version = "0.10", features = ["destination"] }
once_cell = "1.17"
prost-types = "0.11"
regex = "1"
@ -33,5 +33,5 @@ thiserror = "1"
tracing = "0.1"
[dev-dependencies]
linkerd2-proxy-api = { version = "0.9", features = ["arbitrary"] }
linkerd2-proxy-api = { version = "0.10", features = ["arbitrary"] }
quickcheck = { version = "1", default-features = false }