feat(runtime): support CORES_MIN, CORES_MAX, and CORES_MAX_RATIO (#3731)

When the proxy boots up, it needs to select a number of I/O worker threads to
allocate to the runtime. This change adds a new environment variable that allows
this value to scale based on the number of CPUs available on on the host.

A CORES_MAX_RATIO value of 1.0 will allocate one worker thread per CPU core. A
lesser value will allocate fewer worker threads. Values are rounded to the
nearest whole number.

The CORES_MIN value sets a lower bound on the number of worker threads to use.
The CORES_MAX value sets an upper bound.
This commit is contained in:
Oliver Gould 2025-03-11 11:53:30 -07:00 committed by GitHub
parent 13a79167d7
commit ed42c2ad3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 232 additions and 21 deletions

View File

@ -1305,6 +1305,7 @@ dependencies = [
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"linkerd-workers",
"rangemap",
"regex",
"thiserror 2.0.12",
@ -2693,6 +2694,10 @@ dependencies = [
"tracing",
]
[[package]]
name = "linkerd-workers"
version = "0.1.0"
[[package]]
name = "linkerd2-proxy"
version = "0.1.0"

View File

@ -87,6 +87,7 @@ members = [
"linkerd/tracing",
"linkerd/transport-header",
"linkerd/transport-metrics",
"linkerd/workers",
"linkerd2-proxy",
"opencensus-proto",
"opentelemetry-proto",

View File

@ -27,6 +27,7 @@ linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
linkerd-workers = { path = "../workers" }
rangemap = "1"
regex = "1"
thiserror = "2"

View File

@ -29,6 +29,7 @@ pub use linkerd_app_core::{metrics, trace, transport::BindTcp, BUILD_INFO};
use linkerd_app_gateway as gateway;
use linkerd_app_inbound::{self as inbound, Inbound};
use linkerd_app_outbound::{self as outbound, Outbound};
pub use linkerd_workers::Workers;
use std::pin::Pin;
use tokio::{
sync::mpsc,

View File

@ -0,0 +1,8 @@
[package]
name = "linkerd-workers"
version = "0.1.0"
edition = "2021"
publish = false
description = "CPU core allocation logic for Linkerd"
[dependencies]

163
linkerd/workers/src/lib.rs Normal file
View File

@ -0,0 +1,163 @@
//! Core allocation logic for Linkerd's worker threads.
use std::num::NonZeroUsize;
/// Determines the number of worker threads to use in a runtime.
#[derive(Copy, Clone, Debug)]
pub struct Workers {
pub available: NonZeroUsize,
pub max_ratio: Option<f64>,
pub max_cores: Option<NonZeroUsize>,
pub min_cores: NonZeroUsize,
}
impl Workers {
/// Calculate the number of cores to use based on the constraints.
///
/// The algorithm uses the following precedence:
/// 1. The explicitly configured maximum cores, if present
/// 2. The ratio-based calculation, if present
/// 3. Default to 1 core
///
/// The result is constrained by both the minimum cores and the available cores.
pub fn cores(&self) -> NonZeroUsize {
let Self {
available,
max_ratio,
max_cores,
min_cores,
} = *self;
max_cores
.or_else(|| {
max_ratio.and_then(|ratio| {
let max = (available.get() as f64 * ratio).round() as usize;
max.try_into().ok()
})
})
.unwrap_or_else(|| 1.try_into().unwrap())
.max(min_cores)
.min(available)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn min_cores_exceeds_max_cores() {
let workers = Workers {
available: NonZeroUsize::new(8).unwrap(),
max_cores: NonZeroUsize::new(2),
min_cores: NonZeroUsize::new(4).unwrap(),
max_ratio: None,
};
assert_eq!(workers.cores().get(), 4);
}
#[test]
fn available_limits_max_cores() {
let workers = Workers {
available: NonZeroUsize::new(2).unwrap(),
max_cores: NonZeroUsize::new(4),
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: None,
};
assert_eq!(workers.cores().get(), 2);
}
#[test]
fn max_ratio_calculates_cores() {
let workers = Workers {
available: NonZeroUsize::new(10).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(0.5),
};
assert_eq!(workers.cores().get(), 5); // 10 * 0.5 = 5
}
#[test]
fn max_cores_overrides_ratio() {
let workers = Workers {
available: NonZeroUsize::new(10).unwrap(),
max_cores: NonZeroUsize::new(3),
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(0.5),
};
assert_eq!(workers.cores().get(), 3);
}
#[test]
fn min_cores_exceeds_ratio_calculation() {
let workers = Workers {
available: NonZeroUsize::new(10).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(6).unwrap(),
max_ratio: Some(0.5),
};
assert_eq!(workers.cores().get(), 6); // min_cores > max_cores from ratio (5)
}
#[test]
fn fallback_to_min_cores_when_no_max() {
let workers = Workers {
available: NonZeroUsize::new(8).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(2).unwrap(),
max_ratio: None,
};
assert_eq!(workers.cores().get(), 2);
}
#[test]
fn single_cpu_environment() {
let workers = Workers {
available: NonZeroUsize::new(1).unwrap(),
max_cores: NonZeroUsize::new(4),
min_cores: NonZeroUsize::new(2).unwrap(),
max_ratio: None,
};
assert_eq!(workers.cores().get(), 1);
}
#[test]
fn ratio() {
// For 10 CPUs with 0.31 ratio, we get 3.1 cores, which rounds to 3
let workers = Workers {
available: NonZeroUsize::new(10).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(0.31),
};
assert_eq!(workers.cores().get(), 3);
// For 10 CPUs with 0.35 ratio, we get 3.5 cores, which rounds to 4
let workers = Workers {
available: NonZeroUsize::new(10).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(0.35),
};
assert_eq!(workers.cores().get(), 4);
// For 8 CPUs with 0.25 ratio, we get exactly 2 cores
let workers = Workers {
available: NonZeroUsize::new(8).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(0.25),
};
assert_eq!(workers.cores().get(), 2);
// For 96 CPUs with 1.0 ratio, we get all 96 cores
let workers = Workers {
available: NonZeroUsize::new(96).unwrap(),
max_cores: None,
min_cores: NonZeroUsize::new(1).unwrap(),
max_ratio: Some(1.0),
};
assert_eq!(workers.cores().get(), 96);
}
}

View File

@ -1,5 +1,8 @@
use std::num::NonZeroUsize;
use linkerd_app::Workers;
use tokio::runtime::{Builder, Runtime};
use tracing::{info, warn};
use tracing::{debug, info, warn};
pub(crate) fn build() -> Runtime {
// The proxy creates an additional admin thread, but it would be wasteful to
@ -9,31 +12,60 @@ pub(crate) fn build() -> Runtime {
//
// The basic scheduler is used when the threaded scheduler would provide no
// benefit.
let mut cores = std::env::var("LINKERD2_PROXY_CORES")
let min_cores = std::env::var("LINKERD2_PROXY_CORES_MIN")
.ok()
.and_then(|v| {
let opt = v.parse::<usize>().ok().filter(|n| *n > 0);
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
if opt.is_none() {
warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration");
warn!(LINKERD2_PROXY_CORES_MIN = %v, "Ignoring invalid configuration");
}
opt
})
.unwrap_or(0);
.or_else(|| {
std::env::var("LINKERD2_PROXY_CORES").ok().and_then(|v| {
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
if opt.is_none() {
warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration");
}
opt
})
})
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap());
let cpus = num_cpus::get();
debug_assert!(cpus > 0, "At least one CPU must be available");
if cores > cpus {
warn!(
cpus,
LINKERD2_PROXY_CORES = cores,
"Ignoring configuration due to insufficient resources"
);
cores = cpus;
}
let max_cores = std::env::var("LINKERD2_PROXY_CORES_MAX")
.ok()
.and_then(|v| {
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
if opt.is_none() {
warn!(LINKERD2_PROXY_CORES_MAX = %v, "Ignoring invalid configuration");
}
opt
});
match cores {
// `0` is unexpected, but it's a wild world out there.
0 | 1 => {
let cores_ratio = std::env::var("LINKERD2_PROXY_CORES_MAX_RATIO")
.ok()
.and_then(|v| {
let opt = v.parse::<f64>().ok().filter(|n| *n > 0.0 && *n <= 1.0);
if opt.is_none() {
warn!(LINKERD2_PROXY_CORES_MAX_RATIO = %v, "Ignoring invalid configuration");
}
opt
});
let available_cpus = num_cpus::get();
debug_assert!(available_cpus > 0, "At least one CPU must be available");
let workers = Workers {
available: NonZeroUsize::new(available_cpus)
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
max_ratio: cores_ratio,
min_cores,
max_cores,
};
debug!(?workers);
match workers.cores().get() {
1 => {
info!("Using single-threaded proxy runtime");
Builder::new_current_thread()
.enable_all()
@ -41,13 +73,13 @@ pub(crate) fn build() -> Runtime {
.build()
.expect("failed to build basic runtime!")
}
num_cpus => {
cores => {
info!(%cores, "Using multi-threaded proxy runtime");
Builder::new_multi_thread()
.enable_all()
.thread_name("proxy")
.worker_threads(num_cpus)
.max_blocking_threads(num_cpus)
.worker_threads(cores)
.max_blocking_threads(cores)
.build()
.expect("failed to build threaded runtime!")
}