Add support for retries and timeouts (#12888)

Adds support for configuring retries and timeouts as outbound policy.  Http retries can be configured as annotations on HttpRoute or Service resources like

```
retry.linkerd.io/http: 5xx,gateway-error
retry.linkerd.io/limit: "2"
retry.linkerd.io/timeout: 400ms
```

If any of these retry annotations are specified on an HttpRoute resource, they will override ALL retry annotations on the parent Service resource.

Similarly, Grpc retries can be configured as annotations on GrpcRoute or Service resources like

```
retry.linkerd.io/grpc: cancelled,deadline-exceeded,internal,resource-exhausted,unavailable
retry.linkerd.io/limit: "2"
retry.linkerd.io/timeout: 400ms
```

Outbound timeouts can be configured on HttpRoute, GrpcRoute, or Service resources like

```
timeout.linkerd.io/request: 500ms
timeout.linkerd.io/response: 100ms
timeout.linkerd.io/idle: 50ms
```

If any of these timeout annotations are specified on a HttpRoute or GrpcRoute resource, they will override ALL timeout annotations on the parent Service resource.

Signed-off-by: Alex Leong <alex@buoyant.io>
Co-authored-by: Oliver Gould <ver@buoyant.io>
This commit is contained in:
Alex Leong 2024-07-26 10:14:21 -07:00 committed by GitHub
parent 2ead03fdbd
commit aed4850e6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1385 additions and 641 deletions

View File

@ -1321,9 +1321,9 @@ dependencies = [
[[package]]
name = "linkerd2-proxy-api"
version = "0.13.1"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65678e4c506a7e5fdf1a664c629a9b658afa70e254dffcd24df72e937b2c0159"
checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61"
dependencies = [
"http",
"ipnet",

View File

@ -20,7 +20,9 @@ pub trait DiscoverOutboundPolicy<T> {
pub type OutboundPolicyStream = Pin<Box<dyn Stream<Item = OutboundPolicy> + Send + Sync + 'static>>;
pub type RouteSet<M> = HashMap<GroupKindNamespaceName, OutboundRoute<M>>;
pub type HttpRoute = OutboundRoute<HttpRouteMatch, HttpRetryCondition>;
pub type GrpcRoute = OutboundRoute<GrpcRouteMatch, GrpcRetryCondition>;
pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;
pub struct OutboundDiscoverTarget {
pub service_name: String,
@ -31,20 +33,23 @@ pub struct OutboundDiscoverTarget {
#[derive(Clone, Debug, PartialEq)]
pub struct OutboundPolicy {
pub http_routes: RouteSet<HttpRouteMatch>,
pub grpc_routes: RouteSet<GrpcRouteMatch>,
pub http_routes: RouteSet<HttpRoute>,
pub grpc_routes: RouteSet<GrpcRoute>,
pub authority: String,
pub name: String,
pub namespace: String,
pub port: NonZeroU16,
pub opaque: bool,
pub accrual: Option<FailureAccrual>,
pub http_retry: Option<RouteRetry<HttpRetryCondition>>,
pub grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
pub timeouts: RouteTimeouts,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OutboundRoute<M> {
pub struct OutboundRoute<M, R> {
pub hostnames: Vec<HostMatch>,
pub rules: Vec<OutboundRouteRule<M>>,
pub rules: Vec<OutboundRouteRule<M, R>>,
/// This is required for ordering returned routes
/// by their creation timestamp.
@ -52,11 +57,11 @@ pub struct OutboundRoute<M> {
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OutboundRouteRule<M> {
pub struct OutboundRouteRule<M, R> {
pub matches: Vec<M>,
pub backends: Vec<Backend>,
pub request_timeout: Option<time::Duration>,
pub backend_request_timeout: Option<time::Duration>,
pub retry: Option<RouteRetry<R>>,
pub timeouts: RouteTimeouts,
pub filters: Vec<Filter>,
}
@ -104,3 +109,32 @@ pub enum Filter {
RequestRedirect(RequestRedirectFilter),
FailureInjector(FailureInjectorFilter),
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RouteTimeouts {
pub response: Option<time::Duration>,
pub request: Option<time::Duration>,
pub idle: Option<time::Duration>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RouteRetry<R> {
pub limit: u16,
pub timeout: Option<time::Duration>,
pub conditions: Option<Vec<R>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HttpRetryCondition {
pub status_min: u32,
pub status_max: u32,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum GrpcRetryCondition {
Cancelled,
DeadlineExceeded,
ResourceExhausted,
Internal,
Unavailable,
}

View File

@ -22,5 +22,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
[dependencies.linkerd2-proxy-api]
version = "0.13"
version = "0.14"
features = ["inbound", "outbound"]

View File

@ -28,6 +28,7 @@ pub struct OutboundPolicyServer<T> {
index: T,
// Used to parse named addresses into <svc>.<ns>.svc.<cluster-domain>.
cluster_domain: Arc<str>,
allow_l5d_request_headers: bool,
drain: drain::Watch,
}
@ -35,10 +36,16 @@ impl<T> OutboundPolicyServer<T>
where
T: DiscoverOutboundPolicy<OutboundDiscoverTarget> + Send + Sync + 'static,
{
pub fn new(discover: T, cluster_domain: impl Into<Arc<str>>, drain: drain::Watch) -> Self {
pub fn new(
discover: T,
cluster_domain: impl Into<Arc<str>>,
allow_l5d_request_headers: bool,
drain: drain::Watch,
) -> Self {
Self {
index: discover,
cluster_domain: cluster_domain.into(),
allow_l5d_request_headers,
drain,
}
}
@ -149,7 +156,10 @@ where
})?;
if let Some(policy) = policy {
Ok(tonic::Response::new(to_service(policy)))
Ok(tonic::Response::new(to_service(
policy,
self.allow_l5d_request_headers,
)))
} else {
Err(tonic::Status::not_found("No such policy"))
}
@ -170,7 +180,11 @@ where
.await
.map_err(|e| tonic::Status::internal(format!("lookup failed: {e}")))?
.ok_or_else(|| tonic::Status::not_found("unknown server"))?;
Ok(tonic::Response::new(response_stream(drain, rx)))
Ok(tonic::Response::new(response_stream(
drain,
rx,
self.allow_l5d_request_headers,
)))
}
}
@ -178,7 +192,11 @@ type BoxWatchStream = std::pin::Pin<
Box<dyn Stream<Item = Result<outbound::OutboundPolicy, tonic::Status>> + Send + Sync>,
>;
fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatchStream {
fn response_stream(
drain: drain::Watch,
mut rx: OutboundPolicyStream,
allow_l5d_request_headers: bool,
) -> BoxWatchStream {
Box::pin(async_stream::try_stream! {
tokio::pin! {
let shutdown = drain.signaled();
@ -189,7 +207,7 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc
// When the port is updated with a new server, update the server watch.
res = rx.next() => match res {
Some(policy) => {
yield to_service(policy);
yield to_service(policy, allow_l5d_request_headers);
}
None => return,
},
@ -204,7 +222,10 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc
})
}
fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
fn to_service(
outbound: OutboundPolicy,
allow_l5d_request_headers: bool,
) -> outbound::OutboundPolicy {
let backend: outbound::Backend = default_backend(&outbound);
let kind = if outbound.opaque {
@ -235,10 +256,24 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
if !grpc_routes.is_empty() {
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(backend, grpc_routes.into_iter(), accrual)
grpc::protocol(
backend,
grpc_routes.into_iter(),
accrual,
outbound.grpc_retry,
outbound.timeouts,
allow_l5d_request_headers,
)
} else {
http_routes.sort_by(timestamp_then_name);
http::protocol(backend, http_routes.into_iter(), accrual)
http::protocol(
backend,
http_routes.into_iter(),
accrual,
outbound.http_retry,
outbound.timeouts,
allow_l5d_request_headers,
)
}
};
@ -259,9 +294,9 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
}
}
fn timestamp_then_name<LeftMatchType, RightMatchType>(
(left_id, left_route): &(GroupKindNamespaceName, OutboundRoute<LeftMatchType>),
(right_id, right_route): &(GroupKindNamespaceName, OutboundRoute<RightMatchType>),
fn timestamp_then_name<LM, LR, RM, RR>(
(left_id, left_route): &(GroupKindNamespaceName, OutboundRoute<LM, LR>),
(right_id, right_route): &(GroupKindNamespaceName, OutboundRoute<RM, RR>),
) -> std::cmp::Ordering {
let by_ts = match (
&left_route.creation_timestamp,

View File

@ -1,22 +1,36 @@
use std::net::SocketAddr;
use super::{convert_duration, default_balancer_config, default_queue_config};
use crate::routes::{
convert_host_match, convert_request_header_modifier_filter, grpc::convert_match,
};
use linkerd2_proxy_api::{destination, grpc_route, http_route, meta, outbound};
use linkerd_policy_controller_core::{
outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule},
routes::{FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch},
outbound::{
Backend, Filter, GrpcRetryCondition, GrpcRoute, OutboundRoute, OutboundRouteRule,
RouteRetry, RouteTimeouts,
},
routes::{FailureInjectorFilter, GroupKindNamespaceName},
};
use std::{net::SocketAddr, time};
pub(crate) fn protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, OutboundRoute<GrpcRouteMatch>)>,
routes: impl Iterator<Item = (GroupKindNamespaceName, GrpcRoute)>,
failure_accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<GrpcRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
) -> outbound::proxy_protocol::Kind {
let routes = routes
.map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone()))
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
)
})
.collect::<Vec<_>>();
outbound::proxy_protocol::Kind::Grpc(outbound::proxy_protocol::Grpc {
routes,
@ -30,9 +44,15 @@ fn convert_outbound_route(
hostnames,
rules,
creation_timestamp: _,
}: OutboundRoute<GrpcRouteMatch>,
}: GrpcRoute,
backend: outbound::Backend,
service_retry: Option<RouteRetry<GrpcRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
) -> outbound::GrpcRoute {
// This encoder sets deprecated timeouts for older proxies.
#![allow(deprecated)]
let metadata = Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Resource(meta::Resource {
group: gknn.group.to_string(),
@ -51,15 +71,13 @@ fn convert_outbound_route(
|OutboundRouteRule {
matches,
backends,
request_timeout,
backend_request_timeout,
mut retry,
mut timeouts,
filters,
}| {
let backend_request_timeout = backend_request_timeout
.and_then(|d| convert_duration("backend request_timeout", d));
let backends = backends
.into_iter()
.map(|backend| convert_backend(backend_request_timeout.clone(), backend))
.map(convert_backend)
.collect::<Vec<_>>();
let dist = if backends.is_empty() {
outbound::grpc_route::distribution::Kind::FirstAvailable(
@ -67,7 +85,7 @@ fn convert_outbound_route(
backends: vec![outbound::grpc_route::RouteBackend {
backend: Some(backend.clone()),
filters: vec![],
request_timeout: backend_request_timeout,
..Default::default()
}],
},
)
@ -76,12 +94,58 @@ fn convert_outbound_route(
outbound::grpc_route::distribution::RandomAvailable { backends },
)
};
if timeouts == Default::default() {
timeouts = service_timeouts.clone();
}
if retry.is_none() {
retry = service_retry.clone();
}
outbound::grpc_route::Rule {
matches: matches.into_iter().map(convert_match).collect(),
backends: Some(outbound::grpc_route::Distribution { kind: Some(dist) }),
filters: filters.into_iter().map(convert_to_filter).collect(),
request_timeout: request_timeout
request_timeout: timeouts
.request
.and_then(|d| convert_duration("request timeout", d)),
timeouts: Some(http_route::Timeouts {
request: timeouts
.request
.and_then(|d| convert_duration("stream timeout", d)),
idle: timeouts
.idle
.and_then(|d| convert_duration("idle timeout", d)),
response: timeouts
.response
.and_then(|d| convert_duration("response timeout", d)),
}),
retry: retry.map(|r| outbound::grpc_route::Retry {
max_retries: r.limit.into(),
max_request_bytes: 64 * 1024,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: Some(time::Duration::from_millis(25).try_into().unwrap()),
max_backoff: Some(time::Duration::from_millis(250).try_into().unwrap()),
jitter_ratio: 1.0,
}),
conditions: Some(r.conditions.iter().flatten().fold(
outbound::grpc_route::retry::Conditions::default(),
|mut cond, c| {
match c {
GrpcRetryCondition::Cancelled => cond.cancelled = true,
GrpcRetryCondition::DeadlineExceeded => {
cond.deadine_exceeded = true
}
GrpcRetryCondition::Internal => cond.internal = true,
GrpcRetryCondition::ResourceExhausted => {
cond.resource_exhausted = true
}
GrpcRetryCondition::Unavailable => cond.unavailable = true,
};
cond
},
)),
timeout: r.timeout.and_then(|d| convert_duration("retry timeout", d)),
}),
allow_l5d_request_headers,
}
},
)
@ -94,10 +158,7 @@ fn convert_outbound_route(
}
}
fn convert_backend(
request_timeout: Option<prost_types::Duration>,
backend: Backend,
) -> outbound::grpc_route::WeightedRouteBackend {
fn convert_backend(backend: Backend) -> outbound::grpc_route::WeightedRouteBackend {
match backend {
Backend::Addr(addr) => {
let socket_addr = SocketAddr::new(addr.addr, addr.port.get());
@ -116,7 +177,7 @@ fn convert_backend(
)),
}),
filters: Default::default(),
request_timeout,
..Default::default()
}),
}
}
@ -152,7 +213,7 @@ fn convert_backend(
)),
}),
filters,
request_timeout,
..Default::default()
}),
}
} else {
@ -175,7 +236,7 @@ fn convert_backend(
},
)),
}],
request_timeout,
..Default::default()
}),
}
}
@ -199,7 +260,7 @@ fn convert_backend(
},
)),
}],
request_timeout,
..Default::default()
}),
},
}

