From ed42c2ad3adcacac19bebfbef6ddc2a0954c64c4 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 11 Mar 2025 11:53:30 -0700 Subject: [PATCH] feat(runtime): support CORES_MIN, CORES_MAX, and CORES_MAX_RATIO (#3731) When the proxy boots up, it needs to select a number of I/O worker threads to allocate to the runtime. This change adds a new environment variable that allows this value to scale based on the number of CPUs available on on the host. A CORES_MAX_RATIO value of 1.0 will allocate one worker thread per CPU core. A lesser value will allocate fewer worker threads. Values are rounded to the nearest whole number. The CORES_MIN value sets a lower bound on the number of worker threads to use. The CORES_MAX value sets an upper bound. --- Cargo.lock | 5 ++ Cargo.toml | 1 + linkerd/app/Cargo.toml | 1 + linkerd/app/src/lib.rs | 1 + linkerd/workers/Cargo.toml | 8 ++ linkerd/workers/src/lib.rs | 163 +++++++++++++++++++++++++++++++++++++ linkerd2-proxy/src/rt.rs | 74 ++++++++++++----- 7 files changed, 232 insertions(+), 21 deletions(-) create mode 100644 linkerd/workers/Cargo.toml create mode 100644 linkerd/workers/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index ac6b719d3..cfd733f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,6 +1305,7 @@ dependencies = [ "linkerd-opencensus", "linkerd-opentelemetry", "linkerd-tonic-stream", + "linkerd-workers", "rangemap", "regex", "thiserror 2.0.12", @@ -2693,6 +2694,10 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-workers" +version = "0.1.0" + [[package]] name = "linkerd2-proxy" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index ecdf16e63..2fbefe347 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ members = [ "linkerd/tracing", "linkerd/transport-header", "linkerd/transport-metrics", + "linkerd/workers", "linkerd2-proxy", "opencensus-proto", "opentelemetry-proto", diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index bb87dc174..5431fe2d6 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -27,6 +27,7 @@ linkerd-error = { path = "../error" } linkerd-opencensus = { path = "../opencensus" } linkerd-opentelemetry = { path = "../opentelemetry" } linkerd-tonic-stream = { path = "../tonic-stream" } +linkerd-workers = { path = "../workers" } rangemap = "1" regex = "1" thiserror = "2" diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 2e8cc25c4..20cece663 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -29,6 +29,7 @@ pub use linkerd_app_core::{metrics, trace, transport::BindTcp, BUILD_INFO}; use linkerd_app_gateway as gateway; use linkerd_app_inbound::{self as inbound, Inbound}; use linkerd_app_outbound::{self as outbound, Outbound}; +pub use linkerd_workers::Workers; use std::pin::Pin; use tokio::{ sync::mpsc, diff --git a/linkerd/workers/Cargo.toml b/linkerd/workers/Cargo.toml new file mode 100644 index 000000000..409a432b0 --- /dev/null +++ b/linkerd/workers/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "linkerd-workers" +version = "0.1.0" +edition = "2021" +publish = false +description = "CPU core allocation logic for Linkerd" + +[dependencies] diff --git a/linkerd/workers/src/lib.rs b/linkerd/workers/src/lib.rs new file mode 100644 index 000000000..f8ebc482b --- /dev/null +++ b/linkerd/workers/src/lib.rs @@ -0,0 +1,163 @@ +//! Core allocation logic for Linkerd's worker threads. + +use std::num::NonZeroUsize; + +/// Determines the number of worker threads to use in a runtime. +#[derive(Copy, Clone, Debug)] +pub struct Workers { + pub available: NonZeroUsize, + pub max_ratio: Option, + pub max_cores: Option, + pub min_cores: NonZeroUsize, +} + +impl Workers { + /// Calculate the number of cores to use based on the constraints. + /// + /// The algorithm uses the following precedence: + /// 1. The explicitly configured maximum cores, if present + /// 2. The ratio-based calculation, if present + /// 3. Default to 1 core + /// + /// The result is constrained by both the minimum cores and the available cores. + pub fn cores(&self) -> NonZeroUsize { + let Self { + available, + max_ratio, + max_cores, + min_cores, + } = *self; + + max_cores + .or_else(|| { + max_ratio.and_then(|ratio| { + let max = (available.get() as f64 * ratio).round() as usize; + max.try_into().ok() + }) + }) + .unwrap_or_else(|| 1.try_into().unwrap()) + .max(min_cores) + .min(available) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn min_cores_exceeds_max_cores() { + let workers = Workers { + available: NonZeroUsize::new(8).unwrap(), + max_cores: NonZeroUsize::new(2), + min_cores: NonZeroUsize::new(4).unwrap(), + max_ratio: None, + }; + assert_eq!(workers.cores().get(), 4); + } + + #[test] + fn available_limits_max_cores() { + let workers = Workers { + available: NonZeroUsize::new(2).unwrap(), + max_cores: NonZeroUsize::new(4), + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: None, + }; + assert_eq!(workers.cores().get(), 2); + } + + #[test] + fn max_ratio_calculates_cores() { + let workers = Workers { + available: NonZeroUsize::new(10).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(0.5), + }; + assert_eq!(workers.cores().get(), 5); // 10 * 0.5 = 5 + } + + #[test] + fn max_cores_overrides_ratio() { + let workers = Workers { + available: NonZeroUsize::new(10).unwrap(), + max_cores: NonZeroUsize::new(3), + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(0.5), + }; + assert_eq!(workers.cores().get(), 3); + } + + #[test] + fn min_cores_exceeds_ratio_calculation() { + let workers = Workers { + available: NonZeroUsize::new(10).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(6).unwrap(), + max_ratio: Some(0.5), + }; + assert_eq!(workers.cores().get(), 6); // min_cores > max_cores from ratio (5) + } + + #[test] + fn fallback_to_min_cores_when_no_max() { + let workers = Workers { + available: NonZeroUsize::new(8).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(2).unwrap(), + max_ratio: None, + }; + assert_eq!(workers.cores().get(), 2); + } + + #[test] + fn single_cpu_environment() { + let workers = Workers { + available: NonZeroUsize::new(1).unwrap(), + max_cores: NonZeroUsize::new(4), + min_cores: NonZeroUsize::new(2).unwrap(), + max_ratio: None, + }; + assert_eq!(workers.cores().get(), 1); + } + + #[test] + fn ratio() { + // For 10 CPUs with 0.31 ratio, we get 3.1 cores, which rounds to 3 + let workers = Workers { + available: NonZeroUsize::new(10).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(0.31), + }; + assert_eq!(workers.cores().get(), 3); + + // For 10 CPUs with 0.35 ratio, we get 3.5 cores, which rounds to 4 + let workers = Workers { + available: NonZeroUsize::new(10).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(0.35), + }; + assert_eq!(workers.cores().get(), 4); + + // For 8 CPUs with 0.25 ratio, we get exactly 2 cores + let workers = Workers { + available: NonZeroUsize::new(8).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(0.25), + }; + assert_eq!(workers.cores().get(), 2); + + // For 96 CPUs with 1.0 ratio, we get all 96 cores + let workers = Workers { + available: NonZeroUsize::new(96).unwrap(), + max_cores: None, + min_cores: NonZeroUsize::new(1).unwrap(), + max_ratio: Some(1.0), + }; + assert_eq!(workers.cores().get(), 96); + } +} diff --git a/linkerd2-proxy/src/rt.rs b/linkerd2-proxy/src/rt.rs index a45909673..e374c6481 100644 --- a/linkerd2-proxy/src/rt.rs +++ b/linkerd2-proxy/src/rt.rs @@ -1,5 +1,8 @@ +use std::num::NonZeroUsize; + +use linkerd_app::Workers; use tokio::runtime::{Builder, Runtime}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; pub(crate) fn build() -> Runtime { // The proxy creates an additional admin thread, but it would be wasteful to @@ -9,31 +12,60 @@ pub(crate) fn build() -> Runtime { // // The basic scheduler is used when the threaded scheduler would provide no // benefit. - let mut cores = std::env::var("LINKERD2_PROXY_CORES") + + let min_cores = std::env::var("LINKERD2_PROXY_CORES_MIN") .ok() .and_then(|v| { - let opt = v.parse::().ok().filter(|n| *n > 0); + let opt = v.parse::().ok().and_then(NonZeroUsize::new); if opt.is_none() { - warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration"); + warn!(LINKERD2_PROXY_CORES_MIN = %v, "Ignoring invalid configuration"); } opt }) - .unwrap_or(0); + .or_else(|| { + std::env::var("LINKERD2_PROXY_CORES").ok().and_then(|v| { + let opt = v.parse::().ok().and_then(NonZeroUsize::new); + if opt.is_none() { + warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration"); + } + opt + }) + }) + .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()); - let cpus = num_cpus::get(); - debug_assert!(cpus > 0, "At least one CPU must be available"); - if cores > cpus { - warn!( - cpus, - LINKERD2_PROXY_CORES = cores, - "Ignoring configuration due to insufficient resources" - ); - cores = cpus; - } + let max_cores = std::env::var("LINKERD2_PROXY_CORES_MAX") + .ok() + .and_then(|v| { + let opt = v.parse::().ok().and_then(NonZeroUsize::new); + if opt.is_none() { + warn!(LINKERD2_PROXY_CORES_MAX = %v, "Ignoring invalid configuration"); + } + opt + }); - match cores { - // `0` is unexpected, but it's a wild world out there. - 0 | 1 => { + let cores_ratio = std::env::var("LINKERD2_PROXY_CORES_MAX_RATIO") + .ok() + .and_then(|v| { + let opt = v.parse::().ok().filter(|n| *n > 0.0 && *n <= 1.0); + if opt.is_none() { + warn!(LINKERD2_PROXY_CORES_MAX_RATIO = %v, "Ignoring invalid configuration"); + } + opt + }); + + let available_cpus = num_cpus::get(); + debug_assert!(available_cpus > 0, "At least one CPU must be available"); + let workers = Workers { + available: NonZeroUsize::new(available_cpus) + .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()), + max_ratio: cores_ratio, + min_cores, + max_cores, + }; + debug!(?workers); + + match workers.cores().get() { + 1 => { info!("Using single-threaded proxy runtime"); Builder::new_current_thread() .enable_all() @@ -41,13 +73,13 @@ pub(crate) fn build() -> Runtime { .build() .expect("failed to build basic runtime!") } - num_cpus => { + cores => { info!(%cores, "Using multi-threaded proxy runtime"); Builder::new_multi_thread() .enable_all() .thread_name("proxy") - .worker_threads(num_cpus) - .max_blocking_threads(num_cpus) + .worker_threads(cores) + .max_blocking_threads(cores) .build() .expect("failed to build threaded runtime!") }