Add rate-limiters to ServerPolicy

This adds the local_rate_limit module to the server-policy crate, that
`ServerPolicy` uses for its new `local_rate_limit` field, containing
three optional rate-limiters: total, identity, overrides (this one is
really a vector of limiters, one per configured override).

I tried putting that under `Protocol` instead, but the `PartialEq`
requirement made it very hard to follow. `Server` OTOH doesn't really
require that trait, so I was able to remove it and accommodate the
limiters.

I made sure to avoid pulling the dashmap dependency in `governor`; I
haven't checked yet the necessity of the "jitter" and "quanta" features.

This temporarily overrides linkerd2-proxy-api dependency to pick changes
from linkerd/linkerd2-proxy-api#388
This commit is contained in:
Alejandro Pedraza 2024-10-29 12:38:54 -05:00
parent c2687744a0
commit b580e657e5
No known key found for this signature in database
15 changed files with 220 additions and 7 deletions

View File

@ -379,6 +379,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -682,6 +688,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.31"
@ -733,6 +745,25 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b"
dependencies = [
"cfg-if",
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"portable-atomic",
"quanta",
"rand",
"smallvec",
"spinning_top",
]
[[package]]
name = "gzip-header"
version = "1.0.0"
@ -2012,6 +2043,7 @@ dependencies = [
name = "linkerd-proxy-server-policy"
version = "0.1.0"
dependencies = [
"governor",
"http",
"ipnet",
"linkerd-http-route",
@ -2385,8 +2417,7 @@ dependencies = [
[[package]]
name = "linkerd2-proxy-api"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61"
source = "git+https://github.com/linkerd/linkerd2-proxy-api.git?branch=alpeb/v0.14.0-rate-limiting#903aeafd8c2e60790b5de0de7aee28f31964fce2"
dependencies = [
"h2",
"http",
@ -2530,6 +2561,12 @@ dependencies = [
"libc",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@ -2540,6 +2577,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -2764,6 +2807,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "portable-atomic"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2"
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -2920,6 +2969,21 @@ dependencies = [
"prost",
]
[[package]]
name = "quanta"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -2980,6 +3044,15 @@ version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684"
[[package]]
name = "raw-cpuid"
version = "11.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0"
dependencies = [
"bitflags 2.4.2",
]
[[package]]
name = "rcgen"
version = "0.12.1"
@ -3290,6 +3363,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -3863,6 +3945,16 @@ version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
[[package]]
name = "web-sys"
version = "0.3.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "widestring"
version = "1.1.0"

View File

@ -90,3 +90,6 @@ lto = true
[workspace.dependencies]
linkerd2-proxy-api = "0.14.0"
[patch.crates-io]
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api.git", branch = "alpeb/v0.14.0-rate-limiting" }

View File

@ -129,6 +129,7 @@ async fn upgraded_request_remains_relative_form() {
}),
}]))]),
},
local_rate_limit: Arc::new(Default::default()),
};
let (policy, tx) = inbound::policy::AllowPolicy::for_test(self.param(), policy);
tokio::spawn(async move {

View File

@ -138,6 +138,7 @@ mod tests {
kind: "server".into(),
name: "testsrv".into(),
}),
local_rate_limit: Default::default(),
},
None,
);

View File

@ -35,6 +35,7 @@ fn allow(protocol: Protocol) -> AllowPolicy {
kind: "server".into(),
name: "testsrv".into(),
}),
local_rate_limit: Arc::new(Default::default()),
},
);
allow

View File

@ -748,6 +748,7 @@ impl svc::Param<policy::AllowPolicy> for Target {
kind: "server".into(),
name: "testsrv".into(),
}),
local_rate_limit: Default::default(),
},
);
policy

View File

@ -44,7 +44,7 @@ pub trait GetPolicy {
fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy;
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
pub enum DefaultPolicy {
Allow(ServerPolicy),
Deny,
@ -90,6 +90,7 @@ impl From<DefaultPolicy> for ServerPolicy {
DefaultPolicy::Allow(p) => p,
DefaultPolicy::Deny => ServerPolicy {
protocol: Protocol::Opaque(Arc::new([])),
local_rate_limit: Default::default(),
meta: Meta::new_default("deny"),
},
}

View File

@ -88,5 +88,6 @@ fn mk(
ServerPolicy {
meta: Meta::new_default(name),
protocol,
local_rate_limit: Default::default(),
}
}

View File

@ -29,6 +29,7 @@ macro_rules! new_svc {
kind: "Server".into(),
name: "testsrv".into(),
}),
local_rate_limit: Arc::new(Default::default()),
},
);
let svc = HttpPolicyService {
@ -197,6 +198,7 @@ async fn http_route() {
},
],
}])),
local_rate_limit: Arc::new(Default::default()),
})
.expect("must send");

View File

@ -26,6 +26,7 @@ async fn unauthenticated_allowed() {
kind: "server".into(),
name: "test".into(),
}),
local_rate_limit: Arc::new(Default::default()),
};
let tls = tls::ConditionalServerTls::None(tls::NoServerTls::NoClientHello);
@ -75,6 +76,7 @@ async fn authenticated_identity() {
kind: "server".into(),
name: "test".into(),
}),
local_rate_limit: Arc::new(Default::default()),
};
let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established {
@ -138,6 +140,7 @@ async fn authenticated_suffix() {
kind: "server".into(),
name: "test".into(),
}),
local_rate_limit: Arc::new(Default::default()),
};
let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established {
@ -197,6 +200,7 @@ async fn tls_unauthenticated() {
kind: "server".into(),
name: "test".into(),
}),
local_rate_limit: Arc::new(Default::default()),
};
let tls = tls::ConditionalServerTls::Some(tls::ServerTls::Established {

View File

@ -46,6 +46,7 @@ pub fn default_config() -> Config {
kind: "server".into(),
name: "testsrv".into(),
}),
local_rate_limit: Arc::new(Default::default()),
}
.into(),
ports: Default::default(),

