diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index 097b464..51ce2fe 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -36,7 +36,4 @@ tokio = { version = "1.17.0", features = ["full"], optional = true } futures = { version = "0.3.21", optional = true } containerd-shim = { path = "../shim", version = "0.3.0" } -runc = { path = "../runc", version = "0.2.0" } - -[target.'cfg(target_os = "linux")'.dependencies] -cgroups-rs = "0.2.9" +runc = { path = "../runc", version = "0.2.0" } \ No newline at end of file diff --git a/crates/runc-shim/src/asynchronous/runc.rs b/crates/runc-shim/src/asynchronous/runc.rs index ba21c45..b35bf1f 100644 --- a/crates/runc-shim/src/asynchronous/runc.rs +++ b/crates/runc-shim/src/asynchronous/runc.rs @@ -25,7 +25,7 @@ use async_trait::async_trait; use log::{debug, error}; use nix::sys::signal::kill; use nix::unistd::Pid; -use oci_spec::runtime::{LinuxNamespaceType, Process}; +use oci_spec::runtime::{LinuxNamespaceType, LinuxResources, Process}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; @@ -40,6 +40,8 @@ use containerd_shim::asynchronous::monitor::{ use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate}; use containerd_shim::io::Stdio; use containerd_shim::monitor::{ExitEvent, Subject, Topic}; +use containerd_shim::protos::api::ProcessInfo; +use containerd_shim::protos::cgroups::metrics::Metrics; use containerd_shim::protos::protobuf::{CodedInputStream, Message}; use containerd_shim::util::{ asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime, @@ -270,6 +272,41 @@ impl ProcessLifecycle for RuncInitLifecycle { }) .map_err(other_error!(e, "failed delete")) } + + async fn update(&self, p: &mut InitProcess, resources: &LinuxResources) -> Result<()> { + if p.pid <= 0 { + return Err(other!( + "failed to update resources because init process is {}", + p.pid + )); + } + containerd_shim::cgroup::update_resources(p.pid as u32, resources) + } + + async fn stats(&self, p: &InitProcess) -> Result { + if p.pid <= 0 { + return Err(other!( + "failed to collect metrics because init process is {}", + p.pid + )); + } + containerd_shim::cgroup::collect_metrics(p.pid as u32) + } + + async fn ps(&self, p: &InitProcess) -> Result> { + let pids = self + .runtime + .ps(&*p.id) + .await + .map_err(other_error!(e, "failed to execute runc ps"))?; + Ok(pids + .iter() + .map(|&x| ProcessInfo { + pid: x as u32, + ..Default::default() + }) + .collect()) + } } impl RuncInitLifecycle { @@ -387,6 +424,18 @@ impl ProcessLifecycle for RuncExecLifecycle { async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> { Ok(()) } + + async fn update(&self, _p: &mut ExecProcess, _resources: &LinuxResources) -> Result<()> { + Err(Error::Unimplemented("exec update".to_string())) + } + + async fn stats(&self, _p: &ExecProcess) -> Result { + Err(Error::Unimplemented("exec stats".to_string())) + } + + async fn ps(&self, _p: &ExecProcess) -> Result> { + Err(Error::Unimplemented("exec ps".to_string())) + } } async fn copy_console(console_socket: &ConsoleSocket, stdio: &Stdio) -> Result { diff --git a/crates/runc-shim/src/synchronous/mod.rs b/crates/runc-shim/src/synchronous/mod.rs index 1667b46..848cf03 100644 --- a/crates/runc-shim/src/synchronous/mod.rs +++ b/crates/runc-shim/src/synchronous/mod.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use containerd_shim::ExitSignal; -mod cgroup; mod container; mod io; mod runc; diff --git a/crates/runc-shim/src/synchronous/runc.rs b/crates/runc-shim/src/synchronous/runc.rs index 7afae5a..1769dd5 100644 --- a/crates/runc-shim/src/synchronous/runc.rs +++ b/crates/runc-shim/src/synchronous/runc.rs @@ -294,7 +294,7 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn stats(&self) -> Result { let pid = self.common.init.pid() as u32; - crate::synchronous::cgroup::collect_metrics(pid) + containerd_shim::cgroup::collect_metrics(pid) } #[cfg(not(target_os = "linux"))] @@ -305,7 +305,7 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn update(&mut self, resources: &LinuxResources) -> Result<()> { let pid = self.common.init.pid() as u32; - crate::synchronous::cgroup::update_metrics(pid, resources) + containerd_shim::cgroup::update_resources(pid, resources) } #[cfg(not(target_os = "linux"))] diff --git a/crates/runc-shim/src/synchronous/service.rs b/crates/runc-shim/src/synchronous/service.rs index 03078b2..302ebc1 100644 --- a/crates/runc-shim/src/synchronous/service.rs +++ b/crates/runc-shim/src/synchronous/service.rs @@ -21,9 +21,9 @@ use std::path::Path; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use containerd_shim as shim; - use log::{debug, error}; + +use containerd_shim as shim; use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND}; use shim::api::*; use shim::error::{Error, Result}; @@ -73,7 +73,7 @@ impl Shim for Service { let (child_id, address) = spawn(opts, &grouping, Vec::new())?; #[cfg(target_os = "linux")] - crate::synchronous::cgroup::set_cgroup_and_oom_score(child_id)?; + containerd_shim::cgroup::set_cgroup_and_oom_score(child_id)?; write_address(&address)?; Ok(address) diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index f8c384a..feca846 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -40,5 +40,8 @@ tokio = { version = "1.17.0", features = ["full"], optional = true } futures = {version = "0.3.21", optional = true} signal-hook-tokio = {version = "0.3.1", optional = true, features = ["futures-v0_3"]} +[target.'cfg(target_os = "linux")'.dependencies] +cgroups-rs = "0.2.9" + [dev-dependencies] tempfile = "3.0" diff --git a/crates/shim/src/asynchronous/container.rs b/crates/shim/src/asynchronous/container.rs index f4d9b23..2188442 100644 --- a/crates/shim/src/asynchronous/container.rs +++ b/crates/shim/src/asynchronous/container.rs @@ -18,10 +18,14 @@ use std::collections::HashMap; use async_trait::async_trait; use log::debug; +use oci_spec::runtime::LinuxResources; use time::OffsetDateTime; use tokio::sync::oneshot::Receiver; -use containerd_shim_protos::api::{CreateTaskRequest, ExecProcessRequest, StateResponse}; +use containerd_shim_protos::api::{ + CreateTaskRequest, ExecProcessRequest, ProcessInfo, StateResponse, +}; +use containerd_shim_protos::cgroups::metrics::Metrics; use crate::asynchronous::processes::Process; use crate::error::Result; @@ -45,6 +49,9 @@ pub trait Container { async fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>; async fn pid(&self) -> i32; async fn id(&self) -> String; + async fn update(&mut self, resources: &LinuxResources) -> Result<()>; + async fn stats(&self) -> Result; + async fn all_processes(&self) -> Result>; } #[async_trait] @@ -154,6 +161,30 @@ where async fn id(&self) -> String { self.id.to_string() } + + #[cfg(target_os = "linux")] + async fn update(&mut self, resources: &LinuxResources) -> Result<()> { + self.init.update(resources).await + } + + #[cfg(not(target_os = "linux"))] + async fn update(&mut self, resources: &LinuxResources) -> Result<()> { + Err(Error::Unimplemented("update".to_string())) + } + + #[cfg(target_os = "linux")] + async fn stats(&self) -> Result { + self.init.stats().await + } + + #[cfg(not(target_os = "linux"))] + async fn stats(&self) -> Result { + Err(Error::Unimplemented("stats".to_string())) + } + + async fn all_processes(&self) -> Result> { + self.init.ps().await + } } impl ContainerTemplate diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index 0be5fd8..1e20706 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -293,10 +293,12 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> command .spawn() .map_err(io_error!(e, "spawn shim")) - .map(|_| { + .and_then(|child| { // Ownership of `listener` has been passed to child. std::mem::forget(listener); - address + #[cfg(target_os = "linux")] + crate::cgroup::set_cgroup_and_oom_score(child.id())?; + Ok(address) }) } diff --git a/crates/shim/src/asynchronous/processes.rs b/crates/shim/src/asynchronous/processes.rs index 2d0af1d..369dbf2 100644 --- a/crates/shim/src/asynchronous/processes.rs +++ b/crates/shim/src/asynchronous/processes.rs @@ -17,10 +17,12 @@ use std::os::unix::io::AsRawFd; use async_trait::async_trait; +use oci_spec::runtime::LinuxResources; use time::OffsetDateTime; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use containerd_shim_protos::api::{StateResponse, Status}; +use containerd_shim_protos::api::{ProcessInfo, StateResponse, Status}; +use containerd_shim_protos::cgroups::metrics::Metrics; use containerd_shim_protos::protobuf::well_known_types::Timestamp; use crate::io::Stdio; @@ -40,6 +42,9 @@ pub trait Process { async fn exit_code(&self) -> i32; async fn exited_at(&self) -> Option; async fn resize_pty(&mut self, height: u32, width: u32) -> crate::Result<()>; + async fn update(&mut self, resources: &LinuxResources) -> crate::Result<()>; + async fn stats(&self) -> crate::Result; + async fn ps(&self) -> crate::Result>; } #[async_trait] @@ -47,6 +52,9 @@ pub trait ProcessLifecycle { async fn start(&self, p: &mut P) -> crate::Result<()>; async fn kill(&self, p: &mut P, signal: u32, all: bool) -> crate::Result<()>; async fn delete(&self, p: &mut P) -> crate::Result<()>; + async fn update(&self, p: &mut P, resources: &LinuxResources) -> crate::Result<()>; + async fn stats(&self, p: &P) -> crate::Result; + async fn ps(&self, p: &P) -> crate::Result>; } pub struct ProcessTemplate { @@ -164,4 +172,16 @@ where None => Err(other!("there is no console")), } } + + async fn update(&mut self, resources: &LinuxResources) -> crate::Result<()> { + self.lifecycle.clone().update(&mut self, resources).await + } + + async fn stats(&self) -> crate::Result { + self.lifecycle.stats(self).await + } + + async fn ps(&self) -> crate::Result> { + self.lifecycle.ps(self).await + } } diff --git a/crates/shim/src/asynchronous/task.rs b/crates/shim/src/asynchronous/task.rs index 56382e5..0cc4b28 100644 --- a/crates/shim/src/asynchronous/task.rs +++ b/crates/shim/src/asynchronous/task.rs @@ -19,10 +19,14 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, info, warn}; +use oci_spec::runtime::LinuxResources; use tokio::sync::mpsc::Sender; use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard}; -use containerd_shim_protos::api::DeleteResponse; +use containerd_shim_protos::api::{ + CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest, PidsResponse, + StatsRequest, StatsResponse, UpdateTaskRequest, +}; use containerd_shim_protos::events::task::{ TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart, }; @@ -39,7 +43,7 @@ use crate::api::{ use crate::asynchronous::container::{Container, ContainerFactory}; use crate::asynchronous::ExitSignal; use crate::event::Event; -use crate::util::{convert_to_timestamp, AsOption}; +use crate::util::{convert_to_any, convert_to_timestamp, AsOption}; use crate::TtrpcResult; type EventSender = Sender<(String, Box)>; @@ -215,6 +219,17 @@ where Ok(resp) } + async fn pids(&self, _ctx: &TtrpcContext, req: PidsRequest) -> TtrpcResult { + debug!("Pids request for {:?}", req); + let container = self.get_container(req.get_id()).await?; + let procs = container.all_processes().await?; + debug!("Pids request for {:?} returns successfully", req); + Ok(PidsResponse { + processes: procs.into(), + ..Default::default() + }) + } + async fn kill(&self, _ctx: &TtrpcContext, req: KillRequest) -> TtrpcResult { info!("Kill request for {:?}", req); let mut container = self.get_container(req.get_id()).await?; @@ -253,6 +268,25 @@ where Ok(Empty::new()) } + async fn close_io(&self, _ctx: &TtrpcContext, _req: CloseIORequest) -> TtrpcResult { + // TODO call close_io of container + Ok(Empty::new()) + } + + async fn update(&self, _ctx: &TtrpcContext, req: UpdateTaskRequest) -> TtrpcResult { + debug!("Update request for {:?}", req); + let resources: LinuxResources = serde_json::from_slice(req.get_resources().get_value()) + .map_err(|e| { + ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc::Code::INVALID_ARGUMENT, + format!("failed to parse resource spec: {}", e), + )) + })?; + let mut container = self.get_container(req.get_id()).await?; + container.update(&resources).await?; + Ok(Empty::new()) + } + async fn wait(&self, _ctx: &TtrpcContext, req: WaitRequest) -> TtrpcResult { info!("Wait request for {:?}", req); let exec_id = req.exec_id.as_str().as_option(); @@ -283,6 +317,31 @@ where Ok(resp) } + async fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult { + debug!("Stats request for {:?}", req); + let container = self.get_container(req.get_id()).await?; + let stats = container.stats().await?; + + let mut resp = StatsResponse::new(); + resp.set_stats(convert_to_any(Box::new(stats))?); + Ok(resp) + } + + async fn connect( + &self, + _ctx: &TtrpcContext, + req: ConnectRequest, + ) -> TtrpcResult { + info!("Connect request for {:?}", req); + let container = self.get_container(req.get_id()).await?; + + Ok(ConnectResponse { + shim_pid: std::process::id() as u32, + task_pid: container.pid().await as u32, + ..Default::default() + }) + } + async fn shutdown(&self, _ctx: &TtrpcContext, _req: ShutdownRequest) -> TtrpcResult { debug!("Shutdown request"); let containers = self.containers.lock().await; diff --git a/crates/runc-shim/src/synchronous/cgroup.rs b/crates/shim/src/cgroup.rs similarity index 95% rename from crates/runc-shim/src/synchronous/cgroup.rs rename to crates/shim/src/cgroup.rs index 97928f7..5536303 100644 --- a/crates/runc-shim/src/synchronous/cgroup.rs +++ b/crates/shim/src/cgroup.rs @@ -24,15 +24,14 @@ use cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid; use cgroups_rs::{hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem}; use oci_spec::runtime::LinuxResources; -use containerd_shim as shim; -use containerd_shim::api::Options; -use containerd_shim::protos::cgroups::metrics::{ +use containerd_shim_protos::cgroups::metrics::{ CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics, }; -use containerd_shim::protos::protobuf::well_known_types::Any; -use containerd_shim::protos::protobuf::Message; -use shim::error::{Error, Result}; -use shim::{io_error, other_error}; +use containerd_shim_protos::protobuf::well_known_types::Any; +use containerd_shim_protos::protobuf::Message; +use containerd_shim_protos::shim::oci::Options; + +use crate::error::{Error, Result}; // OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10 const OOM_SCORE_ADJ_MAX: i64 = 1000; @@ -131,7 +130,7 @@ pub fn collect_metrics(pid: u32) -> Result { } /// Update process cgroup limits -pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> { +pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> { // get container main process cgroup let path = get_cgroups_relative_paths_by_pid(pid).map_err(other_error!(e, "get process cgroup"))?; @@ -225,7 +224,7 @@ pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> { mod tests { use cgroups_rs::{hierarchies, Cgroup, CgroupPid}; - use crate::synchronous::cgroup::{ + use crate::cgroup::{ add_task_to_cgroup, adjust_oom_score, read_process_oom_score, OOM_SCORE_ADJ_MAX, }; diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 732c0aa..a0e9464 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -39,8 +39,9 @@ use std::os::unix::io::RawFd; use std::os::unix::net::UnixListener; use std::path::{Path, PathBuf}; -pub use containerd_shim_protos as protos; use nix::ioctl_write_ptr_bad; + +pub use containerd_shim_protos as protos; pub use protos::shim::shim::DeleteResponse; pub use protos::ttrpc::{context::Context, Result as TtrpcResult}; @@ -56,6 +57,7 @@ pub mod error; mod args; #[cfg(feature = "async")] pub mod asynchronous; +pub mod cgroup; pub mod event; pub mod io; mod logger;