runc-shim: support remote publisher and extra some common methods

Signed-off-by: Zhang Tianyang <burning9699@gmail.com>
This commit is contained in:
Zhang Tianyang 2022-03-08 13:54:48 +08:00
parent 3ff1fc0a6d
commit 2fd561e290
21 changed files with 451 additions and 230 deletions

View File

@ -19,6 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, warn};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use ::runc::options::DeleteOpts;
use containerd_shim::asynchronous::container::Container;
@ -28,15 +29,14 @@ use containerd_shim::asynchronous::monitor::{
use containerd_shim::asynchronous::processes::Process;
use containerd_shim::asynchronous::publisher::RemotePublisher;
use containerd_shim::asynchronous::task::TaskService;
use containerd_shim::asynchronous::util::{
read_options, read_runtime, read_spec, write_str_to_file,
};
use containerd_shim::asynchronous::{spawn, ExitSignal, Shim};
use containerd_shim::event::Event;
use containerd_shim::monitor::{Subject, Topic};
use containerd_shim::protos::protobuf::SingularPtrField;
use containerd_shim::util::get_timestamp;
use containerd_shim::Error;
use containerd_shim::{io_error, Config, DeleteResponse, StartOpts};
use containerd_shim::protos::events::task::TaskExit;
use containerd_shim::protos::protobuf::{Message, SingularPtrField};
use containerd_shim::util::{convert_to_timestamp, timestamp};
use containerd_shim::util::{read_options, read_runtime, read_spec, write_str_to_file};
use containerd_shim::{io_error, Config, Context, DeleteResponse, Error, StartOpts};
use crate::asynchronous::runc::{RuncContainer, RuncFactory};
use crate::common::create_runc;
@ -54,13 +54,7 @@ pub(crate) struct Service {
impl Shim for Service {
type T = TaskService<RuncFactory, RuncContainer>;
async fn new(
_runtime_id: &str,
id: &str,
namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
async fn new(_runtime_id: &str, id: &str, namespace: &str, _config: &mut Config) -> Self {
let exit = Arc::new(ExitSignal::default());
// TODO: add publisher
Service {
@ -110,7 +104,7 @@ impl Shim for Service {
let mut resp = DeleteResponse::new();
// sigkill
resp.exit_status = 137;
resp.exited_at = SingularPtrField::some(get_timestamp()?);
resp.exited_at = SingularPtrField::some(timestamp()?);
Ok(resp)
}
@ -118,19 +112,24 @@ impl Shim for Service {
self.exit.wait().await;
}
async fn create_task_service(&self) -> Self::T {
async fn create_task_service(&self, publisher: RemotePublisher) -> Self::T {
let (tx, rx) = channel(128);
let exit_clone = self.exit.clone();
let task = TaskService::new(&*self.namespace, exit_clone);
let task = TaskService::new(&*self.namespace, exit_clone, tx.clone());
let s = monitor_subscribe(Topic::Pid)
.await
.expect("monitor subscribe failed");
process_exits(s, &task).await;
process_exits(s, &task, tx).await;
forward(publisher, self.namespace.to_string(), rx).await;
task
}
}
async fn process_exits(s: Subscription, task: &TaskService<RuncFactory, RuncContainer>) {
async fn process_exits(
s: Subscription,
task: &TaskService<RuncFactory, RuncContainer>,
tx: Sender<(String, Box<dyn Message>)>,
) {
let containers = task.containers.clone();
let mut s = s;
tokio::spawn(async move {
@ -150,7 +149,27 @@ async fn process_exits(s: Subscription, task: &TaskService<RuncFactory, RuncCont
}
// set exit for init process
cont.init.set_exited(exit_code).await;
// TODO: publish event
// publish event
let (_, code, exited_at) = match cont.get_exit_info(None).await {
Ok(info) => info,
Err(_) => break,
};
let ts = convert_to_timestamp(exited_at);
let event = TaskExit {
container_id: cont.id.to_string(),
id: cont.id.to_string(),
pid: cont.pid().await as u32,
exit_status: code as u32,
exited_at: SingularPtrField::some(ts),
..Default::default()
};
let topic = event.topic();
tx.send((topic.to_string(), Box::new(event)))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
break;
}
@ -169,3 +188,18 @@ async fn process_exits(s: Subscription, task: &TaskService<RuncFactory, RuncCont
monitor_unsubscribe(s.id).await.unwrap_or_default();
});
}
async fn forward(
publisher: RemotePublisher,
ns: String,
mut rx: Receiver<(String, Box<dyn Message>)>,
) {
tokio::spawn(async move {
while let Some((topic, e)) = rx.recv().await {
publisher
.publish(Context::default(), &topic, &ns, e)
.await
.unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e));
}
});
}

View File

@ -38,12 +38,12 @@ use containerd_shim::asynchronous::monitor::{
monitor_subscribe, monitor_unsubscribe, Subscription,
};
use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate};
use containerd_shim::asynchronous::util::{
asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime,
};
use containerd_shim::io::Stdio;
use containerd_shim::monitor::{ExitEvent, Subject, Topic};
use containerd_shim::protos::protobuf::{CodedInputStream, Message};
use containerd_shim::util::{
asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime,
};
use containerd_shim::Console;
use containerd_shim::{io_error, other, Error};
use containerd_shim::{other_error, Result};

View File

@ -32,8 +32,7 @@ use shim::error::{Error, Result};
use shim::io::Stdio;
use shim::ioctl_set_winsz;
use shim::protos::cgroups::metrics::Metrics;
use shim::protos::protobuf::well_known_types::Timestamp;
use shim::util::read_pid_from_file;
use shim::util::{convert_to_timestamp, read_pid_from_file};
use shim::Console;
use shim::{io_error, other, other_error};
@ -71,13 +70,14 @@ pub trait Container {
fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()>;
fn wait_channel(&mut self, exec_id: Option<&str>) -> Result<Receiver<i8>>;
fn get_exit_info(&self, exec_id: Option<&str>) -> Result<(i32, i32, Option<OffsetDateTime>)>;
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>;
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, i32, Option<OffsetDateTime>)>;
fn exec(&mut self, req: ExecProcessRequest) -> Result<()>;
fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>;
fn pid(&self) -> i32;
fn stats(&self) -> Result<Metrics>;
fn update(&mut self, resources: &LinuxResources) -> Result<()>;
fn pids(&self) -> Result<PidsResponse>;
fn id(&self) -> String;
}
pub struct CommonContainer<T, E> {
@ -228,12 +228,8 @@ impl Process for CommonProcess {
resp.stdout = self.stdio.stdout.to_string();
resp.stderr = self.stdio.stderr.to_string();
resp.exit_status = self.exit_code as u32;
if let Some(exit_at) = self.exited_at {
let mut time_stamp = Timestamp::new();
time_stamp.set_seconds(exit_at.unix_timestamp());
time_stamp.set_nanos(exit_at.nanosecond() as i32);
resp.set_exited_at(time_stamp);
}
let ts = convert_to_timestamp(self.exited_at);
resp.set_exited_at(ts);
resp
}

View File

@ -40,11 +40,9 @@ use shim::monitor::{monitor_subscribe, ExitEvent, Subject, Subscription, Topic};
use shim::mount::mount_rootfs;
use shim::protos::api::ProcessInfo;
use shim::protos::cgroups::metrics::Metrics;
use shim::protos::protobuf::well_known_types::{Any, Timestamp};
use shim::protos::protobuf::{CodedInputStream, Message, RepeatedField};
use shim::protos::shim::oci::ProcessDetails;
#[cfg(not(feature = "async"))]
use shim::util::{read_spec_from_file, write_options, write_runtime, IntoOption};
use shim::util::{convert_to_any, read_spec_from_file, write_options, write_runtime, IntoOption};
use shim::Console;
use shim::{other, other_error};
@ -146,6 +144,7 @@ pub(crate) struct RuncContainer {
impl Container for RuncContainer {
fn start(&mut self, exec_id: Option<&str>) -> Result<i32> {
let id = self.id();
match exec_id {
Some(exec_id) => {
let process = self
@ -189,7 +188,7 @@ impl Container for RuncContainer {
self.common
.init
.runtime
.exec(&self.common.id, &process.spec, Some(&exec_opts))
.exec(&id, &process.spec, Some(&exec_opts))
.map_err(other_error!(e, "failed exec"))?;
if process.common.stdio.terminal {
let console_socket =
@ -201,16 +200,16 @@ impl Container for RuncContainer {
}
process.common.set_pid_from_file(pid_path.as_path())?;
process.common.state = Status::RUNNING;
Ok(process.common.pid())
Ok(process.pid())
}
None => {
self.common
.init
.runtime
.start(self.common.id.as_str())
.start(&id)
.map_err(other_error!(e, "failed start"))?;
self.common.init.common.set_status(Status::RUNNING);
Ok(self.common.init.common.pid())
Ok(self.pid())
}
}
}
@ -231,7 +230,7 @@ impl Container for RuncContainer {
.init
.runtime
.kill(
self.common.id.as_str(),
self.id().as_str(),
signal,
Some(&runc::options::KillOpts { all }),
)
@ -247,8 +246,8 @@ impl Container for RuncContainer {
self.common.get_exit_info(exec_id)
}
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> {
let (pid, code, exit_at) = self
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, i32, Option<OffsetDateTime>)> {
let (pid, code, exited_at) = self
.get_exit_info(exec_id_opt)
.map_err(other_error!(e, "failed to get exit info"))?;
match exec_id_opt {
@ -260,7 +259,7 @@ impl Container for RuncContainer {
.init
.runtime
.delete(
self.common.id.as_str(),
self.id().as_str(),
Some(&runc::options::DeleteOpts { force: true }),
)
.or_else(|e| {
@ -273,13 +272,7 @@ impl Container for RuncContainer {
.map_err(other_error!(e, "failed delete"))?;
}
};
let mut time_stamp = Timestamp::new();
if let Some(exit_at) = exit_at {
time_stamp.set_seconds(exit_at.unix_timestamp());
time_stamp.set_nanos(exit_at.nanosecond() as i32);
}
Ok((pid, code as u32, time_stamp))
Ok((pid, code, exited_at))
}
fn exec(&mut self, req: ExecProcessRequest) -> Result<()> {
@ -339,16 +332,7 @@ impl Container for RuncContainer {
exec_id: "".to_string(),
..Default::default()
};
// marshal ProcessDetails to Any
let mut any = Any::new();
let mut data = Vec::new();
details
.write_to_vec(&mut data)
.map_err(other_error!(e, "write ProcessDetails to vec"))?;
any.set_value(data);
any.set_type_url(details.descriptor().full_name().to_string());
p_info.set_info(any);
p_info.set_info(convert_to_any(Box::new(details))?);
break;
}
}
@ -360,6 +344,10 @@ impl Container for RuncContainer {
};
Ok(resp)
}
fn id(&self) -> String {
self.common.id.to_string()
}
}
impl RuncContainer {

View File

@ -18,19 +18,25 @@
use std::env::current_dir;
use std::path::Path;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use containerd_shim as shim;
use containerd_shim::util::{read_options, read_runtime, read_spec_from_file, write_address};
use log::{debug, error};
use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND};
use shim::api::*;
use shim::error::{Error, Result};
use shim::event::Event;
use shim::monitor::{monitor_subscribe, Subject, Subscription, Topic};
use shim::protos::protobuf::SingularPtrField;
use shim::protos::events::task::TaskExit;
use shim::protos::protobuf::{Message, SingularPtrField};
use shim::publisher::RemotePublisher;
use shim::util::get_timestamp;
use shim::{debug, error, io_error, other_error, warn};
use shim::{spawn, Config, ExitSignal, Shim, StartOpts};
use shim::util::{
convert_to_timestamp, read_options, read_runtime, read_spec_from_file, timestamp, write_address,
};
use shim::{io_error, other_error, warn};
use shim::{spawn, Config, Context, ExitSignal, Shim, StartOpts};
use crate::common::{create_runc, ShimExecutor, GROUP_LABELS};
use crate::synchronous::container::{Container, Process};
@ -41,15 +47,7 @@ use crate::synchronous::Service;
impl Shim for Service {
type T = ShimTask<RuncFactory, RuncContainer>;
fn new(
_runtime_id: &str,
id: &str,
namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
// TODO: add publisher
fn new(_runtime_id: &str, id: &str, namespace: &str, _config: &mut Config) -> Self {
Service {
exit: Arc::new(ExitSignal::default()),
id: id.to_string(),
@ -100,7 +98,7 @@ impl Shim for Service {
let mut resp = DeleteResponse::new();
// sigkill
resp.exit_status = 137;
resp.exited_at = SingularPtrField::some(get_timestamp()?);
resp.exited_at = SingularPtrField::some(timestamp()?);
Ok(resp)
}
@ -113,18 +111,24 @@ impl Shim for Service {
self.exit.wait();
}
fn create_task_service(&self) -> Self::T {
let task = ShimTask::new(&self.namespace, Arc::clone(&self.exit));
fn create_task_service(&self, publisher: RemotePublisher) -> Self::T {
let (tx, rx) = channel();
let task = ShimTask::new(self.namespace.as_str(), Arc::clone(&self.exit), tx.clone());
let s = monitor_subscribe(Topic::All).expect("monitor subscribe failed");
self.process_exits(s, &task);
self.process_exits(s, &task, tx);
forward(publisher, self.namespace.to_string(), rx);
task
}
}
impl Service {
pub fn process_exits(&self, s: Subscription, task: &ShimTask<RuncFactory, RuncContainer>) {
pub fn process_exits(
&self,
s: Subscription,
task: &ShimTask<RuncFactory, RuncContainer>,
tx: Sender<(String, Box<dyn Message>)>,
) {
let containers = task.containers.clone();
std::thread::spawn(move || {
for e in s.rx.iter() {
@ -134,7 +138,7 @@ impl Service {
for (_k, cont) in containers.lock().unwrap().iter_mut() {
let bundle = cont.common.bundle.to_string();
// pid belongs to container init process
if cont.common.init.common.pid == pid {
if cont.pid() == pid {
// kill all children process if the container has a private PID namespace
if cont.should_kill_all_on_exit(&bundle) {
cont.kill(None, 9, true).unwrap_or_else(|e| {
@ -143,14 +147,33 @@ impl Service {
}
// set exit for init process
cont.common.init.set_exited(exit_code);
// TODO: publish event
// publish event
let (_, code, exited_at) = match cont.get_exit_info(None) {
Ok(info) => info,
Err(_) => break,
};
let ts = convert_to_timestamp(exited_at);
let event = TaskExit {
container_id: cont.id(),
id: cont.id(),
pid: cont.pid() as u32,
exit_status: code as u32,
exited_at: SingularPtrField::some(ts),
..Default::default()
};
let topic = event.topic();
tx.send((topic.to_string(), Box::new(event)))
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
break;
}
// pid belongs to container common process
for (_exec_id, p) in cont.common.processes.iter_mut() {
// set exit for exec process
if p.common.pid == pid {
if p.pid() == pid {
p.set_exited(exit_code);
// TODO: publish event
break;
@ -162,3 +185,13 @@ impl Service {
});
}
}
fn forward(publisher: RemotePublisher, ns: String, rx: Receiver<(String, Box<dyn Message>)>) {
std::thread::spawn(move || {
for (topic, e) in rx.iter() {
publisher
.publish(Context::default(), &topic, &ns, e)
.unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e));
}
});
}

View File

@ -15,23 +15,27 @@
*/
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex, Once};
use log::{debug, info};
use log::{debug, info, warn};
use oci_spec::runtime::LinuxResources;
use containerd_shim as shim;
use shim::other_error;
use shim::protos::protobuf::well_known_types::{Any, Timestamp};
use shim::api::*;
use shim::event::Event;
use shim::protos::events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart,
};
use shim::protos::protobuf::{Message, SingularPtrField};
use shim::util::IntoOption;
use shim::Error;
use shim::Task;
use shim::{api::*, ExitSignal};
use shim::{TtrpcContext, TtrpcResult};
use shim::util::{convert_to_any, convert_to_timestamp, IntoOption};
use shim::{other_error, Error, ExitSignal, Task, TtrpcContext, TtrpcResult};
use crate::synchronous::container::{Container, ContainerFactory};
type EventSender = Sender<(String, Box<dyn Message>)>;
pub struct ShimTask<F, C> {
pub containers: Arc<Mutex<HashMap<String, C>>>,
factory: F,
@ -39,23 +43,36 @@ pub struct ShimTask<F, C> {
exit: Arc<ExitSignal>,
/// Prevent multiple shutdown
shutdown: Once,
tx: Arc<Mutex<EventSender>>,
}
impl<F, C> ShimTask<F, C>
where
F: Default,
{
pub fn new(ns: &str, exit: Arc<ExitSignal>) -> Self {
pub fn new(ns: &str, exit: Arc<ExitSignal>, tx: EventSender) -> Self {
Self {
factory: Default::default(),
containers: Arc::new(Mutex::new(Default::default())),
namespace: ns.to_string(),
exit,
shutdown: Once::new(),
tx: Arc::new(Mutex::new(tx)),
}
}
}
impl<F, C> ShimTask<F, C> {
pub fn send_event(&self, event: impl Event) {
let topic = event.topic();
self.tx
.lock()
.unwrap()
.send((topic.to_string(), Box::new(event)))
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
}
impl<F, C> Task for ShimTask<F, C>
where
F: ContainerFactory<C>,
@ -86,10 +103,29 @@ where
let container = self.factory.create(ns, &req)?;
let mut resp = CreateTaskResponse::new();
resp.pid = container.pid() as u32;
let pid = container.pid() as u32;
resp.pid = pid;
containers.insert(id.to_string(), container);
info!("Create request for {} returns pid {}", id, resp.pid);
self.send_event(TaskCreate {
container_id: req.id.to_string(),
bundle: req.bundle.to_string(),
rootfs: req.rootfs,
io: SingularPtrField::some(TaskIO {
stdin: req.stdin.to_string(),
stdout: req.stdout.to_string(),
stderr: req.stderr.to_string(),
terminal: req.terminal,
unknown_fields: Default::default(),
cached_size: Default::default(),
}),
checkpoint: req.checkpoint.to_string(),
pid,
..Default::default()
});
info!("Create request for {} returns pid {}", id, pid);
Ok(resp)
}
@ -103,6 +139,22 @@ where
let mut resp = StartResponse::new();
resp.pid = pid as u32;
if req.exec_id.is_empty() {
self.send_event(TaskStart {
container_id: req.id.to_string(),
pid: pid as u32,
..Default::default()
});
} else {
self.send_event(TaskExecStarted {
container_id: req.id.to_string(),
exec_id: req.exec_id.to_string(),
pid: pid as u32,
..Default::default()
});
};
info!("Start request for {:?} returns pid {}", req, resp.get_pid());
Ok(resp)
}
@ -113,15 +165,26 @@ where
let container = containers.get_mut(req.get_id()).ok_or_else(|| {
Error::NotFoundError(format!("can not find container by id {}", req.get_id()))
})?;
let id = container.id();
let exec_id_opt = req.get_exec_id().none_if(|x| x.is_empty());
let (pid, exit_status, exited_at) = container.delete(exec_id_opt)?;
if req.get_exec_id().is_empty() {
containers.remove(req.id.as_str());
}
let ts = convert_to_timestamp(exited_at);
self.send_event(TaskDelete {
container_id: id,
pid: pid as u32,
exit_status: exit_status as u32,
exited_at: SingularPtrField::some(ts.clone()),
..Default::default()
});
let mut resp = DeleteResponse::new();
resp.set_exited_at(exited_at);
resp.set_exited_at(ts);
resp.set_pid(pid as u32);
resp.set_exit_status(exit_status);
resp.set_exit_status(exit_status as u32);
info!(
"Delete request for {} {} returns {:?}",
req.get_id(),
@ -158,6 +221,7 @@ where
}
fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult<Empty> {
let exec_id = req.get_exec_id().to_string();
info!(
"Exec request for id: {} exec_id: {}",
req.get_id(),
@ -168,6 +232,13 @@ where
Error::Other(format!("can not find container by id {}", req.get_id()))
})?;
container.exec(req)?;
self.send_event(TaskExecAdded {
container_id: container.id(),
exec_id,
..Default::default()
});
Ok(Empty::new())
}
@ -235,11 +306,7 @@ where
let (_, code, exited_at) = container.get_exit_info(exec_id)?;
let mut resp = WaitResponse::new();
resp.exit_status = code as u32;
let mut ts = Timestamp::new();
if let Some(ea) = exited_at {
ts.seconds = ea.unix_timestamp();
ts.nanos = ea.nanosecond() as i32;
}
let ts = convert_to_timestamp(exited_at);
resp.exited_at = SingularPtrField::some(ts);
info!("Wait request for {:?} returns {:?}", req, &resp);
Ok(resp)
@ -253,17 +320,8 @@ where
})?;
let stats = container.stats()?;
// marshal to ttrpc Any
let mut any = Any::new();
let mut data = Vec::new();
stats
.write_to_vec(&mut data)
.map_err(other_error!(e, "write stats to vec"))?;
any.set_value(data);
any.set_type_url(stats.descriptor().full_name().to_string());
let mut resp = StatsResponse::new();
resp.set_stats(any);
resp.set_stats(convert_to_any(Box::new(stats))?);
Ok(resp)
}

View File

@ -41,7 +41,7 @@ fn main() {
println!("Sending event");
publisher
.publish(ctx, "/tasks/oom", "default", event)
.publish(ctx, "/tasks/oom", "default", Box::new(event))
.expect("Publish failed");
println!("Done");
@ -70,7 +70,7 @@ async fn main() {
println!("Sending event");
publisher
.publish(ctx, "/tasks/oom", "default", event)
.publish(ctx, "/tasks/oom", "default", Box::new(event))
.await
.expect("Publish failed");

View File

@ -23,7 +23,7 @@ mod skeleton {
use log::info;
use containerd_shim as shim;
use containerd_shim::synchronous::publisher::RemotePublisher;
use shim::synchronous::publisher::RemotePublisher;
use shim::{api, Config, DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult};
#[derive(Clone)]
@ -34,13 +34,7 @@ mod skeleton {
impl shim::Shim for Service {
type T = Service;
fn new(
_runtime_id: &str,
_id: &str,
_namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
fn new(_runtime_id: &str, _id: &str, _namespace: &str, _config: &mut Config) -> Self {
Service {
exit: Arc::new(ExitSignal::default()),
}
@ -60,7 +54,7 @@ mod skeleton {
self.exit.wait();
}
fn create_task_service(&self) -> Self::T {
fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T {
self.clone()
}
}

View File

@ -19,8 +19,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::info;
use containerd_shim::asynchronous::publisher::RemotePublisher;
use containerd_shim::asynchronous::{run, spawn, ExitSignal, Shim};
use containerd_shim::publisher::RemotePublisher;
use containerd_shim::{Config, Error, StartOpts, TtrpcResult};
use containerd_shim_protos::api;
use containerd_shim_protos::api::DeleteResponse;
@ -36,13 +36,7 @@ struct Service {
impl Shim for Service {
type T = Service;
async fn new(
_runtime_id: &str,
_id: &str,
_namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
async fn new(_runtime_id: &str, _id: &str, _namespace: &str, _config: &mut Config) -> Self {
Service {
exit: Arc::new(ExitSignal::default()),
}
@ -62,7 +56,7 @@ impl Shim for Service {
self.exit.wait().await;
}
async fn create_task_service(&self) -> Self::T {
async fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T {
self.clone()
}
}

View File

@ -20,8 +20,7 @@ use log::warn;
use tokio::net::{UnixListener, UnixStream};
use uuid::Uuid;
use crate::asynchronous::util::mkdir;
use crate::util::xdg_runtime_dir;
use crate::util::{mkdir, xdg_runtime_dir};
use crate::Error;
use crate::Result;

View File

@ -22,7 +22,6 @@ use time::OffsetDateTime;
use tokio::sync::oneshot::Receiver;
use containerd_shim_protos::api::{CreateTaskRequest, ExecProcessRequest, StateResponse};
use containerd_shim_protos::protobuf::well_known_types::Timestamp;
use crate::asynchronous::processes::Process;
use crate::error::Result;
@ -37,11 +36,15 @@ pub trait Container {
async fn get_exit_info(
&self,
exec_id: Option<&str>,
) -> Result<(i32, u32, Option<OffsetDateTime>)>;
async fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>;
) -> Result<(i32, i32, Option<OffsetDateTime>)>;
async fn delete(
&mut self,
exec_id_opt: Option<&str>,
) -> Result<(i32, i32, Option<OffsetDateTime>)>;
async fn exec(&mut self, req: ExecProcessRequest) -> Result<()>;
async fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>;
async fn pid(&self) -> i32;
async fn id(&self) -> String;
}
#[async_trait]
@ -106,32 +109,30 @@ where
async fn get_exit_info(
&self,
exec_id: Option<&str>,
) -> Result<(i32, u32, Option<OffsetDateTime>)> {
) -> Result<(i32, i32, Option<OffsetDateTime>)> {
let process = self.get_process(exec_id)?;
Ok((
process.pid().await,
process.exit_code().await as u32,
process.exit_code().await,
process.exited_at().await,
))
}
async fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> {
let (pid, code, exit_at) = self.get_exit_info(exec_id_opt).await?;
let mut timestamp = Timestamp::new();
if let Some(exit_at) = exit_at {
timestamp.set_seconds(exit_at.unix_timestamp());
timestamp.set_nanos(exit_at.nanosecond() as i32);
}
async fn delete(
&mut self,
exec_id_opt: Option<&str>,
) -> Result<(i32, i32, Option<OffsetDateTime>)> {
let (pid, code, exited_at) = self.get_exit_info(exec_id_opt).await?;
let process = self.get_mut_process(exec_id_opt);
match process {
Ok(p) => p.delete().await?,
Err(Error::NotFoundError(_)) => return Ok((pid, code, timestamp)),
Err(Error::NotFoundError(_)) => return Ok((pid, code, exited_at)),
Err(e) => return Err(e),
}
if let Some(exec_id) = exec_id_opt {
self.processes.remove(exec_id);
}
Ok((pid, code, timestamp))
Ok((pid, code, exited_at))
}
async fn exec(&mut self, req: ExecProcessRequest) -> Result<()> {
@ -149,6 +150,10 @@ where
async fn pid(&self) -> i32 {
self.init.pid().await
}
async fn id(&self) -> String {
self.id.to_string()
}
}
impl<T, E, P> ContainerTemplate<T, E, P>

View File

@ -40,9 +40,9 @@ use containerd_shim_protos::ttrpc::r#async::Server;
use crate::asynchronous::monitor::monitor_notify_by_pid;
use crate::asynchronous::publisher::RemotePublisher;
use crate::asynchronous::util::{asyncify, read_file_to_str, write_str_to_file};
use crate::error::Error;
use crate::error::Result;
use crate::util::{asyncify, read_file_to_str, write_str_to_file};
use crate::{
args, logger, parse_sockaddr, reap, socket_address, Config, StartOpts, SOCKET_FD, TTRPC_ADDRESS,
};
@ -69,15 +69,8 @@ pub trait Shim {
/// - `runtime_id`: identifier of the container runtime.
/// - `id`: identifier of the shim/container, passed in from Containerd.
/// - `namespace`: namespace of the shim/container, passed in from Containerd.
/// - `publisher`: publisher to send events to Containerd.
/// - `config`: for the shim to pass back configuration information
async fn new(
runtime_id: &str,
id: &str,
namespace: &str,
publisher: RemotePublisher,
config: &mut Config,
) -> Self;
async fn new(runtime_id: &str, id: &str, namespace: &str, config: &mut Config) -> Self;
/// Start shim will be called by containerd when launching new shim instance.
///
@ -95,7 +88,7 @@ pub trait Shim {
async fn wait(&mut self);
/// Create the task service object asynchronously.
async fn create_task_service(&self) -> Self::T;
async fn create_task_service(&self, publisher: RemotePublisher) -> Self::T;
}
/// Async Shim entry point that must be invoked from tokio `main`.
@ -128,8 +121,6 @@ where
let flags = args::parse(&os_args[1..])?;
let ttrpc_address = env::var(TTRPC_ADDRESS)?;
let publisher = publisher::RemotePublisher::new(&ttrpc_address).await?;
// Create shim instance
let mut config = opts.unwrap_or_else(Config::default);
@ -140,14 +131,7 @@ where
reap::set_subreaper()?;
}
let mut shim = T::new(
runtime_id,
&flags.id,
&flags.namespace,
publisher,
&mut config,
)
.await;
let mut shim = T::new(runtime_id, &flags.id, &flags.namespace, &mut config).await;
match flags.action.as_str() {
"start" => {
@ -187,7 +171,8 @@ where
logger::init(flags.debug)?;
}
let task = shim.create_task_service().await;
let publisher = RemotePublisher::new(&ttrpc_address).await?;
let task = shim.create_task_service(publisher).await;
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;

View File

@ -23,8 +23,8 @@ use tokio::sync::oneshot::{channel, Receiver, Sender};
use containerd_shim_protos::api::{StateResponse, Status};
use containerd_shim_protos::protobuf::well_known_types::Timestamp;
use crate::asynchronous::util::asyncify;
use crate::io::Stdio;
use crate::util::asyncify;
use crate::Error;
use crate::{ioctl_set_winsz, Console};

View File

@ -26,9 +26,9 @@ use containerd_shim_protos::ttrpc;
use containerd_shim_protos::ttrpc::context::Context;
use containerd_shim_protos::ttrpc::r#async::TtrpcContext;
use crate::asynchronous::util::asyncify;
use crate::error::Result;
use crate::util::{any, connect, timestamp};
use crate::util::asyncify;
use crate::util::{connect, convert_to_any, timestamp};
/// Async Remote publisher connects to containerd's TTRPC endpoint to publish events from shim.
pub struct RemotePublisher {
@ -67,13 +67,13 @@ impl RemotePublisher {
ctx: Context,
topic: &str,
namespace: &str,
event: impl Message,
event: Box<dyn Message>,
) -> Result<()> {
let mut envelope = events::Envelope::new();
envelope.set_topic(topic.to_owned());
envelope.set_namespace(namespace.to_owned());
envelope.set_timestamp(timestamp()?);
envelope.set_event(any(event)?);
envelope.set_event(convert_to_any(event)?);
let mut req = events::ForwardRequest::new();
req.set_envelope(envelope);
@ -163,7 +163,7 @@ mod tests {
let mut msg = TaskOOM::new();
msg.set_container_id("test".to_string());
client
.publish(Context::default(), "/tasks/oom", "ns1", msg)
.publish(Context::default(), "/tasks/oom", "ns1", Box::new(msg))
.await
.unwrap();
match rx.recv().await {

View File

@ -18,12 +18,15 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, info};
use log::{debug, info, warn};
use tokio::sync::mpsc::Sender;
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
use containerd_shim_protos::api::DeleteResponse;
use containerd_shim_protos::protobuf::well_known_types::Timestamp;
use containerd_shim_protos::protobuf::SingularPtrField;
use containerd_shim_protos::events::task::{
TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart,
};
use containerd_shim_protos::protobuf::{Message, SingularPtrField};
use containerd_shim_protos::shim_async::Task;
use containerd_shim_protos::ttrpc;
use containerd_shim_protos::ttrpc::r#async::TtrpcContext;
@ -35,9 +38,12 @@ use crate::api::{
};
use crate::asynchronous::container::{Container, ContainerFactory};
use crate::asynchronous::ExitSignal;
use crate::util::AsOption;
use crate::event::Event;
use crate::util::{convert_to_timestamp, AsOption};
use crate::TtrpcResult;
type EventSender = Sender<(String, Box<dyn Message>)>;
/// TaskService is a Task template struct, it is considered a helper struct,
/// which has already implemented `Task` trait, so that users can make it the type `T`
/// parameter of `Service`, and implements their own `ContainerFactory` and `Container`.
@ -46,18 +52,20 @@ pub struct TaskService<F, C> {
pub containers: Arc<Mutex<HashMap<String, C>>>,
pub namespace: String,
pub exit: Arc<ExitSignal>,
pub tx: EventSender,
}
impl<F, C> TaskService<F, C>
where
F: Default,
{
pub fn new(ns: &str, exit: Arc<ExitSignal>) -> Self {
pub fn new(ns: &str, exit: Arc<ExitSignal>, tx: EventSender) -> Self {
Self {
factory: Default::default(),
containers: Arc::new(Mutex::new(Default::default())),
namespace: ns.to_string(),
exit,
tx,
}
}
}
@ -74,6 +82,14 @@ impl<F, C> TaskService<F, C> {
let container = MutexGuard::map(containers, |m| m.get_mut(id).unwrap());
Ok(container)
}
pub async fn send_event(&self, event: impl Event) {
let topic = event.topic();
self.tx
.send((topic.to_string(), Box::new(event)))
.await
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
}
}
#[async_trait]
@ -104,9 +120,28 @@ where
let container = self.factory.create(ns, &req).await?;
let mut resp = CreateTaskResponse::new();
resp.pid = container.pid().await as u32;
let pid = container.pid().await as u32;
resp.pid = pid;
containers.insert(id.to_string(), container);
self.send_event(TaskCreate {
container_id: req.id.to_string(),
bundle: req.bundle.to_string(),
rootfs: req.rootfs,
io: SingularPtrField::some(TaskIO {
stdin: req.stdin.to_string(),
stdout: req.stdout.to_string(),
stderr: req.stderr.to_string(),
terminal: req.terminal,
unknown_fields: Default::default(),
cached_size: Default::default(),
}),
checkpoint: req.checkpoint.to_string(),
pid,
..Default::default()
})
.await;
info!("Create request for {} returns pid {}", id, resp.pid);
Ok(resp)
}
@ -118,6 +153,24 @@ where
let mut resp = StartResponse::new();
resp.pid = pid as u32;
if req.exec_id.is_empty() {
self.send_event(TaskStart {
container_id: req.id.to_string(),
pid: pid as u32,
..Default::default()
})
.await;
} else {
self.send_event(TaskExecStarted {
container_id: req.id.to_string(),
exec_id: req.exec_id.to_string(),
pid: pid as u32,
..Default::default()
})
.await;
};
info!("Start request for {:?} returns pid {}", req, resp.get_pid());
Ok(resp)
}
@ -131,16 +184,28 @@ where
format!("can not find container by id {}", req.get_id()),
))
})?;
let id = container.id().await;
let exec_id_opt = req.get_exec_id().as_option();
let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?;
self.factory.cleanup(&*self.namespace, container).await?;
if req.get_exec_id().is_empty() {
containers.remove(req.get_id());
}
let ts = convert_to_timestamp(exited_at);
self.send_event(TaskDelete {
container_id: id,
pid: pid as u32,
exit_status: exit_status as u32,
exited_at: SingularPtrField::some(ts.clone()),
..Default::default()
})
.await;
let mut resp = DeleteResponse::new();
resp.set_exited_at(exited_at);
resp.set_exited_at(ts);
resp.set_pid(pid as u32);
resp.set_exit_status(exit_status);
resp.set_exit_status(exit_status as u32);
info!(
"Delete request for {} {} returns {:?}",
req.get_id(),
@ -162,8 +227,17 @@ where
async fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult<Empty> {
info!("Exec request for {:?}", req);
let exec_id = req.get_exec_id().to_string();
let mut container = self.get_container(req.get_id()).await?;
container.exec(req).await?;
self.send_event(TaskExecAdded {
container_id: container.id().await,
exec_id,
..Default::default()
})
.await;
Ok(Empty::new())
}
@ -202,12 +276,8 @@ where
let container = self.get_container(req.get_id()).await?;
let (_, code, exited_at) = container.get_exit_info(exec_id).await?;
let mut resp = WaitResponse::new();
resp.exit_status = code;
let mut ts = Timestamp::new();
if let Some(ea) = exited_at {
ts.seconds = ea.unix_timestamp();
ts.nanos = ea.nanosecond() as i32;
}
resp.exit_status = code as u32;
let ts = convert_to_timestamp(exited_at);
resp.exited_at = SingularPtrField::some(ts);
info!("Wait request for {:?} returns {:?}", req, &resp);
Ok(resp)

View File

@ -142,7 +142,7 @@ pub async fn mkdir(path: impl AsRef<Path>, mode: mode_t) -> Result<()> {
#[cfg(test)]
mod tests {
use crate::asynchronous::util::{read_file_to_str, write_str_to_file};
use crate::util::{read_file_to_str, write_str_to_file};
#[tokio::test]
async fn test_read_write_str() {

66
crates/shim/src/event.rs Normal file
View File

@ -0,0 +1,66 @@
use containerd_shim_protos::events::task::*;
use containerd_shim_protos::protobuf::Message;
pub trait Event: Message {
fn topic(&self) -> String;
}
impl Event for TaskCreate {
fn topic(&self) -> String {
"/tasks/create".to_string()
}
}
impl Event for TaskStart {
fn topic(&self) -> String {
"/tasks/start".to_string()
}
}
impl Event for TaskExecAdded {
fn topic(&self) -> String {
"/tasks/exec-added".to_string()
}
}
impl Event for TaskExecStarted {
fn topic(&self) -> String {
"/tasks/exec-started".to_string()
}
}
impl Event for TaskPaused {
fn topic(&self) -> String {
"/tasks/paused".to_string()
}
}
impl Event for TaskResumed {
fn topic(&self) -> String {
"/tasks/resumed".to_string()
}
}
impl Event for TaskExit {
fn topic(&self) -> String {
"/tasks/exit".to_string()
}
}
impl Event for TaskDelete {
fn topic(&self) -> String {
"/tasks/delete".to_string()
}
}
impl Event for TaskOOM {
fn topic(&self) -> String {
"/tasks/oom".to_string()
}
}
impl Event for TaskCheckpointed {
fn topic(&self) -> String {
"/tasks/checkpointed".to_string()
}
}

View File

@ -56,6 +56,7 @@ pub mod error;
mod args;
#[cfg(feature = "async")]
pub mod asynchronous;
pub mod event;
pub mod io;
mod logger;
pub mod monitor;

View File

@ -103,15 +103,8 @@ pub trait Shim {
/// - `runtime_id`: identifier of the container runtime.
/// - `id`: identifier of the shim/container, passed in from Containerd.
/// - `namespace`: namespace of the shim/container, passed in from Containerd.
/// - `publisher`: publisher to send events to Containerd.
/// - `config`: for the shim to pass back configuration information
fn new(
runtime_id: &str,
id: &str,
namespace: &str,
publisher: RemotePublisher,
config: &mut Config,
) -> Self;
fn new(runtime_id: &str, id: &str, namespace: &str, config: &mut Config) -> Self;
/// Start shim will be called by containerd when launching new shim instance.
///
@ -128,7 +121,7 @@ pub trait Shim {
fn wait(&mut self);
/// Create the task service object.
fn create_task_service(&self) -> Self::T;
fn create_task_service(&self, publisher: RemotePublisher) -> Self::T;
}
/// Shim entry point that must be invoked from `main`.
@ -151,7 +144,6 @@ where
let flags = args::parse(&os_args[1..])?;
let ttrpc_address = env::var(TTRPC_ADDRESS)?;
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
// Create shim instance
let mut config = opts.unwrap_or_else(Config::default);
@ -163,13 +155,7 @@ where
reap::set_subreaper()?;
}
let mut shim = T::new(
runtime_id,
&flags.id,
&flags.namespace,
publisher,
&mut config,
);
let mut shim = T::new(runtime_id, &flags.id, &flags.namespace, &mut config);
match flags.action.as_str() {
"start" => {
@ -205,7 +191,8 @@ where
logger::init(flags.debug)?;
}
let task = shim.create_task_service();
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
let task = shim.create_task_service(publisher);
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;

View File

@ -18,15 +18,16 @@
use protobuf::Message;
use containerd_shim_protos as client;
use client::protobuf;
use client::shim::events;
use client::ttrpc::{self, context::Context};
use client::types::empty;
use client::{Client, Events, EventsClient};
use containerd_shim_protos as client;
use crate::error::Result;
use crate::util::{any, connect, timestamp};
use crate::util::{connect, convert_to_any, timestamp};
/// Remote publisher connects to containerd's TTRPC endpoint to publish events from shim.
pub struct RemotePublisher {
@ -59,13 +60,13 @@ impl RemotePublisher {
ctx: Context,
topic: &str,
namespace: &str,
event: impl Message,
event: Box<dyn Message>,
) -> Result<()> {
let mut envelope = events::Envelope::new();
envelope.set_topic(topic.to_owned());
envelope.set_namespace(namespace.to_owned());
envelope.set_timestamp(timestamp()?);
envelope.set_event(any(event)?);
envelope.set_event(convert_to_any(event)?);
let mut req = events::ForwardRequest::new();
req.set_envelope(envelope);
@ -109,12 +110,6 @@ mod tests {
}
}
#[test]
fn test_timestamp() {
let ts = timestamp().unwrap();
assert!(ts.seconds > 0);
}
#[test]
fn test_connect() {
let tmpdir = tempfile::tempdir().unwrap();
@ -149,7 +144,7 @@ mod tests {
let mut msg = TaskOOM::new();
msg.set_container_id("test".to_string());
client
.publish(Context::default(), "/tasks/oom", "ns1", msg)
.publish(Context::default(), "/tasks/oom", "ns1", Box::new(msg))
.unwrap();
barrier.wait();

View File

@ -19,6 +19,7 @@ use std::os::unix::io::RawFd;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::api::Options;
#[cfg(feature = "async")]
@ -95,14 +96,6 @@ impl From<JsonOptions> for Options {
}
}
pub fn get_timestamp() -> Result<Timestamp> {
let mut timestamp = Timestamp::new();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
timestamp.set_seconds(now.as_secs() as i64);
timestamp.set_nanos(now.subsec_nanos() as i32);
Ok(timestamp)
}
pub fn connect(address: impl AsRef<str>) -> Result<RawFd> {
use nix::sys::socket::*;
use nix::unistd::close;
@ -148,10 +141,22 @@ pub fn timestamp() -> Result<Timestamp> {
Ok(ts)
}
pub fn any(event: impl Message) -> Result<Any> {
let data = event.write_to_bytes()?;
pub fn convert_to_timestamp(exited_at: Option<OffsetDateTime>) -> Timestamp {
let mut ts = Timestamp::new();
if let Some(ea) = exited_at {
ts.seconds = ea.unix_timestamp();
ts.nanos = ea.nanosecond() as i32;
}
ts
}
pub fn convert_to_any(obj: Box<dyn Message>) -> Result<Any> {
let mut data = Vec::new();
obj.write_to_vec(&mut data)?;
let mut any = Any::new();
any.merge_from_bytes(&data)?;
any.set_value(data);
any.set_type_url(obj.descriptor().full_name().to_string());
Ok(any)
}
@ -194,3 +199,14 @@ impl AsOption for str {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timestamp() {
let ts = timestamp().unwrap();
assert!(ts.seconds > 0);
}
}