View File

@ -45,6 +45,7 @@ pub fn all_unauthenticated() -> inbound::Server {
inbound::proxy_protocol::Detect {
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
http_routes: vec![],
http_local_rate_limit: None,
},
)),
}),

View File

@ -10,6 +10,7 @@ publish = false
proto = ["linkerd-http-route/proto", "linkerd2-proxy-api", "prost-types"]
[dependencies]
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
ipnet = "2"
http = "0.2"
prost-types = { version = "0.12", optional = true }

View File

@ -1,11 +1,13 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]
use local_rate_limit::HttpLocalRateLimit;
use std::{hash::Hash, sync::Arc, time};
pub mod authz;
pub mod grpc;
pub mod http;
pub mod local_rate_limit;
pub mod meta;
pub use self::{
@ -14,10 +16,11 @@ pub use self::{
};
pub use linkerd_http_route as route;
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
pub struct ServerPolicy {
pub protocol: Protocol,
pub meta: Arc<Meta>,
pub local_rate_limit: Arc<HttpLocalRateLimit>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -65,6 +68,7 @@ impl ServerPolicy {
}]),
tcp_authorizations: Arc::new([]),
},
local_rate_limit: Arc::new(HttpLocalRateLimit::default()),
}
}
}
@ -131,6 +135,28 @@ pub mod proto {
server_ips: _,
} = proto;
let local_rate_limit = {
match protocol
.clone()
.and_then(|api::ProxyProtocol { kind }| kind)
.ok_or(InvalidServer::MissingProxyProtocol)?
{
api::proxy_protocol::Kind::Detect(api::proxy_protocol::Detect {
http_local_rate_limit,
..
}) => http_local_rate_limit.unwrap_or_default().into(),
api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 {
local_rate_limit,
..
})
| api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 {
local_rate_limit,
..
}) => local_rate_limit.unwrap_or_default().into(),
_ => Default::default(),
}
};
let authorizations = {
// Always permit traffic from localhost.
let localhost = Authorization {
@ -154,6 +180,7 @@ pub mod proto {
api::proxy_protocol::Kind::Detect(api::proxy_protocol::Detect {
http_routes,
timeout,
..
}) => Protocol::Detect {
http: mk_routes!(http, http_routes, authorizations.clone())?,
timeout: timeout
@ -162,11 +189,11 @@ pub mod proto {
tcp_authorizations: authorizations,
},
api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 { routes }) => {
api::proxy_protocol::Kind::Http1(api::proxy_protocol::Http1 { routes, .. }) => {
Protocol::Http1(mk_routes!(http, routes, authorizations)?)
}
api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 { routes }) => {
api::proxy_protocol::Kind::Http2(api::proxy_protocol::Http2 { routes, .. }) => {
Protocol::Http2(mk_routes!(http, routes, authorizations)?)
}
@ -182,7 +209,11 @@ pub mod proto {
// avoid label inference.
let meta = Meta::try_new_with_default(labels, "policy.linkerd.io", "server")?;
Ok(ServerPolicy { protocol, meta })
Ok(ServerPolicy {
protocol,
meta,
local_rate_limit: Arc::new(local_rate_limit),
})
}
}
}

View File

@ -0,0 +1,72 @@
use governor::{
clock::DefaultClock,
state::{keyed::DefaultKeyedStateStore, InMemoryState, NotKeyed, RateLimiter},
Quota,
};
use std::num::NonZeroU32;
#[derive(Debug, Default)]
pub struct HttpLocalRateLimit {
pub total: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
pub identity: Option<RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>>,
pub overrides: Vec<HttpLocalRateLimitOverride>,
}
#[derive(Debug)]
pub struct HttpLocalRateLimitOverride {
pub ids: Vec<String>,
pub rate_limit: RateLimiter<Vec<String>, DefaultKeyedStateStore<Vec<String>>, DefaultClock>,
}
impl Default for HttpLocalRateLimitOverride {
fn default() -> Self {
Self {
ids: vec![],
rate_limit: RateLimiter::keyed(Quota::per_second(NonZeroU32::new(1).unwrap())),
}
}
}
#[cfg(feature = "proto")]
pub mod proto {
use super::*;
use linkerd2_proxy_api::inbound::{self as api};
impl From<api::HttpLocalRateLimit> for HttpLocalRateLimit {
fn from(proto: api::HttpLocalRateLimit) -> Self {
let total = proto.total.map(|lim| {
let quota = Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap());
RateLimiter::direct(quota)
});
let identity = proto.identity.map(|lim| {
let quota = Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap());
RateLimiter::keyed(quota)
});
let overrides = proto
.overrides
.into_iter()
.flat_map(|ovr| {
ovr.limit.map(|lim| {
let ids = ovr
.clients
.into_iter()
.flat_map(|cl| cl.identities.into_iter().map(|id| id.name))
.collect();
let quota =
Quota::per_second(NonZeroU32::new(lim.requests_per_second).unwrap());
let rate_limit = RateLimiter::keyed(quota);
HttpLocalRateLimitOverride { ids, rate_limit }
})
})
.collect();
Self {
total,
identity,
overrides,
}
}
}
}