From ce97e0786f7b020585fa01118f3baeec960ee393 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 2 Jun 2023 11:32:40 -0700 Subject: [PATCH] implement GEP-1742 timeouts in the policy controller (#10975) PR #10969 adds support for the GEP-1742 `timeouts` field to the HTTPRoute CRD. This branch implements actual support for these fields in the policy controller. The timeout fields are now read and used to set the timeout fields added to the proxy-api in linkerd/linkerd2-proxy-api#243. In addition, I've added code to ensure that the timeout fields are parsed correctly when a JSON manifest is deserialized. The current implementation represents timeouts in the bindings as a Rust `std::time::Duration` type. `Duration` does implement `serde::Deserialize` and `serde::Serialize`, but its serialization implementation attempts to (de)serialize it as a struct consisting of a number of seconds and a number of subsecond nanoseconds. The timeout fields are instead supposed to be represented as strings in the Go standard library's `time.ParseDuration` format. Therefore, I've added a newtype which wraps the Rust `std::time::Duration` and implements the same parsing logic as Go. Eventually, I'd like to upstream the implementation of this to `kube-rs`; see kube-rs/kube#1222 for details. Depends on #10969 Depends on linkerd/linkerd2-proxy-api#243 Signed-off-by: Eliza Weisman --- Cargo.lock | 17 +- policy-controller/core/src/outbound.rs | 2 + policy-controller/grpc/Cargo.toml | 3 +- policy-controller/grpc/src/outbound.rs | 103 +++-- policy-controller/k8s/api/src/duration.rs | 354 ++++++++++++++++++ policy-controller/k8s/api/src/lib.rs | 1 + .../k8s/api/src/policy/httproute.rs | 7 +- .../k8s/index/src/outbound/index.rs | 33 +- policy-controller/src/admission.rs | 34 +- policy-test/Cargo.toml | 2 +- 10 files changed, 499 insertions(+), 57 deletions(-) create mode 100644 policy-controller/k8s/api/src/duration.rs diff --git a/Cargo.lock b/Cargo.lock index 83ee9744f..eacd1463c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,6 +1304,7 @@ dependencies = [ "linkerd-policy-controller-core", "linkerd2-proxy-api", "maplit", + "prost-types", "tokio", "tonic", "tracing", @@ -1394,9 +1395,9 @@ dependencies = [ [[package]] name = "linkerd2-proxy-api" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d" +checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14" dependencies = [ "http", "ipnet", @@ -1851,9 +1852,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", "prost-derive", @@ -1861,9 +1862,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", "itertools", @@ -1874,9 +1875,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ "prost", ] diff --git a/policy-controller/core/src/outbound.rs b/policy-controller/core/src/outbound.rs index 5a5c576b0..1d1310078 100644 --- a/policy-controller/core/src/outbound.rs +++ b/policy-controller/core/src/outbound.rs @@ -42,6 +42,8 @@ pub struct HttpRoute { pub struct HttpRouteRule { pub matches: Vec, pub backends: Vec, + pub request_timeout: Option, + pub backend_request_timeout: Option, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index 8d9bd71fa..b7a59e32e 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -14,12 +14,13 @@ hyper = { version = "0.14", features = ["http2", "server", "tcp"] } futures = { version = "0.3", default-features = false } linkerd-policy-controller-core = { path = "../core" } maplit = "1" +prost-types = "0.11.9" tokio = { version = "1", features = ["macros"] } tonic = { version = "0.8", default-features = false } tracing = "0.1" [dependencies.linkerd2-proxy-api] -version = "0.9" +version = "0.10" features = [ "inbound", "outbound", diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index 3edcaaf4b..88864a27f 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -237,20 +237,14 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { outbound::failure_accrual::ConsecutiveFailures { max_failures, backoff: Some(outbound::ExponentialBackoff { - min_backoff: backoff - .min_penalty - .try_into() - .map_err(|error| { - tracing::error!(?error, "invalid min_backoff") - }) - .ok(), - max_backoff: backoff - .max_penalty - .try_into() - .map_err(|error| { - tracing::error!(?error, "invalid max_backoff") - }) - .ok(), + min_backoff: convert_duration( + "min_backoff", + backoff.min_penalty, + ), + max_backoff: convert_duration( + "max_backoff", + backoff.max_penalty, + ), jitter_ratio: backoff.jitter, }), }, @@ -324,31 +318,43 @@ fn convert_outbound_http_route( let rules = rules .into_iter() - .map(|HttpRouteRule { matches, backends }| { - let backends = backends - .into_iter() - .map(convert_http_backend) - .collect::>(); - let dist = if backends.is_empty() { - outbound::http_route::distribution::Kind::FirstAvailable( - outbound::http_route::distribution::FirstAvailable { - backends: vec![outbound::http_route::RouteBackend { - backend: Some(backend.clone()), - filters: vec![], - }], - }, - ) - } else { - outbound::http_route::distribution::Kind::RandomAvailable( - outbound::http_route::distribution::RandomAvailable { backends }, - ) - }; - outbound::http_route::Rule { - matches: matches.into_iter().map(http_route::convert_match).collect(), - backends: Some(outbound::http_route::Distribution { kind: Some(dist) }), - filters: Default::default(), - } - }) + .map( + |HttpRouteRule { + matches, + backends, + request_timeout, + backend_request_timeout, + }| { + let backend_request_timeout = backend_request_timeout + .and_then(|d| convert_duration("backend request_timeout", d)); + let backends = backends + .into_iter() + .map(|backend| convert_http_backend(backend_request_timeout.clone(), backend)) + .collect::>(); + let dist = if backends.is_empty() { + outbound::http_route::distribution::Kind::FirstAvailable( + outbound::http_route::distribution::FirstAvailable { + backends: vec![outbound::http_route::RouteBackend { + backend: Some(backend.clone()), + filters: vec![], + request_timeout: backend_request_timeout, + }], + }, + ) + } else { + outbound::http_route::distribution::Kind::RandomAvailable( + outbound::http_route::distribution::RandomAvailable { backends }, + ) + }; + outbound::http_route::Rule { + matches: matches.into_iter().map(http_route::convert_match).collect(), + backends: Some(outbound::http_route::Distribution { kind: Some(dist) }), + filters: Default::default(), + request_timeout: request_timeout + .and_then(|d| convert_duration("request timeout", d)), + } + }, + ) .collect(); outbound::HttpRoute { @@ -358,7 +364,10 @@ fn convert_outbound_http_route( } } -fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend { +fn convert_http_backend( + request_timeout: Option, + backend: Backend, +) -> outbound::http_route::WeightedRouteBackend { match backend { Backend::Addr(addr) => { let socket_addr = SocketAddr::new(addr.addr, addr.port.get()); @@ -377,6 +386,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute )), }), filters: Default::default(), + request_timeout, }), } } @@ -409,6 +419,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute )), }), filters: Default::default(), + request_timeout, }), }, Backend::Invalid { weight, message } => outbound::http_route::WeightedRouteBackend { @@ -430,6 +441,7 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute }, )), }], + request_timeout, }), }, } @@ -473,11 +485,13 @@ fn default_outbound_http_route(backend: outbound::Backend) -> outbound::HttpRout backends: vec![outbound::http_route::RouteBackend { backend: Some(backend), filters: vec![], + request_timeout: None, }], }, )), }), filters: Default::default(), + request_timeout: None, }]; outbound::HttpRoute { metadata, @@ -529,3 +543,12 @@ fn default_queue_config() -> outbound::Queue { ), } } + +fn convert_duration(name: &'static str, duration: time::Duration) -> Option { + duration + .try_into() + .map_err(|error| { + tracing::error!(%error, "Invalid {name} duration"); + }) + .ok() +} diff --git a/policy-controller/k8s/api/src/duration.rs b/policy-controller/k8s/api/src/duration.rs new file mode 100644 index 000000000..969ec9e70 --- /dev/null +++ b/policy-controller/k8s/api/src/duration.rs @@ -0,0 +1,354 @@ +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +use std::{fmt, str::FromStr, time::Duration}; + +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct K8sDuration { + duration: Duration, + is_negative: bool, +} + +#[derive(Debug, thiserror::Error, Eq, PartialEq)] +#[non_exhaustive] +pub enum ParseError { + #[error("invalid unit: {}", EXPECTED_UNITS)] + InvalidUnit, + + #[error("missing a unit: {}", EXPECTED_UNITS)] + NoUnit, + + #[error("invalid floating-point number: {}", .0)] + NotANumber(#[from] std::num::ParseFloatError), +} + +const EXPECTED_UNITS: &str = "expected one of 'ns', 'us', '\u{00b5}s', 'ms', 's', 'm', or 'h'"; + +impl From for K8sDuration { + fn from(duration: Duration) -> Self { + Self { + duration, + is_negative: false, + } + } +} + +impl From for Duration { + fn from(K8sDuration { duration, .. }: K8sDuration) -> Self { + duration + } +} + +impl K8sDuration { + #[inline] + #[must_use] + pub fn is_negative(&self) -> bool { + self.is_negative + } +} + +impl fmt::Debug for K8sDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use std::fmt::Write; + if self.is_negative { + f.write_char('-')?; + } + fmt::Debug::fmt(&self.duration, f) + } +} + +impl fmt::Display for K8sDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use std::fmt::Write; + if self.is_negative { + f.write_char('-')?; + } + fmt::Debug::fmt(&self.duration, f) + } +} + +impl FromStr for K8sDuration { + type Err = ParseError; + + fn from_str(mut s: &str) -> Result { + // implements the same format as + // https://cs.opensource.google/go/go/+/refs/tags/go1.20.4:src/time/format.go;l=1589 + + fn duration_from_units(val: f64, unit: &str) -> Result { + const MINUTE: Duration = Duration::from_secs(60); + // https://cs.opensource.google/go/go/+/refs/tags/go1.20.4:src/time/format.go;l=1573 + let base = match unit { + "ns" => Duration::from_nanos(1), + // U+00B5 is the "micro sign" while U+03BC is "Greek letter mu" + "us" | "\u{00b5}s" | "\u{03bc}s" => Duration::from_micros(1), + "ms" => Duration::from_millis(1), + "s" => Duration::from_secs(1), + "m" => MINUTE, + "h" => MINUTE * 60, + _ => return Err(ParseError::InvalidUnit), + }; + Ok(base.mul_f64(val)) + } + + // Go durations are signed. Rust durations aren't. So we need to ignore + // this for now. + let is_negative = s.starts_with('-'); + s = s.trim_start_matches('+').trim_start_matches('-'); + + let mut total = Duration::from_secs(0); + while !s.is_empty() { + if let Some(unit_start) = s.find(|c: char| c.is_alphabetic()) { + let (val, rest) = s.split_at(unit_start); + let val = val.parse::()?; + let unit = if let Some(next_numeric_start) = rest.find(|c: char| !c.is_alphabetic()) + { + let (unit, rest) = rest.split_at(next_numeric_start); + s = rest; + unit + } else { + s = ""; + rest + }; + total += duration_from_units(val, unit)?; + } else if s == "0" { + return Ok(K8sDuration { + duration: Duration::from_secs(0), + is_negative, + }); + } else { + return Err(ParseError::NoUnit); + } + } + + Ok(K8sDuration { + duration: total, + is_negative, + }) + } +} + +impl Serialize for K8sDuration { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.collect_str(self) + } +} + +impl<'de> Deserialize<'de> for K8sDuration { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor; + impl<'de> de::Visitor<'de> for Visitor { + type Value = K8sDuration; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a string in Go `time.Duration.String()` format") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + let val = value.parse::().map_err(de::Error::custom)?; + Ok(val) + } + } + deserializer.deserialize_str(Visitor) + } +} + +impl schemars::JsonSchema for K8sDuration { + // see + // https://github.com/kubernetes/apimachinery/blob/756e2227bf3a486098f504af1a0ffb736ad16f4c/pkg/apis/meta/v1/duration.go#L61 + fn schema_name() -> String { + "K8sDuration".to_owned() + } + + fn is_referenceable() -> bool { + false + } + + fn json_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + // the format should *not* be "duration", because "duration" means + // the duration is formatted in ISO 8601, as described here: + // https://datatracker.ietf.org/doc/html/draft-handrews-json-schema-validation-02#section-7.3.1 + format: None, + ..Default::default() + } + .into() + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_the_same_as_go() { + const MINUTE: Duration = Duration::from_secs(60); + const HOUR: Duration = Duration::from_secs(60 * 60); + // from Go: + // https://cs.opensource.google/go/go/+/refs/tags/go1.20.4:src/time/time_test.go;l=891-951 + // ``` + // var parseDurationTests = []struct { + // in string + // want Duration + // }{ + let cases: &[(&str, K8sDuration)] = &[ + // // simple + // {"0", 0}, + ("0", Duration::from_secs(0).into()), + // {"5s", 5 * Second}, + ("5s", Duration::from_secs(5).into()), + // {"30s", 30 * Second}, + ("30s", Duration::from_secs(30).into()), + // {"1478s", 1478 * Second}, + ("1478s", Duration::from_secs(1478).into()), + // // sign + // {"-5s", -5 * Second}, + ( + "-5s", + K8sDuration { + duration: Duration::from_secs(5), + is_negative: true, + }, + ), + // {"+5s", 5 * Second}, + ("+5s", Duration::from_secs(5).into()), + // {"-0", 0}, + ( + "-0", + K8sDuration { + duration: Duration::from_secs(0), + is_negative: true, + }, + ), + // {"+0", 0}, + ("+0", Duration::from_secs(0).into()), + // // decimal + // {"5.0s", 5 * Second}, + ("5s", Duration::from_secs(5).into()), + // {"5.6s", 5*Second + 600*Millisecond}, + ( + "5.6s", + (Duration::from_secs(5) + Duration::from_millis(600)).into(), + ), + // {"5.s", 5 * Second}, + ("5.s", Duration::from_secs(5).into()), + // {".5s", 500 * Millisecond}, + (".5s", Duration::from_millis(500).into()), + // {"1.0s", 1 * Second}, + ("1.0s", Duration::from_secs(1).into()), + // {"1.00s", 1 * Second}, + ("1.00s", Duration::from_secs(1).into()), + // {"1.004s", 1*Second + 4*Millisecond}, + ( + "1.004s", + (Duration::from_secs(1) + Duration::from_millis(4)).into(), + ), + // {"1.0040s", 1*Second + 4*Millisecond}, + ( + "1.0040s", + (Duration::from_secs(1) + Duration::from_millis(4)).into(), + ), + // {"100.00100s", 100*Second + 1*Millisecond}, + ( + "100.00100s", + (Duration::from_secs(100) + Duration::from_millis(1)).into(), + ), + // // different units + // {"10ns", 10 * Nanosecond}, + ("10ns", Duration::from_nanos(10).into()), + // {"11us", 11 * Microsecond}, + ("11us", Duration::from_micros(11).into()), + // {"12µs", 12 * Microsecond}, // U+00B5 + ("12µs", Duration::from_micros(12).into()), + // {"12μs", 12 * Microsecond}, // U+03BC + ("12μs", Duration::from_micros(12).into()), + // {"13ms", 13 * Millisecond}, + ("13ms", Duration::from_millis(13).into()), + // {"14s", 14 * Second}, + ("14s", Duration::from_secs(14).into()), + // {"15m", 15 * Minute}, + ("15m", (15 * MINUTE).into()), + // {"16h", 16 * Hour}, + ("16h", (16 * HOUR).into()), + // // composite durations + // {"3h30m", 3*Hour + 30*Minute}, + ("3h30m", (3 * HOUR + 30 * MINUTE).into()), + // {"10.5s4m", 4*Minute + 10*Second + 500*Millisecond}, + ( + "10.5s4m", + (4 * MINUTE + Duration::from_secs(10) + Duration::from_millis(500)).into(), + ), + // {"-2m3.4s", -(2*Minute + 3*Second + 400*Millisecond)}, + ( + "-2m3.4s", + K8sDuration { + duration: 2 * MINUTE + Duration::from_secs(3) + Duration::from_millis(400), + is_negative: true, + }, + ), + // {"1h2m3s4ms5us6ns", 1*Hour + 2*Minute + 3*Second + 4*Millisecond + 5*Microsecond + 6*Nanosecond}, + ( + "1h2m3s4ms5us6ns", + (1 * HOUR + + 2 * MINUTE + + Duration::from_secs(3) + + Duration::from_millis(4) + + Duration::from_micros(5) + + Duration::from_nanos(6)) + .into(), + ), + // {"39h9m14.425s", 39*Hour + 9*Minute + 14*Second + 425*Millisecond}, + ( + "39h9m14.425s", + (39 * HOUR + 9 * MINUTE + Duration::from_secs(14) + Duration::from_millis(425)) + .into(), + ), + // // large value + // {"52763797000ns", 52763797000 * Nanosecond}, + ("52763797000ns", Duration::from_nanos(52763797000).into()), + // // more than 9 digits after decimal point, see https://golang.org/issue/6617 + // {"0.3333333333333333333h", 20 * Minute}, + ("0.3333333333333333333h", (20 * MINUTE).into()), + // // 9007199254740993 = 1<<53+1 cannot be stored precisely in a float64 + // {"9007199254740993ns", (1<<53 + 1) * Nanosecond}, + ( + "9007199254740993ns", + Duration::from_nanos((1 << 53) + 1).into(), + ), + // Rust Durations can handle larger durations than Go's + // representation, so skip these tests for their precision limits + + // // largest duration that can be represented by int64 in nanoseconds + // {"9223372036854775807ns", (1<<63 - 1) * Nanosecond}, + // ("9223372036854775807ns", Duration::from_nanos((1 << 63) - 1).into()), + // {"9223372036854775.807us", (1<<63 - 1) * Nanosecond}, + // ("9223372036854775.807us", Duration::from_nanos((1 << 63) - 1).into()), + // {"9223372036s854ms775us807ns", (1<<63 - 1) * Nanosecond}, + // {"-9223372036854775808ns", -1 << 63 * Nanosecond}, + // {"-9223372036854775.808us", -1 << 63 * Nanosecond}, + // {"-9223372036s854ms775us808ns", -1 << 63 * Nanosecond}, + // // largest negative value + // {"-9223372036854775808ns", -1 << 63 * Nanosecond}, + // // largest negative round trip value, see https://golang.org/issue/48629 + // {"-2562047h47m16.854775808s", -1 << 63 * Nanosecond}, + + // // huge string; issue 15011. + // {"0.100000000000000000000h", 6 * Minute}, + ("0.100000000000000000000h", (6 * MINUTE).into()), // // This value tests the first overflow check in leadingFraction. + // {"0.830103483285477580700h", 49*Minute + 48*Second + 372539827*Nanosecond}, + // } + // ``` + ]; + + for (input, expected) in cases { + let parsed = dbg!(input).parse::().unwrap(); + assert_eq!(&dbg!(parsed), expected); + } + } +} diff --git a/policy-controller/k8s/api/src/lib.rs b/policy-controller/k8s/api/src/lib.rs index 7cb280663..67d72ee0f 100644 --- a/policy-controller/k8s/api/src/lib.rs +++ b/policy-controller/k8s/api/src/lib.rs @@ -1,6 +1,7 @@ #![deny(warnings, rust_2018_idioms)] #![forbid(unsafe_code)] +pub mod duration; pub mod labels; pub mod policy; diff --git a/policy-controller/k8s/api/src/policy/httproute.rs b/policy-controller/k8s/api/src/policy/httproute.rs index 8099684bd..7d0589e53 100644 --- a/policy-controller/k8s/api/src/policy/httproute.rs +++ b/policy-controller/k8s/api/src/policy/httproute.rs @@ -4,7 +4,6 @@ pub use k8s_gateway_api::{ HttpRequestHeaderFilter, HttpRequestRedirectFilter, HttpRouteMatch, LocalObjectReference, ParentReference, RouteStatus, }; -use std::time::Duration; /// HTTPRoute provides a way to route HTTP requests. This includes the /// capability to match requests by hostname, path, header, or query param. @@ -207,6 +206,7 @@ pub struct HttpRouteStatus { #[derive( Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, )] +#[serde(rename_all = "camelCase")] pub struct HttpRouteTimeouts { /// Request specifies a timeout for the Gateway to send a response to a client HTTP request. /// Whether the gateway starts the timeout before or after the entire client request stream @@ -219,8 +219,7 @@ pub struct HttpRouteTimeouts { /// Request timeouts are disabled by default. /// /// Support: Core - pub request: Option, - + pub request: Option, /// BackendRequest specifies a timeout for an individual request from the gateway /// to a backend service. Typically used in conjuction with retry configuration, /// if supported by an implementation. @@ -228,7 +227,7 @@ pub struct HttpRouteTimeouts { /// The value of BackendRequest defaults to and must be <= the value of Request timeout. /// /// Support: Extended - pub backend_request: Option, + pub backend_request: Option, } pub fn parent_ref_targets_kind(parent_ref: &ParentReference) -> bool diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 9dcf5a0c6..c5947d85d 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -334,7 +334,38 @@ impl Namespace { .flatten() .filter_map(|b| convert_backend(&self.namespace, b, cluster, service_info)) .collect(); - Ok(HttpRouteRule { matches, backends }) + + let request_timeout = rule.timeouts.as_ref().and_then(|timeouts| { + let timeout = time::Duration::from(timeouts.request?); + + // zero means "no timeout", per GEP-1742 + if timeout == time::Duration::from_nanos(0) { + return None; + } + + Some(timeout) + }); + + let backend_request_timeout = + rule.timeouts + .as_ref() + .and_then(|timeouts: &api::httproute::HttpRouteTimeouts| { + let timeout = time::Duration::from(timeouts.backend_request?); + + // zero means "no timeout", per GEP-1742 + if timeout == time::Duration::from_nanos(0) { + return None; + } + + Some(timeout) + }); + + Ok(HttpRouteRule { + matches, + backends, + request_timeout, + backend_request_timeout, + }) } } diff --git a/policy-controller/src/admission.rs b/policy-controller/src/admission.rs index 14fd1ad2b..eade0aa5c 100644 --- a/policy-controller/src/admission.rs +++ b/policy-controller/src/admission.rs @@ -7,7 +7,7 @@ use crate::k8s::{ ServerAuthorizationSpec, ServerSpec, }, }; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, ensure, Result}; use futures::future; use hyper::{body::Buf, http, Body, Request, Response}; use k8s_openapi::api::core::v1::{Namespace, ServiceAccount}; @@ -463,12 +463,38 @@ impl Validate for Admission { } } + fn validate_timeouts(timeouts: httproute::HttpRouteTimeouts) -> Result<()> { + use std::time::Duration; + + if let Some(t) = timeouts.backend_request { + ensure!( + !t.is_negative(), + "backendRequest timeout must not be negative" + ); + } + + if let Some(t) = timeouts.request { + ensure!(!t.is_negative(), "request timeout must not be negative"); + } + + if let (Some(req), Some(backend_req)) = (timeouts.request, timeouts.backend_request) { + ensure!( + Duration::from(req) >= Duration::from(backend_req), + "backendRequest timeout ({backend_req}) must not be greater than request timeout ({req})" + ); + } + Ok(()) + } + // Validate the rules in this spec. // This is essentially equivalent to the indexer's conversion function // from `HttpRouteSpec` to `InboundRouteBinding`, except that we don't // actually allocate stuff in order to return an `InboundRouteBinding`. for httproute::HttpRouteRule { - filters, matches, .. + filters, + matches, + timeouts, + .. } in spec.rules.into_iter().flatten() { for m in matches.into_iter().flatten() { @@ -478,6 +504,10 @@ impl Validate for Admission { for f in filters.into_iter().flatten() { validate_filter(f)?; } + + if let Some(timeouts) = timeouts { + validate_timeouts(timeouts)?; + } } Ok(()) diff --git a/policy-test/Cargo.toml b/policy-test/Cargo.toml index 85d86db26..28ab0b3d4 100644 --- a/policy-test/Cargo.toml +++ b/policy-test/Cargo.toml @@ -30,7 +30,7 @@ default-features = false features = ["client", "openssl-tls", "runtime", "ws"] [dependencies.linkerd2-proxy-api] -version = "0.9" +version = "0.10" features = [ "inbound", "outbound",