add update/stats/pids for async

Signed-off-by: Feng Shaobao <fshb1988@gmail.com>
This commit is contained in:
Feng Shaobao 2022-03-15 17:10:58 +08:00
parent b3ac82d9cc
commit b48cc3d993
12 changed files with 188 additions and 27 deletions

View File

@ -36,7 +36,4 @@ tokio = { version = "1.17.0", features = ["full"], optional = true }
futures = { version = "0.3.21", optional = true }
containerd-shim = { path = "../shim", version = "0.3.0" }
runc = { path = "../runc", version = "0.2.0" }
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.2.9"
runc = { path = "../runc", version = "0.2.0" }

View File

@ -25,7 +25,7 @@ use async_trait::async_trait;
use log::{debug, error};
use nix::sys::signal::kill;
use nix::unistd::Pid;
use oci_spec::runtime::{LinuxNamespaceType, Process};
use oci_spec::runtime::{LinuxNamespaceType, LinuxResources, Process};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
@ -40,6 +40,8 @@ use containerd_shim::asynchronous::monitor::{
use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate};
use containerd_shim::io::Stdio;
use containerd_shim::monitor::{ExitEvent, Subject, Topic};
use containerd_shim::protos::api::ProcessInfo;
use containerd_shim::protos::cgroups::metrics::Metrics;
use containerd_shim::protos::protobuf::{CodedInputStream, Message};
use containerd_shim::util::{
asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime,
@ -270,6 +272,41 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
})
.map_err(other_error!(e, "failed delete"))
}
async fn update(&self, p: &mut InitProcess, resources: &LinuxResources) -> Result<()> {
if p.pid <= 0 {
return Err(other!(
"failed to update resources because init process is {}",
p.pid
));
}
containerd_shim::cgroup::update_resources(p.pid as u32, resources)
}
async fn stats(&self, p: &InitProcess) -> Result<Metrics> {
if p.pid <= 0 {
return Err(other!(
"failed to collect metrics because init process is {}",
p.pid
));
}
containerd_shim::cgroup::collect_metrics(p.pid as u32)
}
async fn ps(&self, p: &InitProcess) -> Result<Vec<ProcessInfo>> {
let pids = self
.runtime
.ps(&*p.id)
.await
.map_err(other_error!(e, "failed to execute runc ps"))?;
Ok(pids
.iter()
.map(|&x| ProcessInfo {
pid: x as u32,
..Default::default()
})
.collect())
}
}
impl RuncInitLifecycle {
@ -387,6 +424,18 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> {
Ok(())
}
async fn update(&self, _p: &mut ExecProcess, _resources: &LinuxResources) -> Result<()> {
Err(Error::Unimplemented("exec update".to_string()))
}
async fn stats(&self, _p: &ExecProcess) -> Result<Metrics> {
Err(Error::Unimplemented("exec stats".to_string()))
}
async fn ps(&self, _p: &ExecProcess) -> Result<Vec<ProcessInfo>> {
Err(Error::Unimplemented("exec ps".to_string()))
}
}
async fn copy_console(console_socket: &ConsoleSocket, stdio: &Stdio) -> Result<Console> {

View File

@ -18,7 +18,6 @@ use std::sync::Arc;
use containerd_shim::ExitSignal;
mod cgroup;
mod container;
mod io;
mod runc;

View File

@ -294,7 +294,7 @@ impl Container for RuncContainer {
#[cfg(target_os = "linux")]
fn stats(&self) -> Result<Metrics> {
let pid = self.common.init.pid() as u32;
crate::synchronous::cgroup::collect_metrics(pid)
containerd_shim::cgroup::collect_metrics(pid)
}
#[cfg(not(target_os = "linux"))]
@ -305,7 +305,7 @@ impl Container for RuncContainer {
#[cfg(target_os = "linux")]
fn update(&mut self, resources: &LinuxResources) -> Result<()> {
let pid = self.common.init.pid() as u32;
crate::synchronous::cgroup::update_metrics(pid, resources)
containerd_shim::cgroup::update_resources(pid, resources)
}
#[cfg(not(target_os = "linux"))]

View File

@ -21,9 +21,9 @@ use std::path::Path;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use containerd_shim as shim;
use log::{debug, error};
use containerd_shim as shim;
use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND};
use shim::api::*;
use shim::error::{Error, Result};
@ -73,7 +73,7 @@ impl Shim for Service {
let (child_id, address) = spawn(opts, &grouping, Vec::new())?;
#[cfg(target_os = "linux")]
crate::synchronous::cgroup::set_cgroup_and_oom_score(child_id)?;
containerd_shim::cgroup::set_cgroup_and_oom_score(child_id)?;
write_address(&address)?;
Ok(address)

View File

@ -40,5 +40,8 @@ tokio = { version = "1.17.0", features = ["full"], optional = true }
futures = {version = "0.3.21", optional = true}
signal-hook-tokio = {version = "0.3.1", optional = true, features = ["futures-v0_3"]}
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.2.9"
[dev-dependencies]
tempfile = "3.0"

View File

@ -18,10 +18,14 @@ use std::collections::HashMap;
use async_trait::async_trait;
use log::debug;
use oci_spec::runtime::LinuxResources;
use time::OffsetDateTime;
use tokio::sync::oneshot::Receiver;
use containerd_shim_protos::api::{CreateTaskRequest, ExecProcessRequest, StateResponse};
use containerd_shim_protos::api::{
CreateTaskRequest, ExecProcessRequest, ProcessInfo, StateResponse,
};
use containerd_shim_protos::cgroups::metrics::Metrics;
use crate::asynchronous::processes::Process;
use crate::error::Result;
@ -45,6 +49,9 @@ pub trait Container {
async fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>;
async fn pid(&self) -> i32;
async fn id(&self) -> String;
async fn update(&mut self, resources: &LinuxResources) -> Result<()>;
async fn stats(&self) -> Result<Metrics>;
async fn all_processes(&self) -> Result<Vec<ProcessInfo>>;
}
#[async_trait]
@ -154,6 +161,30 @@ where
async fn id(&self) -> String {
self.id.to_string()
}
#[cfg(target_os = "linux")]
async fn update(&mut self, resources: &LinuxResources) -> Result<()> {
self.init.update(resources).await
}
#[cfg(not(target_os = "linux"))]
async fn update(&mut self, resources: &LinuxResources) -> Result<()> {
Err(Error::Unimplemented("update".to_string()))
}
#[cfg(target_os = "linux")]
async fn stats(&self) -> Result<Metrics> {
self.init.stats().await
}
#[cfg(not(target_os = "linux"))]
async fn stats(&self) -> Result<Metrics> {
Err(Error::Unimplemented("stats".to_string()))
}
async fn all_processes(&self) -> Result<Vec<ProcessInfo>> {
self.init.ps().await
}
}
impl<T, E, P> ContainerTemplate<T, E, P>

View File

@ -293,10 +293,12 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|_| {
.and_then(|child| {
// Ownership of `listener` has been passed to child.
std::mem::forget(listener);
address
#[cfg(target_os = "linux")]
crate::cgroup::set_cgroup_and_oom_score(child.id())?;
Ok(address)
})
}

View File

@ -17,10 +17,12 @@
use std::os::unix::io::AsRawFd;
use async_trait::async_trait;
use oci_spec::runtime::LinuxResources;
use time::OffsetDateTime;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use containerd_shim_protos::api::{StateResponse, Status};
use containerd_shim_protos::api::{ProcessInfo, StateResponse, Status};
use containerd_shim_protos::cgroups::metrics::Metrics;
use containerd_shim_protos::protobuf::well_known_types::Timestamp;
use crate::io::Stdio;
@ -40,6 +42,9 @@ pub trait Process {
async fn exit_code(&self) -> i32;
async fn exited_at(&self) -> Option<OffsetDateTime>;
async fn resize_pty(&mut self, height: u32, width: u32) -> crate::Result<()>;
async fn update(&mut self, resources: &LinuxResources) -> crate::Result<()>;
async fn stats(&self) -> crate::Result<Metrics>;
async fn ps(&self) -> crate::Result<Vec<ProcessInfo>>;
}
#[async_trait]
@ -47,6 +52,9 @@ pub trait ProcessLifecycle<P: Process> {
async fn start(&self, p: &mut P) -> crate::Result<()>;
async fn kill(&self, p: &mut P, signal: u32, all: bool) -> crate::Result<()>;
async fn delete(&self, p: &mut P) -> crate::Result<()>;
async fn update(&self, p: &mut P, resources: &LinuxResources) -> crate::Result<()>;
async fn stats(&self, p: &P) -> crate::Result<Metrics>;
async fn ps(&self, p: &P) -> crate::Result<Vec<ProcessInfo>>;
}
pub struct ProcessTemplate<S> {
@ -164,4 +172,16 @@ where
None => Err(other!("there is no console")),
}
}
async fn update(&mut self, resources: &LinuxResources) -> crate::Result<()> {
self.lifecycle.clone().update(&mut self, resources).await
}
async fn stats(&self) -> crate::Result<Metrics> {
self.lifecycle.stats(self).await
}
async fn ps(&self) -> crate::Result<Vec<ProcessInfo>> {
self.lifecycle.ps(self).await
}
}

