Add shim exit signal

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-07-28 11:43:22 -07:00
parent 2120de8ba8
commit 9629eac9ec
3 changed files with 120 additions and 31 deletions

View File

@ -10,37 +10,46 @@ API offered by containerd's shim v2 runtime implementation written in Go.
The API is very similar to the one offered by Go version:
```rust
struct Service;
struct Service {
exit: shim::ExitSignal,
}
impl shim::Shim for Service {
fn new(_id: &str, _namespace: &str, _config: &mut shim::Config) -> Self {
Service {}
fn new(
_id: &str,
_namespace: &str,
_publisher: shim::RemotePublisher,
_config: &mut shim::Config,
exit: shim::ExitSignal,
) -> Self {
Service { exit }
}
fn start_shim(&mut self, opts: StartOpts) -> Result<String, Box<dyn Error>> {
let address = shim::spawn(opts)?;
Ok(address)
}
fn delete_shim(&mut self) -> Result<api::DeleteResponse, Box<dyn Error>> {
todo!()
fn start_shim(&mut self, _opts: shim::StartOpts) -> Result<String, Box<dyn Error>> {
Ok("Socket address here".into())
}
}
impl shim::Task for Service {
fn create(
&self,
ctx: &TtrpcContext,
req: api::CreateTaskRequest,
) -> ::ttrpc::Result<api::CreateTaskResponse> {
debug!("Create");
_ctx: &TtrpcContext,
_req: api::CreateTaskRequest,
) -> TtrpcResult<api::CreateTaskResponse> {
// New task nere...
Ok(api::CreateTaskResponse::default())
}
fn shutdown(&self, _ctx: &TtrpcContext, _req: api::ShutdownRequest) -> TtrpcResult<api::Empty> {
self.exit.signal(); // Signal to shutdown shim server
Ok(api::Empty::default())
}
}
fn main() {
shim::run::<Service>("io.containerd.empty.v1")
}
```
## How to use

View File

@ -1,9 +1,12 @@
use containerd_shim as shim;
use shim::{api, TtrpcContext, TtrpcResult};
use log::info;
use shim::{api, TtrpcContext, TtrpcResult};
use std::error::Error;
struct Service;
struct Service {
exit: shim::ExitSignal,
}
impl shim::Shim for Service {
fn new(
@ -11,8 +14,13 @@ impl shim::Shim for Service {
_namespace: &str,
_publisher: shim::RemotePublisher,
_config: &mut shim::Config,
exit: shim::ExitSignal,
) -> Self {
Service {}
Service { exit }
}
fn start_shim(&mut self, _opts: shim::StartOpts) -> Result<String, Box<dyn Error>> {
Ok("Socket address here".into())
}
}
@ -25,6 +33,11 @@ impl shim::Task for Service {
info!("Create");
Ok(api::CreateTaskResponse::default())
}
fn shutdown(&self, _ctx: &TtrpcContext, _req: api::ShutdownRequest) -> TtrpcResult<api::Empty> {
self.exit.signal();
Ok(api::Empty::default())
}
}
fn main() {

View File

@ -5,11 +5,13 @@ use std::hash::Hasher;
use std::io::{self, Write};
use std::path::PathBuf;
use std::process::{self, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time;
pub use containerd_shim_protos as protos;
use protos::protobuf::Message;
use protos::shim::{shim::DeleteResponse, shim_ttrpc::create_task};
use protos::ttrpc::Server;
@ -23,7 +25,11 @@ mod reap;
pub use publisher::RemotePublisher;
pub use protos::shim::shim as api;
pub mod api {
pub use super::protos::shim::empty::Empty;
pub use super::protos::shim::shim::*;
}
pub use protos::shim::shim_ttrpc::Task;
pub use protos::ttrpc;
@ -39,6 +45,8 @@ pub struct Config {
pub no_sub_reaper: bool,
}
/// Startup options received from containerd to start new shim instance.
/// These will be passed via [`Shim::start_shim`] to shim.
#[derive(Debug, Default)]
pub struct StartOpts {
/// ID of the container.
@ -53,17 +61,49 @@ pub struct StartOpts {
pub namespace: String,
}
/// Shim interface that must be implemented by clients.
pub trait Shim: Task {
fn new(id: &str, namespace: &str, publisher: RemotePublisher, config: &mut Config) -> Self;
/// Helper structure that wraps atomic bool to signal shim server when to shutdown the TTRPC server.
/// Shim implementations are responsible for calling [`Self::signal`].
#[derive(Clone)]
pub struct ExitSignal(Arc<AtomicBool>);
/// Launch new shim.
/// See https://github.com/containerd/containerd/tree/master/runtime/v2#start
fn start_shim(&mut self, opts: StartOpts) -> Result<String, Box<dyn error::Error>> {
let address = spawn(opts)?;
Ok(address)
impl Default for ExitSignal {
fn default() -> Self {
ExitSignal(Arc::new(AtomicBool::new(false)))
}
}
impl ExitSignal {
/// Set exit signal to shutdown shim server.
pub fn signal(&self) {
self.0.store(true, Ordering::Release)
}
/// Wait for the exit signal to be set.
fn wait(&self) {
while !self.0.load(Ordering::Acquire) {
std::hint::spin_loop();
}
}
}
/// Main shim interface that must be implemented by all shims.
/// Start and delete routines will be called to handle containerd's shim lifecycle requests.
pub trait Shim: Task {
fn new(
id: &str,
namespace: &str,
publisher: RemotePublisher,
config: &mut Config,
exit: ExitSignal,
) -> Self;
/// Start shim will be called by containerd when launching new shim instance.
/// It expected to return TTRPC address containerd daemon can use to communicate with
/// the given shim instance.
/// See https://github.com/containerd/containerd/tree/master/runtime/v2#start
fn start_shim(&mut self, opts: StartOpts) -> Result<String, Box<dyn error::Error>>;
/// Delete shim will be called by containerd after shim shutdown to cleanup any leftovers.
fn delete_shim(&mut self) -> Result<DeleteResponse, Box<dyn error::Error>> {
Ok(DeleteResponse::default())
}
@ -91,8 +131,15 @@ where
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
// Create shim instance
let exit_signal = ExitSignal::default();
let mut config = Config::default();
let mut shim = T::new(id, &flags.namespace, publisher, &mut config);
let mut shim = T::new(
id,
&flags.namespace,
publisher,
&mut config,
exit_signal.clone(),
);
if !config.no_sub_reaper {
reap::set_subreaper()?;
@ -135,8 +182,7 @@ where
server.start()?;
// TODO: define exit criteria here.
std::thread::sleep(std::time::Duration::from_secs(360));
exit_signal.wait();
server.shutdown();
@ -208,9 +254,30 @@ pub fn spawn(opts: StartOpts) -> Result<String, Error> {
])
.spawn()?;
// This is temp HACK.
// Give TTRPC server some time to initialize.
// TODO: This is hack: give TTRPC server some time to initialize. Need to pass fd instead.
thread::sleep(time::Duration::from_secs(2));
Ok(socket_address)
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn exit_signal() {
let signal = ExitSignal::default();
let cloned = signal.clone();
let handle = thread::spawn(move || {
cloned.signal();
});
signal.wait();
if let Err(err) = handle.join() {
panic!("{:?}", err);
}
}
}