View File

@ -1,5 +1,3 @@
use std::{net::SocketAddr, time};
use super::{
convert_duration, default_balancer_config, default_outbound_opaq_route, default_queue_config,
};
@ -10,21 +8,41 @@ use crate::routes::{
};
use linkerd2_proxy_api::{destination, http_route, meta, outbound};
use linkerd_policy_controller_core::{
outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule},
routes::{GroupKindNamespaceName, HttpRouteMatch},
outbound::{
Backend, Filter, HttpRetryCondition, HttpRoute, OutboundRouteRule, RouteRetry,
RouteTimeouts,
},
routes::GroupKindNamespaceName,
};
use std::{net::SocketAddr, time};
pub(crate) fn protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, OutboundRoute<HttpRouteMatch>)>,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
) -> outbound::proxy_protocol::Kind {
let opaque_route = default_outbound_opaq_route(default_backend.clone());
let mut routes = routes
.map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone()))
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
)
})
.collect::<Vec<_>>();
if routes.is_empty() {
routes.push(default_outbound_route(default_backend));
routes.push(default_outbound_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
));
}
outbound::proxy_protocol::Kind::Detect(outbound::proxy_protocol::Detect {
timeout: Some(
@ -49,13 +67,19 @@ pub(crate) fn protocol(
fn convert_outbound_route(
gknn: GroupKindNamespaceName,
OutboundRoute {
HttpRoute {
hostnames,
rules,
creation_timestamp: _,
}: OutboundRoute<HttpRouteMatch>,
}: HttpRoute,
backend: outbound::Backend,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
) -> outbound::HttpRoute {
// This encoder sets deprecated timeouts for older proxies.
#![allow(deprecated)]
let metadata = Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Resource(meta::Resource {
group: gknn.group.to_string(),
@ -74,15 +98,13 @@ fn convert_outbound_route(
|OutboundRouteRule {
matches,
backends,
request_timeout,
backend_request_timeout,
mut retry,
mut timeouts,
filters,
}| {
let backend_request_timeout = backend_request_timeout
.and_then(|d| convert_duration("backend request_timeout", d));
let backends = backends
.into_iter()
.map(|backend| convert_backend(backend_request_timeout.clone(), backend))
.map(convert_backend)
.collect::<Vec<_>>();
let dist = if backends.is_empty() {
outbound::http_route::distribution::Kind::FirstAvailable(
@ -90,7 +112,7 @@ fn convert_outbound_route(
backends: vec![outbound::http_route::RouteBackend {
backend: Some(backend.clone()),
filters: vec![],
request_timeout: backend_request_timeout,
..Default::default()
}],
},
)
@ -99,12 +121,22 @@ fn convert_outbound_route(
outbound::http_route::distribution::RandomAvailable { backends },
)
};
if timeouts == Default::default() {
timeouts = service_timeouts.clone();
}
if retry.is_none() {
retry = service_retry.clone();
}
outbound::http_route::Rule {
matches: matches.into_iter().map(convert_match).collect(),
backends: Some(outbound::http_route::Distribution { kind: Some(dist) }),
filters: filters.into_iter().map(convert_to_filter).collect(),
request_timeout: request_timeout
request_timeout: timeouts
.request
.and_then(|d| convert_duration("request timeout", d)),
timeouts: Some(convert_timeouts(timeouts)),
retry: retry.map(convert_retry),
allow_l5d_request_headers,
}
},
)
@ -117,10 +149,7 @@ fn convert_outbound_route(
}
}
fn convert_backend(
request_timeout: Option<prost_types::Duration>,
backend: Backend,
) -> outbound::http_route::WeightedRouteBackend {
fn convert_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend {
match backend {
Backend::Addr(addr) => {
let socket_addr = SocketAddr::new(addr.addr, addr.port.get());
@ -139,7 +168,7 @@ fn convert_backend(
)),
}),
filters: Default::default(),
request_timeout,
..Default::default()
}),
}
}
@ -175,7 +204,7 @@ fn convert_backend(
)),
}),
filters,
request_timeout,
..Default::default()
}),
}
} else {
@ -198,7 +227,7 @@ fn convert_backend(
},
)),
}],
request_timeout,
..Default::default()
}),
}
}
@ -222,13 +251,19 @@ fn convert_backend(
},
)),
}],
request_timeout,
..Default::default()
}),
},
}
}
pub(crate) fn default_outbound_route(backend: outbound::Backend) -> outbound::HttpRoute {
pub(crate) fn default_outbound_route(
backend: outbound::Backend,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
) -> outbound::HttpRoute {
// This encoder sets deprecated timeouts for older proxies.
#![allow(deprecated)]
let metadata = Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Default("http".to_string())),
});
@ -245,13 +280,17 @@ pub(crate) fn default_outbound_route(backend: outbound::Backend) -> outbound::Ht
backends: vec![outbound::http_route::RouteBackend {
backend: Some(backend),
filters: vec![],
request_timeout: None,
..Default::default()
}],
},
)),
}),
filters: Default::default(),
request_timeout: None,
retry: service_retry.map(convert_retry),
request_timeout: service_timeouts
.request
.and_then(|d| convert_duration("request timeout", d)),
timeouts: Some(convert_timeouts(service_timeouts)),
..Default::default()
}];
outbound::HttpRoute {
metadata,
@ -276,3 +315,41 @@ fn convert_to_filter(filter: Filter) -> outbound::http_route::Filter {
}),
}
}
fn convert_retry(r: RouteRetry<HttpRetryCondition>) -> outbound::http_route::Retry {
outbound::http_route::Retry {
max_retries: r.limit.into(),
max_request_bytes: 64 * 1024,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: Some(time::Duration::from_millis(25).try_into().unwrap()),
max_backoff: Some(time::Duration::from_millis(250).try_into().unwrap()),
jitter_ratio: 1.0,
}),
conditions: Some(r.conditions.iter().flatten().fold(
outbound::http_route::retry::Conditions::default(),
|mut cond, c| {
cond.status_ranges
.push(outbound::http_route::retry::conditions::StatusRange {
start: c.status_min,
end: c.status_max,
});
cond
},
)),
timeout: r.timeout.and_then(|d| convert_duration("retry timeout", d)),
}
}
fn convert_timeouts(timeouts: RouteTimeouts) -> http_route::Timeouts {
http_route::Timeouts {
request: timeouts
.request
.and_then(|d| convert_duration("request timeout", d)),
idle: timeouts
.idle
.and_then(|d| convert_duration("idle timeout", d)),
response: timeouts
.response
.and_then(|d| convert_duration("response timeout", d)),
}
}