View File

@ -19,10 +19,14 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, info, warn};
use oci_spec::runtime::LinuxResources;
use tokio::sync::mpsc::Sender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use containerd_shim_protos::api::DeleteResponse;
use containerd_shim_protos::api::{
CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest, PidsResponse,
StatsRequest, StatsResponse, UpdateTaskRequest,
};
use containerd_shim_protos::events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart,
};
@ -39,7 +43,7 @@ use crate::api::{
use crate::asynchronous::container::{Container, ContainerFactory};
use crate::asynchronous::ExitSignal;
use crate::event::Event;
use crate::util::{convert_to_timestamp, AsOption};
use crate::util::{convert_to_any, convert_to_timestamp, AsOption};
use crate::TtrpcResult;
type EventSender = Sender<(String, Box<dyn Message>)>;
@ -215,6 +219,17 @@ where
Ok(resp)
}
async fn pids(&self, _ctx: &TtrpcContext, req: PidsRequest) -> TtrpcResult<PidsResponse> {
debug!("Pids request for {:?}", req);
let container = self.get_container(req.get_id()).await?;
let procs = container.all_processes().await?;
debug!("Pids request for {:?} returns successfully", req);
Ok(PidsResponse {
processes: procs.into(),
..Default::default()
})
}
async fn kill(&self, _ctx: &TtrpcContext, req: KillRequest) -> TtrpcResult<Empty> {
info!("Kill request for {:?}", req);
let mut container = self.get_container(req.get_id()).await?;
@ -253,6 +268,25 @@ where
Ok(Empty::new())
}
async fn close_io(&self, _ctx: &TtrpcContext, _req: CloseIORequest) -> TtrpcResult<Empty> {
// TODO call close_io of container
Ok(Empty::new())
}
async fn update(&self, _ctx: &TtrpcContext, req: UpdateTaskRequest) -> TtrpcResult<Empty> {
debug!("Update request for {:?}", req);
let resources: LinuxResources = serde_json::from_slice(req.get_resources().get_value())
.map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
format!("failed to parse resource spec: {}", e),
))
})?;
let mut container = self.get_container(req.get_id()).await?;
container.update(&resources).await?;
Ok(Empty::new())
}
async fn wait(&self, _ctx: &TtrpcContext, req: WaitRequest) -> TtrpcResult<WaitResponse> {
info!("Wait request for {:?}", req);
let exec_id = req.exec_id.as_str().as_option();
@ -283,6 +317,31 @@ where
Ok(resp)
}
async fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult<StatsResponse> {
debug!("Stats request for {:?}", req);
let container = self.get_container(req.get_id()).await?;
let stats = container.stats().await?;
let mut resp = StatsResponse::new();
resp.set_stats(convert_to_any(Box::new(stats))?);
Ok(resp)
}
async fn connect(
&self,
_ctx: &TtrpcContext,
req: ConnectRequest,
) -> TtrpcResult<ConnectResponse> {
info!("Connect request for {:?}", req);
let container = self.get_container(req.get_id()).await?;
Ok(ConnectResponse {
shim_pid: std::process::id() as u32,
task_pid: container.pid().await as u32,
..Default::default()
})
}
async fn shutdown(&self, _ctx: &TtrpcContext, _req: ShutdownRequest) -> TtrpcResult<Empty> {
debug!("Shutdown request");
let containers = self.containers.lock().await;

View File

@ -24,15 +24,14 @@ use cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid;
use cgroups_rs::{hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem};
use oci_spec::runtime::LinuxResources;
use containerd_shim as shim;
use containerd_shim::api::Options;
use containerd_shim::protos::cgroups::metrics::{
use containerd_shim_protos::cgroups::metrics::{
CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics,
};
use containerd_shim::protos::protobuf::well_known_types::Any;
use containerd_shim::protos::protobuf::Message;
use shim::error::{Error, Result};
use shim::{io_error, other_error};
use containerd_shim_protos::protobuf::well_known_types::Any;
use containerd_shim_protos::protobuf::Message;
use containerd_shim_protos::shim::oci::Options;
use crate::error::{Error, Result};
// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10
const OOM_SCORE_ADJ_MAX: i64 = 1000;
@ -131,7 +130,7 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
}
/// Update process cgroup limits
pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> {
pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> {
// get container main process cgroup
let path =
get_cgroups_relative_paths_by_pid(pid).map_err(other_error!(e, "get process cgroup"))?;
@ -225,7 +224,7 @@ pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> {
mod tests {
use cgroups_rs::{hierarchies, Cgroup, CgroupPid};
use crate::synchronous::cgroup::{
use crate::cgroup::{
add_task_to_cgroup, adjust_oom_score, read_process_oom_score, OOM_SCORE_ADJ_MAX,
};

View File

@ -39,8 +39,9 @@ use std::os::unix::io::RawFd;
use std::os::unix::net::UnixListener;
use std::path::{Path, PathBuf};
pub use containerd_shim_protos as protos;
use nix::ioctl_write_ptr_bad;
pub use containerd_shim_protos as protos;
pub use protos::shim::shim::DeleteResponse;
pub use protos::ttrpc::{context::Context, Result as TtrpcResult};
@ -56,6 +57,7 @@ pub mod error;
mod args;
#[cfg(feature = "async")]
pub mod asynchronous;
pub mod cgroup;
pub mod event;
pub mod io;
mod logger;