fix(cgroup) : fix cgroup update if cgroup path is release

avoid update cgroup path if init process cgroup is release

Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
This commit is contained in:
jokemanfire 2025-08-21 15:57:01 +08:00 committed by Maksym Pavlenko
parent 2055c40893
commit 45387cd5e6
2 changed files with 49 additions and 33 deletions

View File

@ -14,6 +14,8 @@
limitations under the License. limitations under the License.
*/ */
#[cfg(target_os = "linux")]
use std::sync::RwLock;
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
os::{ os::{
@ -29,6 +31,8 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
#[cfg(target_os = "linux")]
use cgroups_rs::fs::Cgroup;
use containerd_shim::{ use containerd_shim::{
api::{CreateTaskRequest, ExecProcessRequest, Options, Status}, api::{CreateTaskRequest, ExecProcessRequest, Options, Status},
asynchronous::monitor::{monitor_subscribe, monitor_unsubscribe, Subscription}, asynchronous::monitor::{monitor_subscribe, monitor_unsubscribe, Subscription},
@ -66,19 +70,6 @@ use crate::{
io::Stdio, io::Stdio,
}; };
/// check the process is zombie
#[cfg(target_os = "linux")]
fn is_zombie_process(pid: i32) -> bool {
if let Ok(status) = std::fs::read_to_string(format!("/proc/{}/status", pid)) {
for line in status.lines() {
if line.starts_with("State:") && line.contains('Z') {
return true;
}
}
}
false
}
pub type ExecProcess = ProcessTemplate<RuncExecLifecycle>; pub type ExecProcess = ProcessTemplate<RuncExecLifecycle>;
pub type InitProcess = ProcessTemplate<RuncInitLifecycle>; pub type InitProcess = ProcessTemplate<RuncInitLifecycle>;
@ -139,6 +130,12 @@ impl ContainerFactory<RuncContainer> for RuncFactory {
let config = CreateConfig::default(); let config = CreateConfig::default();
self.do_create(&mut init, config).await?; self.do_create(&mut init, config).await?;
#[cfg(target_os = "linux")]
{
*init.lifecycle.cgroup_cache.write().unwrap() =
containerd_shim::cgroup::get_cgroup(init.pid as u32).ok();
}
let container = RuncContainer { let container = RuncContainer {
id: id.to_string(), id: id.to_string(),
bundle: bundle.to_string(), bundle: bundle.to_string(),
@ -151,6 +148,7 @@ impl ContainerFactory<RuncContainer> for RuncFactory {
}, },
processes: Default::default(), processes: Default::default(),
}; };
Ok(container) Ok(container)
} }
@ -270,6 +268,9 @@ pub struct RuncInitLifecycle {
opts: Options, opts: Options,
bundle: String, bundle: String,
exit_signal: Arc<ExitSignal>, exit_signal: Arc<ExitSignal>,
/// Cache for cgroup paths to avoid repeated /proc/<pid>/cgroup parsing
#[cfg(target_os = "linux")]
cgroup_cache: RwLock<Option<Cgroup>>,
} }
#[async_trait] #[async_trait]
@ -327,15 +328,18 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
)); ));
} }
// check the process is zombie // Check if cgroup still exists before attempting update
if is_zombie_process(p.pid) { if !self.ensure_init_cgroup_exists().await {
return Err(other!( return Err(other!(
"failed to update resources because process {} is a zombie", "failed to update resources because cgroup for process {} has been released",
p.pid p.pid
)); ));
} }
let cgroup_guard = p.lifecycle.cgroup_cache.read().unwrap();
containerd_shim::cgroup::update_resources(p.pid as u32, resources) let cgroup = cgroup_guard
.as_ref()
.ok_or_else(|| other!("cgroup cache is empty for process {}", p.pid))?;
containerd_shim::cgroup::update_resources(cgroup, resources)
} }
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
@ -352,15 +356,18 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
)); ));
} }
// check the process is zombie // Check if cgroup still exists before attempting to collect stats
if is_zombie_process(p.pid) { if !self.ensure_init_cgroup_exists().await {
return Err(other!( return Err(other!(
"failed to collect metrics because process {} is a zombie", "failed to collect metrics because cgroup for process {} has been released",
p.pid p.pid
)); ));
} }
let cgroup_guard = p.lifecycle.cgroup_cache.read().unwrap();
containerd_shim::cgroup::collect_metrics(p.pid as u32) let cgroup = cgroup_guard
.as_ref()
.ok_or_else(|| other!("cgroup cache is empty for process {}", p.pid))?;
containerd_shim::cgroup::collect_metrics(cgroup)
} }
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
@ -436,6 +443,20 @@ impl RuncInitLifecycle {
opts, opts,
bundle: bundle.to_string(), bundle: bundle.to_string(),
exit_signal: Default::default(), exit_signal: Default::default(),
#[cfg(target_os = "linux")]
cgroup_cache: RwLock::new(None),
}
}
/// Ensure cgroup exists and cache the path information
/// Returns true if cgroup exists, false if released
#[cfg(target_os = "linux")]
async fn ensure_init_cgroup_exists(&self) -> bool {
let cache = self.cgroup_cache.read().unwrap();
if let Some(ref cached) = *cache {
cached.exists()
} else {
false
} }
} }
} }

View File

@ -109,13 +109,11 @@ fn write_process_oom_score(pid: u32, score: i64) -> Result<()> {
/// Collect process cgroup stats, return only necessary parts of it /// Collect process cgroup stats, return only necessary parts of it
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn collect_metrics(pid: u32) -> Result<Metrics> { pub fn collect_metrics(cgroup: &Cgroup) -> Result<Metrics> {
let mut metrics = Metrics::new(); let mut metrics = Metrics::new();
let cgroup = get_cgroup(pid)?;
// to make it easy, fill the necessary metrics only. // to make it easy, fill the necessary metrics only.
for sub_system in Cgroup::subsystems(&cgroup) { for sub_system in Cgroup::subsystems(cgroup) {
match sub_system { match sub_system {
Subsystem::Cpu(cpu_ctr) => { Subsystem::Cpu(cpu_ctr) => {
let mut cpu_usage = CPUUsage::new(); let mut cpu_usage = CPUUsage::new();
@ -204,7 +202,7 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
// get_cgroup will return either cgroup v1 or v2 depending on system configuration // get_cgroup will return either cgroup v1 or v2 depending on system configuration
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn get_cgroup(pid: u32) -> Result<Cgroup> { pub fn get_cgroup(pid: u32) -> Result<Cgroup> {
let hierarchies = hierarchies::auto(); let hierarchies = hierarchies::auto();
let cgroup = if hierarchies.v2() { let cgroup = if hierarchies.v2() {
let path = get_cgroups_v2_path_by_pid(pid)?; let path = get_cgroups_v2_path_by_pid(pid)?;
@ -250,11 +248,8 @@ fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
/// Update process cgroup limits /// Update process cgroup limits
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> { pub fn update_resources(cgroup: &Cgroup, resources: &LinuxResources) -> Result<()> {
// get container main process cgroup for sub_system in Cgroup::subsystems(cgroup) {
let cgroup = get_cgroup(pid)?;
for sub_system in Cgroup::subsystems(&cgroup) {
match sub_system { match sub_system {
Subsystem::Pid(pid_ctr) => { Subsystem::Pid(pid_ctr) => {
// set maximum number of PIDs // set maximum number of PIDs