View File

@ -6,8 +6,11 @@ use crate::{
use ahash::AHashMap as HashMap;
use anyhow::{bail, ensure, Result};
use linkerd_policy_controller_core::{
outbound::{Backend, Backoff, FailureAccrual, OutboundPolicy, OutboundRoute, RouteSet},
routes::{GroupKindNamespaceName, GrpcRouteMatch, HttpRouteMatch},
outbound::{
Backend, Backoff, FailureAccrual, GrpcRetryCondition, GrpcRoute, HttpRetryCondition,
HttpRoute, OutboundPolicy, RouteRetry, RouteSet, RouteTimeouts,
},
routes::GroupKindNamespaceName,
};
use linkerd_policy_controller_k8s_api::{
gateway::{self as k8s_gateway_api, ParentReference},
@ -24,8 +27,8 @@ pub struct Index {
service_info: HashMap<ServiceRef, ServiceInfo>,
}
mod grpc;
mod http;
pub mod grpc;
pub mod http;
pub mod metrics;
pub type SharedIndex = Arc<RwLock<Index>>;
@ -51,8 +54,8 @@ struct Namespace {
service_port_routes: HashMap<ServicePort, ServiceRoutes>,
/// Stores the route resources (by service name) that do not
/// explicitly target a port.
service_http_routes: HashMap<String, RouteSet<HttpRouteMatch>>,
service_grpc_routes: HashMap<String, RouteSet<GrpcRouteMatch>>,
service_http_routes: HashMap<String, RouteSet<HttpRoute>>,
service_grpc_routes: HashMap<String, RouteSet<GrpcRoute>>,
namespace: Arc<String>,
}
@ -60,6 +63,9 @@ struct Namespace {
struct ServiceInfo {
opaque_ports: PortSet,
accrual: Option<FailureAccrual>,
http_retry: Option<RouteRetry<HttpRetryCondition>>,
grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
timeouts: RouteTimeouts,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -77,14 +83,20 @@ struct ServiceRoutes {
watches_by_ns: HashMap<String, RoutesWatch>,
opaque: bool,
accrual: Option<FailureAccrual>,
http_retry: Option<RouteRetry<HttpRetryCondition>>,
grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
timeouts: RouteTimeouts,
}
#[derive(Debug)]
struct RoutesWatch {
opaque: bool,
accrual: Option<FailureAccrual>,
http_routes: RouteSet<HttpRouteMatch>,
grpc_routes: RouteSet<GrpcRouteMatch>,
http_retry: Option<RouteRetry<HttpRetryCondition>>,
grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
timeouts: RouteTimeouts,
http_routes: RouteSet<HttpRoute>,
grpc_routes: RouteSet<GrpcRoute>,
watch: watch::Sender<OutboundPolicy>,
}
@ -146,6 +158,17 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports")
.unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone());
let timeouts = parse_timeouts(service.annotations())
.map_err(|error| tracing::error!(%error, service=name, namespace=ns, "failed to parse timeouts"))
.unwrap_or_default();
let http_retry = http::parse_http_retry(service.annotations()).map_err(|error| {
tracing::error!(%error, service=name, namespace=ns, "failed to parse http retry")
}).unwrap_or_default();
let grpc_retry = grpc::parse_grpc_retry(service.annotations()).map_err(|error| {
tracing::error!(%error, service=name, namespace=ns, "failed to parse grpc retry")
}).unwrap_or_default();
if let Some(cluster_ips) = service
.spec
.as_ref()
@ -173,6 +196,9 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
let service_info = ServiceInfo {
opaque_ports,
accrual,
http_retry,
grpc_retry,
timeouts,
};
self.namespaces
@ -495,7 +521,13 @@ impl Namespace {
let opaque = service.opaque_ports.contains(&svc_port.port);
svc_routes.update_service(opaque, service.accrual);
svc_routes.update_service(
opaque,
service.accrual,
service.http_retry.clone(),
service.grpc_retry.clone(),
service.timeouts.clone(),
);
}
}
@ -538,10 +570,18 @@ impl Namespace {
namespace: self.namespace.to_string(),
};
let (opaque, accrual) = match service_info.get(&service_ref) {
Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual),
None => (false, None),
};
let mut opaque = false;
let mut accrual = None;
let mut http_retry = None;
let mut grpc_retry = None;
let mut timeouts = Default::default();
if let Some(svc) = service_info.get(&service_ref) {
opaque = svc.opaque_ports.contains(&sp.port);
accrual = svc.accrual;
http_retry = svc.http_retry.clone();
grpc_retry = svc.grpc_retry.clone();
timeouts = svc.timeouts.clone();
}
// The routes which target this Service but don't specify
// a port apply to all ports. Therefore, we include them.
@ -559,6 +599,9 @@ impl Namespace {
let mut service_routes = ServiceRoutes {
opaque,
accrual,
http_retry,
grpc_retry,
timeouts,
authority,
port: sp.port,
name: sp.service,
@ -648,7 +691,7 @@ fn is_service(group: Option<&str>, kind: &str) -> bool {
}
#[inline]
fn is_parent_service(parent: &ParentReference) -> bool {
pub fn is_parent_service(parent: &ParentReference) -> bool {
parent
.kind
.as_deref()
@ -696,6 +739,9 @@ impl ServiceRoutes {
port: self.port,
opaque: self.opaque,
accrual: self.accrual,
http_retry: self.http_retry.clone(),
grpc_retry: self.grpc_retry.clone(),
timeouts: self.timeouts.clone(),
http_routes: http_routes.clone(),
grpc_routes: grpc_routes.clone(),
name: self.name.to_string(),
@ -709,15 +755,14 @@ impl ServiceRoutes {
watch: sender,
opaque: self.opaque,
accrual: self.accrual,
http_retry: self.http_retry.clone(),
grpc_retry: self.grpc_retry.clone(),
timeouts: self.timeouts.clone(),
}
})
}
fn apply_http_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<HttpRouteMatch>,
) {
fn apply_http_route(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) {
if *gknn.namespace == *self.namespace {
// This is a producer namespace route.
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
@ -739,11 +784,7 @@ impl ServiceRoutes {
}
}
fn apply_grpc_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<GrpcRouteMatch>,
) {
fn apply_grpc_route(&mut self, gknn: GroupKindNamespaceName, route: GrpcRoute) {
if *gknn.namespace == *self.namespace {
// This is a producer namespace route.
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
@ -765,12 +806,25 @@ impl ServiceRoutes {
}
}
fn update_service(&mut self, opaque: bool, accrual: Option<FailureAccrual>) {
fn update_service(
&mut self,
opaque: bool,
accrual: Option<FailureAccrual>,
http_retry: Option<RouteRetry<HttpRetryCondition>>,
grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
timeouts: RouteTimeouts,
) {
self.opaque = opaque;
self.accrual = accrual;
self.http_retry = http_retry.clone();
self.grpc_retry = grpc_retry.clone();
self.timeouts = timeouts.clone();
for watch in self.watches_by_ns.values_mut() {
watch.opaque = opaque;
watch.accrual = accrual;
watch.http_retry = http_retry.clone();
watch.grpc_retry = grpc_retry.clone();
watch.timeouts = timeouts.clone();
watch.send_if_modified();
}
}
@ -813,25 +867,32 @@ impl RoutesWatch {
modified = true;
}
if self.http_retry != policy.http_retry {
policy.http_retry = self.http_retry.clone();
modified = true;
}
if self.grpc_retry != policy.grpc_retry {
policy.grpc_retry = self.grpc_retry.clone();
modified = true;
}
if self.timeouts != policy.timeouts {
policy.timeouts = self.timeouts.clone();
modified = true;
}
modified
});
}
fn insert_http_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<HttpRouteMatch>,
) {
fn insert_http_route(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) {
self.http_routes.insert(gknn, route);
self.send_if_modified();
}
fn insert_grpc_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<GrpcRouteMatch>,
) {
fn insert_grpc_route(&mut self, gknn: GroupKindNamespaceName, route: GrpcRoute) {
self.grpc_routes.insert(gknn, route);
self.send_if_modified();
@ -848,7 +909,7 @@ impl RoutesWatch {
}
}
fn parse_accrual_config(
pub fn parse_accrual_config(
annotations: &std::collections::BTreeMap<String, String>,
) -> Result<Option<FailureAccrual>> {
annotations
@ -903,6 +964,28 @@ fn parse_accrual_config(
.transpose()
}
pub fn parse_timeouts(
annotations: &std::collections::BTreeMap<String, String>,
) -> Result<RouteTimeouts> {
let response = annotations
.get("timeout.linkerd.io/response")
.map(|s| parse_duration(s))
.transpose()?;
let request = annotations
.get("timeout.linkerd.io/request")
.map(|s| parse_duration(s))
.transpose()?;
let idle = annotations
.get("timeout.linkerd.io/idle")
.map(|s| parse_duration(s))
.transpose()?;
Ok(RouteTimeouts {
response,
request,
idle,
})
}
fn parse_duration(s: &str) -> Result<time::Duration> {
let s = s.trim();
let offset = s

View File

@ -1,18 +1,26 @@
use std::time;
use super::http::{convert_backend, convert_gateway_filter};
use super::{ServiceInfo, ServiceRef};
use super::{parse_duration, parse_timeouts, ServiceInfo, ServiceRef};
use crate::{routes, ClusterInfo};
use ahash::AHashMap as HashMap;
use anyhow::Result;
use linkerd_policy_controller_core::outbound::OutboundRoute;
use anyhow::{bail, Result};
use kube::ResourceExt;
use linkerd_policy_controller_core::outbound::{
GrpcRetryCondition, GrpcRoute, OutboundRoute, RouteRetry, RouteTimeouts,
};
use linkerd_policy_controller_core::{outbound::OutboundRouteRule, routes::GrpcRouteMatch};
use linkerd_policy_controller_k8s_api::{gateway, Time};
pub(crate) fn convert_route(
pub(super) fn convert_route(
ns: &str,
route: gateway::GrpcRoute,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<OutboundRoute<GrpcRouteMatch>> {
) -> Result<GrpcRoute> {
let timeouts = parse_timeouts(route.annotations())?;
let retry = parse_grpc_retry(route.annotations())?;
let hostnames = route
.spec
.hostnames
@ -26,7 +34,16 @@ pub(crate) fn convert_route(
.rules
.into_iter()
.flatten()
.map(|rule| convert_rule(ns, rule, cluster, service_info))
.map(|rule| {
convert_rule(
ns,
rule,
cluster,
service_info,
timeouts.clone(),
retry.clone(),
)
})
.collect::<Result<_>>()?;
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
@ -43,7 +60,9 @@ fn convert_rule(
rule: gateway::GrpcRouteRule,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<OutboundRouteRule<GrpcRouteMatch>> {
timeouts: RouteTimeouts,
retry: Option<RouteRetry<GrpcRetryCondition>>,
) -> Result<OutboundRouteRule<GrpcRouteMatch, GrpcRetryCondition>> {
let matches = rule
.matches
.into_iter()
@ -68,8 +87,60 @@ fn convert_rule(
Ok(OutboundRouteRule {
matches,
backends,
request_timeout: None,
backend_request_timeout: None,
timeouts,
retry,
filters,
})
}
pub fn parse_grpc_retry(
annotations: &std::collections::BTreeMap<String, String>,
) -> Result<Option<RouteRetry<GrpcRetryCondition>>> {
let limit = annotations
.get("retry.linkerd.io/limit")
.map(|s| s.parse::<u16>())
.transpose()?
.filter(|v| *v != 0);
let timeout = annotations
.get("retry.linkerd.io/timeout")
.map(|v| parse_duration(v))
.transpose()?
.filter(|v| *v != time::Duration::ZERO);
let conditions = annotations
.get("retry.linkerd.io/grpc")
.map(|v| {
v.split(',')
.map(|cond| {
if cond.eq_ignore_ascii_case("cancelled") {
return Ok(GrpcRetryCondition::Cancelled);
}
if cond.eq_ignore_ascii_case("deadline-exceeded") {
return Ok(GrpcRetryCondition::DeadlineExceeded);
}
if cond.eq_ignore_ascii_case("internal") {
return Ok(GrpcRetryCondition::Internal);
}
if cond.eq_ignore_ascii_case("resource-exhausted") {
return Ok(GrpcRetryCondition::ResourceExhausted);
}
if cond.eq_ignore_ascii_case("unavailable") {
return Ok(GrpcRetryCondition::Unavailable);
}
bail!("Unknown grpc retry condition: {cond}");
})
.collect::<Result<Vec<_>>>()
})
.transpose()?;
if limit.is_none() && timeout.is_none() && conditions.is_none() {
return Ok(None);
}
Ok(Some(RouteRetry {
limit: limit.unwrap_or(1),
timeout,
conditions,
}))
}

View File

@ -1,26 +1,33 @@
use std::{num::NonZeroU16, time};
use super::{is_service, ServiceInfo, ServiceRef};
use super::{is_service, parse_duration, parse_timeouts, ServiceInfo, ServiceRef};
use crate::{
routes::{self, HttpRouteResource},
ClusterInfo,
};
use ahash::AHashMap as HashMap;
use anyhow::{bail, Result};
use kube::ResourceExt;
use linkerd_policy_controller_core::{
outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule, WeightedService},
outbound::{
Backend, Filter, HttpRetryCondition, OutboundRoute, OutboundRouteRule, RouteRetry,
RouteTimeouts, WeightedService,
},
routes::HttpRouteMatch,
};
use linkerd_policy_controller_k8s_api::{gateway, policy, Time};
pub(crate) fn convert_route(
pub(super) fn convert_route(
ns: &str,
route: HttpRouteResource,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<OutboundRoute<HttpRouteMatch>> {
) -> Result<OutboundRoute<HttpRouteMatch, HttpRetryCondition>> {
match route {
HttpRouteResource::LinkerdHttp(route) => {
let timeouts = parse_timeouts(route.annotations())?;
let retry = parse_http_retry(route.annotations())?;
let hostnames = route
.spec
.hostnames
@ -34,7 +41,16 @@ pub(crate) fn convert_route(
.rules
.into_iter()
.flatten()
.map(|r| convert_linkerd_rule(ns, r, cluster, service_info))
.map(|r| {
convert_linkerd_rule(
ns,
r,
cluster,
service_info,
timeouts.clone(),
retry.clone(),
)
})
.collect::<Result<_>>()?;
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
@ -46,6 +62,9 @@ pub(crate) fn convert_route(
})
}
HttpRouteResource::GatewayHttp(route) => {
let timeouts = parse_timeouts(route.annotations())?;
let retry = parse_http_retry(route.annotations())?;
let hostnames = route
.spec
.hostnames
@ -59,7 +78,16 @@ pub(crate) fn convert_route(
.rules
.into_iter()
.flatten()
.map(|r| convert_gateway_rule(ns, r, cluster, service_info))
.map(|r| {
convert_gateway_rule(
ns,
r,
cluster,
service_info,
timeouts.clone(),
retry.clone(),
)
})
.collect::<Result<_>>()?;
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
@ -78,7 +106,9 @@ fn convert_linkerd_rule(
rule: policy::httproute::HttpRouteRule,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<OutboundRouteRule<HttpRouteMatch>> {
mut timeouts: RouteTimeouts,
retry: Option<RouteRetry<HttpRetryCondition>>,
) -> Result<OutboundRouteRule<HttpRouteMatch, HttpRetryCondition>> {
let matches = rule
.matches
.into_iter()
@ -100,36 +130,24 @@ fn convert_linkerd_rule(
.map(convert_linkerd_filter)
.collect::<Result<_>>()?;
let request_timeout = rule.timeouts.as_ref().and_then(|timeouts| {
let timeout = time::Duration::from(timeouts.request?);
timeouts.request = timeouts.request.or_else(|| {
rule.timeouts.as_ref().and_then(|timeouts| {
let timeout = time::Duration::from(timeouts.request?);
// zero means "no timeout", per GEP-1742
if timeout == time::Duration::from_nanos(0) {
return None;
}
// zero means "no timeout", per GEP-1742
if timeout == time::Duration::ZERO {
return None;
}
Some(timeout)
Some(timeout)
})
});
let backend_request_timeout =
rule.timeouts
.as_ref()
.and_then(|timeouts: &policy::httproute::HttpRouteTimeouts| {
let timeout = time::Duration::from(timeouts.backend_request?);
// zero means "no timeout", per GEP-1742
if timeout == time::Duration::from_nanos(0) {
return None;
}
Some(timeout)
});
Ok(OutboundRouteRule {
matches,
backends,
request_timeout,
backend_request_timeout,
timeouts,
retry,
filters,
})
}
@ -139,7 +157,9 @@ fn convert_gateway_rule(
rule: gateway::HttpRouteRule,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<OutboundRouteRule<HttpRouteMatch>> {
timeouts: RouteTimeouts,
retry: Option<RouteRetry<HttpRetryCondition>>,
) -> Result<OutboundRouteRule<HttpRouteMatch, HttpRetryCondition>> {
let matches = rule
.matches
.into_iter()
@ -164,13 +184,13 @@ fn convert_gateway_rule(
Ok(OutboundRouteRule {
matches,
backends,
request_timeout: None,
backend_request_timeout: None,
timeouts,
retry,
filters,
})
}
pub(crate) fn convert_backend<BackendRef: Into<gateway::HttpBackendRef>>(
pub(super) fn convert_backend<BackendRef: Into<gateway::HttpBackendRef>>(
ns: &str,
backend: BackendRef,
cluster: &ClusterInfo,
@ -308,3 +328,79 @@ fn is_backend_service(backend: &gateway::BackendObjectReference) -> bool {
backend.kind.as_deref().unwrap_or("Service"),
)
}
pub fn parse_http_retry(
annotations: &std::collections::BTreeMap<String, String>,
) -> Result<Option<RouteRetry<HttpRetryCondition>>> {
let limit = annotations
.get("retry.linkerd.io/limit")
.map(|s| s.parse::<u16>())
.transpose()?
.filter(|v| *v != 0);
let timeout = annotations
.get("retry.linkerd.io/timeout")
.map(|v| parse_duration(v))
.transpose()?
.filter(|v| *v != time::Duration::ZERO);
fn to_code(s: &str) -> Option<u32> {
let code = s.parse::<u32>().ok()?;
if (100..600).contains(&code) {
Some(code)
} else {
None
}
}
let conditions = annotations
.get("retry.linkerd.io/http")
.map(|v| {
v.split(',')
.map(|cond| {
if cond.eq_ignore_ascii_case("5xx") {
return Ok(HttpRetryCondition {
status_min: 500,
status_max: 599,
});
}
if cond.eq_ignore_ascii_case("gateway-error") {
return Ok(HttpRetryCondition {
status_min: 502,
status_max: 504,
});
}
if let Some(code) = to_code(cond) {
return Ok(HttpRetryCondition {
status_min: code,
status_max: code,
});
}
if let Some((start, end)) = cond.split_once('-') {
if let (Some(s), Some(e)) = (to_code(start), to_code(end)) {
if s <= e {
return Ok(HttpRetryCondition {
status_min: s,
status_max: e,
});
}
}
}
bail!("invalid retry condition: {v}");
})
.collect::<Result<Vec<_>>>()
})
.transpose()?;
if limit.is_none() && timeout.is_none() && conditions.is_none() {
return Ok(None);
}
Ok(Some(RouteRetry {
limit: limit.unwrap_or(1),
timeout,
conditions,
}))
}

View File

@ -12,9 +12,9 @@ use k8s_openapi::api::core::v1::{Namespace, ServiceAccount};
use kube::{core::DynamicObject, Resource, ResourceExt};
use linkerd_policy_controller_core as core;
use linkerd_policy_controller_k8s_api::gateway::{self as k8s_gateway_api, GrpcRoute};
use linkerd_policy_controller_k8s_index as index;
use linkerd_policy_controller_k8s_index::{self as index, outbound::index as outbound_index};
use serde::de::DeserializeOwned;
use std::task;
use std::{collections::BTreeMap, task};
use thiserror::Error;
use tracing::{debug, info, trace, warn};
@ -39,7 +39,13 @@ type AdmissionReview = kube::core::admission::AdmissionReview<DynamicObject>;
#[async_trait::async_trait]
trait Validate<T> {
async fn validate(self, ns: &str, name: &str, spec: T) -> Result<()>;
async fn validate(
self,
ns: &str,
name: &str,
annotations: &BTreeMap<String, String>,
spec: T,
) -> Result<()>;
}
// === impl AdmissionService ===
@ -144,7 +150,7 @@ impl Admission {
let rsp = AdmissionResponse::from(&req);
let kind = req.kind.kind.clone();
let (ns, name, spec) = match parse_spec::<T>(req) {
let (obj, spec) = match parse_spec::<T>(req) {
Ok(spec) => spec,
Err(error) => {
info!(%error, "Failed to parse {} spec", kind);
@ -152,7 +158,11 @@ impl Admission {
}
};
if let Err(error) = self.validate(&ns, &name, spec).await {
let ns = obj.namespace().unwrap_or_default();
let name = obj.name_any();
let annotations = obj.annotations();
if let Err(error) = self.validate(&ns, &name, annotations, spec).await {
info!(%error, %ns, %name, %kind, "Denied");
return rsp.deny(error);
}
@ -180,16 +190,11 @@ fn json_response(rsp: AdmissionReview) -> Result<Response<Body>, Error> {
.expect("admission review response must be valid"))
}
fn parse_spec<T: DeserializeOwned>(req: AdmissionRequest) -> Result<(String, String, T)> {
fn parse_spec<T: DeserializeOwned>(req: AdmissionRequest) -> Result<(DynamicObject, T)> {
let obj = req
.object
.ok_or_else(|| anyhow!("admission request missing 'object"))?;
let ns = obj
.namespace()
.ok_or_else(|| anyhow!("admission request missing 'namespace'"))?;
let name = obj.name_any();
let spec = {
let data = obj
.data
@ -199,7 +204,7 @@ fn parse_spec<T: DeserializeOwned>(req: AdmissionRequest) -> Result<(String, Str
serde_json::from_value(data)?
};
Ok((ns, name, spec))
Ok((obj, spec))
}
/// Validates the target of an `AuthorizationPolicy`.
@ -228,7 +233,13 @@ fn validate_policy_target(ns: &str, tgt: &LocalTargetRef) -> Result<()> {
#[async_trait::async_trait]
impl Validate<AuthorizationPolicySpec> for Admission {
async fn validate(self, ns: &str, _name: &str, spec: AuthorizationPolicySpec) -> Result<()> {
async fn validate(
self,
ns: &str,
_name: &str,
_annotations: &BTreeMap<String, String>,
spec: AuthorizationPolicySpec,
) -> Result<()> {
validate_policy_target(ns, &spec.target_ref)?;
let mtls_authns_count = spec
@ -302,7 +313,13 @@ fn validate_identity_ref(id: &NamespacedTargetRef) -> Result<()> {
#[async_trait::async_trait]
impl Validate<MeshTLSAuthenticationSpec> for Admission {
async fn validate(self, _ns: &str, _name: &str, spec: MeshTLSAuthenticationSpec) -> Result<()> {
async fn validate(
self,
_ns: &str,
_name: &str,
_annotations: &BTreeMap<String, String>,
spec: MeshTLSAuthenticationSpec,
) -> Result<()> {
for id in spec.identities.iter().flatten() {
if let Err(err) = validation::validate_identity(id) {
bail!("id {} is invalid: {}", id, err);
@ -323,7 +340,13 @@ impl Validate<ServerSpec> for Admission {
//
// TODO(ver) this isn't rigorous about detecting servers that select the same port if one port
// specifies a numeric port and the other specifies the port's name.
async fn validate(self, ns: &str, name: &str, spec: ServerSpec) -> Result<()> {
async fn validate(
self,
ns: &str,
name: &str,
_annotations: &BTreeMap<String, String>,
spec: ServerSpec,
) -> Result<()> {
// Since we can't ensure that the local index is up-to-date with the API server (i.e.
// updates may be delayed), we issue an API request to get the latest state of servers in
// the namespace.
@ -378,7 +401,13 @@ impl Admission {
#[async_trait::async_trait]
impl Validate<NetworkAuthenticationSpec> for Admission {
async fn validate(self, _ns: &str, _name: &str, spec: NetworkAuthenticationSpec) -> Result<()> {
async fn validate(
self,
_ns: &str,
_name: &str,
_annotations: &BTreeMap<String, String>,
spec: NetworkAuthenticationSpec,
) -> Result<()> {
if spec.networks.is_empty() {
bail!("at least one network must be specified");
}
@ -407,7 +436,13 @@ impl Validate<NetworkAuthenticationSpec> for Admission {
#[async_trait::async_trait]
impl Validate<ServerAuthorizationSpec> for Admission {
async fn validate(self, _ns: &str, _name: &str, spec: ServerAuthorizationSpec) -> Result<()> {
async fn validate(
self,
_ns: &str,
_name: &str,
_annotations: &BTreeMap<String, String>,
spec: ServerAuthorizationSpec,
) -> Result<()> {
if let Some(mtls) = spec.client.mesh_tls.as_ref() {
if spec.client.unauthenticated {
bail!("`unauthenticated` must be false if `mesh_tls` is specified");
@ -477,7 +512,25 @@ fn validate_match(
#[async_trait::async_trait]
impl Validate<HttpRouteSpec> for Admission {
async fn validate(self, _ns: &str, _name: &str, spec: HttpRouteSpec) -> Result<()> {
async fn validate(
self,
_ns: &str,
_name: &str,
annotations: &BTreeMap<String, String>,
spec: HttpRouteSpec,
) -> Result<()> {
if spec
.inner
.parent_refs
.iter()
.flatten()
.any(index::outbound::index::is_parent_service)
{
index::outbound::index::http::parse_http_retry(annotations)?;
index::outbound::index::parse_accrual_config(annotations)?;
index::outbound::index::parse_timeouts(annotations)?;
}
fn validate_filter(filter: httproute::HttpRouteFilter) -> Result<()> {
match filter {
httproute::HttpRouteFilter::RequestHeaderModifier {
@ -549,8 +602,21 @@ impl Validate<k8s_gateway_api::HttpRouteSpec> for Admission {
self,
_ns: &str,
_name: &str,
annotations: &BTreeMap<String, String>,
spec: k8s_gateway_api::HttpRouteSpec,
) -> Result<()> {
if spec
.inner
.parent_refs
.iter()
.flatten()
.any(outbound_index::is_parent_service)
{
outbound_index::http::parse_http_retry(annotations)?;
outbound_index::parse_accrual_config(annotations)?;
outbound_index::parse_timeouts(annotations)?;
}
fn validate_filter(filter: k8s_gateway_api::HttpRouteFilter) -> Result<()> {
match filter {
k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
@ -595,8 +661,21 @@ impl Validate<k8s_gateway_api::GrpcRouteSpec> for Admission {
self,
_ns: &str,
_name: &str,
annotations: &BTreeMap<String, String>,
spec: k8s_gateway_api::GrpcRouteSpec,
) -> Result<()> {
if spec
.inner
.parent_refs
.iter()
.flatten()
.any(outbound_index::is_parent_service)
{
outbound_index::grpc::parse_grpc_retry(annotations)?;
outbound_index::parse_accrual_config(annotations)?;
outbound_index::parse_timeouts(annotations)?;
}
fn validate_filter(filter: k8s_gateway_api::GrpcRouteFilter) -> Result<()> {
match filter {
k8s_gateway_api::GrpcRouteFilter::RequestHeaderModifier {

View File

@ -97,6 +97,9 @@ struct Args {
#[clap(long, default_value = "5000")]
patch_timeout_ms: u64,
#[clap(long)]
allow_l5d_request_headers: bool,
}
#[tokio::main]
@ -118,6 +121,7 @@ async fn main() -> Result<()> {
probe_networks,
default_opaque_ports,
patch_timeout_ms,
allow_l5d_request_headers,
} = Args::parse();
let server = if admission_controller_disabled {
@ -290,6 +294,7 @@ async fn main() -> Result<()> {
grpc_addr,
cluster_domain,
cluster_networks,
allow_l5d_request_headers,
inbound_index,
outbound_index,
runtime.shutdown_handle(),
@ -340,6 +345,7 @@ async fn grpc(
addr: SocketAddr,
cluster_domain: String,
cluster_networks: Vec<IpNet>,
allow_l5d_request_headers: bool,
inbound_index: inbound::SharedIndex,
outbound_index: outbound::SharedIndex,
drain: drain::Watch,
@ -350,9 +356,13 @@ async fn grpc(
.svc();
let outbound_discover = OutboundDiscover::new(outbound_index);
let outbound_svc =
grpc::outbound::OutboundPolicyServer::new(outbound_discover, cluster_domain, drain.clone())
.svc();
let outbound_svc = grpc::outbound::OutboundPolicyServer::new(
outbound_discover,
cluster_domain,
allow_l5d_request_headers,
drain.clone(),
)
.svc();
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
tokio::pin! {

View File

@ -31,7 +31,7 @@ default-features = false
features = ["client", "openssl-tls", "runtime", "ws"]
[dependencies.linkerd2-proxy-api]
version = "0.13"
version = "0.14"
features = ["inbound", "outbound"]
[dev-dependencies]

View File

@ -5,6 +5,7 @@ pub mod admission;
pub mod bb;
pub mod curl;
pub mod grpc;
pub mod outbound_api;
pub mod web;
use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt};

View File

@ -0,0 +1,259 @@
use crate::{assert_svc_meta, grpc};
use kube::ResourceExt;
use linkerd_policy_controller_k8s_api as k8s;
use std::time::Duration;
use tokio::time;
pub async fn retry_watch_outbound_policy(
client: &kube::Client,
ns: &str,
svc: &k8s::Service,
port: u16,
) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
// Port-forward to the control plane and start watching the service's
// outbound policy.
let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
loop {
match policy_api.watch(ns, svc, port).await {
Ok(rx) => return rx,
Err(error) => {
tracing::error!(
?error,
ns,
svc = svc.name_unchecked(),
"failed to watch outbound policy for port 4191"
);
time::sleep(Duration::from_secs(1)).await;
}
}
}
}
// detect_http_routes asserts that the given outbound policy has a proxy protcol
// of "Detect" and then invokes the given function with the Http1 and Http2
// routes from the Detect.
#[track_caller]
pub fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(&[grpc::outbound::HttpRoute]),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(&http1.routes);
f(&http2.routes);
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
pub fn grpc_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::GrpcRoute] {
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Grpc(grpc::outbound::proxy_protocol::Grpc {
routes,
failure_accrual: _,
}) = kind
{
routes
} else {
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
}
}
#[track_caller]
pub fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(Option<&grpc::outbound::FailureAccrual>),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(http1.failure_accrual.as_ref());
f(http2.failure_accrual.as_ref());
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
pub fn failure_accrual_consecutive(
accrual: Option<&grpc::outbound::FailureAccrual>,
) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
assert!(
accrual.is_some(),
"failure accrual must be configured for service"
);
let kind = accrual
.unwrap()
.kind
.as_ref()
.expect("failure accrual must have kind");
let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
accrual
}
#[track_caller]
pub fn route_backends_first_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::RouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
_ => panic!("Distribution must be FirstAvailable"),
}
}
#[track_caller]
pub fn route_backends_random_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
_ => panic!("Distribution must be RandomAvailable"),
}
}
#[track_caller]
pub fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
_ => panic!("route must be a resource kind"),
}
}
#[track_caller]
pub fn assert_backend_has_failure_filter(
backend: &grpc::outbound::http_route::WeightedRouteBackend,
) {
let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
match filter.kind.as_ref().unwrap() {
grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
_ => panic!("backend must have FailureInjector filter"),
};
}
#[track_caller]
pub fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(_) => {}
grpc::meta::metadata::Kind::Resource(r) => {
panic!("route expected to be default but got resource {r:?}")
}
}
let backends = route_backends_first_available(route);
let backend = assert_singleton(backends);
assert_backend_matches_service(backend, svc, port);
let rule = assert_singleton(&route.rules);
let route_match = assert_singleton(&rule.matches);
let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
assert_eq!(
*path_match,
grpc::http_route::path_match::Kind::Prefix("/".to_string())
);
}
#[track_caller]
pub fn assert_backend_matches_service(
backend: &grpc::outbound::http_route::RouteBackend,
svc: &k8s::Service,
port: u16,
) {
let backend = backend.backend.as_ref().unwrap();
let dst = match backend.kind.as_ref().unwrap() {
grpc::outbound::backend::Kind::Balancer(balance) => {
let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
}
}
grpc::outbound::backend::Kind::Forward(_) => {
panic!("default route backend must be Balancer")
}
};
assert_eq!(
*dst,
format!(
"{}.{}.svc.{}:{}",
svc.name_unchecked(),
svc.namespace().unwrap(),
"cluster.local",
port
)
);
assert_svc_meta(&backend.metadata, svc, port)
}
#[track_caller]
pub fn assert_singleton<T>(ts: &[T]) -> &T {
assert_eq!(ts.len(), 1);
ts.first().unwrap()
}
#[track_caller]
pub fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(d) => {
panic!("route expected to not be default, but got default {d:?}")
}
grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
}
}

View File

@ -4,11 +4,10 @@ use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_test::{
assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
mk_service, with_temp_ns,
mk_service, outbound_api::*, with_temp_ns,
};
use maplit::{btreemap, convert_args};
use std::{collections::BTreeMap, time::Duration};
use tokio::time;
// These tests are copies of the tests in outbound_api_gateway.rs but using the
// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
@ -1153,35 +1152,124 @@ async fn consumer_route() {
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn http_route_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
("retry.linkerd.io/http".to_string(), "5xx".to_string()),
("timeout.linkerd.io/response".to_string(), "10s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
let status_range = assert_singleton(&conditions.status_ranges);
assert_eq!(status_range.start, 500);
assert_eq!(status_range.end, 599);
let timeout = rule
.timeouts
.as_ref()
.expect("timeouts expected")
.response
.as_ref()
.expect("response timeout expected");
assert_eq!(timeout.seconds, 10);
});
})
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn service_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let mut svc = mk_service(&ns, "my-svc", 4191);
svc.annotations_mut()
.insert("retry.linkerd.io/http".to_string(), "5xx".to_string());
svc.annotations_mut()
.insert("timeout.linkerd.io/response".to_string(), "10s".to_string());
let svc = create(&client, svc).await;
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
// Route annotations override the timeout config specified
// on the service.
("timeout.linkerd.io/request".to_string(), "5s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
let status_range = assert_singleton(&conditions.status_ranges);
// Retry config inherited from the service.
assert_eq!(status_range.start, 500);
assert_eq!(status_range.end, 599);
let timeouts = rule.timeouts.as_ref().expect("timeouts expected");
// Service timeout config overridden by route timeout config.
assert_eq!(timeouts.response, None);
let request_timeout = timeouts.request.as_ref().expect("request timeout expected");
assert_eq!(request_timeout.seconds, 5);
});
})
.await;
}
/* Helpers */
struct HttpRouteBuilder(k8s_gateway_api::HttpRoute);
async fn retry_watch_outbound_policy(
client: &kube::Client,
ns: &str,
svc: &k8s::Service,
port: u16,
) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
// Port-forward to the control plane and start watching the service's
// outbound policy.
let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
loop {
match policy_api.watch(ns, svc, port).await {
Ok(rx) => return rx,
Err(error) => {
tracing::error!(
?error,
ns,
svc = svc.name_unchecked(),
"failed to watch outbound policy for port 4191"
);
time::sleep(Duration::from_secs(1)).await;
}
}
}
}
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
use k8s_gateway_api as api;
@ -1263,6 +1351,12 @@ impl HttpRouteBuilder {
Self(route)
}
fn with_annotations(self, annotations: BTreeMap<String, String>) -> Self {
let mut route = self.0;
route.metadata.annotations = Some(annotations);
Self(route)
}
fn build(self) -> k8s_gateway_api::HttpRoute {
self.0
}
@ -1298,210 +1392,3 @@ fn mk_empty_http_route(
status: None,
}
}
// detect_http_routes asserts that the given outbound policy has a proxy protcol
// of "Detect" and then invokes the given function with the Http1 and Http2
// routes from the Detect.
#[track_caller]
fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(&[grpc::outbound::HttpRoute]),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(&http1.routes);
f(&http2.routes);
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(Option<&grpc::outbound::FailureAccrual>),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(http1.failure_accrual.as_ref());
f(http2.failure_accrual.as_ref());
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
fn failure_accrual_consecutive(
accrual: Option<&grpc::outbound::FailureAccrual>,
) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
assert!(
accrual.is_some(),
"failure accrual must be configured for service"
);
let kind = accrual
.unwrap()
.kind
.as_ref()
.expect("failure accrual must have kind");
let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
accrual
}
#[track_caller]
fn route_backends_first_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::RouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
_ => panic!("Distribution must be FirstAvailable"),
}
}
#[track_caller]
fn route_backends_random_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
_ => panic!("Distribution must be RandomAvailable"),
}
}
#[track_caller]
fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
_ => panic!("route must be a resource kind"),
}
}
#[track_caller]
fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
match filter.kind.as_ref().unwrap() {
grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
_ => panic!("backend must have FailureInjector filter"),
};
}
#[track_caller]
fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(_) => {}
grpc::meta::metadata::Kind::Resource(r) => {
panic!("route expected to be default but got resource {r:?}")
}
}
let backends = route_backends_first_available(route);
let backend = assert_singleton(backends);
assert_backend_matches_service(backend, svc, port);
let rule = assert_singleton(&route.rules);
let route_match = assert_singleton(&rule.matches);
let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
assert_eq!(
*path_match,
grpc::http_route::path_match::Kind::Prefix("/".to_string())
);
}
#[track_caller]
fn assert_backend_matches_service(
backend: &grpc::outbound::http_route::RouteBackend,
svc: &k8s::Service,
port: u16,
) {
let backend = backend.backend.as_ref().unwrap();
let dst = match backend.kind.as_ref().unwrap() {
grpc::outbound::backend::Kind::Balancer(balance) => {
let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
}
}
grpc::outbound::backend::Kind::Forward(_) => {
panic!("default route backend must be Balancer")
}
};
assert_eq!(
*dst,
format!(
"{}.{}.svc.{}:{}",
svc.name_unchecked(),
svc.namespace().unwrap(),
"cluster.local",
port
)
);
assert_svc_meta(&backend.metadata, svc, port)
}
#[track_caller]
fn assert_singleton<T>(ts: &[T]) -> &T {
assert_eq!(ts.len(), 1);
ts.first().unwrap()
}
#[track_caller]
fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(d) => {
panic!("route expected to not be default, but got default {d:?}")
}
grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
}
}

View File

@ -0,0 +1,164 @@
use futures::prelude::*;
use kube::ResourceExt;
use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_test::{create, create_service, mk_service, outbound_api::*, with_temp_ns};
use std::collections::BTreeMap;
#[tokio::test(flavor = "current_thread")]
async fn grpc_route_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;
let _route = create(
&client,
mk_grpc_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
("retry.linkerd.io/grpc".to_string(), "internal".to_string()),
("timeout.linkerd.io/response".to_string(), "10s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
let routes = grpc_routes(&config);
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
assert!(conditions.internal);
let timeout = rule
.timeouts
.as_ref()
.expect("timeouts expected")
.response
.as_ref()
.expect("response timeout expected");
assert_eq!(timeout.seconds, 10);
})
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn service_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let mut svc = mk_service(&ns, "my-svc", 4191);
svc.annotations_mut()
.insert("retry.linkerd.io/grpc".to_string(), "internal".to_string());
svc.annotations_mut()
.insert("timeout.linkerd.io/response".to_string(), "10s".to_string());
let svc = create(&client, svc).await;
let _route = create(
&client,
mk_grpc_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
// Route annotations override the timeout config specified
// on the service.
("timeout.linkerd.io/request".to_string(), "5s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
let routes = grpc_routes(&config);
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
// Retry config inherited from the service.
assert!(conditions.internal);
let timeouts = rule.timeouts.as_ref().expect("timeouts expected");
// Service timeout config overridden by route timeout config.
assert_eq!(timeouts.response, None);
let request_timeout = timeouts.request.as_ref().expect("request timeout expected");
assert_eq!(request_timeout.seconds, 5);
})
.await;
}
/* Helpers */
struct GrpcRouteBuilder(k8s_gateway_api::GrpcRoute);
fn mk_grpc_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> GrpcRouteBuilder {
GrpcRouteBuilder(k8s_gateway_api::GrpcRoute {
metadata: kube::api::ObjectMeta {
namespace: Some(ns.to_string()),
name: Some(name.to_string()),
..Default::default()
},
spec: k8s_gateway_api::GrpcRouteSpec {
inner: k8s_gateway_api::CommonRouteSpec {
parent_refs: Some(vec![k8s_gateway_api::ParentReference {
group: Some("core".to_string()),
kind: Some("Service".to_string()),
namespace: svc.namespace(),
name: svc.name_unchecked(),
section_name: None,
port,
}]),
},
hostnames: None,
rules: Some(vec![k8s_gateway_api::GrpcRouteRule {
matches: Some(vec![k8s_gateway_api::GrpcRouteMatch {
method: Some(k8s_gateway_api::GrpcMethodMatch::Exact {
method: Some("foo".to_string()),
service: Some("my-gprc-service".to_string()),
}),
headers: None,
}]),
filters: None,
backend_refs: None,
}]),
},
status: None,
})
}
impl GrpcRouteBuilder {
fn with_annotations(self, annotations: BTreeMap<String, String>) -> Self {
let mut route = self.0;
route.metadata.annotations = Some(annotations);
Self(route)
}
fn build(self) -> k8s_gateway_api::GrpcRoute {
self.0
}
}

View File

@ -6,10 +6,9 @@ use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_test::{
assert_default_accrual_backoff, assert_svc_meta, create, create_annotated_service,
create_cluster_scoped, create_opaque_service, create_service, delete_cluster_scoped, grpc,
mk_service, with_temp_ns,
mk_service, outbound_api::*, with_temp_ns,
};
use maplit::{btreemap, convert_args};
use tokio::time;
// These tests are copies of the tests in outbound_api_gateway.rs but using the
// policy.linkerd.io HttpRoute kubernetes types instead of the Gateway API ones.
@ -1179,35 +1178,124 @@ async fn consumer_route() {
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn http_route_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
("retry.linkerd.io/http".to_string(), "5xx".to_string()),
("timeout.linkerd.io/response".to_string(), "10s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
let status_range = assert_singleton(&conditions.status_ranges);
assert_eq!(status_range.start, 500);
assert_eq!(status_range.end, 599);
let timeout = rule
.timeouts
.as_ref()
.expect("timeouts expected")
.response
.as_ref()
.expect("response timeout expected");
assert_eq!(timeout.seconds, 10);
});
})
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn service_retries_and_timeouts() {
with_temp_ns(|client, ns| async move {
// Create a service
let mut svc = mk_service(&ns, "my-svc", 4191);
svc.annotations_mut()
.insert("retry.linkerd.io/http".to_string(), "5xx".to_string());
svc.annotations_mut()
.insert("timeout.linkerd.io/response".to_string(), "10s".to_string());
let svc = create(&client, svc).await;
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, Some(4191))
.with_annotations(
vec![
// Route annotations override the timeout config specified
// on the service.
("timeout.linkerd.io/request".to_string(), "5s".to_string()),
]
.into_iter()
.collect(),
)
.build(),
)
.await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let conditions = rule
.retry
.as_ref()
.expect("retry config expected")
.conditions
.as_ref()
.expect("retry conditions expected");
let status_range = assert_singleton(&conditions.status_ranges);
// Retry config inherited from the service.
assert_eq!(status_range.start, 500);
assert_eq!(status_range.end, 599);
let timeouts = rule.timeouts.as_ref().expect("timeouts expected");
// Service timeout config overridden by route timeout config.
assert_eq!(timeouts.response, None);
let request_timeout = timeouts.request.as_ref().expect("request timeout expected");
assert_eq!(request_timeout.seconds, 5);
});
})
.await;
}
/* Helpers */
struct HttpRouteBuilder(k8s::policy::HttpRoute);
async fn retry_watch_outbound_policy(
client: &kube::Client,
ns: &str,
svc: &k8s::Service,
port: u16,
) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
// Port-forward to the control plane and start watching the service's
// outbound policy.
let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
loop {
match policy_api.watch(ns, svc, port).await {
Ok(rx) => return rx,
Err(error) => {
tracing::error!(
?error,
ns,
svc = svc.name_unchecked(),
"failed to watch outbound policy for port 4191"
);
time::sleep(Duration::from_secs(1)).await;
}
}
}
}
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
use k8s::policy::httproute as api;
@ -1290,6 +1378,12 @@ impl HttpRouteBuilder {
Self(route)
}
fn with_annotations(self, annotations: BTreeMap<String, String>) -> Self {
let mut route = self.0;
route.metadata.annotations = Some(annotations);
Self(route)
}
fn build(self) -> k8s::policy::HttpRoute {
self.0
}
@ -1325,210 +1419,3 @@ fn mk_empty_http_route(
status: None,
}
}
// detect_http_routes asserts that the given outbound policy has a proxy protcol
// of "Detect" and then invokes the given function with the Http1 and Http2
// routes from the Detect.
#[track_caller]
fn detect_http_routes<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(&[grpc::outbound::HttpRoute]),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(&http1.routes);
f(&http2.routes);
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(Option<&grpc::outbound::FailureAccrual>),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(http1.failure_accrual.as_ref());
f(http2.failure_accrual.as_ref());
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}
#[track_caller]
fn failure_accrual_consecutive(
accrual: Option<&grpc::outbound::FailureAccrual>,
) -> &grpc::outbound::failure_accrual::ConsecutiveFailures {
assert!(
accrual.is_some(),
"failure accrual must be configured for service"
);
let kind = accrual
.unwrap()
.kind
.as_ref()
.expect("failure accrual must have kind");
let grpc::outbound::failure_accrual::Kind::ConsecutiveFailures(accrual) = kind;
accrual
}
#[track_caller]
fn route_backends_first_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::RouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::FirstAvailable(fa) => &fa.backends,
_ => panic!("Distribution must be FirstAvailable"),
}
}
#[track_caller]
fn route_backends_random_available(
route: &grpc::outbound::HttpRoute,
) -> &[grpc::outbound::http_route::WeightedRouteBackend] {
let kind = assert_singleton(&route.rules)
.backends
.as_ref()
.expect("Rule must have backends")
.kind
.as_ref()
.expect("Backend must have kind");
match kind {
grpc::outbound::http_route::distribution::Kind::RandomAvailable(dist) => &dist.backends,
_ => panic!("Distribution must be RandomAvailable"),
}
}
#[track_caller]
fn route_name(route: &grpc::outbound::HttpRoute) -> &str {
match route.metadata.as_ref().unwrap().kind.as_ref().unwrap() {
grpc::meta::metadata::Kind::Resource(grpc::meta::Resource { ref name, .. }) => name,
_ => panic!("route must be a resource kind"),
}
}
#[track_caller]
fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::WeightedRouteBackend) {
let filter = assert_singleton(&backend.backend.as_ref().unwrap().filters);
match filter.kind.as_ref().unwrap() {
grpc::outbound::http_route::filter::Kind::FailureInjector(_) => {}
_ => panic!("backend must have FailureInjector filter"),
};
}
#[track_caller]
fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(_) => {}
grpc::meta::metadata::Kind::Resource(r) => {
panic!("route expected to be default but got resource {r:?}")
}
}
let backends = route_backends_first_available(route);
let backend = assert_singleton(backends);
assert_backend_matches_service(backend, svc, port);
let rule = assert_singleton(&route.rules);
let route_match = assert_singleton(&rule.matches);
let path_match = route_match.path.as_ref().unwrap().kind.as_ref().unwrap();
assert_eq!(
*path_match,
grpc::http_route::path_match::Kind::Prefix("/".to_string())
);
}
#[track_caller]
fn assert_backend_matches_service(
backend: &grpc::outbound::http_route::RouteBackend,
svc: &k8s::Service,
port: u16,
) {
let backend = backend.backend.as_ref().unwrap();
let dst = match backend.kind.as_ref().unwrap() {
grpc::outbound::backend::Kind::Balancer(balance) => {
let kind = balance.discovery.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::outbound::backend::endpoint_discovery::Kind::Dst(dst) => &dst.path,
}
}
grpc::outbound::backend::Kind::Forward(_) => {
panic!("default route backend must be Balancer")
}
};
assert_eq!(
*dst,
format!(
"{}.{}.svc.{}:{}",
svc.name_unchecked(),
svc.namespace().unwrap(),
"cluster.local",
port
)
);
assert_svc_meta(&backend.metadata, svc, port)
}
#[track_caller]
fn assert_singleton<T>(ts: &[T]) -> &T {
assert_eq!(ts.len(), 1);
ts.first().unwrap()
}
#[track_caller]
fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
match kind {
grpc::meta::metadata::Kind::Default(d) => {
panic!("route expected to not be default, but got default {d:?}")
}
grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
}
}