diff --git a/crates/runc-shim/src/asynchronous/mod.rs b/crates/runc-shim/src/asynchronous/mod.rs index 0bfb04e..94d6949 100644 --- a/crates/runc-shim/src/asynchronous/mod.rs +++ b/crates/runc-shim/src/asynchronous/mod.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, error, warn}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use ::runc::options::DeleteOpts; use containerd_shim::asynchronous::container::Container; @@ -28,15 +29,14 @@ use containerd_shim::asynchronous::monitor::{ use containerd_shim::asynchronous::processes::Process; use containerd_shim::asynchronous::publisher::RemotePublisher; use containerd_shim::asynchronous::task::TaskService; -use containerd_shim::asynchronous::util::{ - read_options, read_runtime, read_spec, write_str_to_file, -}; use containerd_shim::asynchronous::{spawn, ExitSignal, Shim}; +use containerd_shim::event::Event; use containerd_shim::monitor::{Subject, Topic}; -use containerd_shim::protos::protobuf::SingularPtrField; -use containerd_shim::util::get_timestamp; -use containerd_shim::Error; -use containerd_shim::{io_error, Config, DeleteResponse, StartOpts}; +use containerd_shim::protos::events::task::TaskExit; +use containerd_shim::protos::protobuf::{Message, SingularPtrField}; +use containerd_shim::util::{convert_to_timestamp, timestamp}; +use containerd_shim::util::{read_options, read_runtime, read_spec, write_str_to_file}; +use containerd_shim::{io_error, Config, Context, DeleteResponse, Error, StartOpts}; use crate::asynchronous::runc::{RuncContainer, RuncFactory}; use crate::common::create_runc; @@ -54,13 +54,7 @@ pub(crate) struct Service { impl Shim for Service { type T = TaskService; - async fn new( - _runtime_id: &str, - id: &str, - namespace: &str, - _publisher: RemotePublisher, - _config: &mut Config, - ) -> Self { + async fn new(_runtime_id: &str, id: &str, namespace: &str, _config: &mut Config) -> Self { let exit = Arc::new(ExitSignal::default()); // TODO: add publisher Service { @@ -110,7 +104,7 @@ impl Shim for Service { let mut resp = DeleteResponse::new(); // sigkill resp.exit_status = 137; - resp.exited_at = SingularPtrField::some(get_timestamp()?); + resp.exited_at = SingularPtrField::some(timestamp()?); Ok(resp) } @@ -118,19 +112,24 @@ impl Shim for Service { self.exit.wait().await; } - async fn create_task_service(&self) -> Self::T { + async fn create_task_service(&self, publisher: RemotePublisher) -> Self::T { + let (tx, rx) = channel(128); let exit_clone = self.exit.clone(); - let task = TaskService::new(&*self.namespace, exit_clone); + let task = TaskService::new(&*self.namespace, exit_clone, tx.clone()); let s = monitor_subscribe(Topic::Pid) .await .expect("monitor subscribe failed"); - process_exits(s, &task).await; - + process_exits(s, &task, tx).await; + forward(publisher, self.namespace.to_string(), rx).await; task } } -async fn process_exits(s: Subscription, task: &TaskService) { +async fn process_exits( + s: Subscription, + task: &TaskService, + tx: Sender<(String, Box)>, +) { let containers = task.containers.clone(); let mut s = s; tokio::spawn(async move { @@ -150,7 +149,27 @@ async fn process_exits(s: Subscription, task: &TaskService info, + Err(_) => break, + }; + + let ts = convert_to_timestamp(exited_at); + let event = TaskExit { + container_id: cont.id.to_string(), + id: cont.id.to_string(), + pid: cont.pid().await as u32, + exit_status: code as u32, + exited_at: SingularPtrField::some(ts), + ..Default::default() + }; + let topic = event.topic(); + tx.send((topic.to_string(), Box::new(event))) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + break; } @@ -169,3 +188,18 @@ async fn process_exits(s: Subscription, task: &TaskService)>, +) { + tokio::spawn(async move { + while let Some((topic, e)) = rx.recv().await { + publisher + .publish(Context::default(), &topic, &ns, e) + .await + .unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e)); + } + }); +} diff --git a/crates/runc-shim/src/asynchronous/runc.rs b/crates/runc-shim/src/asynchronous/runc.rs index 00efc9a..ba21c45 100644 --- a/crates/runc-shim/src/asynchronous/runc.rs +++ b/crates/runc-shim/src/asynchronous/runc.rs @@ -38,12 +38,12 @@ use containerd_shim::asynchronous::monitor::{ monitor_subscribe, monitor_unsubscribe, Subscription, }; use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate}; -use containerd_shim::asynchronous::util::{ - asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime, -}; use containerd_shim::io::Stdio; use containerd_shim::monitor::{ExitEvent, Subject, Topic}; use containerd_shim::protos::protobuf::{CodedInputStream, Message}; +use containerd_shim::util::{ + asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime, +}; use containerd_shim::Console; use containerd_shim::{io_error, other, Error}; use containerd_shim::{other_error, Result}; diff --git a/crates/runc-shim/src/synchronous/container.rs b/crates/runc-shim/src/synchronous/container.rs index 4e37dec..02c4f17 100644 --- a/crates/runc-shim/src/synchronous/container.rs +++ b/crates/runc-shim/src/synchronous/container.rs @@ -32,8 +32,7 @@ use shim::error::{Error, Result}; use shim::io::Stdio; use shim::ioctl_set_winsz; use shim::protos::cgroups::metrics::Metrics; -use shim::protos::protobuf::well_known_types::Timestamp; -use shim::util::read_pid_from_file; +use shim::util::{convert_to_timestamp, read_pid_from_file}; use shim::Console; use shim::{io_error, other, other_error}; @@ -71,13 +70,14 @@ pub trait Container { fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()>; fn wait_channel(&mut self, exec_id: Option<&str>) -> Result>; fn get_exit_info(&self, exec_id: Option<&str>) -> Result<(i32, i32, Option)>; - fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>; + fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, i32, Option)>; fn exec(&mut self, req: ExecProcessRequest) -> Result<()>; fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>; fn pid(&self) -> i32; fn stats(&self) -> Result; fn update(&mut self, resources: &LinuxResources) -> Result<()>; fn pids(&self) -> Result; + fn id(&self) -> String; } pub struct CommonContainer { @@ -228,12 +228,8 @@ impl Process for CommonProcess { resp.stdout = self.stdio.stdout.to_string(); resp.stderr = self.stdio.stderr.to_string(); resp.exit_status = self.exit_code as u32; - if let Some(exit_at) = self.exited_at { - let mut time_stamp = Timestamp::new(); - time_stamp.set_seconds(exit_at.unix_timestamp()); - time_stamp.set_nanos(exit_at.nanosecond() as i32); - resp.set_exited_at(time_stamp); - } + let ts = convert_to_timestamp(self.exited_at); + resp.set_exited_at(ts); resp } diff --git a/crates/runc-shim/src/synchronous/runc.rs b/crates/runc-shim/src/synchronous/runc.rs index 38f84f9..7afae5a 100644 --- a/crates/runc-shim/src/synchronous/runc.rs +++ b/crates/runc-shim/src/synchronous/runc.rs @@ -40,11 +40,9 @@ use shim::monitor::{monitor_subscribe, ExitEvent, Subject, Subscription, Topic}; use shim::mount::mount_rootfs; use shim::protos::api::ProcessInfo; use shim::protos::cgroups::metrics::Metrics; -use shim::protos::protobuf::well_known_types::{Any, Timestamp}; use shim::protos::protobuf::{CodedInputStream, Message, RepeatedField}; use shim::protos::shim::oci::ProcessDetails; -#[cfg(not(feature = "async"))] -use shim::util::{read_spec_from_file, write_options, write_runtime, IntoOption}; +use shim::util::{convert_to_any, read_spec_from_file, write_options, write_runtime, IntoOption}; use shim::Console; use shim::{other, other_error}; @@ -146,6 +144,7 @@ pub(crate) struct RuncContainer { impl Container for RuncContainer { fn start(&mut self, exec_id: Option<&str>) -> Result { + let id = self.id(); match exec_id { Some(exec_id) => { let process = self @@ -189,7 +188,7 @@ impl Container for RuncContainer { self.common .init .runtime - .exec(&self.common.id, &process.spec, Some(&exec_opts)) + .exec(&id, &process.spec, Some(&exec_opts)) .map_err(other_error!(e, "failed exec"))?; if process.common.stdio.terminal { let console_socket = @@ -201,16 +200,16 @@ impl Container for RuncContainer { } process.common.set_pid_from_file(pid_path.as_path())?; process.common.state = Status::RUNNING; - Ok(process.common.pid()) + Ok(process.pid()) } None => { self.common .init .runtime - .start(self.common.id.as_str()) + .start(&id) .map_err(other_error!(e, "failed start"))?; self.common.init.common.set_status(Status::RUNNING); - Ok(self.common.init.common.pid()) + Ok(self.pid()) } } } @@ -231,7 +230,7 @@ impl Container for RuncContainer { .init .runtime .kill( - self.common.id.as_str(), + self.id().as_str(), signal, Some(&runc::options::KillOpts { all }), ) @@ -247,8 +246,8 @@ impl Container for RuncContainer { self.common.get_exit_info(exec_id) } - fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> { - let (pid, code, exit_at) = self + fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, i32, Option)> { + let (pid, code, exited_at) = self .get_exit_info(exec_id_opt) .map_err(other_error!(e, "failed to get exit info"))?; match exec_id_opt { @@ -260,7 +259,7 @@ impl Container for RuncContainer { .init .runtime .delete( - self.common.id.as_str(), + self.id().as_str(), Some(&runc::options::DeleteOpts { force: true }), ) .or_else(|e| { @@ -273,13 +272,7 @@ impl Container for RuncContainer { .map_err(other_error!(e, "failed delete"))?; } }; - - let mut time_stamp = Timestamp::new(); - if let Some(exit_at) = exit_at { - time_stamp.set_seconds(exit_at.unix_timestamp()); - time_stamp.set_nanos(exit_at.nanosecond() as i32); - } - Ok((pid, code as u32, time_stamp)) + Ok((pid, code, exited_at)) } fn exec(&mut self, req: ExecProcessRequest) -> Result<()> { @@ -339,16 +332,7 @@ impl Container for RuncContainer { exec_id: "".to_string(), ..Default::default() }; - // marshal ProcessDetails to Any - let mut any = Any::new(); - let mut data = Vec::new(); - details - .write_to_vec(&mut data) - .map_err(other_error!(e, "write ProcessDetails to vec"))?; - any.set_value(data); - any.set_type_url(details.descriptor().full_name().to_string()); - - p_info.set_info(any); + p_info.set_info(convert_to_any(Box::new(details))?); break; } } @@ -360,6 +344,10 @@ impl Container for RuncContainer { }; Ok(resp) } + + fn id(&self) -> String { + self.common.id.to_string() + } } impl RuncContainer { diff --git a/crates/runc-shim/src/synchronous/service.rs b/crates/runc-shim/src/synchronous/service.rs index 2e4dd6c..03078b2 100644 --- a/crates/runc-shim/src/synchronous/service.rs +++ b/crates/runc-shim/src/synchronous/service.rs @@ -18,19 +18,25 @@ use std::env::current_dir; use std::path::Path; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use containerd_shim as shim; -use containerd_shim::util::{read_options, read_runtime, read_spec_from_file, write_address}; + +use log::{debug, error}; use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND}; use shim::api::*; use shim::error::{Error, Result}; +use shim::event::Event; use shim::monitor::{monitor_subscribe, Subject, Subscription, Topic}; -use shim::protos::protobuf::SingularPtrField; +use shim::protos::events::task::TaskExit; +use shim::protos::protobuf::{Message, SingularPtrField}; use shim::publisher::RemotePublisher; -use shim::util::get_timestamp; -use shim::{debug, error, io_error, other_error, warn}; -use shim::{spawn, Config, ExitSignal, Shim, StartOpts}; +use shim::util::{ + convert_to_timestamp, read_options, read_runtime, read_spec_from_file, timestamp, write_address, +}; +use shim::{io_error, other_error, warn}; +use shim::{spawn, Config, Context, ExitSignal, Shim, StartOpts}; use crate::common::{create_runc, ShimExecutor, GROUP_LABELS}; use crate::synchronous::container::{Container, Process}; @@ -41,15 +47,7 @@ use crate::synchronous::Service; impl Shim for Service { type T = ShimTask; - fn new( - _runtime_id: &str, - id: &str, - namespace: &str, - _publisher: RemotePublisher, - _config: &mut Config, - ) -> Self { - // TODO: add publisher - + fn new(_runtime_id: &str, id: &str, namespace: &str, _config: &mut Config) -> Self { Service { exit: Arc::new(ExitSignal::default()), id: id.to_string(), @@ -100,7 +98,7 @@ impl Shim for Service { let mut resp = DeleteResponse::new(); // sigkill resp.exit_status = 137; - resp.exited_at = SingularPtrField::some(get_timestamp()?); + resp.exited_at = SingularPtrField::some(timestamp()?); Ok(resp) } @@ -113,18 +111,24 @@ impl Shim for Service { self.exit.wait(); } - fn create_task_service(&self) -> Self::T { - let task = ShimTask::new(&self.namespace, Arc::clone(&self.exit)); + fn create_task_service(&self, publisher: RemotePublisher) -> Self::T { + let (tx, rx) = channel(); + let task = ShimTask::new(self.namespace.as_str(), Arc::clone(&self.exit), tx.clone()); let s = monitor_subscribe(Topic::All).expect("monitor subscribe failed"); - self.process_exits(s, &task); - + self.process_exits(s, &task, tx); + forward(publisher, self.namespace.to_string(), rx); task } } impl Service { - pub fn process_exits(&self, s: Subscription, task: &ShimTask) { + pub fn process_exits( + &self, + s: Subscription, + task: &ShimTask, + tx: Sender<(String, Box)>, + ) { let containers = task.containers.clone(); std::thread::spawn(move || { for e in s.rx.iter() { @@ -134,7 +138,7 @@ impl Service { for (_k, cont) in containers.lock().unwrap().iter_mut() { let bundle = cont.common.bundle.to_string(); // pid belongs to container init process - if cont.common.init.common.pid == pid { + if cont.pid() == pid { // kill all children process if the container has a private PID namespace if cont.should_kill_all_on_exit(&bundle) { cont.kill(None, 9, true).unwrap_or_else(|e| { @@ -143,14 +147,33 @@ impl Service { } // set exit for init process cont.common.init.set_exited(exit_code); - // TODO: publish event + + // publish event + let (_, code, exited_at) = match cont.get_exit_info(None) { + Ok(info) => info, + Err(_) => break, + }; + + let ts = convert_to_timestamp(exited_at); + let event = TaskExit { + container_id: cont.id(), + id: cont.id(), + pid: cont.pid() as u32, + exit_status: code as u32, + exited_at: SingularPtrField::some(ts), + ..Default::default() + }; + let topic = event.topic(); + tx.send((topic.to_string(), Box::new(event))) + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + break; } // pid belongs to container common process for (_exec_id, p) in cont.common.processes.iter_mut() { // set exit for exec process - if p.common.pid == pid { + if p.pid() == pid { p.set_exited(exit_code); // TODO: publish event break; @@ -162,3 +185,13 @@ impl Service { }); } } + +fn forward(publisher: RemotePublisher, ns: String, rx: Receiver<(String, Box)>) { + std::thread::spawn(move || { + for (topic, e) in rx.iter() { + publisher + .publish(Context::default(), &topic, &ns, e) + .unwrap_or_else(|e| warn!("publish {} to containerd: {}", topic, e)); + } + }); +} diff --git a/crates/runc-shim/src/synchronous/task.rs b/crates/runc-shim/src/synchronous/task.rs index 626a7f5..9e34e0c 100644 --- a/crates/runc-shim/src/synchronous/task.rs +++ b/crates/runc-shim/src/synchronous/task.rs @@ -15,23 +15,27 @@ */ use std::collections::HashMap; +use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex, Once}; -use log::{debug, info}; +use log::{debug, info, warn}; use oci_spec::runtime::LinuxResources; use containerd_shim as shim; -use shim::other_error; -use shim::protos::protobuf::well_known_types::{Any, Timestamp}; + +use shim::api::*; +use shim::event::Event; +use shim::protos::events::task::{ + TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart, +}; use shim::protos::protobuf::{Message, SingularPtrField}; -use shim::util::IntoOption; -use shim::Error; -use shim::Task; -use shim::{api::*, ExitSignal}; -use shim::{TtrpcContext, TtrpcResult}; +use shim::util::{convert_to_any, convert_to_timestamp, IntoOption}; +use shim::{other_error, Error, ExitSignal, Task, TtrpcContext, TtrpcResult}; use crate::synchronous::container::{Container, ContainerFactory}; +type EventSender = Sender<(String, Box)>; + pub struct ShimTask { pub containers: Arc>>, factory: F, @@ -39,23 +43,36 @@ pub struct ShimTask { exit: Arc, /// Prevent multiple shutdown shutdown: Once, + tx: Arc>, } impl ShimTask where F: Default, { - pub fn new(ns: &str, exit: Arc) -> Self { + pub fn new(ns: &str, exit: Arc, tx: EventSender) -> Self { Self { factory: Default::default(), containers: Arc::new(Mutex::new(Default::default())), namespace: ns.to_string(), exit, shutdown: Once::new(), + tx: Arc::new(Mutex::new(tx)), } } } +impl ShimTask { + pub fn send_event(&self, event: impl Event) { + let topic = event.topic(); + self.tx + .lock() + .unwrap() + .send((topic.to_string(), Box::new(event))) + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + } +} + impl Task for ShimTask where F: ContainerFactory, @@ -86,10 +103,29 @@ where let container = self.factory.create(ns, &req)?; let mut resp = CreateTaskResponse::new(); - resp.pid = container.pid() as u32; + let pid = container.pid() as u32; + resp.pid = pid; containers.insert(id.to_string(), container); - info!("Create request for {} returns pid {}", id, resp.pid); + + self.send_event(TaskCreate { + container_id: req.id.to_string(), + bundle: req.bundle.to_string(), + rootfs: req.rootfs, + io: SingularPtrField::some(TaskIO { + stdin: req.stdin.to_string(), + stdout: req.stdout.to_string(), + stderr: req.stderr.to_string(), + terminal: req.terminal, + unknown_fields: Default::default(), + cached_size: Default::default(), + }), + checkpoint: req.checkpoint.to_string(), + pid, + ..Default::default() + }); + + info!("Create request for {} returns pid {}", id, pid); Ok(resp) } @@ -103,6 +139,22 @@ where let mut resp = StartResponse::new(); resp.pid = pid as u32; + + if req.exec_id.is_empty() { + self.send_event(TaskStart { + container_id: req.id.to_string(), + pid: pid as u32, + ..Default::default() + }); + } else { + self.send_event(TaskExecStarted { + container_id: req.id.to_string(), + exec_id: req.exec_id.to_string(), + pid: pid as u32, + ..Default::default() + }); + }; + info!("Start request for {:?} returns pid {}", req, resp.get_pid()); Ok(resp) } @@ -113,15 +165,26 @@ where let container = containers.get_mut(req.get_id()).ok_or_else(|| { Error::NotFoundError(format!("can not find container by id {}", req.get_id())) })?; + let id = container.id(); let exec_id_opt = req.get_exec_id().none_if(|x| x.is_empty()); let (pid, exit_status, exited_at) = container.delete(exec_id_opt)?; if req.get_exec_id().is_empty() { containers.remove(req.id.as_str()); } + + let ts = convert_to_timestamp(exited_at); + self.send_event(TaskDelete { + container_id: id, + pid: pid as u32, + exit_status: exit_status as u32, + exited_at: SingularPtrField::some(ts.clone()), + ..Default::default() + }); + let mut resp = DeleteResponse::new(); - resp.set_exited_at(exited_at); + resp.set_exited_at(ts); resp.set_pid(pid as u32); - resp.set_exit_status(exit_status); + resp.set_exit_status(exit_status as u32); info!( "Delete request for {} {} returns {:?}", req.get_id(), @@ -158,6 +221,7 @@ where } fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult { + let exec_id = req.get_exec_id().to_string(); info!( "Exec request for id: {} exec_id: {}", req.get_id(), @@ -168,6 +232,13 @@ where Error::Other(format!("can not find container by id {}", req.get_id())) })?; container.exec(req)?; + + self.send_event(TaskExecAdded { + container_id: container.id(), + exec_id, + ..Default::default() + }); + Ok(Empty::new()) } @@ -235,11 +306,7 @@ where let (_, code, exited_at) = container.get_exit_info(exec_id)?; let mut resp = WaitResponse::new(); resp.exit_status = code as u32; - let mut ts = Timestamp::new(); - if let Some(ea) = exited_at { - ts.seconds = ea.unix_timestamp(); - ts.nanos = ea.nanosecond() as i32; - } + let ts = convert_to_timestamp(exited_at); resp.exited_at = SingularPtrField::some(ts); info!("Wait request for {:?} returns {:?}", req, &resp); Ok(resp) @@ -253,17 +320,8 @@ where })?; let stats = container.stats()?; - // marshal to ttrpc Any - let mut any = Any::new(); - let mut data = Vec::new(); - stats - .write_to_vec(&mut data) - .map_err(other_error!(e, "write stats to vec"))?; - any.set_value(data); - any.set_type_url(stats.descriptor().full_name().to_string()); - let mut resp = StatsResponse::new(); - resp.set_stats(any); + resp.set_stats(convert_to_any(Box::new(stats))?); Ok(resp) } diff --git a/crates/shim/examples/publish.rs b/crates/shim/examples/publish.rs index 8f7a85c..db3c370 100644 --- a/crates/shim/examples/publish.rs +++ b/crates/shim/examples/publish.rs @@ -41,7 +41,7 @@ fn main() { println!("Sending event"); publisher - .publish(ctx, "/tasks/oom", "default", event) + .publish(ctx, "/tasks/oom", "default", Box::new(event)) .expect("Publish failed"); println!("Done"); @@ -70,7 +70,7 @@ async fn main() { println!("Sending event"); publisher - .publish(ctx, "/tasks/oom", "default", event) + .publish(ctx, "/tasks/oom", "default", Box::new(event)) .await .expect("Publish failed"); diff --git a/crates/shim/examples/skeleton.rs b/crates/shim/examples/skeleton.rs index 7072e8b..ceb1959 100644 --- a/crates/shim/examples/skeleton.rs +++ b/crates/shim/examples/skeleton.rs @@ -23,7 +23,7 @@ mod skeleton { use log::info; use containerd_shim as shim; - use containerd_shim::synchronous::publisher::RemotePublisher; + use shim::synchronous::publisher::RemotePublisher; use shim::{api, Config, DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult}; #[derive(Clone)] @@ -34,13 +34,7 @@ mod skeleton { impl shim::Shim for Service { type T = Service; - fn new( - _runtime_id: &str, - _id: &str, - _namespace: &str, - _publisher: RemotePublisher, - _config: &mut Config, - ) -> Self { + fn new(_runtime_id: &str, _id: &str, _namespace: &str, _config: &mut Config) -> Self { Service { exit: Arc::new(ExitSignal::default()), } @@ -60,7 +54,7 @@ mod skeleton { self.exit.wait(); } - fn create_task_service(&self) -> Self::T { + fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T { self.clone() } } diff --git a/crates/shim/examples/skeleton_async.rs b/crates/shim/examples/skeleton_async.rs index 8f60491..7f54fc4 100644 --- a/crates/shim/examples/skeleton_async.rs +++ b/crates/shim/examples/skeleton_async.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use async_trait::async_trait; use log::info; -use containerd_shim::asynchronous::publisher::RemotePublisher; use containerd_shim::asynchronous::{run, spawn, ExitSignal, Shim}; +use containerd_shim::publisher::RemotePublisher; use containerd_shim::{Config, Error, StartOpts, TtrpcResult}; use containerd_shim_protos::api; use containerd_shim_protos::api::DeleteResponse; @@ -36,13 +36,7 @@ struct Service { impl Shim for Service { type T = Service; - async fn new( - _runtime_id: &str, - _id: &str, - _namespace: &str, - _publisher: RemotePublisher, - _config: &mut Config, - ) -> Self { + async fn new(_runtime_id: &str, _id: &str, _namespace: &str, _config: &mut Config) -> Self { Service { exit: Arc::new(ExitSignal::default()), } @@ -62,7 +56,7 @@ impl Shim for Service { self.exit.wait().await; } - async fn create_task_service(&self) -> Self::T { + async fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T { self.clone() } } diff --git a/crates/shim/src/asynchronous/console.rs b/crates/shim/src/asynchronous/console.rs index 160f0ba..8e18c95 100644 --- a/crates/shim/src/asynchronous/console.rs +++ b/crates/shim/src/asynchronous/console.rs @@ -20,8 +20,7 @@ use log::warn; use tokio::net::{UnixListener, UnixStream}; use uuid::Uuid; -use crate::asynchronous::util::mkdir; -use crate::util::xdg_runtime_dir; +use crate::util::{mkdir, xdg_runtime_dir}; use crate::Error; use crate::Result; diff --git a/crates/shim/src/asynchronous/container.rs b/crates/shim/src/asynchronous/container.rs index c0e924d..f4d9b23 100644 --- a/crates/shim/src/asynchronous/container.rs +++ b/crates/shim/src/asynchronous/container.rs @@ -22,7 +22,6 @@ use time::OffsetDateTime; use tokio::sync::oneshot::Receiver; use containerd_shim_protos::api::{CreateTaskRequest, ExecProcessRequest, StateResponse}; -use containerd_shim_protos::protobuf::well_known_types::Timestamp; use crate::asynchronous::processes::Process; use crate::error::Result; @@ -37,11 +36,15 @@ pub trait Container { async fn get_exit_info( &self, exec_id: Option<&str>, - ) -> Result<(i32, u32, Option)>; - async fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>; + ) -> Result<(i32, i32, Option)>; + async fn delete( + &mut self, + exec_id_opt: Option<&str>, + ) -> Result<(i32, i32, Option)>; async fn exec(&mut self, req: ExecProcessRequest) -> Result<()>; async fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()>; async fn pid(&self) -> i32; + async fn id(&self) -> String; } #[async_trait] @@ -106,32 +109,30 @@ where async fn get_exit_info( &self, exec_id: Option<&str>, - ) -> Result<(i32, u32, Option)> { + ) -> Result<(i32, i32, Option)> { let process = self.get_process(exec_id)?; Ok(( process.pid().await, - process.exit_code().await as u32, + process.exit_code().await, process.exited_at().await, )) } - async fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> { - let (pid, code, exit_at) = self.get_exit_info(exec_id_opt).await?; - let mut timestamp = Timestamp::new(); - if let Some(exit_at) = exit_at { - timestamp.set_seconds(exit_at.unix_timestamp()); - timestamp.set_nanos(exit_at.nanosecond() as i32); - } + async fn delete( + &mut self, + exec_id_opt: Option<&str>, + ) -> Result<(i32, i32, Option)> { + let (pid, code, exited_at) = self.get_exit_info(exec_id_opt).await?; let process = self.get_mut_process(exec_id_opt); match process { Ok(p) => p.delete().await?, - Err(Error::NotFoundError(_)) => return Ok((pid, code, timestamp)), + Err(Error::NotFoundError(_)) => return Ok((pid, code, exited_at)), Err(e) => return Err(e), } if let Some(exec_id) = exec_id_opt { self.processes.remove(exec_id); } - Ok((pid, code, timestamp)) + Ok((pid, code, exited_at)) } async fn exec(&mut self, req: ExecProcessRequest) -> Result<()> { @@ -149,6 +150,10 @@ where async fn pid(&self) -> i32 { self.init.pid().await } + + async fn id(&self) -> String { + self.id.to_string() + } } impl ContainerTemplate diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index 127cec6..0be5fd8 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -40,9 +40,9 @@ use containerd_shim_protos::ttrpc::r#async::Server; use crate::asynchronous::monitor::monitor_notify_by_pid; use crate::asynchronous::publisher::RemotePublisher; -use crate::asynchronous::util::{asyncify, read_file_to_str, write_str_to_file}; use crate::error::Error; use crate::error::Result; +use crate::util::{asyncify, read_file_to_str, write_str_to_file}; use crate::{ args, logger, parse_sockaddr, reap, socket_address, Config, StartOpts, SOCKET_FD, TTRPC_ADDRESS, }; @@ -69,15 +69,8 @@ pub trait Shim { /// - `runtime_id`: identifier of the container runtime. /// - `id`: identifier of the shim/container, passed in from Containerd. /// - `namespace`: namespace of the shim/container, passed in from Containerd. - /// - `publisher`: publisher to send events to Containerd. /// - `config`: for the shim to pass back configuration information - async fn new( - runtime_id: &str, - id: &str, - namespace: &str, - publisher: RemotePublisher, - config: &mut Config, - ) -> Self; + async fn new(runtime_id: &str, id: &str, namespace: &str, config: &mut Config) -> Self; /// Start shim will be called by containerd when launching new shim instance. /// @@ -95,7 +88,7 @@ pub trait Shim { async fn wait(&mut self); /// Create the task service object asynchronously. - async fn create_task_service(&self) -> Self::T; + async fn create_task_service(&self, publisher: RemotePublisher) -> Self::T; } /// Async Shim entry point that must be invoked from tokio `main`. @@ -128,8 +121,6 @@ where let flags = args::parse(&os_args[1..])?; let ttrpc_address = env::var(TTRPC_ADDRESS)?; - let publisher = publisher::RemotePublisher::new(&ttrpc_address).await?; - // Create shim instance let mut config = opts.unwrap_or_else(Config::default); @@ -140,14 +131,7 @@ where reap::set_subreaper()?; } - let mut shim = T::new( - runtime_id, - &flags.id, - &flags.namespace, - publisher, - &mut config, - ) - .await; + let mut shim = T::new(runtime_id, &flags.id, &flags.namespace, &mut config).await; match flags.action.as_str() { "start" => { @@ -187,7 +171,8 @@ where logger::init(flags.debug)?; } - let task = shim.create_task_service().await; + let publisher = RemotePublisher::new(&ttrpc_address).await?; + let task = shim.create_task_service(publisher).await; let task_service = create_task(Arc::new(Box::new(task))); let mut server = Server::new().register_service(task_service); server = server.add_listener(SOCKET_FD)?; diff --git a/crates/shim/src/asynchronous/processes.rs b/crates/shim/src/asynchronous/processes.rs index eb53fbc..2d0af1d 100644 --- a/crates/shim/src/asynchronous/processes.rs +++ b/crates/shim/src/asynchronous/processes.rs @@ -23,8 +23,8 @@ use tokio::sync::oneshot::{channel, Receiver, Sender}; use containerd_shim_protos::api::{StateResponse, Status}; use containerd_shim_protos::protobuf::well_known_types::Timestamp; -use crate::asynchronous::util::asyncify; use crate::io::Stdio; +use crate::util::asyncify; use crate::Error; use crate::{ioctl_set_winsz, Console}; diff --git a/crates/shim/src/asynchronous/publisher.rs b/crates/shim/src/asynchronous/publisher.rs index 7973e8d..fc68a32 100644 --- a/crates/shim/src/asynchronous/publisher.rs +++ b/crates/shim/src/asynchronous/publisher.rs @@ -26,9 +26,9 @@ use containerd_shim_protos::ttrpc; use containerd_shim_protos::ttrpc::context::Context; use containerd_shim_protos::ttrpc::r#async::TtrpcContext; -use crate::asynchronous::util::asyncify; use crate::error::Result; -use crate::util::{any, connect, timestamp}; +use crate::util::asyncify; +use crate::util::{connect, convert_to_any, timestamp}; /// Async Remote publisher connects to containerd's TTRPC endpoint to publish events from shim. pub struct RemotePublisher { @@ -67,13 +67,13 @@ impl RemotePublisher { ctx: Context, topic: &str, namespace: &str, - event: impl Message, + event: Box, ) -> Result<()> { let mut envelope = events::Envelope::new(); envelope.set_topic(topic.to_owned()); envelope.set_namespace(namespace.to_owned()); envelope.set_timestamp(timestamp()?); - envelope.set_event(any(event)?); + envelope.set_event(convert_to_any(event)?); let mut req = events::ForwardRequest::new(); req.set_envelope(envelope); @@ -163,7 +163,7 @@ mod tests { let mut msg = TaskOOM::new(); msg.set_container_id("test".to_string()); client - .publish(Context::default(), "/tasks/oom", "ns1", msg) + .publish(Context::default(), "/tasks/oom", "ns1", Box::new(msg)) .await .unwrap(); match rx.recv().await { diff --git a/crates/shim/src/asynchronous/task.rs b/crates/shim/src/asynchronous/task.rs index 49e774b..56382e5 100644 --- a/crates/shim/src/asynchronous/task.rs +++ b/crates/shim/src/asynchronous/task.rs @@ -18,12 +18,15 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use log::{debug, info}; +use log::{debug, info, warn}; +use tokio::sync::mpsc::Sender; use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard}; use containerd_shim_protos::api::DeleteResponse; -use containerd_shim_protos::protobuf::well_known_types::Timestamp; -use containerd_shim_protos::protobuf::SingularPtrField; +use containerd_shim_protos::events::task::{ + TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart, +}; +use containerd_shim_protos::protobuf::{Message, SingularPtrField}; use containerd_shim_protos::shim_async::Task; use containerd_shim_protos::ttrpc; use containerd_shim_protos::ttrpc::r#async::TtrpcContext; @@ -35,9 +38,12 @@ use crate::api::{ }; use crate::asynchronous::container::{Container, ContainerFactory}; use crate::asynchronous::ExitSignal; -use crate::util::AsOption; +use crate::event::Event; +use crate::util::{convert_to_timestamp, AsOption}; use crate::TtrpcResult; +type EventSender = Sender<(String, Box)>; + /// TaskService is a Task template struct, it is considered a helper struct, /// which has already implemented `Task` trait, so that users can make it the type `T` /// parameter of `Service`, and implements their own `ContainerFactory` and `Container`. @@ -46,18 +52,20 @@ pub struct TaskService { pub containers: Arc>>, pub namespace: String, pub exit: Arc, + pub tx: EventSender, } impl TaskService where F: Default, { - pub fn new(ns: &str, exit: Arc) -> Self { + pub fn new(ns: &str, exit: Arc, tx: EventSender) -> Self { Self { factory: Default::default(), containers: Arc::new(Mutex::new(Default::default())), namespace: ns.to_string(), exit, + tx, } } } @@ -74,6 +82,14 @@ impl TaskService { let container = MutexGuard::map(containers, |m| m.get_mut(id).unwrap()); Ok(container) } + + pub async fn send_event(&self, event: impl Event) { + let topic = event.topic(); + self.tx + .send((topic.to_string(), Box::new(event))) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + } } #[async_trait] @@ -104,9 +120,28 @@ where let container = self.factory.create(ns, &req).await?; let mut resp = CreateTaskResponse::new(); - resp.pid = container.pid().await as u32; + let pid = container.pid().await as u32; + resp.pid = pid; containers.insert(id.to_string(), container); + + self.send_event(TaskCreate { + container_id: req.id.to_string(), + bundle: req.bundle.to_string(), + rootfs: req.rootfs, + io: SingularPtrField::some(TaskIO { + stdin: req.stdin.to_string(), + stdout: req.stdout.to_string(), + stderr: req.stderr.to_string(), + terminal: req.terminal, + unknown_fields: Default::default(), + cached_size: Default::default(), + }), + checkpoint: req.checkpoint.to_string(), + pid, + ..Default::default() + }) + .await; info!("Create request for {} returns pid {}", id, resp.pid); Ok(resp) } @@ -118,6 +153,24 @@ where let mut resp = StartResponse::new(); resp.pid = pid as u32; + + if req.exec_id.is_empty() { + self.send_event(TaskStart { + container_id: req.id.to_string(), + pid: pid as u32, + ..Default::default() + }) + .await; + } else { + self.send_event(TaskExecStarted { + container_id: req.id.to_string(), + exec_id: req.exec_id.to_string(), + pid: pid as u32, + ..Default::default() + }) + .await; + }; + info!("Start request for {:?} returns pid {}", req, resp.get_pid()); Ok(resp) } @@ -131,16 +184,28 @@ where format!("can not find container by id {}", req.get_id()), )) })?; + let id = container.id().await; let exec_id_opt = req.get_exec_id().as_option(); let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?; self.factory.cleanup(&*self.namespace, container).await?; if req.get_exec_id().is_empty() { containers.remove(req.get_id()); } + + let ts = convert_to_timestamp(exited_at); + self.send_event(TaskDelete { + container_id: id, + pid: pid as u32, + exit_status: exit_status as u32, + exited_at: SingularPtrField::some(ts.clone()), + ..Default::default() + }) + .await; + let mut resp = DeleteResponse::new(); - resp.set_exited_at(exited_at); + resp.set_exited_at(ts); resp.set_pid(pid as u32); - resp.set_exit_status(exit_status); + resp.set_exit_status(exit_status as u32); info!( "Delete request for {} {} returns {:?}", req.get_id(), @@ -162,8 +227,17 @@ where async fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult { info!("Exec request for {:?}", req); + let exec_id = req.get_exec_id().to_string(); let mut container = self.get_container(req.get_id()).await?; container.exec(req).await?; + + self.send_event(TaskExecAdded { + container_id: container.id().await, + exec_id, + ..Default::default() + }) + .await; + Ok(Empty::new()) } @@ -202,12 +276,8 @@ where let container = self.get_container(req.get_id()).await?; let (_, code, exited_at) = container.get_exit_info(exec_id).await?; let mut resp = WaitResponse::new(); - resp.exit_status = code; - let mut ts = Timestamp::new(); - if let Some(ea) = exited_at { - ts.seconds = ea.unix_timestamp(); - ts.nanos = ea.nanosecond() as i32; - } + resp.exit_status = code as u32; + let ts = convert_to_timestamp(exited_at); resp.exited_at = SingularPtrField::some(ts); info!("Wait request for {:?} returns {:?}", req, &resp); Ok(resp) diff --git a/crates/shim/src/asynchronous/util.rs b/crates/shim/src/asynchronous/util.rs index ec1b7cc..39790c4 100644 --- a/crates/shim/src/asynchronous/util.rs +++ b/crates/shim/src/asynchronous/util.rs @@ -142,7 +142,7 @@ pub async fn mkdir(path: impl AsRef, mode: mode_t) -> Result<()> { #[cfg(test)] mod tests { - use crate::asynchronous::util::{read_file_to_str, write_str_to_file}; + use crate::util::{read_file_to_str, write_str_to_file}; #[tokio::test] async fn test_read_write_str() { diff --git a/crates/shim/src/event.rs b/crates/shim/src/event.rs new file mode 100644 index 0000000..12c6abb --- /dev/null +++ b/crates/shim/src/event.rs @@ -0,0 +1,66 @@ +use containerd_shim_protos::events::task::*; +use containerd_shim_protos::protobuf::Message; + +pub trait Event: Message { + fn topic(&self) -> String; +} + +impl Event for TaskCreate { + fn topic(&self) -> String { + "/tasks/create".to_string() + } +} + +impl Event for TaskStart { + fn topic(&self) -> String { + "/tasks/start".to_string() + } +} + +impl Event for TaskExecAdded { + fn topic(&self) -> String { + "/tasks/exec-added".to_string() + } +} + +impl Event for TaskExecStarted { + fn topic(&self) -> String { + "/tasks/exec-started".to_string() + } +} + +impl Event for TaskPaused { + fn topic(&self) -> String { + "/tasks/paused".to_string() + } +} + +impl Event for TaskResumed { + fn topic(&self) -> String { + "/tasks/resumed".to_string() + } +} + +impl Event for TaskExit { + fn topic(&self) -> String { + "/tasks/exit".to_string() + } +} + +impl Event for TaskDelete { + fn topic(&self) -> String { + "/tasks/delete".to_string() + } +} + +impl Event for TaskOOM { + fn topic(&self) -> String { + "/tasks/oom".to_string() + } +} + +impl Event for TaskCheckpointed { + fn topic(&self) -> String { + "/tasks/checkpointed".to_string() + } +} diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 22f39e5..732c0aa 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -56,6 +56,7 @@ pub mod error; mod args; #[cfg(feature = "async")] pub mod asynchronous; +pub mod event; pub mod io; mod logger; pub mod monitor; diff --git a/crates/shim/src/synchronous/mod.rs b/crates/shim/src/synchronous/mod.rs index 5a4af1e..0939bdc 100644 --- a/crates/shim/src/synchronous/mod.rs +++ b/crates/shim/src/synchronous/mod.rs @@ -103,15 +103,8 @@ pub trait Shim { /// - `runtime_id`: identifier of the container runtime. /// - `id`: identifier of the shim/container, passed in from Containerd. /// - `namespace`: namespace of the shim/container, passed in from Containerd. - /// - `publisher`: publisher to send events to Containerd. /// - `config`: for the shim to pass back configuration information - fn new( - runtime_id: &str, - id: &str, - namespace: &str, - publisher: RemotePublisher, - config: &mut Config, - ) -> Self; + fn new(runtime_id: &str, id: &str, namespace: &str, config: &mut Config) -> Self; /// Start shim will be called by containerd when launching new shim instance. /// @@ -128,7 +121,7 @@ pub trait Shim { fn wait(&mut self); /// Create the task service object. - fn create_task_service(&self) -> Self::T; + fn create_task_service(&self, publisher: RemotePublisher) -> Self::T; } /// Shim entry point that must be invoked from `main`. @@ -151,7 +144,6 @@ where let flags = args::parse(&os_args[1..])?; let ttrpc_address = env::var(TTRPC_ADDRESS)?; - let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; // Create shim instance let mut config = opts.unwrap_or_else(Config::default); @@ -163,13 +155,7 @@ where reap::set_subreaper()?; } - let mut shim = T::new( - runtime_id, - &flags.id, - &flags.namespace, - publisher, - &mut config, - ); + let mut shim = T::new(runtime_id, &flags.id, &flags.namespace, &mut config); match flags.action.as_str() { "start" => { @@ -205,7 +191,8 @@ where logger::init(flags.debug)?; } - let task = shim.create_task_service(); + let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; + let task = shim.create_task_service(publisher); let task_service = create_task(Arc::new(Box::new(task))); let mut server = Server::new().register_service(task_service); server = server.add_listener(SOCKET_FD)?; diff --git a/crates/shim/src/synchronous/publisher.rs b/crates/shim/src/synchronous/publisher.rs index a7d8104..681800d 100644 --- a/crates/shim/src/synchronous/publisher.rs +++ b/crates/shim/src/synchronous/publisher.rs @@ -18,15 +18,16 @@ use protobuf::Message; +use containerd_shim_protos as client; + use client::protobuf; use client::shim::events; use client::ttrpc::{self, context::Context}; use client::types::empty; use client::{Client, Events, EventsClient}; -use containerd_shim_protos as client; use crate::error::Result; -use crate::util::{any, connect, timestamp}; +use crate::util::{connect, convert_to_any, timestamp}; /// Remote publisher connects to containerd's TTRPC endpoint to publish events from shim. pub struct RemotePublisher { @@ -59,13 +60,13 @@ impl RemotePublisher { ctx: Context, topic: &str, namespace: &str, - event: impl Message, + event: Box, ) -> Result<()> { let mut envelope = events::Envelope::new(); envelope.set_topic(topic.to_owned()); envelope.set_namespace(namespace.to_owned()); envelope.set_timestamp(timestamp()?); - envelope.set_event(any(event)?); + envelope.set_event(convert_to_any(event)?); let mut req = events::ForwardRequest::new(); req.set_envelope(envelope); @@ -109,12 +110,6 @@ mod tests { } } - #[test] - fn test_timestamp() { - let ts = timestamp().unwrap(); - assert!(ts.seconds > 0); - } - #[test] fn test_connect() { let tmpdir = tempfile::tempdir().unwrap(); @@ -149,7 +144,7 @@ mod tests { let mut msg = TaskOOM::new(); msg.set_container_id("test".to_string()); client - .publish(Context::default(), "/tasks/oom", "ns1", msg) + .publish(Context::default(), "/tasks/oom", "ns1", Box::new(msg)) .unwrap(); barrier.wait(); diff --git a/crates/shim/src/util.rs b/crates/shim/src/util.rs index b7c7e05..5d0f154 100644 --- a/crates/shim/src/util.rs +++ b/crates/shim/src/util.rs @@ -19,6 +19,7 @@ use std::os::unix::io::RawFd; use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use crate::api::Options; #[cfg(feature = "async")] @@ -95,14 +96,6 @@ impl From for Options { } } -pub fn get_timestamp() -> Result { - let mut timestamp = Timestamp::new(); - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - timestamp.set_seconds(now.as_secs() as i64); - timestamp.set_nanos(now.subsec_nanos() as i32); - Ok(timestamp) -} - pub fn connect(address: impl AsRef) -> Result { use nix::sys::socket::*; use nix::unistd::close; @@ -148,10 +141,22 @@ pub fn timestamp() -> Result { Ok(ts) } -pub fn any(event: impl Message) -> Result { - let data = event.write_to_bytes()?; +pub fn convert_to_timestamp(exited_at: Option) -> Timestamp { + let mut ts = Timestamp::new(); + if let Some(ea) = exited_at { + ts.seconds = ea.unix_timestamp(); + ts.nanos = ea.nanosecond() as i32; + } + ts +} + +pub fn convert_to_any(obj: Box) -> Result { + let mut data = Vec::new(); + obj.write_to_vec(&mut data)?; + let mut any = Any::new(); - any.merge_from_bytes(&data)?; + any.set_value(data); + any.set_type_url(obj.descriptor().full_name().to_string()); Ok(any) } @@ -194,3 +199,14 @@ impl AsOption for str { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp() { + let ts = timestamp().unwrap(); + assert!(ts.seconds > 0); + } +}