From 4e1c3a2a692a4105c1e1c20747124c0a4fc7d865 Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Wed, 2 Mar 2022 19:07:40 +0800 Subject: [PATCH] runc-shim: support shim cgroup and oom score Signed-off-by: Zhang Tianyang --- .github/workflows/ci.yml | 8 +- crates/runc-shim/Cargo.toml | 2 +- crates/runc-shim/src/cgroup.rs | 262 +++++++++++++++++++++++++++++++ crates/runc-shim/src/main.rs | 1 + crates/runc-shim/src/runc.rs | 149 +----------------- crates/runc-shim/src/service.rs | 11 +- crates/shim/examples/skeleton.rs | 2 +- crates/shim/src/lib.rs | 8 +- 8 files changed, 288 insertions(+), 155 deletions(-) create mode 100644 crates/runc-shim/src/cgroup.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e4915be..b4174b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,10 +32,12 @@ jobs: - env: # runc::tests::test_exec needs $XDG_RUNTIME_DIR to be set XDG_RUNTIME_DIR: /tmp/dummy-xdr - run: | + run: + # runc-shim::cgroup::test_add_cgroup needs root permission to set cgroup + | mkdir -p /tmp/dummy-xdr - cargo test - cargo test --all-features + sudo -E $(command -v cargo) test + sudo -E $(command -v cargo) test --all-features deny: name: Deny diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index 1d95c90..52f430d 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -26,4 +26,4 @@ containerd-shim = { path = "../shim", version = "0.2.0" } runc = { path = "../runc", version = "0.1.0" } [target.'cfg(target_os = "linux")'.dependencies] -cgroups-rs = "0.2.8" +cgroups-rs = "0.2.9" diff --git a/crates/runc-shim/src/cgroup.rs b/crates/runc-shim/src/cgroup.rs new file mode 100644 index 0000000..2060104 --- /dev/null +++ b/crates/runc-shim/src/cgroup.rs @@ -0,0 +1,262 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#![cfg(target_os = "linux")] + +use std::fs; +use std::io::Read; +use std::path::Path; + +use containerd_shim as shim; + +use cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid; +use cgroups_rs::{hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem}; +use containerd_shim::api::Options; +use containerd_shim::protos::cgroups::metrics::{ + CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics, +}; +use containerd_shim::protos::protobuf::well_known_types::Any; +use containerd_shim::protos::protobuf::Message; +use oci_spec::runtime::LinuxResources; +use shim::error::{Error, Result}; +use shim::{io_error, other_error}; + +// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10 +const OOM_SCORE_ADJ_MAX: i64 = 1000; + +pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> { + if pid == 0 { + return Ok(()); + } + + // set cgroup + let mut data: Vec = Vec::new(); + std::io::stdin() + .read_to_end(&mut data) + .map_err(io_error!(e, "read stdin"))?; + + if !data.is_empty() { + let opts = Any::parse_from_bytes(&data) + .and_then(|any| Options::parse_from_bytes(any.get_value()))?; + + if !opts.shim_cgroup.is_empty() { + add_task_to_cgroup(opts.shim_cgroup.as_str(), pid)?; + } + } + + // set oom score + adjust_oom_score(pid) +} + +/// Add a process to the given relative cgroup path +pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> { + let h = hierarchies::auto(); + // use relative path here, need to trim prefix '/' + let path = path.trim_start_matches('/'); + + Cgroup::load(h, path) + .add_task(CgroupPid::from(pid as u64)) + .map_err(other_error!(e, "add task to cgroup")) +} + +/// Sets the OOM score for the process to the parents OOM score + 1 +/// to ensure that they parent has a lower score than the shim +pub fn adjust_oom_score(pid: u32) -> Result<()> { + let score = read_process_oom_score(std::os::unix::process::parent_id())?; + if score < OOM_SCORE_ADJ_MAX { + write_process_oom_score(pid, score + 1)?; + } + Ok(()) +} + +fn read_process_oom_score(pid: u32) -> Result { + let content = fs::read_to_string(format!("/proc/{}/oom_score_adj", pid)) + .map_err(io_error!(e, "read oom score"))?; + let score = content + .trim() + .parse::() + .map_err(other_error!(e, "parse oom score"))?; + Ok(score) +} + +fn write_process_oom_score(pid: u32, score: i64) -> Result<()> { + fs::write(format!("/proc/{}/oom_score_adj", pid), score.to_string()) + .map_err(io_error!(e, "write oom score")) +} + +/// Collect process cgroup stats, return only necessary parts of it +pub fn collect_metrics(pid: u32) -> Result { + let mut metrics = Metrics::new(); + // get container main process cgroup + let path = + get_cgroups_relative_paths_by_pid(pid).map_err(other_error!(e, "get process cgroup"))?; + let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path); + + // to make it easy, fill the necessary metrics only. + for sub_system in Cgroup::subsystems(&cgroup) { + match sub_system { + Subsystem::CpuAcct(cpuacct_ctr) => { + let mut cpu_usage = CPUUsage::new(); + cpu_usage.set_total(cpuacct_ctr.cpuacct().usage); + let mut cpu_stat = CPUStat::new(); + cpu_stat.set_usage(cpu_usage); + metrics.set_cpu(cpu_stat); + } + Subsystem::Mem(mem_ctr) => { + let mem = mem_ctr.memory_stat(); + let mut mem_entry = MemoryEntry::new(); + mem_entry.set_usage(mem.usage_in_bytes); + let mut mem_stat = MemoryStat::new(); + mem_stat.set_usage(mem_entry); + mem_stat.set_total_inactive_file(mem.stat.total_inactive_file); + metrics.set_memory(mem_stat); + } + _ => {} + } + } + Ok(metrics) +} + +/// Update process cgroup limits +pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> { + // get container main process cgroup + let path = + get_cgroups_relative_paths_by_pid(pid).map_err(other_error!(e, "get process cgroup"))?; + let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path); + + for sub_system in Cgroup::subsystems(&cgroup) { + match sub_system { + Subsystem::Pid(pid_ctr) => { + // set maximum number of PIDs + if let Some(pids) = resources.pids() { + pid_ctr + .set_pid_max(MaxValue::Value(pids.limit())) + .map_err(other_error!(e, "set pid max"))?; + } + } + Subsystem::Mem(mem_ctr) => { + if let Some(memory) = resources.memory() { + // set memory limit in bytes + if let Some(limit) = memory.limit() { + mem_ctr + .set_limit(limit) + .map_err(other_error!(e, "set mem limit"))?; + } + + // set memory swap limit in bytes + if let Some(swap) = memory.swap() { + mem_ctr + .set_memswap_limit(swap) + .map_err(other_error!(e, "set memsw limit"))?; + } + } + } + Subsystem::CpuSet(cpuset_ctr) => { + if let Some(cpu) = resources.cpu() { + // set CPUs to use within the cpuset + if let Some(cpus) = cpu.cpus() { + cpuset_ctr + .set_cpus(cpus) + .map_err(other_error!(e, "set CPU sets"))?; + } + + // set list of memory nodes in the cpuset + if let Some(mems) = cpu.mems() { + cpuset_ctr + .set_mems(mems) + .map_err(other_error!(e, "set CPU memes"))?; + } + } + } + Subsystem::Cpu(cpu_ctr) => { + if let Some(cpu) = resources.cpu() { + // set CPU shares + if let Some(shares) = cpu.shares() { + cpu_ctr + .set_shares(shares) + .map_err(other_error!(e, "set CPU share"))?; + } + + // set CPU hardcap limit + if let Some(quota) = cpu.quota() { + cpu_ctr + .set_cfs_quota(quota) + .map_err(other_error!(e, "set CPU quota"))?; + } + + // set CPU hardcap period + if let Some(period) = cpu.period() { + cpu_ctr + .set_cfs_period(period) + .map_err(other_error!(e, "set CPU period"))?; + } + } + } + Subsystem::HugeTlb(ht_ctr) => { + // set the limit if "pagesize" hugetlb usage + if let Some(hp_limits) = resources.hugepage_limits() { + for limit in hp_limits { + ht_ctr + .set_limit_in_bytes(limit.page_size().as_str(), limit.limit() as u64) + .map_err(other_error!(e, "set huge page limit"))?; + } + } + } + _ => {} + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::cgroup::{ + add_task_to_cgroup, adjust_oom_score, read_process_oom_score, OOM_SCORE_ADJ_MAX, + }; + use cgroups_rs::{hierarchies, Cgroup, CgroupPid}; + + #[test] + fn test_add_cgroup() { + let path = "runc_shim_test_cgroup"; + let h = hierarchies::auto(); + + // create cgroup path first + let cg = Cgroup::new(h, path); + + let pid = std::process::id(); + add_task_to_cgroup(path, pid).unwrap(); + let cg_id = CgroupPid::from(pid as u64); + assert!(cg.tasks().contains(&cg_id)); + + // remove cgroup as possible + cg.remove_task(cg_id); + cg.delete().unwrap() + } + + #[test] + fn test_adjust_oom_score() { + let pid = std::process::id(); + let score = read_process_oom_score(pid).unwrap(); + + adjust_oom_score(pid).unwrap(); + let new = read_process_oom_score(pid).unwrap(); + if score < OOM_SCORE_ADJ_MAX { + assert_eq!(new, score + 1) + } else { + assert_eq!(new, OOM_SCORE_ADJ_MAX) + } + } +} diff --git a/crates/runc-shim/src/main.rs b/crates/runc-shim/src/main.rs index fd1d03a..0ca9926 100644 --- a/crates/runc-shim/src/main.rs +++ b/crates/runc-shim/src/main.rs @@ -14,6 +14,7 @@ limitations under the License. */ +mod cgroup; mod container; mod io; mod runc; diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index 33d28a4..837e704 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -22,21 +22,18 @@ use std::sync::mpsc::{Receiver, SyncSender}; use containerd_shim as shim; -#[cfg(target_os = "linux")] -use cgroups_rs::{cgroup, hierarchies, Cgroup, MaxValue, Subsystem}; use nix::sys::signal::kill; use nix::sys::stat::Mode; use nix::unistd::{mkdir, Pid}; -use oci_spec::runtime::{Linux, LinuxNamespaceType, LinuxResources, Spec}; +use oci_spec::runtime::{LinuxNamespaceType, LinuxResources}; use runc::console::{Console, ConsoleSocket}; -use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts}; +use runc::options::GlobalOpts; use runc::utils::new_temp_console_socket; use shim::api::*; use shim::error::{Error, Result}; -use shim::io_error; use shim::mount::mount_rootfs; use shim::protos::api::ProcessInfo; -use shim::protos::cgroups::metrics::{CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics}; +use shim::protos::cgroups::metrics::Metrics; use shim::protos::protobuf::well_known_types::{Any, Timestamp}; use shim::protos::protobuf::{CodedInputStream, Message, RepeatedField}; use shim::protos::shim::oci::ProcessDetails; @@ -335,34 +332,8 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn stats(&self) -> Result { - let mut metrics = Metrics::new(); - // get container main process cgroup - let path = get_cgroups_relative_paths_by_pid(self.common.init.pid() as u32)?; - let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path); - - // to make it easy, fill the necessary metrics only. - for sub_system in Cgroup::subsystems(&cgroup) { - match sub_system { - Subsystem::CpuAcct(cpuacct_ctr) => { - let mut cpu_usage = CPUUsage::new(); - cpu_usage.set_total(cpuacct_ctr.cpuacct().usage); - let mut cpu_stat = CPUStat::new(); - cpu_stat.set_usage(cpu_usage); - metrics.set_cpu(cpu_stat); - } - Subsystem::Mem(mem_ctr) => { - let mem = mem_ctr.memory_stat(); - let mut mem_entry = MemoryEntry::new(); - mem_entry.set_usage(mem.usage_in_bytes); - let mut mem_stat = MemoryStat::new(); - mem_stat.set_usage(mem_entry); - mem_stat.set_total_inactive_file(mem.stat.total_inactive_file); - metrics.set_memory(mem_stat); - } - _ => {} - } - } - Ok(metrics) + let pid = self.common.init.pid() as u32; + crate::cgroup::collect_metrics(pid) } #[cfg(not(target_os = "linux"))] @@ -372,95 +343,8 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn update(&mut self, resources: &LinuxResources) -> Result<()> { - // get container main process cgroup - let path = get_cgroups_relative_paths_by_pid(self.common.init.pid() as u32)?; - let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path); - - for sub_system in Cgroup::subsystems(&cgroup) { - match sub_system { - Subsystem::Pid(pid_ctr) => { - // set maximum number of PIDs - if let Some(pids) = resources.pids() { - pid_ctr - .set_pid_max(MaxValue::Value(pids.limit())) - .map_err(other_error!(e, "set pid max"))?; - } - } - Subsystem::Mem(mem_ctr) => { - if let Some(memory) = resources.memory() { - // set memory limit in bytes - if let Some(limit) = memory.limit() { - mem_ctr - .set_limit(limit) - .map_err(other_error!(e, "set mem limit"))?; - } - - // set memory swap limit in bytes - if let Some(swap) = memory.swap() { - mem_ctr - .set_memswap_limit(swap) - .map_err(other_error!(e, "set memsw limit"))?; - } - } - } - Subsystem::CpuSet(cpuset_ctr) => { - if let Some(cpu) = resources.cpu() { - // set CPUs to use within the cpuset - if let Some(cpus) = cpu.cpus() { - cpuset_ctr - .set_cpus(cpus) - .map_err(other_error!(e, "set CPU sets"))?; - } - - // set list of memory nodes in the cpuset - if let Some(mems) = cpu.mems() { - cpuset_ctr - .set_mems(mems) - .map_err(other_error!(e, "set CPU memes"))?; - } - } - } - Subsystem::Cpu(cpu_ctr) => { - if let Some(cpu) = resources.cpu() { - // set CPU shares - if let Some(shares) = cpu.shares() { - cpu_ctr - .set_shares(shares) - .map_err(other_error!(e, "set CPU share"))?; - } - - // set CPU hardcap limit - if let Some(quota) = cpu.quota() { - cpu_ctr - .set_cfs_quota(quota) - .map_err(other_error!(e, "set CPU quota"))?; - } - - // set CPU hardcap period - if let Some(period) = cpu.period() { - cpu_ctr - .set_cfs_period(period) - .map_err(other_error!(e, "set CPU period"))?; - } - } - } - Subsystem::HugeTlb(ht_ctr) => { - // set the limit if "pagesize" hugetlb usage - if let Some(hp_limits) = resources.hugepage_limits() { - for limit in hp_limits { - ht_ctr - .set_limit_in_bytes( - limit.page_size().as_str(), - limit.limit() as u64, - ) - .map_err(other_error!(e, "set huge page limit"))?; - } - } - } - _ => {} - } - } - Ok(()) + let pid = self.common.init.pid() as u32; + crate::cgroup::update_metrics(pid, resources) } #[cfg(not(target_os = "linux"))] @@ -842,22 +726,3 @@ fn check_kill_error(emsg: String) -> Error { other!(emsg, "unknown error after kill") } } - -#[cfg(target_os = "linux")] -fn get_cgroups_relative_paths_by_pid(pid: u32) -> Result> { - let path = format!("/proc/{}/cgroup", pid); - let mut m = HashMap::new(); - let content = std::fs::read_to_string(path).map_err(io_error!(e, "read process cgroup"))?; - for l in content.lines() { - let fl: Vec<&str> = l.split(':').collect(); - if fl.len() != 3 { - continue; - } - - let keys: Vec<&str> = fl[1].split(',').collect(); - for key in &keys { - m.insert(key.to_string(), fl[2].to_string()); - } - } - Ok(m) -} diff --git a/crates/runc-shim/src/service.rs b/crates/runc-shim/src/service.rs index 89ee63a..d83ae82 100644 --- a/crates/runc-shim/src/service.rs +++ b/crates/runc-shim/src/service.rs @@ -18,10 +18,11 @@ use std::env::current_dir; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use containerd_shim as shim; +use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND}; use shim::api::*; use shim::error::{Error, Result}; use shim::monitor::{monitor_subscribe, Subject, Subscription, Topic}; @@ -30,8 +31,6 @@ use shim::util::{get_timestamp, read_options, read_runtime, read_spec_from_file, use shim::{debug, error, io_error, other_error, warn}; use shim::{spawn, Config, ExitSignal, RemotePublisher, Shim, StartOpts}; -use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND}; - use crate::container::{Container, Process}; use crate::runc::{RuncContainer, RuncFactory, DEFAULT_RUNC_ROOT}; use crate::task::ShimTask; @@ -81,7 +80,11 @@ impl Shim for Service { None => {} } - let address = spawn(opts, &grouping, Vec::new())?; + let (child_id, address) = spawn(opts, &grouping, Vec::new())?; + + #[cfg(target_os = "linux")] + crate::cgroup::set_cgroup_and_oom_score(child_id)?; + write_address(&address)?; Ok(address) } diff --git a/crates/shim/examples/skeleton.rs b/crates/shim/examples/skeleton.rs index 87fb5d2..25243d8 100644 --- a/crates/shim/examples/skeleton.rs +++ b/crates/shim/examples/skeleton.rs @@ -43,7 +43,7 @@ impl shim::Shim for Service { fn start_shim(&mut self, opts: shim::StartOpts) -> Result { let grouping = opts.id.clone(); - let address = shim::spawn(opts, &grouping, Vec::new())?; + let (_child_id, address) = shim::spawn(opts, &grouping, Vec::new())?; Ok(address) } diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 80589c7..8f805ab 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -393,7 +393,7 @@ fn remove_socket(address: &str) -> Result<()> { /// Spawn is a helper func to launch shim process. /// Typically this expected to be called from `StartShim`. -pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result { +pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> { let cmd = env::current_exe().map_err(io_error!(e, ""))?; let cwd = env::current_dir().map_err(io_error!(e, ""))?; let address = socket_address(&opts.address, &opts.namespace, grouping); @@ -411,7 +411,7 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result }; if let Ok(()) = wait_socket_working(&address, 5, 200) { write_address(&address)?; - return Ok(address); + return Ok((0, address)); } remove_socket(&address)?; start_listener(&address).map_err(io_error!(e, ""))? @@ -445,10 +445,10 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result command .spawn() .map_err(io_error!(e, "spawn shim")) - .map(|_| { + .map(|child| { // Ownership of `listener` has been passed to child. std::mem::forget(listener); - address + (child.id(), address) }) }