Merge pull request #42 from Burning1020/dev
runc-shim: implement stats and update task
This commit is contained in:
commit
f7335a88a9
|
|
@ -25,3 +25,5 @@ crossbeam = "0.8.1"
|
|||
containerd-shim = { path = "../shim", version = "0.2.0" }
|
||||
runc = { path = "../runc", version = "0.1.0" }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
cgroups-rs = "0.2.8"
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
|
|||
use nix::sys::termios::tcgetattr;
|
||||
use nix::sys::uio::IoVec;
|
||||
use nix::{cmsg_space, ioctl_write_ptr_bad};
|
||||
use oci_spec::runtime::LinuxResources;
|
||||
use runc::console::{Console, ConsoleSocket};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
|
|
@ -33,6 +34,7 @@ use containerd_shim as shim;
|
|||
|
||||
use shim::api::*;
|
||||
use shim::error::{Error, Result};
|
||||
use shim::protos::cgroups::metrics::Metrics;
|
||||
use shim::protos::protobuf::well_known_types::Timestamp;
|
||||
use shim::util::read_pid_from_file;
|
||||
use shim::{io_error, other, other_error};
|
||||
|
|
@ -80,6 +82,8 @@ pub trait Container {
|
|||
fn exec(&mut self, req: ExecProcessRequest) -> Result<()>;
|
||||
fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>;
|
||||
fn pid(&self) -> i32;
|
||||
fn stats(&self) -> Result<Metrics>;
|
||||
fn update(&mut self, resources: &LinuxResources) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct CommonContainer<T, E> {
|
||||
|
|
|
|||
|
|
@ -15,22 +15,27 @@
|
|||
*/
|
||||
#![allow(unused)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::{Receiver, SyncSender};
|
||||
|
||||
use containerd_shim as shim;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
use cgroups_rs::{cgroup, hierarchies, Cgroup, MaxValue, Subsystem};
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::stat::Mode;
|
||||
use nix::unistd::{mkdir, Pid};
|
||||
use oci_spec::runtime::LinuxNamespaceType;
|
||||
use oci_spec::runtime::{Linux, LinuxNamespaceType, LinuxResources, Spec};
|
||||
use runc::console::{Console, ConsoleSocket};
|
||||
use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts};
|
||||
use runc::utils::new_temp_console_socket;
|
||||
use shim::api::*;
|
||||
use shim::error::{Error, Result};
|
||||
use shim::io_error;
|
||||
use shim::mount::mount_rootfs;
|
||||
use shim::protos::cgroups::metrics::{CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics};
|
||||
use shim::protos::protobuf::{well_known_types::Timestamp, CodedInputStream, Message};
|
||||
use shim::util::{read_spec_from_file, write_options, write_runtime, IntoOption};
|
||||
use shim::{debug, error, other, other_error};
|
||||
|
|
@ -323,6 +328,141 @@ impl Container for RuncContainer {
|
|||
fn pid(&self) -> i32 {
|
||||
self.common.init.pid()
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn stats(&self) -> Result<Metrics> {
|
||||
let mut metrics = Metrics::new();
|
||||
// get container main process cgroup
|
||||
let path = get_cgroups_relative_paths_by_pid(self.common.init.pid() as u32)?;
|
||||
let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path);
|
||||
|
||||
// to make it easy, fill the necessary metrics only.
|
||||
for sub_system in Cgroup::subsystems(&cgroup) {
|
||||
match sub_system {
|
||||
Subsystem::CpuAcct(cpuacct_ctr) => {
|
||||
let mut cpu_usage = CPUUsage::new();
|
||||
cpu_usage.set_total(cpuacct_ctr.cpuacct().usage);
|
||||
let mut cpu_stat = CPUStat::new();
|
||||
cpu_stat.set_usage(cpu_usage);
|
||||
metrics.set_cpu(cpu_stat);
|
||||
}
|
||||
Subsystem::Mem(mem_ctr) => {
|
||||
let mem = mem_ctr.memory_stat();
|
||||
let mut mem_entry = MemoryEntry::new();
|
||||
mem_entry.set_usage(mem.usage_in_bytes);
|
||||
let mut mem_stat = MemoryStat::new();
|
||||
mem_stat.set_usage(mem_entry);
|
||||
mem_stat.set_total_inactive_file(mem.stat.total_inactive_file);
|
||||
metrics.set_memory(mem_stat);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn stats(&self) -> Result<Metrics> {
|
||||
Err(Error::Unimplemented("stats".to_string()))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn update(&mut self, resources: &LinuxResources) -> Result<()> {
|
||||
// get container main process cgroup
|
||||
let path = get_cgroups_relative_paths_by_pid(self.common.init.pid() as u32)?;
|
||||
let cgroup = Cgroup::load_with_relative_paths(hierarchies::auto(), Path::new("."), path);
|
||||
|
||||
for sub_system in Cgroup::subsystems(&cgroup) {
|
||||
match sub_system {
|
||||
Subsystem::Pid(pid_ctr) => {
|
||||
// set maximum number of PIDs
|
||||
if let Some(pids) = resources.pids() {
|
||||
pid_ctr
|
||||
.set_pid_max(MaxValue::Value(pids.limit()))
|
||||
.map_err(other_error!(e, "set pid max"))?;
|
||||
}
|
||||
}
|
||||
Subsystem::Mem(mem_ctr) => {
|
||||
if let Some(memory) = resources.memory() {
|
||||
// set memory limit in bytes
|
||||
if let Some(limit) = memory.limit() {
|
||||
mem_ctr
|
||||
.set_limit(limit)
|
||||
.map_err(other_error!(e, "set mem limit"))?;
|
||||
}
|
||||
|
||||
// set memory swap limit in bytes
|
||||
if let Some(swap) = memory.swap() {
|
||||
mem_ctr
|
||||
.set_memswap_limit(swap)
|
||||
.map_err(other_error!(e, "set memsw limit"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Subsystem::CpuSet(cpuset_ctr) => {
|
||||
if let Some(cpu) = resources.cpu() {
|
||||
// set CPUs to use within the cpuset
|
||||
if let Some(cpus) = cpu.cpus() {
|
||||
cpuset_ctr
|
||||
.set_cpus(cpus)
|
||||
.map_err(other_error!(e, "set CPU sets"))?;
|
||||
}
|
||||
|
||||
// set list of memory nodes in the cpuset
|
||||
if let Some(mems) = cpu.mems() {
|
||||
cpuset_ctr
|
||||
.set_mems(mems)
|
||||
.map_err(other_error!(e, "set CPU memes"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Subsystem::Cpu(cpu_ctr) => {
|
||||
if let Some(cpu) = resources.cpu() {
|
||||
// set CPU shares
|
||||
if let Some(shares) = cpu.shares() {
|
||||
cpu_ctr
|
||||
.set_shares(shares)
|
||||
.map_err(other_error!(e, "set CPU share"))?;
|
||||
}
|
||||
|
||||
// set CPU hardcap limit
|
||||
if let Some(quota) = cpu.quota() {
|
||||
cpu_ctr
|
||||
.set_cfs_quota(quota)
|
||||
.map_err(other_error!(e, "set CPU quota"))?;
|
||||
}
|
||||
|
||||
// set CPU hardcap period
|
||||
if let Some(period) = cpu.period() {
|
||||
cpu_ctr
|
||||
.set_cfs_period(period)
|
||||
.map_err(other_error!(e, "set CPU period"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Subsystem::HugeTlb(ht_ctr) => {
|
||||
// set the limit if "pagesize" hugetlb usage
|
||||
if let Some(hp_limits) = resources.hugepage_limits() {
|
||||
for limit in hp_limits {
|
||||
ht_ctr
|
||||
.set_limit_in_bytes(
|
||||
limit.page_size().as_str(),
|
||||
limit.limit() as u64,
|
||||
)
|
||||
.map_err(other_error!(e, "set huge page limit"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
fn update(&mut self, resources: &LinuxResources) -> Result<()> {
|
||||
Err(Error::Unimplemented("update".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
impl RuncContainer {
|
||||
|
|
@ -651,3 +791,22 @@ fn check_kill_error(emsg: String) -> Error {
|
|||
other!(emsg, "unknown error after kill")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn get_cgroups_relative_paths_by_pid(pid: u32) -> Result<HashMap<String, String>> {
|
||||
let path = format!("/proc/{}/cgroup", pid);
|
||||
let mut m = HashMap::new();
|
||||
let content = std::fs::read_to_string(path).map_err(io_error!(e, "read process cgroup"))?;
|
||||
for l in content.lines() {
|
||||
let fl: Vec<&str> = l.split(':').collect();
|
||||
if fl.len() != 3 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let keys: Vec<&str> = fl[1].split(',').collect();
|
||||
for key in &keys {
|
||||
m.insert(key.to_string(), fl[2].to_string());
|
||||
}
|
||||
}
|
||||
Ok(m)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,11 +18,13 @@ use std::collections::HashMap;
|
|||
use std::sync::{Arc, Mutex, Once};
|
||||
|
||||
use log::{debug, info};
|
||||
use oci_spec::runtime::LinuxResources;
|
||||
|
||||
use containerd_shim as shim;
|
||||
|
||||
use shim::protos::protobuf::well_known_types::Timestamp;
|
||||
use shim::protos::protobuf::SingularPtrField;
|
||||
use shim::other_error;
|
||||
use shim::protos::protobuf::well_known_types::{Any, Timestamp};
|
||||
use shim::protos::protobuf::{Message, SingularPtrField};
|
||||
use shim::util::IntoOption;
|
||||
use shim::Error;
|
||||
use shim::Task;
|
||||
|
|
@ -146,7 +148,11 @@ where
|
|||
}
|
||||
|
||||
fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult<Empty> {
|
||||
info!("Exec request for {:?}", req);
|
||||
info!(
|
||||
"Exec request for id: {} exec_id: {}",
|
||||
req.get_id(),
|
||||
req.get_exec_id()
|
||||
);
|
||||
let mut containers = self.containers.lock().unwrap();
|
||||
let container = containers.get_mut(req.get_id()).ok_or_else(|| {
|
||||
Error::Other(format!("can not find container by id {}", req.get_id()))
|
||||
|
|
@ -172,6 +178,19 @@ where
|
|||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
fn update(&self, _ctx: &TtrpcContext, req: UpdateTaskRequest) -> TtrpcResult<Empty> {
|
||||
debug!("Update request for {:?}", req);
|
||||
let mut containers = self.containers.lock().unwrap();
|
||||
let container = containers.get_mut(req.get_id()).ok_or_else(|| {
|
||||
Error::Other(format!("can not find container by id {}", req.get_id()))
|
||||
})?;
|
||||
|
||||
let resources: LinuxResources = serde_json::from_slice(req.get_resources().get_value())
|
||||
.map_err(other_error!(e, "failed to parse spec"))?;
|
||||
container.update(&resources)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
fn wait(&self, _ctx: &TtrpcContext, req: WaitRequest) -> TtrpcResult<WaitResponse> {
|
||||
info!("Wait request for {:?}", req);
|
||||
let mut containers = self.containers.lock().unwrap();
|
||||
|
|
@ -211,6 +230,28 @@ where
|
|||
Ok(resp)
|
||||
}
|
||||
|
||||
fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult<StatsResponse> {
|
||||
debug!("Stats request for {:?}", req);
|
||||
let mut containers = self.containers.lock().unwrap();
|
||||
let container = containers.get_mut(req.get_id()).ok_or_else(|| {
|
||||
Error::Other(format!("can not find container by id {}", req.get_id()))
|
||||
})?;
|
||||
let stats = container.stats()?;
|
||||
|
||||
// marshal to ttrpc Any
|
||||
let mut any = Any::new();
|
||||
let mut data = Vec::new();
|
||||
stats
|
||||
.write_to_vec(&mut data)
|
||||
.map_err(other_error!(e, "write stats to vec"))?;
|
||||
any.set_value(data);
|
||||
any.set_type_url(stats.descriptor().full_name().to_string());
|
||||
|
||||
let mut resp = StatsResponse::new();
|
||||
resp.set_stats(any);
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
fn shutdown(&self, _ctx: &TtrpcContext, _req: ShutdownRequest) -> TtrpcResult<Empty> {
|
||||
debug!("Shutdown request");
|
||||
let containers = self.containers.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -36,12 +36,10 @@
|
|||
//! A crate for consuming the runc binary in your Rust applications, similar to
|
||||
//! [go-runc](https://github.com/containerd/go-runc) for Go.
|
||||
use std::fmt::{self, Display};
|
||||
#[cfg(not(feature = "async"))]
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{ExitStatus, Stdio};
|
||||
|
||||
use oci_spec::runtime::{Linux, Process};
|
||||
use oci_spec::runtime::{LinuxResources, Process};
|
||||
|
||||
pub mod console;
|
||||
pub mod container;
|
||||
|
|
@ -58,6 +56,7 @@ use crate::error::Error;
|
|||
#[cfg(feature = "async")]
|
||||
use crate::monitor::{execute, DefaultMonitor, ExecuteResult};
|
||||
use crate::options::*;
|
||||
use crate::utils::write_value_to_temp_file;
|
||||
|
||||
type Result<T> = std::result::Result<T, crate::error::Error>;
|
||||
|
||||
|
|
@ -203,16 +202,7 @@ impl Runc {
|
|||
|
||||
/// Execute an additional process inside the container
|
||||
pub fn exec(&self, id: &str, spec: &Process, opts: Option<&ExecOpts>) -> Result<()> {
|
||||
let (mut temp_file, filename) = utils::make_temp_file_in_runtime_dir()?;
|
||||
{
|
||||
let f = temp_file.as_file_mut();
|
||||
let spec_json =
|
||||
serde_json::to_string(spec).map_err(Error::JsonDeserializationFailed)?;
|
||||
f.write(spec_json.as_bytes())
|
||||
.map_err(Error::SpecFileCreationFailed)?;
|
||||
f.flush().map_err(Error::SpecFileCreationFailed)?;
|
||||
}
|
||||
|
||||
let (_temp_file, filename) = write_value_to_temp_file(spec)?;
|
||||
let mut args = vec!["exec".to_string(), "--process".to_string(), filename];
|
||||
if let Some(opts) = opts {
|
||||
args.append(&mut opts.args()?);
|
||||
|
|
@ -340,11 +330,8 @@ impl Runc {
|
|||
}
|
||||
|
||||
/// Update a container with the provided resource spec
|
||||
pub fn update(&self, id: &str, resources: &Linux) -> Result<()> {
|
||||
let filename = utils::temp_filename_in_runtime_dir()?;
|
||||
let spec_json =
|
||||
serde_json::to_string(resources).map_err(Error::JsonDeserializationFailed)?;
|
||||
std::fs::write(&filename, spec_json).map_err(Error::SpecFileCreationFailed)?;
|
||||
pub fn update(&self, id: &str, resources: &LinuxResources) -> Result<()> {
|
||||
let (_temp_file, filename) = write_value_to_temp_file(resources)?;
|
||||
let args = [
|
||||
"update".to_string(),
|
||||
"--resources".to_string(),
|
||||
|
|
@ -564,12 +551,9 @@ impl Runc {
|
|||
}
|
||||
|
||||
/// Update a container with the provided resource spec
|
||||
pub async fn update(&self, id: &str, resources: &Linux) -> Result<()> {
|
||||
let filename = utils::temp_filename_in_runtime_dir()?;
|
||||
let spec_json =
|
||||
serde_json::to_string(resources).map_err(Error::JsonDeserializationFailed)?;
|
||||
std::fs::write(&filename, spec_json).map_err(Error::SpecFileCreationFailed)?;
|
||||
let args = vec![
|
||||
pub async fn update(&self, id: &str, resources: &LinuxResources) -> Result<()> {
|
||||
let (_temp_file, filename) = write_value_to_temp_file(resources)?;
|
||||
let args = [
|
||||
"update".to_string(),
|
||||
"--resources".to_string(),
|
||||
filename,
|
||||
|
|
|
|||
|
|
@ -14,15 +14,15 @@
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
#![allow(unused)]
|
||||
|
||||
use std::env;
|
||||
use std::io::Write;
|
||||
use std::os::unix::net::UnixListener;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use nix::sys::stat::Mode;
|
||||
use nix::unistd::mkdir;
|
||||
use path_absolutize::*;
|
||||
use serde::Serialize;
|
||||
use tempfile::{Builder, NamedTempFile};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
|
@ -61,16 +61,32 @@ where
|
|||
path_to_string(abs_path_buf(path)?)
|
||||
}
|
||||
|
||||
/// Fetches the value of environment variable "XDG_RUNTIME_DIR", returning a temporary directory
|
||||
/// if the variable isn't set
|
||||
fn xdg_runtime_dir() -> String {
|
||||
env::var("XDG_RUNTIME_DIR")
|
||||
.unwrap_or_else(|_| abs_string(env::temp_dir()).unwrap_or_else(|_| ".".to_string()))
|
||||
}
|
||||
|
||||
/// Write the serialized 'value' to a temp file
|
||||
pub fn write_value_to_temp_file<T: Serialize>(value: &T) -> Result<(NamedTempFile, String), Error> {
|
||||
let filename = format!("{}/runc-process-{}", xdg_runtime_dir(), Uuid::new_v4());
|
||||
let mut temp_file = Builder::new()
|
||||
.prefix(&filename)
|
||||
.rand_bytes(0)
|
||||
.tempfile()
|
||||
.map_err(Error::SpecFileCreationFailed)?;
|
||||
let f = temp_file.as_file_mut();
|
||||
let spec_json = serde_json::to_string(value).map_err(Error::JsonDeserializationFailed)?;
|
||||
f.write(spec_json.as_bytes())
|
||||
.map_err(Error::SpecFileCreationFailed)?;
|
||||
f.flush().map_err(Error::SpecFileCreationFailed)?;
|
||||
Ok((temp_file, filename))
|
||||
}
|
||||
|
||||
/// Crate and return a temp socket file
|
||||
pub fn new_temp_console_socket() -> Result<ConsoleSocket, Error> {
|
||||
let dir = env::var_os("XDG_RUNTIME_DIR")
|
||||
.map(|runtime_dir| {
|
||||
format!(
|
||||
"{}/pty{}",
|
||||
runtime_dir.to_string_lossy().parse::<String>().unwrap(),
|
||||
Uuid::new_v4(),
|
||||
)
|
||||
})
|
||||
.ok_or(Error::SpecFileNotFound)?;
|
||||
let dir = format!("{}/pty{}", xdg_runtime_dir(), Uuid::new_v4(),);
|
||||
mkdir(Path::new(&dir), Mode::from_bits(0o711).unwrap()).map_err(Error::CreateDir)?;
|
||||
let file_name = Path::new(&dir).join("pty.sock");
|
||||
let listener = UnixListener::bind(file_name.as_path()).map_err(Error::UnixSocketBindFailed)?;
|
||||
|
|
@ -81,26 +97,6 @@ pub fn new_temp_console_socket() -> Result<ConsoleSocket, Error> {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn temp_filename_in_runtime_dir() -> Result<String, Error> {
|
||||
match env::var_os("XDG_RUNTIME_DIR") {
|
||||
Some(runtime_dir) => {
|
||||
let path = path_to_string(runtime_dir)?;
|
||||
Ok(format!("{}/runc-process-{}", path, Uuid::new_v4()))
|
||||
}
|
||||
None => Err(Error::SpecFileNotFound),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_temp_file_in_runtime_dir() -> Result<(NamedTempFile, String), Error> {
|
||||
let file_name = temp_filename_in_runtime_dir()?;
|
||||
let temp_file = Builder::new()
|
||||
.prefix(&file_name)
|
||||
.rand_bytes(0)
|
||||
.tempfile()
|
||||
.map_err(Error::SpecFileCreationFailed)?;
|
||||
Ok((temp_file, file_name))
|
||||
}
|
||||
|
||||
/// Resolve a binary path according to the `PATH` environment variable.
|
||||
///
|
||||
/// Note, the case that `path` is already an absolute path is implicitly handled by
|
||||
|
|
|
|||
Loading…
Reference in New Issue