diff --git a/Cargo.lock b/Cargo.lock index 0502087dc..3b546f9f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,9 +1321,9 @@ dependencies = [ [[package]] name = "linkerd2-proxy-api" -version = "0.13.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65678e4c506a7e5fdf1a664c629a9b658afa70e254dffcd24df72e937b2c0159" +checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61" dependencies = [ "http", "ipnet", diff --git a/policy-controller/core/src/outbound.rs b/policy-controller/core/src/outbound.rs index a895fb961..c429b0e21 100644 --- a/policy-controller/core/src/outbound.rs +++ b/policy-controller/core/src/outbound.rs @@ -20,7 +20,9 @@ pub trait DiscoverOutboundPolicy { pub type OutboundPolicyStream = Pin + Send + Sync + 'static>>; -pub type RouteSet = HashMap>; +pub type HttpRoute = OutboundRoute; +pub type GrpcRoute = OutboundRoute; +pub type RouteSet = HashMap; pub struct OutboundDiscoverTarget { pub service_name: String, @@ -31,20 +33,23 @@ pub struct OutboundDiscoverTarget { #[derive(Clone, Debug, PartialEq)] pub struct OutboundPolicy { - pub http_routes: RouteSet, - pub grpc_routes: RouteSet, + pub http_routes: RouteSet, + pub grpc_routes: RouteSet, pub authority: String, pub name: String, pub namespace: String, pub port: NonZeroU16, pub opaque: bool, pub accrual: Option, + pub http_retry: Option>, + pub grpc_retry: Option>, + pub timeouts: RouteTimeouts, } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct OutboundRoute { +pub struct OutboundRoute { pub hostnames: Vec, - pub rules: Vec>, + pub rules: Vec>, /// This is required for ordering returned routes /// by their creation timestamp. @@ -52,11 +57,11 @@ pub struct OutboundRoute { } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct OutboundRouteRule { +pub struct OutboundRouteRule { pub matches: Vec, pub backends: Vec, - pub request_timeout: Option, - pub backend_request_timeout: Option, + pub retry: Option>, + pub timeouts: RouteTimeouts, pub filters: Vec, } @@ -104,3 +109,32 @@ pub enum Filter { RequestRedirect(RequestRedirectFilter), FailureInjector(FailureInjectorFilter), } + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct RouteTimeouts { + pub response: Option, + pub request: Option, + pub idle: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RouteRetry { + pub limit: u16, + pub timeout: Option, + pub conditions: Option>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct HttpRetryCondition { + pub status_min: u32, + pub status_max: u32, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum GrpcRetryCondition { + Cancelled, + DeadlineExceeded, + ResourceExhausted, + Internal, + Unavailable, +} diff --git a/policy-controller/grpc/Cargo.toml b/policy-controller/grpc/Cargo.toml index 3d58542a4..7c0828f8a 100644 --- a/policy-controller/grpc/Cargo.toml +++ b/policy-controller/grpc/Cargo.toml @@ -22,5 +22,5 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" [dependencies.linkerd2-proxy-api] -version = "0.13" +version = "0.14" features = ["inbound", "outbound"] diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index 7d01cb001..7a8d69fb1 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -28,6 +28,7 @@ pub struct OutboundPolicyServer { index: T, // Used to parse named addresses into ..svc.. cluster_domain: Arc, + allow_l5d_request_headers: bool, drain: drain::Watch, } @@ -35,10 +36,16 @@ impl OutboundPolicyServer where T: DiscoverOutboundPolicy + Send + Sync + 'static, { - pub fn new(discover: T, cluster_domain: impl Into>, drain: drain::Watch) -> Self { + pub fn new( + discover: T, + cluster_domain: impl Into>, + allow_l5d_request_headers: bool, + drain: drain::Watch, + ) -> Self { Self { index: discover, cluster_domain: cluster_domain.into(), + allow_l5d_request_headers, drain, } } @@ -149,7 +156,10 @@ where })?; if let Some(policy) = policy { - Ok(tonic::Response::new(to_service(policy))) + Ok(tonic::Response::new(to_service( + policy, + self.allow_l5d_request_headers, + ))) } else { Err(tonic::Status::not_found("No such policy")) } @@ -170,7 +180,11 @@ where .await .map_err(|e| tonic::Status::internal(format!("lookup failed: {e}")))? .ok_or_else(|| tonic::Status::not_found("unknown server"))?; - Ok(tonic::Response::new(response_stream(drain, rx))) + Ok(tonic::Response::new(response_stream( + drain, + rx, + self.allow_l5d_request_headers, + ))) } } @@ -178,7 +192,11 @@ type BoxWatchStream = std::pin::Pin< Box> + Send + Sync>, >; -fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatchStream { +fn response_stream( + drain: drain::Watch, + mut rx: OutboundPolicyStream, + allow_l5d_request_headers: bool, +) -> BoxWatchStream { Box::pin(async_stream::try_stream! { tokio::pin! { let shutdown = drain.signaled(); @@ -189,7 +207,7 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc // When the port is updated with a new server, update the server watch. res = rx.next() => match res { Some(policy) => { - yield to_service(policy); + yield to_service(policy, allow_l5d_request_headers); } None => return, }, @@ -204,7 +222,10 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc }) } -fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { +fn to_service( + outbound: OutboundPolicy, + allow_l5d_request_headers: bool, +) -> outbound::OutboundPolicy { let backend: outbound::Backend = default_backend(&outbound); let kind = if outbound.opaque { @@ -235,10 +256,24 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { if !grpc_routes.is_empty() { grpc_routes.sort_by(timestamp_then_name); - grpc::protocol(backend, grpc_routes.into_iter(), accrual) + grpc::protocol( + backend, + grpc_routes.into_iter(), + accrual, + outbound.grpc_retry, + outbound.timeouts, + allow_l5d_request_headers, + ) } else { http_routes.sort_by(timestamp_then_name); - http::protocol(backend, http_routes.into_iter(), accrual) + http::protocol( + backend, + http_routes.into_iter(), + accrual, + outbound.http_retry, + outbound.timeouts, + allow_l5d_request_headers, + ) } }; @@ -259,9 +294,9 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { } } -fn timestamp_then_name( - (left_id, left_route): &(GroupKindNamespaceName, OutboundRoute), - (right_id, right_route): &(GroupKindNamespaceName, OutboundRoute), +fn timestamp_then_name( + (left_id, left_route): &(GroupKindNamespaceName, OutboundRoute), + (right_id, right_route): &(GroupKindNamespaceName, OutboundRoute), ) -> std::cmp::Ordering { let by_ts = match ( &left_route.creation_timestamp, diff --git a/policy-controller/grpc/src/outbound/grpc.rs b/policy-controller/grpc/src/outbound/grpc.rs index 0924bc0dc..472eb714d 100644 --- a/policy-controller/grpc/src/outbound/grpc.rs +++ b/policy-controller/grpc/src/outbound/grpc.rs @@ -1,22 +1,36 @@ -use std::net::SocketAddr; - use super::{convert_duration, default_balancer_config, default_queue_config}; use crate::routes::{ convert_host_match, convert_request_header_modifier_filter, grpc::convert_match, }; use linkerd2_proxy_api::{destination, grpc_route, http_route, meta, outbound}; use linkerd_policy_controller_core::{ - outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule}, - routes::{FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch}, + outbound::{ + Backend, Filter, GrpcRetryCondition, GrpcRoute, OutboundRoute, OutboundRouteRule, + RouteRetry, RouteTimeouts, + }, + routes::{FailureInjectorFilter, GroupKindNamespaceName}, }; +use std::{net::SocketAddr, time}; pub(crate) fn protocol( default_backend: outbound::Backend, - routes: impl Iterator)>, + routes: impl Iterator, failure_accrual: Option, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, ) -> outbound::proxy_protocol::Kind { let routes = routes - .map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone())) + .map(|(gknn, route)| { + convert_outbound_route( + gknn, + route, + default_backend.clone(), + service_retry.clone(), + service_timeouts.clone(), + allow_l5d_request_headers, + ) + }) .collect::>(); outbound::proxy_protocol::Kind::Grpc(outbound::proxy_protocol::Grpc { routes, @@ -30,9 +44,15 @@ fn convert_outbound_route( hostnames, rules, creation_timestamp: _, - }: OutboundRoute, + }: GrpcRoute, backend: outbound::Backend, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, ) -> outbound::GrpcRoute { + // This encoder sets deprecated timeouts for older proxies. + #![allow(deprecated)] + let metadata = Some(meta::Metadata { kind: Some(meta::metadata::Kind::Resource(meta::Resource { group: gknn.group.to_string(), @@ -51,15 +71,13 @@ fn convert_outbound_route( |OutboundRouteRule { matches, backends, - request_timeout, - backend_request_timeout, + mut retry, + mut timeouts, filters, }| { - let backend_request_timeout = backend_request_timeout - .and_then(|d| convert_duration("backend request_timeout", d)); let backends = backends .into_iter() - .map(|backend| convert_backend(backend_request_timeout.clone(), backend)) + .map(convert_backend) .collect::>(); let dist = if backends.is_empty() { outbound::grpc_route::distribution::Kind::FirstAvailable( @@ -67,7 +85,7 @@ fn convert_outbound_route( backends: vec![outbound::grpc_route::RouteBackend { backend: Some(backend.clone()), filters: vec![], - request_timeout: backend_request_timeout, + ..Default::default() }], }, ) @@ -76,12 +94,58 @@ fn convert_outbound_route( outbound::grpc_route::distribution::RandomAvailable { backends }, ) }; + if timeouts == Default::default() { + timeouts = service_timeouts.clone(); + } + if retry.is_none() { + retry = service_retry.clone(); + } outbound::grpc_route::Rule { matches: matches.into_iter().map(convert_match).collect(), backends: Some(outbound::grpc_route::Distribution { kind: Some(dist) }), filters: filters.into_iter().map(convert_to_filter).collect(), - request_timeout: request_timeout + request_timeout: timeouts + .request .and_then(|d| convert_duration("request timeout", d)), + timeouts: Some(http_route::Timeouts { + request: timeouts + .request + .and_then(|d| convert_duration("stream timeout", d)), + idle: timeouts + .idle + .and_then(|d| convert_duration("idle timeout", d)), + response: timeouts + .response + .and_then(|d| convert_duration("response timeout", d)), + }), + retry: retry.map(|r| outbound::grpc_route::Retry { + max_retries: r.limit.into(), + max_request_bytes: 64 * 1024, + backoff: Some(outbound::ExponentialBackoff { + min_backoff: Some(time::Duration::from_millis(25).try_into().unwrap()), + max_backoff: Some(time::Duration::from_millis(250).try_into().unwrap()), + jitter_ratio: 1.0, + }), + conditions: Some(r.conditions.iter().flatten().fold( + outbound::grpc_route::retry::Conditions::default(), + |mut cond, c| { + match c { + GrpcRetryCondition::Cancelled => cond.cancelled = true, + GrpcRetryCondition::DeadlineExceeded => { + cond.deadine_exceeded = true + } + GrpcRetryCondition::Internal => cond.internal = true, + GrpcRetryCondition::ResourceExhausted => { + cond.resource_exhausted = true + } + GrpcRetryCondition::Unavailable => cond.unavailable = true, + }; + cond + }, + )), + timeout: r.timeout.and_then(|d| convert_duration("retry timeout", d)), + }), + allow_l5d_request_headers, } }, ) @@ -94,10 +158,7 @@ fn convert_outbound_route( } } -fn convert_backend( - request_timeout: Option, - backend: Backend, -) -> outbound::grpc_route::WeightedRouteBackend { +fn convert_backend(backend: Backend) -> outbound::grpc_route::WeightedRouteBackend { match backend { Backend::Addr(addr) => { let socket_addr = SocketAddr::new(addr.addr, addr.port.get()); @@ -116,7 +177,7 @@ fn convert_backend( )), }), filters: Default::default(), - request_timeout, + ..Default::default() }), } } @@ -152,7 +213,7 @@ fn convert_backend( )), }), filters, - request_timeout, + ..Default::default() }), } } else { @@ -175,7 +236,7 @@ fn convert_backend( }, )), }], - request_timeout, + ..Default::default() }), } } @@ -199,7 +260,7 @@ fn convert_backend( }, )), }], - request_timeout, + ..Default::default() }), }, } diff --git a/policy-controller/grpc/src/outbound/http.rs b/policy-controller/grpc/src/outbound/http.rs index 65130251e..75410081e 100644 --- a/policy-controller/grpc/src/outbound/http.rs +++ b/policy-controller/grpc/src/outbound/http.rs @@ -1,5 +1,3 @@ -use std::{net::SocketAddr, time}; - use super::{ convert_duration, default_balancer_config, default_outbound_opaq_route, default_queue_config, }; @@ -10,21 +8,41 @@ use crate::routes::{ }; use linkerd2_proxy_api::{destination, http_route, meta, outbound}; use linkerd_policy_controller_core::{ - outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule}, - routes::{GroupKindNamespaceName, HttpRouteMatch}, + outbound::{ + Backend, Filter, HttpRetryCondition, HttpRoute, OutboundRouteRule, RouteRetry, + RouteTimeouts, + }, + routes::GroupKindNamespaceName, }; +use std::{net::SocketAddr, time}; pub(crate) fn protocol( default_backend: outbound::Backend, - routes: impl Iterator)>, + routes: impl Iterator, accrual: Option, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, ) -> outbound::proxy_protocol::Kind { let opaque_route = default_outbound_opaq_route(default_backend.clone()); let mut routes = routes - .map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone())) + .map(|(gknn, route)| { + convert_outbound_route( + gknn, + route, + default_backend.clone(), + service_retry.clone(), + service_timeouts.clone(), + allow_l5d_request_headers, + ) + }) .collect::>(); if routes.is_empty() { - routes.push(default_outbound_route(default_backend)); + routes.push(default_outbound_route( + default_backend, + service_retry.clone(), + service_timeouts.clone(), + )); } outbound::proxy_protocol::Kind::Detect(outbound::proxy_protocol::Detect { timeout: Some( @@ -49,13 +67,19 @@ pub(crate) fn protocol( fn convert_outbound_route( gknn: GroupKindNamespaceName, - OutboundRoute { + HttpRoute { hostnames, rules, creation_timestamp: _, - }: OutboundRoute, + }: HttpRoute, backend: outbound::Backend, + service_retry: Option>, + service_timeouts: RouteTimeouts, + allow_l5d_request_headers: bool, ) -> outbound::HttpRoute { + // This encoder sets deprecated timeouts for older proxies. + #![allow(deprecated)] + let metadata = Some(meta::Metadata { kind: Some(meta::metadata::Kind::Resource(meta::Resource { group: gknn.group.to_string(), @@ -74,15 +98,13 @@ fn convert_outbound_route( |OutboundRouteRule { matches, backends, - request_timeout, - backend_request_timeout, + mut retry, + mut timeouts, filters, }| { - let backend_request_timeout = backend_request_timeout - .and_then(|d| convert_duration("backend request_timeout", d)); let backends = backends .into_iter() - .map(|backend| convert_backend(backend_request_timeout.clone(), backend)) + .map(convert_backend) .collect::>(); let dist = if backends.is_empty() { outbound::http_route::distribution::Kind::FirstAvailable( @@ -90,7 +112,7 @@ fn convert_outbound_route( backends: vec![outbound::http_route::RouteBackend { backend: Some(backend.clone()), filters: vec![], - request_timeout: backend_request_timeout, + ..Default::default() }], }, ) @@ -99,12 +121,22 @@ fn convert_outbound_route( outbound::http_route::distribution::RandomAvailable { backends }, ) }; + if timeouts == Default::default() { + timeouts = service_timeouts.clone(); + } + if retry.is_none() { + retry = service_retry.clone(); + } outbound::http_route::Rule { matches: matches.into_iter().map(convert_match).collect(), backends: Some(outbound::http_route::Distribution { kind: Some(dist) }), filters: filters.into_iter().map(convert_to_filter).collect(), - request_timeout: request_timeout + request_timeout: timeouts + .request .and_then(|d| convert_duration("request timeout", d)), + timeouts: Some(convert_timeouts(timeouts)), + retry: retry.map(convert_retry), + allow_l5d_request_headers, } }, ) @@ -117,10 +149,7 @@ fn convert_outbound_route( } } -fn convert_backend( - request_timeout: Option, - backend: Backend, -) -> outbound::http_route::WeightedRouteBackend { +fn convert_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend { match backend { Backend::Addr(addr) => { let socket_addr = SocketAddr::new(addr.addr, addr.port.get()); @@ -139,7 +168,7 @@ fn convert_backend( )), }), filters: Default::default(), - request_timeout, + ..Default::default() }), } } @@ -175,7 +204,7 @@ fn convert_backend( )), }), filters, - request_timeout, + ..Default::default() }), } } else { @@ -198,7 +227,7 @@ fn convert_backend( }, )), }], - request_timeout, + ..Default::default() }), } } @@ -222,13 +251,19 @@ fn convert_backend( }, )), }], - request_timeout, + ..Default::default() }), }, } } -pub(crate) fn default_outbound_route(backend: outbound::Backend) -> outbound::HttpRoute { +pub(crate) fn default_outbound_route( + backend: outbound::Backend, + service_retry: Option>, + service_timeouts: RouteTimeouts, +) -> outbound::HttpRoute { + // This encoder sets deprecated timeouts for older proxies. + #![allow(deprecated)] let metadata = Some(meta::Metadata { kind: Some(meta::metadata::Kind::Default("http".to_string())), }); @@ -245,13 +280,17 @@ pub(crate) fn default_outbound_route(backend: outbound::Backend) -> outbound::Ht backends: vec![outbound::http_route::RouteBackend { backend: Some(backend), filters: vec![], - request_timeout: None, + ..Default::default() }], }, )), }), - filters: Default::default(), - request_timeout: None, + retry: service_retry.map(convert_retry), + request_timeout: service_timeouts + .request + .and_then(|d| convert_duration("request timeout", d)), + timeouts: Some(convert_timeouts(service_timeouts)), + ..Default::default() }]; outbound::HttpRoute { metadata, @@ -276,3 +315,41 @@ fn convert_to_filter(filter: Filter) -> outbound::http_route::Filter { }), } } + +fn convert_retry(r: RouteRetry) -> outbound::http_route::Retry { + outbound::http_route::Retry { + max_retries: r.limit.into(), + max_request_bytes: 64 * 1024, + backoff: Some(outbound::ExponentialBackoff { + min_backoff: Some(time::Duration::from_millis(25).try_into().unwrap()), + max_backoff: Some(time::Duration::from_millis(250).try_into().unwrap()), + jitter_ratio: 1.0, + }), + conditions: Some(r.conditions.iter().flatten().fold( + outbound::http_route::retry::Conditions::default(), + |mut cond, c| { + cond.status_ranges + .push(outbound::http_route::retry::conditions::StatusRange { + start: c.status_min, + end: c.status_max, + }); + cond + }, + )), + timeout: r.timeout.and_then(|d| convert_duration("retry timeout", d)), + } +} + +fn convert_timeouts(timeouts: RouteTimeouts) -> http_route::Timeouts { + http_route::Timeouts { + request: timeouts + .request + .and_then(|d| convert_duration("request timeout", d)), + idle: timeouts + .idle + .and_then(|d| convert_duration("idle timeout", d)), + response: timeouts + .response + .and_then(|d| convert_duration("response timeout", d)), + } +} diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 1e102c4f1..887106a5d 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -6,8 +6,11 @@ use crate::{ use ahash::AHashMap as HashMap; use anyhow::{bail, ensure, Result}; use linkerd_policy_controller_core::{ - outbound::{Backend, Backoff, FailureAccrual, OutboundPolicy, OutboundRoute, RouteSet}, - routes::{GroupKindNamespaceName, GrpcRouteMatch, HttpRouteMatch}, + outbound::{ + Backend, Backoff, FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition, + HttpRoute, OutboundPolicy, RouteRetry, RouteSet, RouteTimeouts, + }, + routes::GroupKindNamespaceName, }; use linkerd_policy_controller_k8s_api::{ gateway::{self as k8s_gateway_api, ParentReference}, @@ -24,8 +27,8 @@ pub struct Index { service_info: HashMap, } -mod grpc; -mod http; +pub mod grpc; +pub mod http; pub mod metrics; pub type SharedIndex = Arc>; @@ -51,8 +54,8 @@ struct Namespace { service_port_routes: HashMap, /// Stores the route resources (by service name) that do not /// explicitly target a port. - service_http_routes: HashMap>, - service_grpc_routes: HashMap>, + service_http_routes: HashMap>, + service_grpc_routes: HashMap>, namespace: Arc, } @@ -60,6 +63,9 @@ struct Namespace { struct ServiceInfo { opaque_ports: PortSet, accrual: Option, + http_retry: Option>, + grpc_retry: Option>, + timeouts: RouteTimeouts, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -77,14 +83,20 @@ struct ServiceRoutes { watches_by_ns: HashMap, opaque: bool, accrual: Option, + http_retry: Option>, + grpc_retry: Option>, + timeouts: RouteTimeouts, } #[derive(Debug)] struct RoutesWatch { opaque: bool, accrual: Option, - http_routes: RouteSet, - grpc_routes: RouteSet, + http_retry: Option>, + grpc_retry: Option>, + timeouts: RouteTimeouts, + http_routes: RouteSet, + grpc_routes: RouteSet, watch: watch::Sender, } @@ -146,6 +158,17 @@ impl kubert::index::IndexNamespacedResource for Index { ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports") .unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone()); + let timeouts = parse_timeouts(service.annotations()) + .map_err(|error| tracing::error!(%error, service=name, namespace=ns, "failed to parse timeouts")) + .unwrap_or_default(); + + let http_retry = http::parse_http_retry(service.annotations()).map_err(|error| { + tracing::error!(%error, service=name, namespace=ns, "failed to parse http retry") + }).unwrap_or_default(); + let grpc_retry = grpc::parse_grpc_retry(service.annotations()).map_err(|error| { + tracing::error!(%error, service=name, namespace=ns, "failed to parse grpc retry") + }).unwrap_or_default(); + if let Some(cluster_ips) = service .spec .as_ref() @@ -173,6 +196,9 @@ impl kubert::index::IndexNamespacedResource for Index { let service_info = ServiceInfo { opaque_ports, accrual, + http_retry, + grpc_retry, + timeouts, }; self.namespaces @@ -495,7 +521,13 @@ impl Namespace { let opaque = service.opaque_ports.contains(&svc_port.port); - svc_routes.update_service(opaque, service.accrual); + svc_routes.update_service( + opaque, + service.accrual, + service.http_retry.clone(), + service.grpc_retry.clone(), + service.timeouts.clone(), + ); } } @@ -538,10 +570,18 @@ impl Namespace { namespace: self.namespace.to_string(), }; - let (opaque, accrual) = match service_info.get(&service_ref) { - Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), - None => (false, None), - }; + let mut opaque = false; + let mut accrual = None; + let mut http_retry = None; + let mut grpc_retry = None; + let mut timeouts = Default::default(); + if let Some(svc) = service_info.get(&service_ref) { + opaque = svc.opaque_ports.contains(&sp.port); + accrual = svc.accrual; + http_retry = svc.http_retry.clone(); + grpc_retry = svc.grpc_retry.clone(); + timeouts = svc.timeouts.clone(); + } // The routes which target this Service but don't specify // a port apply to all ports. Therefore, we include them. @@ -559,6 +599,9 @@ impl Namespace { let mut service_routes = ServiceRoutes { opaque, accrual, + http_retry, + grpc_retry, + timeouts, authority, port: sp.port, name: sp.service, @@ -648,7 +691,7 @@ fn is_service(group: Option<&str>, kind: &str) -> bool { } #[inline] -fn is_parent_service(parent: &ParentReference) -> bool { +pub fn is_parent_service(parent: &ParentReference) -> bool { parent .kind .as_deref() @@ -696,6 +739,9 @@ impl ServiceRoutes { port: self.port, opaque: self.opaque, accrual: self.accrual, + http_retry: self.http_retry.clone(), + grpc_retry: self.grpc_retry.clone(), + timeouts: self.timeouts.clone(), http_routes: http_routes.clone(), grpc_routes: grpc_routes.clone(), name: self.name.to_string(), @@ -709,15 +755,14 @@ impl ServiceRoutes { watch: sender, opaque: self.opaque, accrual: self.accrual, + http_retry: self.http_retry.clone(), + grpc_retry: self.grpc_retry.clone(), + timeouts: self.timeouts.clone(), } }) } - fn apply_http_route( - &mut self, - gknn: GroupKindNamespaceName, - route: OutboundRoute, - ) { + fn apply_http_route(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) { if *gknn.namespace == *self.namespace { // This is a producer namespace route. let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); @@ -739,11 +784,7 @@ impl ServiceRoutes { } } - fn apply_grpc_route( - &mut self, - gknn: GroupKindNamespaceName, - route: OutboundRoute, - ) { + fn apply_grpc_route(&mut self, gknn: GroupKindNamespaceName, route: GrpcRoute) { if *gknn.namespace == *self.namespace { // This is a producer namespace route. let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); @@ -765,12 +806,25 @@ impl ServiceRoutes { } } - fn update_service(&mut self, opaque: bool, accrual: Option) { + fn update_service( + &mut self, + opaque: bool, + accrual: Option, + http_retry: Option>, + grpc_retry: Option>, + timeouts: RouteTimeouts, + ) { self.opaque = opaque; self.accrual = accrual; + self.http_retry = http_retry.clone(); + self.grpc_retry = grpc_retry.clone(); + self.timeouts = timeouts.clone(); for watch in self.watches_by_ns.values_mut() { watch.opaque = opaque; watch.accrual = accrual; + watch.http_retry = http_retry.clone(); + watch.grpc_retry = grpc_retry.clone(); + watch.timeouts = timeouts.clone(); watch.send_if_modified(); } } @@ -813,25 +867,32 @@ impl RoutesWatch { modified = true; } + if self.http_retry != policy.http_retry { + policy.http_retry = self.http_retry.clone(); + modified = true; + } + + if self.grpc_retry != policy.grpc_retry { + policy.grpc_retry = self.grpc_retry.clone(); + modified = true; + } + + if self.timeouts != policy.timeouts { + policy.timeouts = self.timeouts.clone(); + modified = true; + } + modified }); } - fn insert_http_route( - &mut self, - gknn: GroupKindNamespaceName, - route: OutboundRoute, - ) { + fn insert_http_route(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) { self.http_routes.insert(gknn, route); self.send_if_modified(); } - fn insert_grpc_route( - &mut self, - gknn: GroupKindNamespaceName, - route: OutboundRoute, - ) { + fn insert_grpc_route(&mut self, gknn: GroupKindNamespaceName, route: GrpcRoute) { self.grpc_routes.insert(gknn, route); self.send_if_modified(); @@ -848,7 +909,7 @@ impl RoutesWatch { } } -fn parse_accrual_config( +pub fn parse_accrual_config( annotations: &std::collections::BTreeMap, ) -> Result> { annotations @@ -903,6 +964,28 @@ fn parse_accrual_config( .transpose() } +pub fn parse_timeouts( + annotations: &std::collections::BTreeMap, +) -> Result { + let response = annotations + .get("timeout.linkerd.io/response") + .map(|s| parse_duration(s)) + .transpose()?; + let request = annotations + .get("timeout.linkerd.io/request") + .map(|s| parse_duration(s)) + .transpose()?; + let idle = annotations + .get("timeout.linkerd.io/idle") + .map(|s| parse_duration(s)) + .transpose()?; + Ok(RouteTimeouts { + response, + request, + idle, + }) +} + fn parse_duration(s: &str) -> Result { let s = s.trim(); let offset = s diff --git a/policy-controller/k8s/index/src/outbound/index/grpc.rs b/policy-controller/k8s/index/src/outbound/index/grpc.rs index 6962e9f82..f93466868 100644 --- a/policy-controller/k8s/index/src/outbound/index/grpc.rs +++ b/policy-controller/k8s/index/src/outbound/index/grpc.rs @@ -1,18 +1,26 @@ +use std::time; + use super::http::{convert_backend, convert_gateway_filter}; -use super::{ServiceInfo, ServiceRef}; +use super::{parse_duration, parse_timeouts, ServiceInfo, ServiceRef}; use crate::{routes, ClusterInfo}; use ahash::AHashMap as HashMap; -use anyhow::Result; -use linkerd_policy_controller_core::outbound::OutboundRoute; +use anyhow::{bail, Result}; +use kube::ResourceExt; +use linkerd_policy_controller_core::outbound::{ + GrpcRetryCondition, GrpcRoute, OutboundRoute, RouteRetry, RouteTimeouts, +}; use linkerd_policy_controller_core::{outbound::OutboundRouteRule, routes::GrpcRouteMatch}; use linkerd_policy_controller_k8s_api::{gateway, Time}; -pub(crate) fn convert_route( +pub(super) fn convert_route( ns: &str, route: gateway::GrpcRoute, cluster: &ClusterInfo, service_info: &HashMap, -) -> Result> { +) -> Result { + let timeouts = parse_timeouts(route.annotations())?; + let retry = parse_grpc_retry(route.annotations())?; + let hostnames = route .spec .hostnames @@ -26,7 +34,16 @@ pub(crate) fn convert_route( .rules .into_iter() .flatten() - .map(|rule| convert_rule(ns, rule, cluster, service_info)) + .map(|rule| { + convert_rule( + ns, + rule, + cluster, + service_info, + timeouts.clone(), + retry.clone(), + ) + }) .collect::>()?; let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); @@ -43,7 +60,9 @@ fn convert_rule( rule: gateway::GrpcRouteRule, cluster: &ClusterInfo, service_info: &HashMap, -) -> Result> { + timeouts: RouteTimeouts, + retry: Option>, +) -> Result> { let matches = rule .matches .into_iter() @@ -68,8 +87,60 @@ fn convert_rule( Ok(OutboundRouteRule { matches, backends, - request_timeout: None, - backend_request_timeout: None, + timeouts, + retry, filters, }) } + +pub fn parse_grpc_retry( + annotations: &std::collections::BTreeMap, +) -> Result>> { + let limit = annotations + .get("retry.linkerd.io/limit") + .map(|s| s.parse::()) + .transpose()? + .filter(|v| *v != 0); + + let timeout = annotations + .get("retry.linkerd.io/timeout") + .map(|v| parse_duration(v)) + .transpose()? + .filter(|v| *v != time::Duration::ZERO); + + let conditions = annotations + .get("retry.linkerd.io/grpc") + .map(|v| { + v.split(',') + .map(|cond| { + if cond.eq_ignore_ascii_case("cancelled") { + return Ok(GrpcRetryCondition::Cancelled); + } + if cond.eq_ignore_ascii_case("deadline-exceeded") { + return Ok(GrpcRetryCondition::DeadlineExceeded); + } + if cond.eq_ignore_ascii_case("internal") { + return Ok(GrpcRetryCondition::Internal); + } + if cond.eq_ignore_ascii_case("resource-exhausted") { + return Ok(GrpcRetryCondition::ResourceExhausted); + } + if cond.eq_ignore_ascii_case("unavailable") { + return Ok(GrpcRetryCondition::Unavailable); + } + bail!("Unknown grpc retry condition: {cond}"); + }) + .collect::>>() + }) + .transpose()?; + + if limit.is_none() && timeout.is_none() && conditions.is_none() { + return Ok(None); + } + + Ok(Some(RouteRetry { + limit: limit.unwrap_or(1), + timeout, + conditions, + })) +} diff --git a/policy-controller/k8s/index/src/outbound/index/http.rs b/policy-controller/k8s/index/src/outbound/index/http.rs index 3a4242c91..8351d1136 100644 --- a/policy-controller/k8s/index/src/outbound/index/http.rs +++ b/policy-controller/k8s/index/src/outbound/index/http.rs @@ -1,26 +1,33 @@ use std::{num::NonZeroU16, time}; -use super::{is_service, ServiceInfo, ServiceRef}; +use super::{is_service, parse_duration, parse_timeouts, ServiceInfo, ServiceRef}; use crate::{ routes::{self, HttpRouteResource}, ClusterInfo, }; use ahash::AHashMap as HashMap; use anyhow::{bail, Result}; +use kube::ResourceExt; use linkerd_policy_controller_core::{ - outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule, WeightedService}, + outbound::{ + Backend, Filter, HttpRetryCondition, OutboundRoute, OutboundRouteRule, RouteRetry, + RouteTimeouts, WeightedService, + }, routes::HttpRouteMatch, }; use linkerd_policy_controller_k8s_api::{gateway, policy, Time}; -pub(crate) fn convert_route( +pub(super) fn convert_route( ns: &str, route: HttpRouteResource, cluster: &ClusterInfo, service_info: &HashMap, -) -> Result> { +) -> Result> { match route { HttpRouteResource::LinkerdHttp(route) => { + let timeouts = parse_timeouts(route.annotations())?; + let retry = parse_http_retry(route.annotations())?; + let hostnames = route .spec .hostnames @@ -34,7 +41,16 @@ pub(crate) fn convert_route( .rules .into_iter() .flatten() - .map(|r| convert_linkerd_rule(ns, r, cluster, service_info)) + .map(|r| { + convert_linkerd_rule( + ns, + r, + cluster, + service_info, + timeouts.clone(), + retry.clone(), + ) + }) .collect::>()?; let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); @@ -46,6 +62,9 @@ pub(crate) fn convert_route( }) } HttpRouteResource::GatewayHttp(route) => { + let timeouts = parse_timeouts(route.annotations())?; + let retry = parse_http_retry(route.annotations())?; + let hostnames = route .spec .hostnames @@ -59,7 +78,16 @@ pub(crate) fn convert_route( .rules .into_iter() .flatten() - .map(|r| convert_gateway_rule(ns, r, cluster, service_info)) + .map(|r| { + convert_gateway_rule( + ns, + r, + cluster, + service_info, + timeouts.clone(), + retry.clone(), + ) + }) .collect::>()?; let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); @@ -78,7 +106,9 @@ fn convert_linkerd_rule( rule: policy::httproute::HttpRouteRule, cluster: &ClusterInfo, service_info: &HashMap, -) -> Result> { + mut timeouts: RouteTimeouts, + retry: Option>, +) -> Result> { let matches = rule .matches .into_iter() @@ -100,36 +130,24 @@ fn convert_linkerd_rule( .map(convert_linkerd_filter) .collect::>()?; - let request_timeout = rule.timeouts.as_ref().and_then(|timeouts| { - let timeout = time::Duration::from(timeouts.request?); + timeouts.request = timeouts.request.or_else(|| { + rule.timeouts.as_ref().and_then(|timeouts| { + let timeout = time::Duration::from(timeouts.request?); - // zero means "no timeout", per GEP-1742 - if timeout == time::Duration::from_nanos(0) { - return None; - } + // zero means "no timeout", per GEP-1742 + if timeout == time::Duration::ZERO { + return None; + } - Some(timeout) + Some(timeout) + }) }); - let backend_request_timeout = - rule.timeouts - .as_ref() - .and_then(|timeouts: &policy::httproute::HttpRouteTimeouts| { - let timeout = time::Duration::from(timeouts.backend_request?); - - // zero means "no timeout", per GEP-1742 - if timeout == time::Duration::from_nanos(0) { - return None; - } - - Some(timeout) - }); - Ok(OutboundRouteRule { matches, backends, - request_timeout, - backend_request_timeout, + timeouts, + retry, filters, }) } @@ -139,7 +157,9 @@ fn convert_gateway_rule( rule: gateway::HttpRouteRule, cluster: &ClusterInfo, service_info: &HashMap, -) -> Result> { + timeouts: RouteTimeouts, + retry: Option>, +) -> Result> { let matches = rule .matches .into_iter() @@ -164,13 +184,13 @@ fn convert_gateway_rule( Ok(OutboundRouteRule { matches, backends, - request_timeout: None, - backend_request_timeout: None, + timeouts, + retry, filters, }) } -pub(crate) fn convert_backend>( +pub(super) fn convert_backend>( ns: &str, backend: BackendRef, cluster: &ClusterInfo, @@ -308,3 +328,79 @@ fn is_backend_service(backend: &gateway::BackendObjectReference) -> bool { backend.kind.as_deref().unwrap_or("Service"), ) } + +pub fn parse_http_retry( + annotations: &std::collections::BTreeMap, +) -> Result>> { + let limit = annotations + .get("retry.linkerd.io/limit") + .map(|s| s.parse::()) + .transpose()? + .filter(|v| *v != 0); + + let timeout = annotations + .get("retry.linkerd.io/timeout") + .map(|v| parse_duration(v)) + .transpose()? + .filter(|v| *v != time::Duration::ZERO); + + fn to_code(s: &str) -> Option { + let code = s.parse::().ok()?; + if (100..600).contains(&code) { + Some(code) + } else { + None + } + } + + let conditions = annotations + .get("retry.linkerd.io/http") + .map(|v| { + v.split(',') + .map(|cond| { + if cond.eq_ignore_ascii_case("5xx") { + return Ok(HttpRetryCondition { + status_min: 500, + status_max: 599, + }); + } + if cond.eq_ignore_ascii_case("gateway-error") { + return Ok(HttpRetryCondition { + status_min: 502, + status_max: 504, + }); + } + + if let Some(code) = to_code(cond) { + return Ok(HttpRetryCondition { + status_min: code, + status_max: code, + }); + } + if let Some((start, end)) = cond.split_once('-') { + if let (Some(s), Some(e)) = (to_code(start), to_code(end)) { + if s <= e { + return Ok(HttpRetryCondition { + status_min: s, + status_max: e, + }); + } + } + } + + bail!("invalid retry condition: {v}"); + }) + .collect::>>() + }) + .transpose()?; + + if limit.is_none() && timeout.is_none() && conditions.is_none() { + return Ok(None); + } + + Ok(Some(RouteRetry { + limit: limit.unwrap_or(1), + timeout, + conditions, + })) +} diff --git a/policy-controller/src/admission.rs b/policy-controller/src/admission.rs index 237a4cdd5..716696fd5 100644 --- a/policy-controller/src/admission.rs +++ b/policy-controller/src/admission.rs @@ -12,9 +12,9 @@ use k8s_openapi::api::core::v1::{Namespace, ServiceAccount}; use kube::{core::DynamicObject, Resource, ResourceExt}; use linkerd_policy_controller_core as core; use linkerd_policy_controller_k8s_api::gateway::{self as k8s_gateway_api, GrpcRoute}; -use linkerd_policy_controller_k8s_index as index; +use linkerd_policy_controller_k8s_index::{self as index, outbound::index as outbound_index}; use serde::de::DeserializeOwned; -use std::task; +use std::{collections::BTreeMap, task}; use thiserror::Error; use tracing::{debug, info, trace, warn}; @@ -39,7 +39,13 @@ type AdmissionReview = kube::core::admission::AdmissionReview; #[async_trait::async_trait] trait Validate { - async fn validate(self, ns: &str, name: &str, spec: T) -> Result<()>; + async fn validate( + self, + ns: &str, + name: &str, + annotations: &BTreeMap, + spec: T, + ) -> Result<()>; } // === impl AdmissionService === @@ -144,7 +150,7 @@ impl Admission { let rsp = AdmissionResponse::from(&req); let kind = req.kind.kind.clone(); - let (ns, name, spec) = match parse_spec::(req) { + let (obj, spec) = match parse_spec::(req) { Ok(spec) => spec, Err(error) => { info!(%error, "Failed to parse {} spec", kind); @@ -152,7 +158,11 @@ impl Admission { } }; - if let Err(error) = self.validate(&ns, &name, spec).await { + let ns = obj.namespace().unwrap_or_default(); + let name = obj.name_any(); + let annotations = obj.annotations(); + + if let Err(error) = self.validate(&ns, &name, annotations, spec).await { info!(%error, %ns, %name, %kind, "Denied"); return rsp.deny(error); } @@ -180,16 +190,11 @@ fn json_response(rsp: AdmissionReview) -> Result, Error> { .expect("admission review response must be valid")) } -fn parse_spec(req: AdmissionRequest) -> Result<(String, String, T)> { +fn parse_spec(req: AdmissionRequest) -> Result<(DynamicObject, T)> { let obj = req .object .ok_or_else(|| anyhow!("admission request missing 'object"))?; - let ns = obj - .namespace() - .ok_or_else(|| anyhow!("admission request missing 'namespace'"))?; - let name = obj.name_any(); - let spec = { let data = obj .data @@ -199,7 +204,7 @@ fn parse_spec(req: AdmissionRequest) -> Result<(String, Str serde_json::from_value(data)? }; - Ok((ns, name, spec)) + Ok((obj, spec)) } /// Validates the target of an `AuthorizationPolicy`. @@ -228,7 +233,13 @@ fn validate_policy_target(ns: &str, tgt: &LocalTargetRef) -> Result<()> { #[async_trait::async_trait] impl Validate for Admission { - async fn validate(self, ns: &str, _name: &str, spec: AuthorizationPolicySpec) -> Result<()> { + async fn validate( + self, + ns: &str, + _name: &str, + _annotations: &BTreeMap, + spec: AuthorizationPolicySpec, + ) -> Result<()> { validate_policy_target(ns, &spec.target_ref)?; let mtls_authns_count = spec @@ -302,7 +313,13 @@ fn validate_identity_ref(id: &NamespacedTargetRef) -> Result<()> { #[async_trait::async_trait] impl Validate for Admission { - async fn validate(self, _ns: &str, _name: &str, spec: MeshTLSAuthenticationSpec) -> Result<()> { + async fn validate( + self, + _ns: &str, + _name: &str, + _annotations: &BTreeMap, + spec: MeshTLSAuthenticationSpec, + ) -> Result<()> { for id in spec.identities.iter().flatten() { if let Err(err) = validation::validate_identity(id) { bail!("id {} is invalid: {}", id, err); @@ -323,7 +340,13 @@ impl Validate for Admission { // // TODO(ver) this isn't rigorous about detecting servers that select the same port if one port // specifies a numeric port and the other specifies the port's name. - async fn validate(self, ns: &str, name: &str, spec: ServerSpec) -> Result<()> { + async fn validate( + self, + ns: &str, + name: &str, + _annotations: &BTreeMap, + spec: ServerSpec, + ) -> Result<()> { // Since we can't ensure that the local index is up-to-date with the API server (i.e. // updates may be delayed), we issue an API request to get the latest state of servers in // the namespace. @@ -378,7 +401,13 @@ impl Admission { #[async_trait::async_trait] impl Validate for Admission { - async fn validate(self, _ns: &str, _name: &str, spec: NetworkAuthenticationSpec) -> Result<()> { + async fn validate( + self, + _ns: &str, + _name: &str, + _annotations: &BTreeMap, + spec: NetworkAuthenticationSpec, + ) -> Result<()> { if spec.networks.is_empty() { bail!("at least one network must be specified"); } @@ -407,7 +436,13 @@ impl Validate for Admission { #[async_trait::async_trait] impl Validate for Admission { - async fn validate(self, _ns: &str, _name: &str, spec: ServerAuthorizationSpec) -> Result<()> { + async fn validate( + self, + _ns: &str, + _name: &str, + _annotations: &BTreeMap, + spec: ServerAuthorizationSpec, + ) -> Result<()> { if let Some(mtls) = spec.client.mesh_tls.as_ref() { if spec.client.unauthenticated { bail!("`unauthenticated` must be false if `mesh_tls` is specified"); @@ -477,7 +512,25 @@ fn validate_match( #[async_trait::async_trait] impl Validate for Admission { - async fn validate(self, _ns: &str, _name: &str, spec: HttpRouteSpec) -> Result<()> { + async fn validate( + self, + _ns: &str, + _name: &str, + annotations: &BTreeMap, + spec: HttpRouteSpec, + ) -> Result<()> { + if spec + .inner + .parent_refs + .iter() + .flatten() + .any(index::outbound::index::is_parent_service) + { + index::outbound::index::http::parse_http_retry(annotations)?; + index::outbound::index::parse_accrual_config(annotations)?; + index::outbound::index::parse_timeouts(annotations)?; + } + fn validate_filter(filter: httproute::HttpRouteFilter) -> Result<()> { match filter { httproute::HttpRouteFilter::RequestHeaderModifier { @@ -549,8 +602,21 @@ impl Validate for Admission { self, _ns: &str, _name: &str, + annotations: &BTreeMap, spec: k8s_gateway_api::HttpRouteSpec, ) -> Result<()> { + if spec + .inner + .parent_refs + .iter() + .flatten() + .any(outbound_index::is_parent_service) + { + outbound_index::http::parse_http_retry(annotations)?; + outbound_index::parse_accrual_config(annotations)?; + outbound_index::parse_timeouts(annotations)?; + } + fn validate_filter(filter: k8s_gateway_api::HttpRouteFilter) -> Result<()> { match filter { k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { @@ -595,8 +661,21 @@ impl Validate for Admission { self, _ns: &str, _name: &str, + annotations: &BTreeMap, spec: k8s_gateway_api::GrpcRouteSpec, ) -> Result<()> { + if spec + .inner + .parent_refs + .iter() + .flatten() + .any(outbound_index::is_parent_service) + { + outbound_index::grpc::parse_grpc_retry(annotations)?; + outbound_index::parse_accrual_config(annotations)?; + outbound_index::parse_timeouts(annotations)?; + } + fn validate_filter(filter: k8s_gateway_api::GrpcRouteFilter) -> Result<()> { match filter { k8s_gateway_api::GrpcRouteFilter::RequestHeaderModifier { diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index 999a19689..f035787f4 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -97,6 +97,9 @@ struct Args { #[clap(long, default_value = "5000")] patch_timeout_ms: u64, + + #[clap(long)] + allow_l5d_request_headers: bool, } #[tokio::main] @@ -118,6 +121,7 @@ async fn main() -> Result<()> { probe_networks, default_opaque_ports, patch_timeout_ms, + allow_l5d_request_headers, } = Args::parse(); let server = if admission_controller_disabled { @@ -290,6 +294,7 @@ async fn main() -> Result<()> { grpc_addr, cluster_domain, cluster_networks, + allow_l5d_request_headers, inbound_index, outbound_index, runtime.shutdown_handle(), @@ -340,6 +345,7 @@ async fn grpc( addr: SocketAddr, cluster_domain: String, cluster_networks: Vec, + allow_l5d_request_headers: bool, inbound_index: inbound::SharedIndex, outbound_index: outbound::SharedIndex, drain: drain::Watch, @@ -350,9 +356,13 @@ async fn grpc( .svc(); let outbound_discover = OutboundDiscover::new(outbound_index); - let outbound_svc = - grpc::outbound::OutboundPolicyServer::new(outbound_discover, cluster_domain, drain.clone()) - .svc(); + let outbound_svc = grpc::outbound::OutboundPolicyServer::new( + outbound_discover, + cluster_domain, + allow_l5d_request_headers, + drain.clone(), + ) + .svc(); let (close_tx, close_rx) = tokio::sync::oneshot::channel(); tokio::pin! { diff --git a/policy-test/Cargo.toml b/policy-test/Cargo.toml index 2625638cb..7726f8bfc 100644 --- a/policy-test/Cargo.toml +++ b/policy-test/Cargo.toml @@ -31,7 +31,7 @@ default-features = false features = ["client", "openssl-tls", "runtime", "ws"] [dependencies.linkerd2-proxy-api] -version = "0.13" +version = "0.14" features = ["inbound", "outbound"] [dev-dependencies] diff --git a/policy-test/src/lib.rs b/policy-test/src/lib.rs index 526bcfa7a..a4ff3b7d5 100644 --- a/policy-test/src/lib.rs +++ b/policy-test/src/lib.rs @@ -5,6 +5,7 @@ pub mod admission; pub mod bb; pub mod curl; pub mod grpc; +pub mod outbound_api; pub mod web; use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt}; diff --git a/policy-test/src/outbound_api.rs b/policy-test/src/outbound_api.rs new file mode 100644 index 000000000..8f4f8e8e2 --- /dev/null +++ b/policy-test/src/outbound_api.rs @@ -0,0 +1,259 @@ +use crate::{assert_svc_meta, grpc}; +use kube::ResourceExt; +use linkerd_policy_controller_k8s_api as k8s; +use std::time::Duration; +use tokio::time; + +pub async fn retry_watch_outbound_policy( + client: &kube::Client, + ns: &str, + svc: &k8s::Service, + port: u16, +) -> tonic::Streaming { + // Port-forward to the control plane and start watching the service's + // outbound policy. + let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; + loop { + match policy_api.watch(ns, svc, port).await { + Ok(rx) => return rx, + Err(error) => { + tracing::error!( + ?error, + ns, + svc = svc.name_unchecked(), + "failed to watch outbound policy for port 4191" + ); + time::sleep(Duration::from_secs(1)).await; + } + } + } +} + +// detect_http_routes asserts that the given outbound policy has a proxy protcol +// of "Detect" and then invokes the given function with the Http1 and Http2 +// routes from the Detect. +#[track_caller] +pub fn detect_http_routes(config: &grpc::outbound::OutboundPolicy, f: F) +where + F: Fn(&[grpc::outbound::HttpRoute]), +{ + let kind = config + .protocol + .as_ref() + .expect("must have proxy protocol") + .kind + .as_ref() + .expect("must have kind"); + if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { + opaque: _, + timeout: _, + http1, + http2, + }) = kind + { + let http1 = http1 + .as_ref() + .expect("proxy protocol must have http1 field"); + let http2 = http2 + .as_ref() + .expect("proxy protocol must have http2 field"); + f(&http1.routes); + f(&http2.routes); + } else { + panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") + } +} + +#[track_caller] +pub fn grpc_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::GrpcRoute] { + let kind = config + .protocol + .as_ref() + .expect("must have proxy protocol") + .kind + .as_ref() + .expect("must have kind"); + if let grpc::outbound::proxy_protocol::Kind::Grpc(grpc::outbound::proxy_protocol::Grpc { + routes, + failure_accrual: _, + }) = kind + { + routes + } else { + panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}") + } +} + +#[track_caller] +pub fn detect_failure_accrual(config: &grpc::outbound::OutboundPolicy, f: F) +where + F: Fn(Option<&grpc::outbound::FailureAccrual>), +{ + let kind = config + .protocol + .as_ref() + .expect("must have proxy protocol") + .kind + .as_ref() + .expect("must have kind"); + if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { + opaque: _, + timeout: _, + http1, + http2, + }) = kind + { + let http1 = http1 + .as_ref() + .expect("proxy protocol must have http1 field"); + let http2 = http2 + .as_ref() + .expect("proxy protocol must have http2 field"); + f(http1.failure_accrual.as_ref()); + f(http2.failure_accrual.as_ref()); + } else { + panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") + } +} + +#[track_caller] +pub fn failure_accrual_consecutive( + accrual: Option<&grpc::outbound::FailureAccrual>, +) -> &grpc::outbound::failure_accrual::ConsecutiveFailures { + assert!( + accrual.is_some(), + "failure accrual must be configured for service" + ); + let kind = accrual + .unwrap() + .kind + .as_ref() + .expect("failure accrual must have kind"); + let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind; + accrual +} + +#[track_caller] +pub fn route_backends_first_available( + route: &grpc::outbound::HttpRoute, +) -> &[grpc::outbound::http_route::RouteBackend] { + let kind = assert_singleton(&route.rules) + .backends + .as_ref() + .expect("Rule must have backends") + .kind + .as_ref() + .expect("Backend must have kind"); + match kind { + grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends, + _ => panic!("Distribution must be FirstAvailable"), + } +} + +#[track_caller] +pub fn route_backends_random_available( + route: &grpc::outbound::HttpRoute, +) -> &[grpc::outbound::http_route::WeightedRouteBackend] { + let kind = assert_singleton(&route.rules) + .backends + .as_ref() + .expect("Rule must have backends") + .kind + .as_ref() + .expect("Backend must have kind"); + match kind { + grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends, + _ => panic!("Distribution must be RandomAvailable"), + } +} + +#[track_caller] +pub fn route_name(route: &grpc::outbound::HttpRoute) -> &str { + match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() { + grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name, + _ => panic!("route must be a resource kind"), + } +} + +#[track_caller] +pub fn assert_backend_has_failure_filter( + backend: &grpc::outbound::http_route::WeightedRouteBackend, +) { + let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters); + match filter.kind.as_ref().unwrap() { + grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {} + _ => panic!("backend must have FailureInjector filter"), + }; +} + +#[track_caller] +pub fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(_) => {} + grpc::meta::metadata::Kind::Resource(r) => { + panic!("route expected to be default but got resource {r:?}") + } + } + + let backends = route_backends_first_available(route); + let backend = assert_singleton(backends); + assert_backend_matches_service(backend, svc, port); + + let rule = assert_singleton(&route.rules); + let route_match = assert_singleton(&rule.matches); + let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap(); + assert_eq!( + *path_match, + grpc::http_route::path_match::Kind::Prefix("/".to_string()) + ); +} + +#[track_caller] +pub fn assert_backend_matches_service( + backend: &grpc::outbound::http_route::RouteBackend, + svc: &k8s::Service, + port: u16, +) { + let backend = backend.backend.as_ref().unwrap(); + let dst = match backend.kind.as_ref().unwrap() { + grpc::outbound::backend::Kind::Balancer(balance) => { + let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path, + } + } + grpc::outbound::backend::Kind::Forward(_) => { + panic!("default route backend must be Balancer") + } + }; + assert_eq!( + *dst, + format!( + "{}.{}.svc.{}:{}", + svc.name_unchecked(), + svc.namespace().unwrap(), + "cluster.local", + port + ) + ); + + assert_svc_meta(&backend.metadata, svc, port) +} + +#[track_caller] +pub fn assert_singleton(ts: &[T]) -> &T { + assert_eq!(ts.len(), 1); + ts.first().unwrap() +} + +#[track_caller] +pub fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(d) => { + panic!("route expected to not be default, but got default {d:?}") + } + grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), + } +} diff --git a/policy-test/tests/outbound_api_gateway.rs b/policy-test/tests/outbound_api_gateway.rs index 933cf7af8..f70560a31 100644 --- a/policy-test/tests/outbound_api_gateway.rs +++ b/policy-test/tests/outbound_api_gateway.rs @@ -4,11 +4,10 @@ use linkerd_policy_controller_k8s_api as k8s; use linkerd_policy_test::{ assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service, create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc, - mk_service, with_temp_ns, + mk_service, outbound_api::*, with_temp_ns, }; use maplit::{btreemap, convert_args}; use std::{collections::BTreeMap, time::Duration}; -use tokio::time; // These tests are copies of the tests in outbound_api_gateway.rs but using the // policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones. @@ -1153,35 +1152,124 @@ async fn consumer_route() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + ("retry.linkerd.io/http".to_string(), "5xx".to_string()), + ("timeout.linkerd.io/response".to_string(), "10s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + let status_range = assert_singleton(&conditions.status_ranges); + assert_eq!(status_range.start, 500); + assert_eq!(status_range.end, 599); + let timeout = rule + .timeouts + .as_ref() + .expect("timeouts expected") + .response + .as_ref() + .expect("response timeout expected"); + assert_eq!(timeout.seconds, 10); + }); + }) + .await; +} + +#[tokio::test(flavor = "current_thread")] +async fn service_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let mut svc = mk_service(&ns, "my-svc", 4191); + svc.annotations_mut() + .insert("retry.linkerd.io/http".to_string(), "5xx".to_string()); + svc.annotations_mut() + .insert("timeout.linkerd.io/response".to_string(), "10s".to_string()); + let svc = create(&client, svc).await; + + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + // Route annotations override the timeout config specified + // on the service. + ("timeout.linkerd.io/request".to_string(), "5s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + let status_range = assert_singleton(&conditions.status_ranges); + // Retry config inherited from the service. + assert_eq!(status_range.start, 500); + assert_eq!(status_range.end, 599); + let timeouts = rule.timeouts.as_ref().expect("timeouts expected"); + // Service timeout config overridden by route timeout config. + assert_eq!(timeouts.response, None); + let request_timeout = timeouts.request.as_ref().expect("request timeout expected"); + assert_eq!(request_timeout.seconds, 5); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s_gateway_api::HttpRoute); -async fn retry_watch_outbound_policy( - client: &kube::Client, - ns: &str, - svc: &k8s::Service, - port: u16, -) -> tonic::Streaming { - // Port-forward to the control plane and start watching the service's - // outbound policy. - let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; - loop { - match policy_api.watch(ns, svc, port).await { - Ok(rx) => return rx, - Err(error) => { - tracing::error!( - ?error, - ns, - svc = svc.name_unchecked(), - "failed to watch outbound policy for port 4191" - ); - time::sleep(Duration::from_secs(1)).await; - } - } - } -} - fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s_gateway_api as api; @@ -1263,6 +1351,12 @@ impl HttpRouteBuilder { Self(route) } + fn with_annotations(self, annotations: BTreeMap) -> Self { + let mut route = self.0; + route.metadata.annotations = Some(annotations); + Self(route) + } + fn build(self) -> k8s_gateway_api::HttpRoute { self.0 } @@ -1298,210 +1392,3 @@ fn mk_empty_http_route( status: None, } } - -// detect_http_routes asserts that the given outbound policy has a proxy protcol -// of "Detect" and then invokes the given function with the Http1 and Http2 -// routes from the Detect. -#[track_caller] -fn detect_http_routes(config: &grpc::outbound::OutboundPolicy, f: F) -where - F: Fn(&[grpc::outbound::HttpRoute]), -{ - let kind = config - .protocol - .as_ref() - .expect("must have proxy protocol") - .kind - .as_ref() - .expect("must have kind"); - if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { - opaque: _, - timeout: _, - http1, - http2, - }) = kind - { - let http1 = http1 - .as_ref() - .expect("proxy protocol must have http1 field"); - let http2 = http2 - .as_ref() - .expect("proxy protocol must have http2 field"); - f(&http1.routes); - f(&http2.routes); - } else { - panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") - } -} - -#[track_caller] -fn detect_failure_accrual(config: &grpc::outbound::OutboundPolicy, f: F) -where - F: Fn(Option<&grpc::outbound::FailureAccrual>), -{ - let kind = config - .protocol - .as_ref() - .expect("must have proxy protocol") - .kind - .as_ref() - .expect("must have kind"); - if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { - opaque: _, - timeout: _, - http1, - http2, - }) = kind - { - let http1 = http1 - .as_ref() - .expect("proxy protocol must have http1 field"); - let http2 = http2 - .as_ref() - .expect("proxy protocol must have http2 field"); - f(http1.failure_accrual.as_ref()); - f(http2.failure_accrual.as_ref()); - } else { - panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") - } -} - -#[track_caller] -fn failure_accrual_consecutive( - accrual: Option<&grpc::outbound::FailureAccrual>, -) -> &grpc::outbound::failure_accrual::ConsecutiveFailures { - assert!( - accrual.is_some(), - "failure accrual must be configured for service" - ); - let kind = accrual - .unwrap() - .kind - .as_ref() - .expect("failure accrual must have kind"); - let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind; - accrual -} - -#[track_caller] -fn route_backends_first_available( - route: &grpc::outbound::HttpRoute, -) -> &[grpc::outbound::http_route::RouteBackend] { - let kind = assert_singleton(&route.rules) - .backends - .as_ref() - .expect("Rule must have backends") - .kind - .as_ref() - .expect("Backend must have kind"); - match kind { - grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends, - _ => panic!("Distribution must be FirstAvailable"), - } -} - -#[track_caller] -fn route_backends_random_available( - route: &grpc::outbound::HttpRoute, -) -> &[grpc::outbound::http_route::WeightedRouteBackend] { - let kind = assert_singleton(&route.rules) - .backends - .as_ref() - .expect("Rule must have backends") - .kind - .as_ref() - .expect("Backend must have kind"); - match kind { - grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends, - _ => panic!("Distribution must be RandomAvailable"), - } -} - -#[track_caller] -fn route_name(route: &grpc::outbound::HttpRoute) -> &str { - match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() { - grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name, - _ => panic!("route must be a resource kind"), - } -} - -#[track_caller] -fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) { - let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters); - match filter.kind.as_ref().unwrap() { - grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {} - _ => panic!("backend must have FailureInjector filter"), - }; -} - -#[track_caller] -fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { - let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::meta::metadata::Kind::Default(_) => {} - grpc::meta::metadata::Kind::Resource(r) => { - panic!("route expected to be default but got resource {r:?}") - } - } - - let backends = route_backends_first_available(route); - let backend = assert_singleton(backends); - assert_backend_matches_service(backend, svc, port); - - let rule = assert_singleton(&route.rules); - let route_match = assert_singleton(&rule.matches); - let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap(); - assert_eq!( - *path_match, - grpc::http_route::path_match::Kind::Prefix("/".to_string()) - ); -} - -#[track_caller] -fn assert_backend_matches_service( - backend: &grpc::outbound::http_route::RouteBackend, - svc: &k8s::Service, - port: u16, -) { - let backend = backend.backend.as_ref().unwrap(); - let dst = match backend.kind.as_ref().unwrap() { - grpc::outbound::backend::Kind::Balancer(balance) => { - let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path, - } - } - grpc::outbound::backend::Kind::Forward(_) => { - panic!("default route backend must be Balancer") - } - }; - assert_eq!( - *dst, - format!( - "{}.{}.svc.{}:{}", - svc.name_unchecked(), - svc.namespace().unwrap(), - "cluster.local", - port - ) - ); - - assert_svc_meta(&backend.metadata, svc, port) -} - -#[track_caller] -fn assert_singleton(ts: &[T]) -> &T { - assert_eq!(ts.len(), 1); - ts.first().unwrap() -} - -#[track_caller] -fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { - let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::meta::metadata::Kind::Default(d) => { - panic!("route expected to not be default, but got default {d:?}") - } - grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), - } -} diff --git a/policy-test/tests/outbound_api_grpc.rs b/policy-test/tests/outbound_api_grpc.rs new file mode 100644 index 000000000..abdf7dc86 --- /dev/null +++ b/policy-test/tests/outbound_api_grpc.rs @@ -0,0 +1,164 @@ +use futures::prelude::*; +use kube::ResourceExt; +use linkerd_policy_controller_k8s_api as k8s; +use linkerd_policy_test::{create, create_service, mk_service, outbound_api::*, with_temp_ns}; +use std::collections::BTreeMap; + +#[tokio::test(flavor = "current_thread")] +async fn grpc_route_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let _route = create( + &client, + mk_grpc_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + ("retry.linkerd.io/grpc".to_string(), "internal".to_string()), + ("timeout.linkerd.io/response".to_string(), "10s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + let routes = grpc_routes(&config); + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + assert!(conditions.internal); + let timeout = rule + .timeouts + .as_ref() + .expect("timeouts expected") + .response + .as_ref() + .expect("response timeout expected"); + assert_eq!(timeout.seconds, 10); + }) + .await; +} + +#[tokio::test(flavor = "current_thread")] +async fn service_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let mut svc = mk_service(&ns, "my-svc", 4191); + svc.annotations_mut() + .insert("retry.linkerd.io/grpc".to_string(), "internal".to_string()); + svc.annotations_mut() + .insert("timeout.linkerd.io/response".to_string(), "10s".to_string()); + let svc = create(&client, svc).await; + + let _route = create( + &client, + mk_grpc_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + // Route annotations override the timeout config specified + // on the service. + ("timeout.linkerd.io/request".to_string(), "5s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + let routes = grpc_routes(&config); + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + // Retry config inherited from the service. + assert!(conditions.internal); + let timeouts = rule.timeouts.as_ref().expect("timeouts expected"); + // Service timeout config overridden by route timeout config. + assert_eq!(timeouts.response, None); + let request_timeout = timeouts.request.as_ref().expect("request timeout expected"); + assert_eq!(request_timeout.seconds, 5); + }) + .await; +} + +/* Helpers */ + +struct GrpcRouteBuilder(k8s_gateway_api::GrpcRoute); + +fn mk_grpc_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> GrpcRouteBuilder { + GrpcRouteBuilder(k8s_gateway_api::GrpcRoute { + metadata: kube::api::ObjectMeta { + namespace: Some(ns.to_string()), + name: Some(name.to_string()), + ..Default::default() + }, + spec: k8s_gateway_api::GrpcRouteSpec { + inner: k8s_gateway_api::CommonRouteSpec { + parent_refs: Some(vec![k8s_gateway_api::ParentReference { + group: Some("core".to_string()), + kind: Some("Service".to_string()), + namespace: svc.namespace(), + name: svc.name_unchecked(), + section_name: None, + port, + }]), + }, + hostnames: None, + rules: Some(vec![k8s_gateway_api::GrpcRouteRule { + matches: Some(vec![k8s_gateway_api::GrpcRouteMatch { + method: Some(k8s_gateway_api::GrpcMethodMatch::Exact { + method: Some("foo".to_string()), + service: Some("my-gprc-service".to_string()), + }), + headers: None, + }]), + filters: None, + backend_refs: None, + }]), + }, + status: None, + }) +} + +impl GrpcRouteBuilder { + fn with_annotations(self, annotations: BTreeMap) -> Self { + let mut route = self.0; + route.metadata.annotations = Some(annotations); + Self(route) + } + + fn build(self) -> k8s_gateway_api::GrpcRoute { + self.0 + } +} diff --git a/policy-test/tests/outbound_api_linkerd.rs b/policy-test/tests/outbound_api_linkerd.rs index dae02c241..4278e15af 100644 --- a/policy-test/tests/outbound_api_linkerd.rs +++ b/policy-test/tests/outbound_api_linkerd.rs @@ -6,10 +6,9 @@ use linkerd_policy_controller_k8s_api as k8s; use linkerd_policy_test::{ assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service, create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc, - mk_service, with_temp_ns, + mk_service, outbound_api::*, with_temp_ns, }; use maplit::{btreemap, convert_args}; -use tokio::time; // These tests are copies of the tests in outbound_api_gateway.rs but using the // policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones. @@ -1179,35 +1178,124 @@ async fn consumer_route() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + ("retry.linkerd.io/http".to_string(), "5xx".to_string()), + ("timeout.linkerd.io/response".to_string(), "10s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + let status_range = assert_singleton(&conditions.status_ranges); + assert_eq!(status_range.start, 500); + assert_eq!(status_range.end, 599); + let timeout = rule + .timeouts + .as_ref() + .expect("timeouts expected") + .response + .as_ref() + .expect("response timeout expected"); + assert_eq!(timeout.seconds, 10); + }); + }) + .await; +} + +#[tokio::test(flavor = "current_thread")] +async fn service_retries_and_timeouts() { + with_temp_ns(|client, ns| async move { + // Create a service + let mut svc = mk_service(&ns, "my-svc", 4191); + svc.annotations_mut() + .insert("retry.linkerd.io/http".to_string(), "5xx".to_string()); + svc.annotations_mut() + .insert("timeout.linkerd.io/response".to_string(), "10s".to_string()); + let svc = create(&client, svc).await; + + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)) + .with_annotations( + vec![ + // Route annotations override the timeout config specified + // on the service. + ("timeout.linkerd.io/request".to_string(), "5s".to_string()), + ] + .into_iter() + .collect(), + ) + .build(), + ) + .await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let conditions = rule + .retry + .as_ref() + .expect("retry config expected") + .conditions + .as_ref() + .expect("retry conditions expected"); + let status_range = assert_singleton(&conditions.status_ranges); + // Retry config inherited from the service. + assert_eq!(status_range.start, 500); + assert_eq!(status_range.end, 599); + let timeouts = rule.timeouts.as_ref().expect("timeouts expected"); + // Service timeout config overridden by route timeout config. + assert_eq!(timeouts.response, None); + let request_timeout = timeouts.request.as_ref().expect("request timeout expected"); + assert_eq!(request_timeout.seconds, 5); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s::policy::HttpRoute); -async fn retry_watch_outbound_policy( - client: &kube::Client, - ns: &str, - svc: &k8s::Service, - port: u16, -) -> tonic::Streaming { - // Port-forward to the control plane and start watching the service's - // outbound policy. - let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; - loop { - match policy_api.watch(ns, svc, port).await { - Ok(rx) => return rx, - Err(error) => { - tracing::error!( - ?error, - ns, - svc = svc.name_unchecked(), - "failed to watch outbound policy for port 4191" - ); - time::sleep(Duration::from_secs(1)).await; - } - } - } -} - fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s::policy::httproute as api; @@ -1290,6 +1378,12 @@ impl HttpRouteBuilder { Self(route) } + fn with_annotations(self, annotations: BTreeMap) -> Self { + let mut route = self.0; + route.metadata.annotations = Some(annotations); + Self(route) + } + fn build(self) -> k8s::policy::HttpRoute { self.0 } @@ -1325,210 +1419,3 @@ fn mk_empty_http_route( status: None, } } - -// detect_http_routes asserts that the given outbound policy has a proxy protcol -// of "Detect" and then invokes the given function with the Http1 and Http2 -// routes from the Detect. -#[track_caller] -fn detect_http_routes(config: &grpc::outbound::OutboundPolicy, f: F) -where - F: Fn(&[grpc::outbound::HttpRoute]), -{ - let kind = config - .protocol - .as_ref() - .expect("must have proxy protocol") - .kind - .as_ref() - .expect("must have kind"); - if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { - opaque: _, - timeout: _, - http1, - http2, - }) = kind - { - let http1 = http1 - .as_ref() - .expect("proxy protocol must have http1 field"); - let http2 = http2 - .as_ref() - .expect("proxy protocol must have http2 field"); - f(&http1.routes); - f(&http2.routes); - } else { - panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") - } -} - -#[track_caller] -fn detect_failure_accrual(config: &grpc::outbound::OutboundPolicy, f: F) -where - F: Fn(Option<&grpc::outbound::FailureAccrual>), -{ - let kind = config - .protocol - .as_ref() - .expect("must have proxy protocol") - .kind - .as_ref() - .expect("must have kind"); - if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { - opaque: _, - timeout: _, - http1, - http2, - }) = kind - { - let http1 = http1 - .as_ref() - .expect("proxy protocol must have http1 field"); - let http2 = http2 - .as_ref() - .expect("proxy protocol must have http2 field"); - f(http1.failure_accrual.as_ref()); - f(http2.failure_accrual.as_ref()); - } else { - panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") - } -} - -#[track_caller] -fn failure_accrual_consecutive( - accrual: Option<&grpc::outbound::FailureAccrual>, -) -> &grpc::outbound::failure_accrual::ConsecutiveFailures { - assert!( - accrual.is_some(), - "failure accrual must be configured for service" - ); - let kind = accrual - .unwrap() - .kind - .as_ref() - .expect("failure accrual must have kind"); - let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind; - accrual -} - -#[track_caller] -fn route_backends_first_available( - route: &grpc::outbound::HttpRoute, -) -> &[grpc::outbound::http_route::RouteBackend] { - let kind = assert_singleton(&route.rules) - .backends - .as_ref() - .expect("Rule must have backends") - .kind - .as_ref() - .expect("Backend must have kind"); - match kind { - grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends, - _ => panic!("Distribution must be FirstAvailable"), - } -} - -#[track_caller] -fn route_backends_random_available( - route: &grpc::outbound::HttpRoute, -) -> &[grpc::outbound::http_route::WeightedRouteBackend] { - let kind = assert_singleton(&route.rules) - .backends - .as_ref() - .expect("Rule must have backends") - .kind - .as_ref() - .expect("Backend must have kind"); - match kind { - grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends, - _ => panic!("Distribution must be RandomAvailable"), - } -} - -#[track_caller] -fn route_name(route: &grpc::outbound::HttpRoute) -> &str { - match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() { - grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name, - _ => panic!("route must be a resource kind"), - } -} - -#[track_caller] -fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) { - let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters); - match filter.kind.as_ref().unwrap() { - grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {} - _ => panic!("backend must have FailureInjector filter"), - }; -} - -#[track_caller] -fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { - let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::meta::metadata::Kind::Default(_) => {} - grpc::meta::metadata::Kind::Resource(r) => { - panic!("route expected to be default but got resource {r:?}") - } - } - - let backends = route_backends_first_available(route); - let backend = assert_singleton(backends); - assert_backend_matches_service(backend, svc, port); - - let rule = assert_singleton(&route.rules); - let route_match = assert_singleton(&rule.matches); - let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap(); - assert_eq!( - *path_match, - grpc::http_route::path_match::Kind::Prefix("/".to_string()) - ); -} - -#[track_caller] -fn assert_backend_matches_service( - backend: &grpc::outbound::http_route::RouteBackend, - svc: &k8s::Service, - port: u16, -) { - let backend = backend.backend.as_ref().unwrap(); - let dst = match backend.kind.as_ref().unwrap() { - grpc::outbound::backend::Kind::Balancer(balance) => { - let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path, - } - } - grpc::outbound::backend::Kind::Forward(_) => { - panic!("default route backend must be Balancer") - } - }; - assert_eq!( - *dst, - format!( - "{}.{}.svc.{}:{}", - svc.name_unchecked(), - svc.namespace().unwrap(), - "cluster.local", - port - ) - ); - - assert_svc_meta(&backend.metadata, svc, port) -} - -#[track_caller] -fn assert_singleton(ts: &[T]) -> &T { - assert_eq!(ts.len(), 1); - ts.first().unwrap() -} - -#[track_caller] -fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { - let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); - match kind { - grpc::meta::metadata::Kind::Default(d) => { - panic!("route expected to not be default, but got default {d:?}") - } - grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), - } -}