diff --git a/crates/runc-shim/src/asynchronous/mod.rs b/crates/runc-shim/src/asynchronous/mod.rs index be80642..b6b28c0 100644 --- a/crates/runc-shim/src/asynchronous/mod.rs +++ b/crates/runc-shim/src/asynchronous/mod.rs @@ -28,7 +28,7 @@ use containerd_shim::asynchronous::monitor::{ use containerd_shim::asynchronous::processes::Process; use containerd_shim::asynchronous::publisher::RemotePublisher; use containerd_shim::asynchronous::task::TaskService; -use containerd_shim::asynchronous::utils::{ +use containerd_shim::asynchronous::util::{ read_options, read_runtime, read_spec, write_str_to_file, }; use containerd_shim::asynchronous::{spawn, ExitSignal, Shim}; @@ -39,8 +39,8 @@ use containerd_shim::Error; use containerd_shim::{io_error, Config, DeleteResponse, StartOpts}; use crate::asynchronous::runc::{RuncContainer, RuncFactory}; -use crate::runc::{create_runc, ShimExecutor}; -use crate::service::GROUP_LABELS; +use crate::common::create_runc; +use crate::common::{ShimExecutor, GROUP_LABELS}; mod runc; diff --git a/crates/runc-shim/src/asynchronous/runc.rs b/crates/runc-shim/src/asynchronous/runc.rs index b1c7e6a..a2f8f4e 100644 --- a/crates/runc-shim/src/asynchronous/runc.rs +++ b/crates/runc-shim/src/asynchronous/runc.rs @@ -37,21 +37,22 @@ use containerd_shim::asynchronous::monitor::{ monitor_subscribe, monitor_unsubscribe, Subscription, }; use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate}; -use containerd_shim::asynchronous::utils::{ +use containerd_shim::asynchronous::util::{ asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime, }; -use containerd_shim::console::Console; use containerd_shim::io::Stdio; use containerd_shim::monitor::{ExitEvent, Subject, Topic}; use containerd_shim::protos::protobuf::{CodedInputStream, Message}; +use containerd_shim::Console; use containerd_shim::{io_error, other, Error}; use containerd_shim::{other_error, Result}; use runc::{Command, Executor, Runc}; -use crate::console::receive_socket; -use crate::io::{create_io, ProcessIO}; -use crate::runc::{ - check_kill_error, create_runc, get_spec_from_request, CreateConfig, ShimExecutor, INIT_PID_FILE, +use crate::common::receive_socket; +use crate::common::CreateConfig; +use crate::common::{ + check_kill_error, create_io, create_runc, get_spec_from_request, ProcessIO, ShimExecutor, + INIT_PID_FILE, }; pub type ExecProcess = ProcessTemplate; diff --git a/crates/runc-shim/src/common.rs b/crates/runc-shim/src/common.rs new file mode 100644 index 0000000..a83ed35 --- /dev/null +++ b/crates/runc-shim/src/common.rs @@ -0,0 +1,198 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::os::unix::io::RawFd; +use std::path::Path; +use std::sync::Arc; + +use log::{debug, warn}; +use nix::cmsg_space; +use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}; +use nix::sys::termios::tcgetattr; +use nix::sys::uio::IoVec; + +use containerd_shim::api::{ExecProcessRequest, Options}; +use containerd_shim::io::Stdio; +use containerd_shim::util::IntoOption; +use containerd_shim::{io_error, other, other_error, Error}; +use runc::io::{Io, NullIo, FIFO}; +use runc::options::GlobalOpts; +use runc::Runc; + +pub const GROUP_LABELS: [&str; 2] = [ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +]; +pub const INIT_PID_FILE: &str = "init.pid"; + +pub struct ProcessIO { + pub uri: Option, + pub io: Option>, + pub copy: bool, +} + +pub fn create_io( + id: &str, + _io_uid: u32, + _io_gid: u32, + stdio: &Stdio, +) -> containerd_shim::Result { + if stdio.is_null() { + let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?; + let pio = ProcessIO { + uri: None, + io: Some(Arc::new(nio)), + copy: false, + }; + return Ok(pio); + } + let stdout = stdio.stdout.as_str(); + let scheme_path = stdout.trim().split("://").collect::>(); + let scheme: &str; + let uri: String; + if scheme_path.len() <= 1 { + // no scheme specified + // default schema to fifo + uri = format!("fifo://{}", stdout); + scheme = "fifo" + } else { + uri = stdout.to_string(); + scheme = scheme_path[0]; + } + + let mut pio = ProcessIO { + uri: Some(uri), + io: None, + copy: false, + }; + + if scheme == "fifo" { + debug!( + "create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", + id, + stdio.stdin.as_str(), + stdio.stdout.as_str(), + stdio.stderr.as_str() + ); + let io = FIFO { + stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), + stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), + stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), + }; + pio.io = Some(Arc::new(io)); + pio.copy = false; + } + Ok(pio) +} + +#[derive(Clone)] +pub struct ShimExecutor {} + +pub fn get_spec_from_request( + req: &ExecProcessRequest, +) -> containerd_shim::Result { + if let Some(val) = req.spec.as_ref() { + let mut p = serde_json::from_slice::(val.get_value())?; + p.set_terminal(Some(req.terminal)); + Ok(p) + } else { + Err(Error::InvalidArgument("no spec in request".to_string())) + } +} + +pub fn check_kill_error(emsg: String) -> Error { + let emsg = emsg.to_lowercase(); + if emsg.contains("process already finished") + || emsg.contains("container not running") + || emsg.contains("no such process") + { + Error::NotFoundError("process already finished".to_string()) + } else if emsg.contains("does not exist") { + Error::NotFoundError("no such container".to_string()) + } else { + other!("unknown error after kill {}", emsg) + } +} + +const DEFAULT_RUNC_ROOT: &str = "/run/containerd/runc"; +const DEFAULT_COMMAND: &str = "runc"; + +pub fn create_runc( + runtime: &str, + namespace: &str, + bundle: impl AsRef, + opts: &Options, + executor: F, +) -> containerd_shim::Result> { + let runtime = if runtime.is_empty() { + DEFAULT_COMMAND + } else { + runtime + }; + let root = opts.root.as_str(); + let root = Path::new(if root.is_empty() { + DEFAULT_RUNC_ROOT + } else { + root + }) + .join(namespace); + + let log = bundle.as_ref().join("log.json"); + GlobalOpts::default() + .command(runtime) + .root(root) + .log(log) + .log_json() + .systemd_cgroup(opts.systemd_cgroup) + .build_with_executor(executor) + .map_err(other_error!(e, "unable to create runc instance")) +} + +#[derive(Default)] +pub(crate) struct CreateConfig {} + +pub fn receive_socket(stream_fd: RawFd) -> containerd_shim::Result { + let mut buf = [0u8; 4096]; + let iovec = [IoVec::from_mut_slice(&mut buf)]; + let mut space = cmsg_space!([RawFd; 2]); + let (path, fds) = match recvmsg(stream_fd, &iovec, Some(&mut space), MsgFlags::empty()) { + Ok(msg) => { + let mut iter = msg.cmsgs(); + if let Some(ControlMessageOwned::ScmRights(fds)) = iter.next() { + (iovec[0].as_slice(), fds) + } else { + return Err(other!("received message is empty")); + } + } + Err(e) => { + return Err(other!("failed to receive message: {}", e)); + } + }; + if fds.is_empty() { + return Err(other!("received message is empty")); + } + let path = String::from_utf8(Vec::from(path)).unwrap_or_else(|e| { + warn!("failed to get path from array {}", e); + "".to_string() + }); + let path = path.trim_matches(char::from(0)); + debug!( + "copy_console: console socket get path: {}, fd: {}", + path, &fds[0] + ); + tcgetattr(fds[0])?; + Ok(fds[0]) +} diff --git a/crates/runc-shim/src/console.rs b/crates/runc-shim/src/console.rs deleted file mode 100644 index 9d05862..0000000 --- a/crates/runc-shim/src/console.rs +++ /dev/null @@ -1,59 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -use std::os::unix::io::RawFd; - -use log::{debug, warn}; -use nix::cmsg_space; -use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}; -use nix::sys::termios::tcgetattr; -use nix::sys::uio::IoVec; - -use containerd_shim::other; -use containerd_shim::Error; - -pub fn receive_socket(stream_fd: RawFd) -> containerd_shim::Result { - let mut buf = [0u8; 4096]; - let iovec = [IoVec::from_mut_slice(&mut buf)]; - let mut space = cmsg_space!([RawFd; 2]); - let (path, fds) = match recvmsg(stream_fd, &iovec, Some(&mut space), MsgFlags::empty()) { - Ok(msg) => { - let mut iter = msg.cmsgs(); - if let Some(ControlMessageOwned::ScmRights(fds)) = iter.next() { - (iovec[0].as_slice(), fds) - } else { - return Err(other!("received message is empty")); - } - } - Err(e) => { - return Err(other!("failed to receive message: {}", e)); - } - }; - if fds.is_empty() { - return Err(other!("received message is empty")); - } - let path = String::from_utf8(Vec::from(path)).unwrap_or_else(|e| { - warn!("failed to get path from array {}", e); - "".to_string() - }); - let path = path.trim_matches(char::from(0)); - debug!( - "copy_console: console socket get path: {}, fd: {}", - path, &fds[0] - ); - tcgetattr(fds[0])?; - Ok(fds[0]) -} diff --git a/crates/runc-shim/src/main.rs b/crates/runc-shim/src/main.rs index e3085f9..16bae39 100644 --- a/crates/runc-shim/src/main.rs +++ b/crates/runc-shim/src/main.rs @@ -14,20 +14,15 @@ limitations under the License. */ -mod cgroup; -mod container; -mod io; -mod runc; -mod service; -mod task; - #[cfg(feature = "async")] mod asynchronous; -mod console; +mod common; +#[cfg(not(feature = "async"))] +mod synchronous; #[cfg(not(feature = "async"))] fn main() { - containerd_shim::run::("io.containerd.runc.v2", None) + containerd_shim::run::("io.containerd.runc.v2", None) } #[cfg(feature = "async")] diff --git a/crates/runc-shim/src/cgroup.rs b/crates/runc-shim/src/synchronous/cgroup.rs similarity index 99% rename from crates/runc-shim/src/cgroup.rs rename to crates/runc-shim/src/synchronous/cgroup.rs index 2060104..97928f7 100644 --- a/crates/runc-shim/src/cgroup.rs +++ b/crates/runc-shim/src/synchronous/cgroup.rs @@ -20,17 +20,17 @@ use std::fs; use std::io::Read; use std::path::Path; -use containerd_shim as shim; - use cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid; use cgroups_rs::{hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem}; +use oci_spec::runtime::LinuxResources; + +use containerd_shim as shim; use containerd_shim::api::Options; use containerd_shim::protos::cgroups::metrics::{ CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics, }; use containerd_shim::protos::protobuf::well_known_types::Any; use containerd_shim::protos::protobuf::Message; -use oci_spec::runtime::LinuxResources; use shim::error::{Error, Result}; use shim::{io_error, other_error}; @@ -223,10 +223,11 @@ pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> { #[cfg(test)] mod tests { - use crate::cgroup::{ + use cgroups_rs::{hierarchies, Cgroup, CgroupPid}; + + use crate::synchronous::cgroup::{ add_task_to_cgroup, adjust_oom_score, read_process_oom_score, OOM_SCORE_ADJ_MAX, }; - use cgroups_rs::{hierarchies, Cgroup, CgroupPid}; #[test] fn test_add_cgroup() { diff --git a/crates/runc-shim/src/container.rs b/crates/runc-shim/src/synchronous/container.rs similarity index 96% rename from crates/runc-shim/src/container.rs rename to crates/runc-shim/src/synchronous/container.rs index 1439757..4e37dec 100644 --- a/crates/runc-shim/src/container.rs +++ b/crates/runc-shim/src/synchronous/container.rs @@ -26,17 +26,20 @@ use oci_spec::runtime::LinuxResources; use time::OffsetDateTime; use containerd_shim as shim; -use containerd_shim::console::{ioctl_set_winsz, Console, ConsoleSocket}; -use containerd_shim::io::Stdio; use shim::api::*; +use shim::console::ConsoleSocket; use shim::error::{Error, Result}; +use shim::io::Stdio; +use shim::ioctl_set_winsz; use shim::protos::cgroups::metrics::Metrics; use shim::protos::protobuf::well_known_types::Timestamp; use shim::util::read_pid_from_file; +use shim::Console; use shim::{io_error, other, other_error}; -use crate::console::receive_socket; -use crate::io::{spawn_copy, ProcessIO}; +use crate::common::receive_socket; +use crate::common::ProcessIO; +use crate::synchronous::io::spawn_copy; pub trait ContainerFactory { fn create(&self, ns: &str, req: &CreateTaskRequest) -> Result; @@ -65,12 +68,7 @@ pub trait Process { pub trait Container { fn start(&mut self, exec_id: Option<&str>) -> Result; fn state(&self, exec_id: Option<&str>) -> Result; - fn kill( - &mut self, - exec_id: Option<&str>, - signal: u32, - all: bool, - ) -> containerd_shim::Result<()>; + fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()>; fn wait_channel(&mut self, exec_id: Option<&str>) -> Result>; fn get_exit_info(&self, exec_id: Option<&str>) -> Result<(i32, i32, Option)>; fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>; diff --git a/crates/runc-shim/src/io.rs b/crates/runc-shim/src/synchronous/io.rs similarity index 69% rename from crates/runc-shim/src/io.rs rename to crates/runc-shim/src/synchronous/io.rs index 39e1f70..f5a15dd 100644 --- a/crates/runc-shim/src/io.rs +++ b/crates/runc-shim/src/synchronous/io.rs @@ -14,22 +14,20 @@ limitations under the License. */ -#[cfg(not(feature = "async"))] use std::fs::OpenOptions; use std::io::{Read, Write}; -use std::sync::Arc; use std::thread::JoinHandle; use crossbeam::sync::WaitGroup; use log::debug; use containerd_shim::io::Stdio; -use containerd_shim::util::IntoOption; use containerd_shim::{ error::{Error, Result}, io_error, }; -use runc::io::{Io, NullIo, FIFO}; + +use crate::common::ProcessIO; pub fn spawn_copy( mut from: R, @@ -51,14 +49,7 @@ pub fn spawn_copy( }) } -pub struct ProcessIO { - pub uri: Option, - pub io: Option>, - pub copy: bool, -} - impl ProcessIO { - #[cfg(not(feature = "async"))] pub fn copy(&self, stdio: &Stdio) -> Result { let wg = WaitGroup::new(); if !self.copy { @@ -127,58 +118,4 @@ impl ProcessIO { Ok(wg) } - - #[cfg(feature = "async")] - pub fn copy(&self, _stdio: &Stdio) -> Result { - unimplemented!() - } -} - -pub fn create_io(id: &str, _io_uid: u32, _io_gid: u32, stdio: &Stdio) -> Result { - if stdio.is_null() { - let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?; - let pio = ProcessIO { - uri: None, - io: Some(Arc::new(nio)), - copy: false, - }; - return Ok(pio); - } - let stdout = stdio.stdout.as_str(); - let scheme_path = stdout.trim().split("://").collect::>(); - let scheme: &str; - let uri: String; - if scheme_path.len() <= 1 { - // no scheme specified - // default schema to fifo - uri = format!("fifo://{}", stdout); - scheme = "fifo" - } else { - uri = stdout.to_string(); - scheme = scheme_path[0]; - } - - let mut pio = ProcessIO { - uri: Some(uri), - io: None, - copy: false, - }; - - if scheme == "fifo" { - debug!( - "create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}", - id, - stdio.stdin.as_str(), - stdio.stdout.as_str(), - stdio.stderr.as_str() - ); - let io = FIFO { - stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()), - stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()), - stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()), - }; - pio.io = Some(Arc::new(io)); - pio.copy = false; - } - Ok(pio) } diff --git a/crates/runc-shim/src/synchronous/mod.rs b/crates/runc-shim/src/synchronous/mod.rs new file mode 100644 index 0000000..1667b46 --- /dev/null +++ b/crates/runc-shim/src/synchronous/mod.rs @@ -0,0 +1,32 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::sync::Arc; + +use containerd_shim::ExitSignal; + +mod cgroup; +mod container; +mod io; +mod runc; +mod service; +mod task; + +pub(crate) struct Service { + exit: Arc, + id: String, + namespace: String, +} diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/synchronous/runc.rs similarity index 84% rename from crates/runc-shim/src/runc.rs rename to crates/runc-shim/src/synchronous/runc.rs index 0f70a11..2c1fed6 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/synchronous/runc.rs @@ -29,37 +29,33 @@ use oci_spec::runtime::{Linux, LinuxNamespaceType, LinuxResources, Spec}; use time::OffsetDateTime; use containerd_shim as shim; -use containerd_shim::console::{Console, ConsoleSocket}; -use containerd_shim::io::Stdio; use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts}; use runc::{DefaultExecutor, Runc}; use shim::api::*; +use shim::console::ConsoleSocket; use shim::error::{Error, Result}; +use shim::io::Stdio; use shim::mount::mount_rootfs; use shim::protos::api::ProcessInfo; use shim::protos::cgroups::metrics::Metrics; use shim::protos::protobuf::well_known_types::{Any, Timestamp}; use shim::protos::protobuf::{CodedInputStream, Message, RepeatedField}; use shim::protos::shim::oci::ProcessDetails; -use shim::util::{ - new_temp_console_socket, read_spec_from_file, write_options, write_runtime, IntoOption, - JsonOptions, -}; +#[cfg(not(feature = "async"))] +use shim::util::{new_temp_console_socket, read_spec_from_file, write_options, write_runtime}; +use shim::util::{IntoOption, JsonOptions}; +use shim::Console; use shim::{debug, error, io_error, other, other_error}; -use crate::container::{CommonContainer, CommonProcess, Container, ContainerFactory, Process}; -use crate::io::create_io; - -pub const DEFAULT_RUNC_ROOT: &str = "/run/containerd/runc"; -pub const INIT_PID_FILE: &str = "init.pid"; -const DEFAULT_COMMAND: &str = "runc"; +use crate::common; +use crate::common::{create_io, CreateConfig, INIT_PID_FILE}; +use crate::synchronous::container::{ + CommonContainer, CommonProcess, Container, ContainerFactory, Process, +}; #[derive(Clone, Default)] pub(crate) struct RuncFactory {} -#[derive(Clone)] -pub struct ShimExecutor {} - impl ContainerFactory for RuncFactory { fn create(&self, ns: &str, req: &CreateTaskRequest) -> Result { let bundle = req.bundle.as_str(); @@ -95,7 +91,7 @@ impl ContainerFactory for RuncFactory { mount_rootfs(mount_type, source, &m.options.to_vec(), rootfs)?; } - let runc = create_runc(runtime, ns, bundle, &opts, DefaultExecutor {})?; + let runc = common::create_runc(runtime, ns, bundle, &opts, DefaultExecutor {})?; let id = req.get_id(); let stdio = Stdio { @@ -142,7 +138,6 @@ pub(crate) struct RuncContainer { } impl Container for RuncContainer { - #[cfg(not(feature = "async"))] fn start(&mut self, exec_id: Option<&str>) -> Result { match exec_id { Some(exec_id) => { @@ -213,22 +208,16 @@ impl Container for RuncContainer { } } - #[cfg(feature = "async")] - fn start(&mut self, exec_id: Option<&str>) -> Result { - Err(Error::Unimplemented("start".to_string())) - } - fn state(&self, exec_id: Option<&str>) -> Result { self.common.state(exec_id) } - #[cfg(not(feature = "async"))] fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()> { match exec_id { Some(_) => { let p = self.common.get_mut_process(exec_id)?; kill_process(p.pid() as u32, p.exited_at(), signal) - .map_err(|e| check_kill_error(format!("{}", e))) + .map_err(|e| common::check_kill_error(format!("{}", e))) } None => self .common @@ -239,15 +228,10 @@ impl Container for RuncContainer { signal, Some(&runc::options::KillOpts { all }), ) - .map_err(|e| check_kill_error(format!("{}", e))), + .map_err(|e| common::check_kill_error(format!("{}", e))), } } - #[cfg(feature = "async")] - fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()> { - Err(Error::Unimplemented("kill".to_string())) - } - fn wait_channel(&mut self, exec_id: Option<&str>) -> Result> { self.common.wait_channel(exec_id) } @@ -256,7 +240,6 @@ impl Container for RuncContainer { self.common.get_exit_info(exec_id) } - #[cfg(not(feature = "async"))] fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> { let (pid, code, exit_at) = self .get_exit_info(exec_id_opt) @@ -292,23 +275,12 @@ impl Container for RuncContainer { Ok((pid, code as u32, time_stamp)) } - #[cfg(feature = "async")] - fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> { - Err(Error::Unimplemented("delete".to_string())) - } - - #[cfg(not(feature = "async"))] fn exec(&mut self, req: ExecProcessRequest) -> Result<()> { self.common .exec(req) .map_err(other_error!(e, "failed exec")) } - #[cfg(feature = "async")] - fn exec(&mut self, req: ExecProcessRequest) -> Result<()> { - Err(Error::Unimplemented("exec".to_string())) - } - fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()> { self.common .resize_pty(exec_id, height, width) @@ -322,7 +294,7 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn stats(&self) -> Result { let pid = self.common.init.pid() as u32; - crate::cgroup::collect_metrics(pid) + crate::synchronous::cgroup::collect_metrics(pid) } #[cfg(not(target_os = "linux"))] @@ -333,7 +305,7 @@ impl Container for RuncContainer { #[cfg(target_os = "linux")] fn update(&mut self, resources: &LinuxResources) -> Result<()> { let pid = self.common.init.pid() as u32; - crate::cgroup::update_metrics(pid, resources) + crate::synchronous::cgroup::update_metrics(pid, resources) } #[cfg(not(target_os = "linux"))] @@ -341,12 +313,6 @@ impl Container for RuncContainer { Err(Error::Unimplemented("update".to_string())) } - #[cfg(feature = "async")] - fn pids(&self) -> Result { - Err(Error::Unimplemented("pids".to_string())) - } - - #[cfg(not(feature = "async"))] fn pids(&self) -> Result { let pids = self .common @@ -468,7 +434,7 @@ impl InitProcess { criu_work_path: "".to_string(), } } - #[cfg(not(feature = "async"))] + pub fn create(&mut self, _conf: &CreateConfig) -> Result<()> { //TODO checkpoint support let id = self.common.id.to_string(); @@ -510,10 +476,6 @@ impl InitProcess { self.common.set_pid_from_file(pid_path.as_path())?; Ok(()) } - #[cfg(feature = "async")] - pub fn create(&mut self, _conf: &CreateConfig) -> Result<()> { - unimplemented!() - } } impl Process for InitProcess { @@ -664,7 +626,7 @@ impl Process for ExecProcess { impl TryFrom for ExecProcess { type Error = Error; fn try_from(req: ExecProcessRequest) -> std::result::Result { - let p = get_spec_from_request(&req)?; + let p = common::get_spec_from_request(&req)?; let exec_process = ExecProcess { common: CommonProcess { state: Status::CREATED, @@ -687,61 +649,3 @@ impl TryFrom for ExecProcess { Ok(exec_process) } } - -pub fn get_spec_from_request(req: &ExecProcessRequest) -> Result { - if let Some(val) = req.spec.as_ref() { - let mut p = serde_json::from_slice::(val.get_value())?; - p.set_terminal(Some(req.terminal)); - Ok(p) - } else { - Err(Error::InvalidArgument("no spec in request".to_string())) - } -} - -#[derive(Default)] -pub(crate) struct CreateConfig {} - -pub fn check_kill_error(emsg: String) -> Error { - let emsg = emsg.to_lowercase(); - if emsg.contains("process already finished") - || emsg.contains("container not running") - || emsg.contains("no such process") - { - Error::NotFoundError("process already finished".to_string()) - } else if emsg.contains("does not exist") { - Error::NotFoundError("no such container".to_string()) - } else { - other!("unknown error after kill {}", emsg) - } -} - -pub fn create_runc( - runtime: &str, - namespace: &str, - bundle: impl AsRef, - opts: &Options, - executor: F, -) -> Result> { - let runtime = if runtime.is_empty() { - DEFAULT_COMMAND - } else { - runtime - }; - let root = opts.root.as_str(); - let root = Path::new(if root.is_empty() { - DEFAULT_RUNC_ROOT - } else { - root - }) - .join(namespace); - - let log = bundle.as_ref().join("log.json"); - GlobalOpts::default() - .command(runtime) - .root(root) - .log(log) - .log_json() - .systemd_cgroup(opts.systemd_cgroup) - .build_with_executor(executor) - .map_err(other_error!(e, "unable to create runc instance")) -} diff --git a/crates/runc-shim/src/service.rs b/crates/runc-shim/src/synchronous/service.rs similarity index 86% rename from crates/runc-shim/src/service.rs rename to crates/runc-shim/src/synchronous/service.rs index 17d16e8..0375322 100644 --- a/crates/runc-shim/src/service.rs +++ b/crates/runc-shim/src/synchronous/service.rs @@ -21,30 +21,23 @@ use std::path::Path; use std::sync::Arc; use containerd_shim as shim; +use containerd_shim::util::{read_options, read_runtime, read_spec_from_file, write_address}; use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND}; use runc::DefaultExecutor; use shim::api::*; use shim::error::{Error, Result}; use shim::monitor::{monitor_subscribe, Subject, Subscription, Topic}; use shim::protos::protobuf::SingularPtrField; -use shim::util::{get_timestamp, read_options, read_runtime, read_spec_from_file, write_address}; +use shim::publisher::RemotePublisher; +use shim::util::get_timestamp; use shim::{debug, error, io_error, other_error, warn}; -use shim::{spawn, Config, ExitSignal, RemotePublisher, Shim, StartOpts}; +use shim::{spawn, Config, ExitSignal, Shim, StartOpts}; -use crate::container::{Container, Process}; -use crate::runc::{create_runc, RuncContainer, RuncFactory, DEFAULT_RUNC_ROOT}; -use crate::task::ShimTask; - -pub const GROUP_LABELS: [&str; 2] = [ - "io.containerd.runc.v2.group", - "io.kubernetes.cri.sandbox-id", -]; - -pub(crate) struct Service { - exit: Arc, - id: String, - namespace: String, -} +use crate::common::{create_runc, GROUP_LABELS}; +use crate::synchronous::container::{Container, Process}; +use crate::synchronous::runc::{RuncContainer, RuncFactory}; +use crate::synchronous::task::ShimTask; +use crate::synchronous::Service; impl Shim for Service { type T = ShimTask; @@ -65,7 +58,7 @@ impl Shim for Service { } } - fn start_shim(&mut self, opts: StartOpts) -> Result { + fn start_shim(&mut self, opts: StartOpts) -> containerd_shim::Result { let mut grouping = opts.id.clone(); let spec = read_spec_from_file("")?; match spec.annotations() { @@ -83,14 +76,14 @@ impl Shim for Service { let (child_id, address) = spawn(opts, &grouping, Vec::new())?; #[cfg(target_os = "linux")] - crate::cgroup::set_cgroup_and_oom_score(child_id)?; + crate::synchronous::cgroup::set_cgroup_and_oom_score(child_id)?; write_address(&address)?; Ok(address) } #[cfg(not(feature = "async"))] - fn delete_shim(&mut self) -> Result { + fn delete_shim(&mut self) -> containerd_shim::Result { let namespace = self.namespace.as_str(); let bundle = current_dir().map_err(io_error!(e, "get current dir"))?; let opts = read_options(&bundle)?; @@ -107,7 +100,7 @@ impl Shim for Service { } #[cfg(feature = "async")] - fn delete_shim(&mut self) -> Result { + fn delete_shim(&mut self) -> containerd_shim::Result { Err(Error::Unimplemented("delete shim".to_string())) } diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/synchronous/task.rs similarity index 99% rename from crates/runc-shim/src/task.rs rename to crates/runc-shim/src/synchronous/task.rs index c0fdffc..626a7f5 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/synchronous/task.rs @@ -30,7 +30,7 @@ use shim::Task; use shim::{api::*, ExitSignal}; use shim::{TtrpcContext, TtrpcResult}; -use crate::container::{Container, ContainerFactory}; +use crate::synchronous::container::{Container, ContainerFactory}; pub struct ShimTask { pub containers: Arc>>, diff --git a/crates/shim/examples/publish.rs b/crates/shim/examples/publish.rs index b810573..8f7a85c 100644 --- a/crates/shim/examples/publish.rs +++ b/crates/shim/examples/publish.rs @@ -13,12 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. */ - use std::env; -use containerd_shim::{Context, RemotePublisher}; +use containerd_shim::publisher::RemotePublisher; +use containerd_shim::Context; use containerd_shim_protos::events::task::TaskOOM; +#[cfg(not(feature = "async"))] fn main() { let args: Vec = env::args().collect(); @@ -38,9 +39,40 @@ fn main() { let ctx = Context::default(); println!("Sending event"); + publisher .publish(ctx, "/tasks/oom", "default", event) .expect("Publish failed"); println!("Done"); } + +#[cfg(feature = "async")] +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + + // Must not start with unix:// + let address = args + .get(1) + .ok_or("First argument must be containerd's TTRPC address to publish events") + .unwrap(); + + println!("Connecting: {}", &address); + + let publisher = RemotePublisher::new(address).await.expect("Connect failed"); + + let mut event = TaskOOM::new(); + event.set_container_id("123".into()); + + let ctx = Context::default(); + + println!("Sending event"); + + publisher + .publish(ctx, "/tasks/oom", "default", event) + .await + .expect("Publish failed"); + + println!("Done"); +} diff --git a/crates/shim/examples/skeleton.rs b/crates/shim/examples/skeleton.rs index f5b075b..7072e8b 100644 --- a/crates/shim/examples/skeleton.rs +++ b/crates/shim/examples/skeleton.rs @@ -13,73 +13,84 @@ See the License for the specific language governing permissions and limitations under the License. */ - -use std::sync::Arc; - -use log::info; - +#[cfg(not(feature = "async"))] use containerd_shim as shim; -use shim::{api, Config, DeleteResponse, ExitSignal, RemotePublisher, TtrpcContext, TtrpcResult}; -#[derive(Clone)] -struct Service { - exit: Arc, -} +#[cfg(not(feature = "async"))] +mod skeleton { + use std::sync::Arc; -impl shim::Shim for Service { - type T = Service; + use log::info; - fn new( - _runtime_id: &str, - _id: &str, - _namespace: &str, - _publisher: RemotePublisher, - _config: &mut Config, - ) -> Self { - Service { - exit: Arc::new(ExitSignal::default()), + use containerd_shim as shim; + use containerd_shim::synchronous::publisher::RemotePublisher; + use shim::{api, Config, DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult}; + + #[derive(Clone)] + pub(crate) struct Service { + exit: Arc, + } + + impl shim::Shim for Service { + type T = Service; + + fn new( + _runtime_id: &str, + _id: &str, + _namespace: &str, + _publisher: RemotePublisher, + _config: &mut Config, + ) -> Self { + Service { + exit: Arc::new(ExitSignal::default()), + } + } + + fn start_shim(&mut self, opts: shim::StartOpts) -> Result { + let grouping = opts.id.clone(); + let (_child_id, address) = shim::spawn(opts, &grouping, Vec::new())?; + Ok(address) + } + + fn delete_shim(&mut self) -> Result { + Ok(DeleteResponse::new()) + } + + fn wait(&mut self) { + self.exit.wait(); + } + + fn create_task_service(&self) -> Self::T { + self.clone() } } - fn start_shim(&mut self, opts: shim::StartOpts) -> Result { - let grouping = opts.id.clone(); - let (_child_id, address) = shim::spawn(opts, &grouping, Vec::new())?; - Ok(address) - } + impl shim::Task for Service { + fn connect( + &self, + _ctx: &TtrpcContext, + _req: api::ConnectRequest, + ) -> TtrpcResult { + info!("Connect request"); + Ok(api::ConnectResponse { + version: String::from("example"), + ..Default::default() + }) + } - fn delete_shim(&mut self) -> Result { - Ok(DeleteResponse::new()) - } - - fn wait(&mut self) { - self.exit.wait(); - } - - fn create_task_service(&self) -> Self::T { - self.clone() - } -} - -impl shim::Task for Service { - fn connect( - &self, - _ctx: &TtrpcContext, - _req: api::ConnectRequest, - ) -> TtrpcResult { - info!("Connect request"); - Ok(api::ConnectResponse { - version: String::from("example"), - ..Default::default() - }) - } - - fn shutdown(&self, _ctx: &TtrpcContext, _req: api::ShutdownRequest) -> TtrpcResult { - info!("Shutdown request"); - self.exit.signal(); - Ok(api::Empty::default()) + fn shutdown( + &self, + _ctx: &TtrpcContext, + _req: api::ShutdownRequest, + ) -> TtrpcResult { + info!("Shutdown request"); + self.exit.signal(); + Ok(api::Empty::default()) + } } } fn main() { - shim::run::("io.containerd.empty.v1", None) + #[cfg(not(feature = "async"))] + shim::run::("io.containerd.empty.v1", None) } diff --git a/crates/shim/src/asynchronous/console.rs b/crates/shim/src/asynchronous/console.rs index 0dc893f..8348779 100644 --- a/crates/shim/src/asynchronous/console.rs +++ b/crates/shim/src/asynchronous/console.rs @@ -21,7 +21,7 @@ use log::warn; use tokio::net::{UnixListener, UnixStream}; use uuid::Uuid; -use crate::asynchronous::utils::mkdir; +use crate::asynchronous::util::mkdir; use crate::Error; use crate::Result; diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index 472d4a3..127cec6 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -40,7 +40,7 @@ use containerd_shim_protos::ttrpc::r#async::Server; use crate::asynchronous::monitor::monitor_notify_by_pid; use crate::asynchronous::publisher::RemotePublisher; -use crate::asynchronous::utils::{asyncify, read_file_to_str, write_str_to_file}; +use crate::asynchronous::util::{asyncify, read_file_to_str, write_str_to_file}; use crate::error::Error; use crate::error::Result; use crate::{ @@ -53,7 +53,7 @@ pub mod monitor; pub mod processes; pub mod publisher; pub mod task; -pub mod utils; +pub mod util; /// Asynchronous Main shim interface that must be implemented by all async shims. /// diff --git a/crates/shim/src/asynchronous/processes.rs b/crates/shim/src/asynchronous/processes.rs index 6bda5de..eb53fbc 100644 --- a/crates/shim/src/asynchronous/processes.rs +++ b/crates/shim/src/asynchronous/processes.rs @@ -23,10 +23,10 @@ use tokio::sync::oneshot::{channel, Receiver, Sender}; use containerd_shim_protos::api::{StateResponse, Status}; use containerd_shim_protos::protobuf::well_known_types::Timestamp; -use crate::asynchronous::utils::asyncify; -use crate::console::{ioctl_set_winsz, Console}; -use crate::error::Error; +use crate::asynchronous::util::asyncify; use crate::io::Stdio; +use crate::Error; +use crate::{ioctl_set_winsz, Console}; #[async_trait] pub trait Process { diff --git a/crates/shim/src/asynchronous/publisher.rs b/crates/shim/src/asynchronous/publisher.rs index f00fa07..7973e8d 100644 --- a/crates/shim/src/asynchronous/publisher.rs +++ b/crates/shim/src/asynchronous/publisher.rs @@ -26,7 +26,7 @@ use containerd_shim_protos::ttrpc; use containerd_shim_protos::ttrpc::context::Context; use containerd_shim_protos::ttrpc::r#async::TtrpcContext; -use crate::asynchronous::utils::asyncify; +use crate::asynchronous::util::asyncify; use crate::error::Result; use crate::util::{any, connect, timestamp}; diff --git a/crates/shim/src/asynchronous/utils.rs b/crates/shim/src/asynchronous/util.rs similarity index 98% rename from crates/shim/src/asynchronous/utils.rs rename to crates/shim/src/asynchronous/util.rs index 8b9c691..ec1b7cc 100644 --- a/crates/shim/src/asynchronous/utils.rs +++ b/crates/shim/src/asynchronous/util.rs @@ -142,7 +142,7 @@ pub async fn mkdir(path: impl AsRef, mode: mode_t) -> Result<()> { #[cfg(test)] mod tests { - use crate::asynchronous::utils::{read_file_to_str, write_str_to_file}; + use crate::asynchronous::util::{read_file_to_str, write_str_to_file}; #[tokio::test] async fn test_read_write_str() { diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 4993ada..22f39e5 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -33,37 +33,38 @@ //! use std::collections::hash_map::DefaultHasher; -use std::env; -use std::fs; +use std::fs::File; use std::hash::Hasher; -use std::io::Write; -use std::os::unix::fs::FileTypeExt; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::RawFd; use std::os::unix::net::UnixListener; use std::path::{Path, PathBuf}; -use std::process::{self, Command, Stdio}; -use std::sync::{Arc, Condvar, Mutex}; - -use command_fds::{CommandFdExt, FdMapping}; -use libc::{c_int, pid_t, SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; -pub use log::{debug, error, info, warn}; -use signal_hook::iterator::Signals; pub use containerd_shim_protos as protos; -use protos::protobuf::Message; +use nix::ioctl_write_ptr_bad; pub use protos::shim::shim::DeleteResponse; -pub use protos::shim::shim_ttrpc::{create_task, Task}; -pub use protos::ttrpc::{context::Context, Result as TtrpcResult, TtrpcContext}; -use protos::ttrpc::{Client, Server}; +pub use protos::ttrpc::{context::Context, Result as TtrpcResult}; +#[cfg(feature = "async")] +pub use crate::asynchronous::*; pub use crate::error::{Error, Result}; -use crate::monitor::monitor_notify_by_pid; -pub use crate::publisher::RemotePublisher; -use crate::util::{read_address, write_address}; +#[cfg(not(feature = "async"))] +pub use crate::synchronous::*; #[macro_use] pub mod error; +mod args; +#[cfg(feature = "async")] +pub mod asynchronous; +pub mod io; +mod logger; +pub mod monitor; +pub mod mount; +mod reap; +#[cfg(not(feature = "async"))] +pub mod synchronous; +pub mod util; + /// Generated request/response structures. pub mod api { pub use super::protos::api::Status; @@ -71,18 +72,47 @@ pub mod api { pub use super::protos::shim::shim::*; pub use super::protos::types::empty::Empty; } -mod args; -mod logger; -pub mod monitor; -pub mod mount; -mod publisher; -mod reap; -pub mod util; -#[cfg(feature = "async")] -pub mod asynchronous; -pub mod console; -pub mod io; +macro_rules! cfg_not_async { + ($($item:item)*) => { + $( + #[cfg(not(feature = "async"))] + #[cfg_attr(docsrs, doc(cfg(not(feature = "async"))))] + $item + )* + } +} + +macro_rules! cfg_async { + ($($item:item)*) => { + $( + #[cfg(feature = "async")] + #[cfg_attr(docsrs, doc(cfg(feature = "async")))] + $item + )* + } +} + +cfg_not_async! { + pub use crate::synchronous::*; + pub use crate::synchronous::console; + pub use crate::synchronous::publisher; + pub use protos::shim::shim_ttrpc::Task; + pub use protos::ttrpc::TtrpcContext; +} + +cfg_async! { + pub use crate::asynchronous::*; + pub use crate::asynchronous::console; + pub use crate::asynchronous::container; + pub use crate::asynchronous::processes; + pub use crate::asynchronous::task; + pub use crate::asynchronous::publisher; + pub use protos::shim_async::Task; + pub use protos::ttrpc::r#async::TtrpcContext; +} + +ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize); const TTRPC_ADDRESS: &str = "TTRPC_ADDRESS"; @@ -116,210 +146,6 @@ pub struct StartOpts { pub debug: bool, } -/// Helper structure that wraps atomic bool to signal shim server when to shutdown the TTRPC server. -/// -/// Shim implementations are responsible for calling [`Self::signal`]. -#[allow(clippy::mutex_atomic)] // Condvar expected to be used with Mutex, not AtomicBool. -#[derive(Default)] -pub struct ExitSignal(Mutex, Condvar); - -#[allow(clippy::mutex_atomic)] -impl ExitSignal { - /// Set exit signal to shutdown shim server. - pub fn signal(&self) { - let (lock, cvar) = (&self.0, &self.1); - let mut exit = lock.lock().unwrap(); - *exit = true; - cvar.notify_all(); - } - - /// Wait for the exit signal to be set. - pub fn wait(&self) { - let (lock, cvar) = (&self.0, &self.1); - let mut started = lock.lock().unwrap(); - while !*started { - started = cvar.wait(started).unwrap(); - } - } -} - -/// Main shim interface that must be implemented by all shims. -/// -/// Start and delete routines will be called to handle containerd's shim lifecycle requests. -pub trait Shim { - /// Type to provide task service for the shim. - type T: Task + Send + Sync; - - /// Create a new instance of Shim. - /// - /// # Arguments - /// - `runtime_id`: identifier of the container runtime. - /// - `id`: identifier of the shim/container, passed in from Containerd. - /// - `namespace`: namespace of the shim/container, passed in from Containerd. - /// - `publisher`: publisher to send events to Containerd. - /// - `config`: for the shim to pass back configuration information - fn new( - runtime_id: &str, - id: &str, - namespace: &str, - publisher: RemotePublisher, - config: &mut Config, - ) -> Self; - - /// Start shim will be called by containerd when launching new shim instance. - /// - /// It expected to return TTRPC address containerd daemon can use to communicate with - /// the given shim instance. - /// - /// See https://github.com/containerd/containerd/tree/master/runtime/v2#start - fn start_shim(&mut self, opts: StartOpts) -> Result; - - /// Delete shim will be called by containerd after shim shutdown to cleanup any leftovers. - fn delete_shim(&mut self) -> Result; - - /// Wait for the shim to exit. - fn wait(&mut self); - - /// Create the task service object. - fn create_task_service(&self) -> Self::T; -} - -/// Shim entry point that must be invoked from `main`. -pub fn run(runtime_id: &str, opts: Option) -where - T: Shim + Send + Sync + 'static, -{ - if let Some(err) = bootstrap::(runtime_id, opts).err() { - eprintln!("{}: {:?}", runtime_id, err); - process::exit(1); - } -} - -fn bootstrap(runtime_id: &str, opts: Option) -> Result<()> -where - T: Shim + Send + Sync + 'static, -{ - // Parse command line - let os_args: Vec<_> = env::args_os().collect(); - let flags = args::parse(&os_args[1..])?; - - let ttrpc_address = env::var(TTRPC_ADDRESS)?; - let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; - - // Create shim instance - let mut config = opts.unwrap_or_else(Config::default); - - // Setup signals - let signals = setup_signals(&config); - - if !config.no_sub_reaper { - reap::set_subreaper()?; - } - - let mut shim = T::new( - runtime_id, - &flags.id, - &flags.namespace, - publisher, - &mut config, - ); - - match flags.action.as_str() { - "start" => { - let args = StartOpts { - id: flags.id, - publish_binary: flags.publish_binary, - address: flags.address, - ttrpc_address, - namespace: flags.namespace, - debug: flags.debug, - }; - - let address = shim.start_shim(args)?; - - std::io::stdout() - .lock() - .write_fmt(format_args!("{}", address)) - .map_err(io_error!(e, "write stdout"))?; - - Ok(()) - } - "delete" => { - std::thread::spawn(move || handle_signals(signals)); - let response = shim.delete_shim()?; - let stdout = std::io::stdout(); - let mut locked = stdout.lock(); - response.write_to_writer(&mut locked)?; - - Ok(()) - } - _ => { - if !config.no_setup_logger { - logger::init(flags.debug)?; - } - - let task = shim.create_task_service(); - let task_service = create_task(Arc::new(Box::new(task))); - let mut server = Server::new().register_service(task_service); - server = server.add_listener(SOCKET_FD)?; - server.start()?; - - info!("Shim successfully started, waiting for exit signal..."); - std::thread::spawn(move || handle_signals(signals)); - shim.wait(); - - info!("Shutting down shim instance"); - server.shutdown(); - - // NOTE: If the shim server is down(like oom killer), the address - // socket might be leaking. - let address = read_address()?; - remove_socket_silently(&address); - Ok(()) - } - } -} - -fn setup_signals(config: &Config) -> Signals { - let signals = Signals::new(&[SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed"); - if !config.no_reaper { - signals.add_signal(SIGCHLD).expect("add signal failed"); - } - signals -} - -fn handle_signals(mut signals: Signals) { - loop { - for sig in signals.wait() { - match sig { - SIGTERM | SIGINT => { - debug!("received {}", sig); - return; - } - SIGCHLD => loop { - unsafe { - let pid: pid_t = -1; - let mut status: c_int = 0; - let options: c_int = libc::WNOHANG; - let res_pid = libc::waitpid(pid, &mut status, options); - let status = libc::WEXITSTATUS(status); - if res_pid <= 0 { - break; - } else { - monitor_notify_by_pid(res_pid, status).unwrap_or_else(|e| { - error!("failed to send exit event {}", e); - }); - } - } - }, - _ => { - debug!("received {}", sig); - } - } - } - } -} - /// The shim process communicates with the containerd server through a communication channel /// created by containerd. One endpoint of the communication channel is passed to shim process /// through a file descriptor during forking, which is the fourth(3) file descriptor. @@ -364,121 +190,18 @@ fn start_listener(address: &str) -> std::io::Result { let path = parse_sockaddr(address); // Try to create the needed directory hierarchy. if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent)?; + std::fs::create_dir_all(parent)?; } UnixListener::bind(path) } -fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { - for _i in 0..count { - match Client::connect(address) { - Ok(_) => { - return Ok(()); - } - Err(_) => { - std::thread::sleep(std::time::Duration::from_millis(interval_in_ms)); - } - } - } - Err(other!("time out waiting for socket {}", address)) -} - -fn remove_socket_silently(address: &str) { - remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e)) -} - -fn remove_socket(address: &str) -> Result<()> { - let path = parse_sockaddr(address); - if let Ok(md) = Path::new(path).metadata() { - if md.file_type().is_socket() { - fs::remove_file(path).map_err(io_error!(e, "remove socket"))?; - } - } - Ok(()) -} - -/// Spawn is a helper func to launch shim process. -/// Typically this expected to be called from `StartShim`. -pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> { - let cmd = env::current_exe().map_err(io_error!(e, ""))?; - let cwd = env::current_dir().map_err(io_error!(e, ""))?; - let address = socket_address(&opts.address, &opts.namespace, grouping); - - // Create socket and prepare listener. - // We'll use `add_listener` when creating TTRPC server. - let listener = match start_listener(&address) { - Ok(l) => l, - Err(e) => { - if e.kind() != std::io::ErrorKind::AddrInUse { - return Err(error::Error::IoError { - context: "".to_string(), - err: e, - }); - }; - if let Ok(()) = wait_socket_working(&address, 5, 200) { - write_address(&address)?; - return Ok((0, address)); - } - remove_socket(&address)?; - start_listener(&address).map_err(io_error!(e, ""))? - } - }; - - let mut command = Command::new(cmd); - - command - .current_dir(cwd) - .stdout(Stdio::null()) - .stdin(Stdio::null()) - .stderr(Stdio::null()) - .fd_mappings(vec![FdMapping { - parent_fd: listener.as_raw_fd(), - child_fd: SOCKET_FD, - }])? - .args(&[ - "-namespace", - &opts.namespace, - "-id", - &opts.id, - "-address", - &opts.address, - ]); - if opts.debug { - command.arg("-debug"); - } - command.envs(vars); - - command - .spawn() - .map_err(io_error!(e, "spawn shim")) - .map(|child| { - // Ownership of `listener` has been passed to child. - std::mem::forget(listener); - (child.id(), address) - }) +pub struct Console { + pub file: File, } #[cfg(test)] mod tests { - use std::thread; - - use super::*; - - #[test] - fn exit_signal() { - let signal = Arc::new(ExitSignal::default()); - - let cloned = Arc::clone(&signal); - let handle = thread::spawn(move || { - cloned.signal(); - }); - - signal.wait(); - - if let Err(err) = handle.join() { - panic!("{:?}", err); - } - } + use crate::start_listener; #[test] fn test_start_listener() { @@ -498,9 +221,9 @@ mod tests { let path = tmpdir.path().to_str().unwrap().to_owned(); let txt_file = path + "demo.txt"; - fs::write(&txt_file, "test").unwrap(); + std::fs::write(&txt_file, "test").unwrap(); assert!(start_listener(&txt_file).is_err()); - let context = fs::read_to_string(&txt_file).unwrap(); + let context = std::fs::read_to_string(&txt_file).unwrap(); assert_eq!(context, "test"); } } diff --git a/crates/shim/src/monitor.rs b/crates/shim/src/monitor.rs index 949ee3b..d3cfdb8 100644 --- a/crates/shim/src/monitor.rs +++ b/crates/shim/src/monitor.rs @@ -13,54 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. */ - -use std::collections::HashMap; use std::fmt; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::Mutex; -use lazy_static::lazy_static; -use log::{error, warn}; - -use crate::error::Result; - -lazy_static! { - pub static ref MONITOR: Mutex = { - let monitor = Monitor { - seq_id: 0, - subscribers: HashMap::new(), - topic_subs: HashMap::new(), - }; - Mutex::new(monitor) - }; -} - -pub fn monitor_subscribe(topic: Topic) -> Result { - let mut monitor = MONITOR.lock().unwrap(); - let s = monitor.subscribe(topic)?; - Ok(s) -} - -pub fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().unwrap(); - monitor.notify_by_pid(pid, exit_code) -} - -pub fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().unwrap(); - monitor.notify_by_exec(id, exec_id, exit_code) -} - -pub struct Monitor { - pub(crate) seq_id: i64, - pub(crate) subscribers: HashMap, - pub(crate) topic_subs: HashMap>, -} - -pub(crate) struct Subscriber { - pub(crate) topic: Topic, - pub(crate) tx: Sender, -} +#[cfg(feature = "async")] +pub use crate::asynchronous::monitor::*; +#[cfg(not(feature = "async"))] +pub use crate::synchronous::monitor::*; #[derive(Clone, Eq, Hash, PartialEq)] pub enum Topic { @@ -69,11 +27,6 @@ pub enum Topic { All, } -pub struct Subscription { - pub id: i64, - pub rx: Receiver, -} - #[derive(Debug)] pub struct ExitEvent { // what kind of a thing exit @@ -106,72 +59,3 @@ pub enum Subject { // if exec is empty, then the event is for the container Exec(String, String), } - -impl Monitor { - pub fn subscribe(&mut self, topic: Topic) -> Result { - let (tx, rx) = channel::(); - let id = self.seq_id; - self.seq_id += 1; - let subscriber = Subscriber { - tx, - topic: topic.clone(), - }; - self.subscribers.insert(id, subscriber); - self.topic_subs - .entry(topic) - .or_insert_with(Vec::new) - .push(id); - Ok(Subscription { id, rx }) - } - - pub fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> { - let subject = Subject::Pid(pid); - self.notify_topic(&Topic::Pid, &subject, exit_code); - self.notify_topic(&Topic::All, &subject, exit_code); - Ok(()) - } - - pub fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let subject = Subject::Exec(cid.into(), exec_id.into()); - self.notify_topic(&Topic::Exec, &subject, exit_code); - self.notify_topic(&Topic::All, &subject, exit_code); - Ok(()) - } - - fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) { - self.topic_subs.get(topic).map_or((), |subs| { - for i in subs { - self.subscribers.get(i).and_then(|sub| { - sub.tx - .send(ExitEvent { - subject: subject.clone(), - exit_code, - }) - .map_err(|e| warn!("failed to send {}", e)) - .ok() - }); - } - }) - } - - pub fn unsubscribe(&mut self, id: i64) -> Result<()> { - let sub = self.subscribers.remove(&id); - if let Some(s) = sub { - self.topic_subs.get_mut(&s.topic).map(|v| { - v.iter().position(|&x| x == id).map(|i| { - v.remove(i); - }) - }); - } - Ok(()) - } -} - -impl Drop for Subscription { - fn drop(&mut self) { - let mut monitor = MONITOR.lock().unwrap(); - monitor.unsubscribe(self.id).unwrap_or_else(|e| { - error!("failed to unsubscribe the subscription {}, {}", self.id, e); - }); - } -} diff --git a/crates/shim/src/console.rs b/crates/shim/src/synchronous/console.rs similarity index 89% rename from crates/shim/src/console.rs rename to crates/shim/src/synchronous/console.rs index 5c35d3b..eaa8a1b 100644 --- a/crates/shim/src/console.rs +++ b/crates/shim/src/synchronous/console.rs @@ -14,18 +14,10 @@ limitations under the License. */ -use std::fs::File; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; use log::warn; -use nix::ioctl_write_ptr_bad; - -ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize); - -pub struct Console { - pub file: File, -} pub struct ConsoleSocket { pub listener: UnixListener, diff --git a/crates/shim/src/synchronous/mod.rs b/crates/shim/src/synchronous/mod.rs new file mode 100644 index 0000000..5a4af1e --- /dev/null +++ b/crates/shim/src/synchronous/mod.rs @@ -0,0 +1,380 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +//! A library to implement custom runtime v2 shims for containerd. +//! +//! # Runtime +//! Runtime v2 introduces a first class shim API for runtime authors to integrate with containerd. +//! The shim API is minimal and scoped to the execution lifecycle of a container. +//! +//! This crate simplifies shim v2 runtime development for containerd. It handles common tasks such +//! as command line parsing, setting up shim's TTRPC server, logging, events, etc. +//! +//! Clients are expected to implement [Shim] and [Task] traits with task handling routines. +//! This generally replicates same API as in Go [version](https://github.com/containerd/containerd/blob/main/runtime/v2/example/cmd/main.go). +//! +//! Once implemented, shim's bootstrap code is as easy as: +//! ```text +//! shim::run::("io.containerd.empty.v1") +//! ``` +//! + +use std::env; +use std::fs; +use std::io::Write; +use std::os::unix::fs::FileTypeExt; +use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::process::{self, Command, Stdio}; +use std::sync::{Arc, Condvar, Mutex}; + +use command_fds::{CommandFdExt, FdMapping}; +use libc::{c_int, pid_t, SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; +pub use log::{debug, error, info, warn}; +use signal_hook::iterator::Signals; + +use crate::protos::protobuf::Message; +use crate::protos::shim::shim_ttrpc::{create_task, Task}; +use crate::protos::ttrpc::{Client, Server}; +use util::{read_address, write_address}; + +use crate::api::DeleteResponse; +use crate::synchronous::publisher::RemotePublisher; +use crate::Error; +use crate::{args, logger, reap, Result, TTRPC_ADDRESS}; +use crate::{parse_sockaddr, socket_address, start_listener, Config, StartOpts, SOCKET_FD}; + +pub mod monitor; +pub mod publisher; +pub mod util; + +pub mod console; + +/// Helper structure that wraps atomic bool to signal shim server when to shutdown the TTRPC server. +/// +/// Shim implementations are responsible for calling [`Self::signal`]. +#[allow(clippy::mutex_atomic)] // Condvar expected to be used with Mutex, not AtomicBool. +#[derive(Default)] +pub struct ExitSignal(Mutex, Condvar); + +#[allow(clippy::mutex_atomic)] +impl ExitSignal { + /// Set exit signal to shutdown shim server. + pub fn signal(&self) { + let (lock, cvar) = (&self.0, &self.1); + let mut exit = lock.lock().unwrap(); + *exit = true; + cvar.notify_all(); + } + + /// Wait for the exit signal to be set. + pub fn wait(&self) { + let (lock, cvar) = (&self.0, &self.1); + let mut started = lock.lock().unwrap(); + while !*started { + started = cvar.wait(started).unwrap(); + } + } +} + +/// Main shim interface that must be implemented by all shims. +/// +/// Start and delete routines will be called to handle containerd's shim lifecycle requests. +pub trait Shim { + /// Type to provide task service for the shim. + type T: Task + Send + Sync; + + /// Create a new instance of Shim. + /// + /// # Arguments + /// - `runtime_id`: identifier of the container runtime. + /// - `id`: identifier of the shim/container, passed in from Containerd. + /// - `namespace`: namespace of the shim/container, passed in from Containerd. + /// - `publisher`: publisher to send events to Containerd. + /// - `config`: for the shim to pass back configuration information + fn new( + runtime_id: &str, + id: &str, + namespace: &str, + publisher: RemotePublisher, + config: &mut Config, + ) -> Self; + + /// Start shim will be called by containerd when launching new shim instance. + /// + /// It expected to return TTRPC address containerd daemon can use to communicate with + /// the given shim instance. + /// + /// See https://github.com/containerd/containerd/tree/master/runtime/v2#start + fn start_shim(&mut self, opts: StartOpts) -> Result; + + /// Delete shim will be called by containerd after shim shutdown to cleanup any leftovers. + fn delete_shim(&mut self) -> Result; + + /// Wait for the shim to exit. + fn wait(&mut self); + + /// Create the task service object. + fn create_task_service(&self) -> Self::T; +} + +/// Shim entry point that must be invoked from `main`. +pub fn run(runtime_id: &str, opts: Option) +where + T: Shim + Send + Sync + 'static, +{ + if let Some(err) = bootstrap::(runtime_id, opts).err() { + eprintln!("{}: {:?}", runtime_id, err); + process::exit(1); + } +} + +fn bootstrap(runtime_id: &str, opts: Option) -> Result<()> +where + T: Shim + Send + Sync + 'static, +{ + // Parse command line + let os_args: Vec<_> = env::args_os().collect(); + let flags = args::parse(&os_args[1..])?; + + let ttrpc_address = env::var(TTRPC_ADDRESS)?; + let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; + + // Create shim instance + let mut config = opts.unwrap_or_else(Config::default); + + // Setup signals + let signals = setup_signals(&config); + + if !config.no_sub_reaper { + reap::set_subreaper()?; + } + + let mut shim = T::new( + runtime_id, + &flags.id, + &flags.namespace, + publisher, + &mut config, + ); + + match flags.action.as_str() { + "start" => { + let args = StartOpts { + id: flags.id, + publish_binary: flags.publish_binary, + address: flags.address, + ttrpc_address, + namespace: flags.namespace, + debug: flags.debug, + }; + + let address = shim.start_shim(args)?; + + std::io::stdout() + .lock() + .write_fmt(format_args!("{}", address)) + .map_err(io_error!(e, "write stdout"))?; + + Ok(()) + } + "delete" => { + std::thread::spawn(move || handle_signals(signals)); + let response = shim.delete_shim()?; + let stdout = std::io::stdout(); + let mut locked = stdout.lock(); + response.write_to_writer(&mut locked)?; + + Ok(()) + } + _ => { + if !config.no_setup_logger { + logger::init(flags.debug)?; + } + + let task = shim.create_task_service(); + let task_service = create_task(Arc::new(Box::new(task))); + let mut server = Server::new().register_service(task_service); + server = server.add_listener(SOCKET_FD)?; + server.start()?; + + info!("Shim successfully started, waiting for exit signal..."); + std::thread::spawn(move || handle_signals(signals)); + shim.wait(); + + info!("Shutting down shim instance"); + server.shutdown(); + + // NOTE: If the shim server is down(like oom killer), the address + // socket might be leaking. + let address = read_address()?; + remove_socket_silently(&address); + Ok(()) + } + } +} + +fn setup_signals(config: &Config) -> Signals { + let signals = Signals::new(&[SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed"); + if !config.no_reaper { + signals.add_signal(SIGCHLD).expect("add signal failed"); + } + signals +} + +fn handle_signals(mut signals: Signals) { + loop { + for sig in signals.wait() { + match sig { + SIGTERM | SIGINT => { + debug!("received {}", sig); + return; + } + SIGCHLD => loop { + unsafe { + let pid: pid_t = -1; + let mut status: c_int = 0; + let options: c_int = libc::WNOHANG; + let res_pid = libc::waitpid(pid, &mut status, options); + let status = libc::WEXITSTATUS(status); + if res_pid <= 0 { + break; + } else { + monitor::monitor_notify_by_pid(res_pid, status).unwrap_or_else(|e| { + error!("failed to send exit event {}", e); + }); + } + } + }, + _ => { + debug!("received {}", sig); + } + } + } + } +} + +fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { + for _i in 0..count { + match Client::connect(address) { + Ok(_) => { + return Ok(()); + } + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(interval_in_ms)); + } + } + } + Err(other!("time out waiting for socket {}", address)) +} + +fn remove_socket_silently(address: &str) { + remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e)) +} + +fn remove_socket(address: &str) -> Result<()> { + let path = parse_sockaddr(address); + if let Ok(md) = Path::new(path).metadata() { + if md.file_type().is_socket() { + fs::remove_file(path).map_err(io_error!(e, "remove socket"))?; + } + } + Ok(()) +} + +/// Spawn is a helper func to launch shim process. +/// Typically this expected to be called from `StartShim`. +pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> { + let cmd = env::current_exe().map_err(io_error!(e, ""))?; + let cwd = env::current_dir().map_err(io_error!(e, ""))?; + let address = socket_address(&opts.address, &opts.namespace, grouping); + + // Create socket and prepare listener. + // We'll use `add_listener` when creating TTRPC server. + let listener = match start_listener(&address) { + Ok(l) => l, + Err(e) => { + if e.kind() != std::io::ErrorKind::AddrInUse { + return Err(Error::IoError { + context: "".to_string(), + err: e, + }); + }; + if let Ok(()) = wait_socket_working(&address, 5, 200) { + write_address(&address)?; + return Ok((0, address)); + } + remove_socket(&address)?; + start_listener(&address).map_err(io_error!(e, ""))? + } + }; + + let mut command = Command::new(cmd); + + command + .current_dir(cwd) + .stdout(Stdio::null()) + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .fd_mappings(vec![FdMapping { + parent_fd: listener.as_raw_fd(), + child_fd: SOCKET_FD, + }])? + .args(&[ + "-namespace", + &opts.namespace, + "-id", + &opts.id, + "-address", + &opts.address, + ]); + if opts.debug { + command.arg("-debug"); + } + command.envs(vars); + + command + .spawn() + .map_err(io_error!(e, "spawn shim")) + .map(|child| { + // Ownership of `listener` has been passed to child. + std::mem::forget(listener); + (child.id(), address) + }) +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + + #[test] + fn exit_signal() { + let signal = Arc::new(ExitSignal::default()); + + let cloned = Arc::clone(&signal); + let handle = thread::spawn(move || { + cloned.signal(); + }); + + signal.wait(); + + if let Err(err) = handle.join() { + panic!("{:?}", err); + } + } +} diff --git a/crates/shim/src/synchronous/monitor.rs b/crates/shim/src/synchronous/monitor.rs new file mode 100644 index 0000000..ef6feef --- /dev/null +++ b/crates/shim/src/synchronous/monitor.rs @@ -0,0 +1,137 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::collections::HashMap; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Mutex; + +use lazy_static::lazy_static; +use log::{error, warn}; + +use crate::monitor::{ExitEvent, Subject, Topic}; +use crate::Result; + +lazy_static! { + pub static ref MONITOR: Mutex = { + let monitor = Monitor { + seq_id: 0, + subscribers: HashMap::new(), + topic_subs: HashMap::new(), + }; + Mutex::new(monitor) + }; +} + +pub fn monitor_subscribe(topic: Topic) -> Result { + let mut monitor = MONITOR.lock().unwrap(); + let s = monitor.subscribe(topic)?; + Ok(s) +} + +pub fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> { + let monitor = MONITOR.lock().unwrap(); + monitor.notify_by_pid(pid, exit_code) +} + +pub fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> { + let monitor = MONITOR.lock().unwrap(); + monitor.notify_by_exec(id, exec_id, exit_code) +} + +pub struct Monitor { + pub(crate) seq_id: i64, + pub(crate) subscribers: HashMap, + pub(crate) topic_subs: HashMap>, +} + +pub(crate) struct Subscriber { + pub(crate) topic: Topic, + pub(crate) tx: Sender, +} + +pub struct Subscription { + pub id: i64, + pub rx: Receiver, +} + +impl Monitor { + pub fn subscribe(&mut self, topic: Topic) -> Result { + let (tx, rx) = channel::(); + let id = self.seq_id; + self.seq_id += 1; + let subscriber = Subscriber { + tx, + topic: topic.clone(), + }; + self.subscribers.insert(id, subscriber); + self.topic_subs + .entry(topic) + .or_insert_with(Vec::new) + .push(id); + Ok(Subscription { id, rx }) + } + + pub fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> { + let subject = Subject::Pid(pid); + self.notify_topic(&Topic::Pid, &subject, exit_code); + self.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) + } + + pub fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> { + let subject = Subject::Exec(cid.into(), exec_id.into()); + self.notify_topic(&Topic::Exec, &subject, exit_code); + self.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) + } + + fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) { + self.topic_subs.get(topic).map_or((), |subs| { + for i in subs { + self.subscribers.get(i).and_then(|sub| { + sub.tx + .send(ExitEvent { + subject: subject.clone(), + exit_code, + }) + .map_err(|e| warn!("failed to send {}", e)) + .ok() + }); + } + }) + } + + pub fn unsubscribe(&mut self, id: i64) -> Result<()> { + let sub = self.subscribers.remove(&id); + if let Some(s) = sub { + self.topic_subs.get_mut(&s.topic).map(|v| { + v.iter().position(|&x| x == id).map(|i| { + v.remove(i); + }) + }); + } + Ok(()) + } +} + +impl Drop for Subscription { + fn drop(&mut self) { + let mut monitor = MONITOR.lock().unwrap(); + monitor.unsubscribe(self.id).unwrap_or_else(|e| { + error!("failed to unsubscribe the subscription {}, {}", self.id, e); + }); + } +} diff --git a/crates/shim/src/publisher.rs b/crates/shim/src/synchronous/publisher.rs similarity index 100% rename from crates/shim/src/publisher.rs rename to crates/shim/src/synchronous/publisher.rs diff --git a/crates/shim/src/synchronous/util.rs b/crates/shim/src/synchronous/util.rs new file mode 100644 index 0000000..cb60579 --- /dev/null +++ b/crates/shim/src/synchronous/util.rs @@ -0,0 +1,154 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::env; +use std::fs::{rename, File, OpenOptions}; +use std::io::{Read, Write}; +use std::os::unix::net::UnixListener; +use std::path::Path; + +use log::warn; +use nix::sys::stat::Mode; +use nix::unistd::mkdir; +use oci_spec::runtime::Spec; +use uuid::Uuid; + +use containerd_shim_protos::shim::oci::Options; + +use crate::console::ConsoleSocket; +use crate::util::{JsonOptions, OPTIONS_FILE_NAME, RUNTIME_FILE_NAME}; +use crate::Error; + +pub fn read_file_to_str>(filename: P) -> crate::Result { + let mut file = File::open(&filename).map_err(io_error!( + e, + "open {}", + filename.as_ref().to_string_lossy() + ))?; + let mut content: String = String::new(); + file.read_to_string(&mut content).map_err(io_error!( + e, + "read {}", + filename.as_ref().to_string_lossy() + ))?; + Ok(content) +} + +pub fn read_options(bundle: impl AsRef) -> crate::Result { + let path = bundle.as_ref().join(OPTIONS_FILE_NAME); + let opts_str = read_file_to_str(path)?; + let json_opt: JsonOptions = serde_json::from_str(&opts_str)?; + Ok(json_opt.into()) +} + +pub fn read_runtime(bundle: impl AsRef) -> crate::Result { + let path = bundle.as_ref().join(RUNTIME_FILE_NAME); + read_file_to_str(path) +} + +pub fn read_address() -> crate::Result { + let path = Path::new("address"); + read_file_to_str(path) +} + +pub fn read_pid_from_file(pid_path: &Path) -> crate::Result { + let pid_str = read_file_to_str(pid_path)?; + let pid = pid_str.parse::()?; + Ok(pid) +} + +pub fn write_str_to_path(filename: &Path, s: &str) -> crate::Result<()> { + let file = filename + .file_name() + .ok_or_else(|| Error::InvalidArgument(String::from("pid path illegal")))?; + let tmp_path = filename + .parent() + .map(|x| x.join(format!(".{}", file.to_str().unwrap_or("")))) + .ok_or_else(|| Error::InvalidArgument(String::from("failed to create tmp path")))?; + let tmp_path = tmp_path + .to_str() + .ok_or_else(|| Error::InvalidArgument(String::from("failed to get path")))?; + let mut f = OpenOptions::new() + .write(true) + .create_new(true) + .open(tmp_path) + .map_err(io_error!(e, "open {}", filename.to_str().unwrap()))?; + f.write_all(s.as_bytes()) + .map_err(io_error!(e, "write tmp file"))?; + rename(tmp_path, filename).map_err(io_error!( + e, + "rename tmp file to {}", + filename.to_str().unwrap() + ))?; + Ok(()) +} + +pub fn write_options(bundle: &str, opt: &Options) -> crate::Result<()> { + let json_opt = JsonOptions::from(opt.to_owned()); + let opts_str = serde_json::to_string(&json_opt)?; + let path = Path::new(bundle).join(OPTIONS_FILE_NAME); + write_str_to_path(path.as_path(), opts_str.as_str()) +} + +pub fn write_runtime(bundle: &str, binary_name: &str) -> crate::Result<()> { + let path = Path::new(bundle).join(RUNTIME_FILE_NAME); + write_str_to_path(path.as_path(), binary_name) +} + +pub fn write_address(address: &str) -> crate::Result<()> { + let path = Path::new("address"); + write_str_to_path(path, address) +} + +pub fn read_spec_from_file(bundle: &str) -> crate::Result { + let path = Path::new(bundle).join("config.json"); + Spec::load(path).map_err(other_error!(e, "read spec file")) +} + +pub fn new_temp_console_socket() -> crate::Result { + let dir = env::var("XDG_RUNTIME_DIR") + .map(|runtime_dir| format!("{}/pty{}", runtime_dir, Uuid::new_v4(),))?; + mkdir(Path::new(&dir), Mode::from_bits(0o711).unwrap())?; + let file_name = Path::new(&dir).join("pty.sock"); + let listener = UnixListener::bind(file_name.as_path()).map_err(io_error!( + e, + "bind socket {}", + file_name.display() + ))?; + Ok(ConsoleSocket { + listener, + path: file_name, + rmdir: true, + }) +} + +/// A helper to help remove temperate file or dir when it became useless +pub struct HelperRemoveFile { + path: String, +} + +impl HelperRemoveFile { + pub fn new(path: String) -> Self { + Self { path } + } +} + +impl Drop for HelperRemoveFile { + fn drop(&mut self) { + std::fs::remove_file(&self.path) + .unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e)); + } +} diff --git a/crates/shim/src/util.rs b/crates/shim/src/util.rs index 5c8bf0a..9562ee0 100644 --- a/crates/shim/src/util.rs +++ b/crates/shim/src/util.rs @@ -14,28 +14,19 @@ limitations under the License. */ -use std::env; -use std::fs::{rename, File, OpenOptions}; -use std::io::{Read, Write}; use std::os::unix::io::RawFd; -use std::os::unix::net::UnixListener; -use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; -use log::warn; -use nix::sys::stat::Mode; -use nix::unistd::mkdir; -use oci_spec::runtime::Spec; use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use containerd_shim_protos::protobuf::well_known_types::Any; -use containerd_shim_protos::protobuf::Message; use crate::api::Options; -use crate::console::ConsoleSocket; -use crate::error::{Error, Result}; -use crate::protos::protobuf::well_known_types::Timestamp; +#[cfg(feature = "async")] +pub use crate::asynchronous::util::*; +use crate::error::Result; +use crate::protos::protobuf::well_known_types::{Any, Timestamp}; +use crate::protos::protobuf::Message; +#[cfg(not(feature = "async"))] +pub use crate::synchronous::util::*; pub const CONFIG_FILE_NAME: &str = "config.json"; pub const OPTIONS_FILE_NAME: &str = "options.json"; @@ -103,92 +94,6 @@ impl From for Options { } } -pub fn read_file_to_str>(filename: P) -> Result { - let mut file = File::open(&filename).map_err(io_error!( - e, - "open {}", - filename.as_ref().to_string_lossy() - ))?; - let mut content: String = String::new(); - file.read_to_string(&mut content).map_err(io_error!( - e, - "read {}", - filename.as_ref().to_string_lossy() - ))?; - Ok(content) -} - -pub fn read_options(bundle: impl AsRef) -> Result { - let path = bundle.as_ref().join(OPTIONS_FILE_NAME); - let opts_str = read_file_to_str(path)?; - let json_opt: JsonOptions = serde_json::from_str(&opts_str)?; - Ok(json_opt.into()) -} - -pub fn read_runtime(bundle: impl AsRef) -> Result { - let path = bundle.as_ref().join(RUNTIME_FILE_NAME); - read_file_to_str(path) -} - -pub fn read_address() -> Result { - let path = Path::new("address"); - read_file_to_str(path) -} - -pub fn read_pid_from_file(pid_path: &Path) -> Result { - let pid_str = read_file_to_str(pid_path)?; - let pid = pid_str.parse::()?; - Ok(pid) -} - -pub fn write_str_to_path(filename: &Path, s: &str) -> Result<()> { - let file = filename - .file_name() - .ok_or_else(|| Error::InvalidArgument(String::from("pid path illegal")))?; - let tmp_path = filename - .parent() - .map(|x| x.join(format!(".{}", file.to_str().unwrap_or("")))) - .ok_or_else(|| Error::InvalidArgument(String::from("failed to create tmp path")))?; - let tmp_path = tmp_path - .to_str() - .ok_or_else(|| Error::InvalidArgument(String::from("failed to get path")))?; - let mut f = OpenOptions::new() - .write(true) - .create_new(true) - .open(tmp_path) - .map_err(io_error!(e, "open {}", filename.to_str().unwrap()))?; - f.write_all(s.as_bytes()) - .map_err(io_error!(e, "write tmp file"))?; - rename(tmp_path, filename).map_err(io_error!( - e, - "rename tmp file to {}", - filename.to_str().unwrap() - ))?; - Ok(()) -} - -pub fn write_options(bundle: &str, opt: &Options) -> Result<()> { - let json_opt = JsonOptions::from(opt.to_owned()); - let opts_str = serde_json::to_string(&json_opt)?; - let path = Path::new(bundle).join(OPTIONS_FILE_NAME); - write_str_to_path(path.as_path(), opts_str.as_str()) -} - -pub fn write_runtime(bundle: &str, binary_name: &str) -> Result<()> { - let path = Path::new(bundle).join(RUNTIME_FILE_NAME); - write_str_to_path(path.as_path(), binary_name) -} - -pub fn write_address(address: &str) -> Result<()> { - let path = Path::new("address"); - write_str_to_path(path, address) -} - -pub fn read_spec_from_file(bundle: &str) -> Result { - let path = Path::new(bundle).join("config.json"); - Spec::load(path).map_err(other_error!(e, "read spec file")) -} - pub fn get_timestamp() -> Result { let mut timestamp = Timestamp::new(); let now = SystemTime::now().duration_since(UNIX_EPOCH)?; @@ -281,37 +186,3 @@ impl AsOption for str { } } } - -/// A helper to help remove temperate file or dir when it became useless -pub struct HelperRemoveFile { - path: String, -} - -impl HelperRemoveFile { - pub fn new(path: String) -> Self { - Self { path } - } -} -impl Drop for HelperRemoveFile { - fn drop(&mut self) { - std::fs::remove_file(&self.path) - .unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e)); - } -} - -pub fn new_temp_console_socket() -> Result { - let dir = env::var("XDG_RUNTIME_DIR") - .map(|runtime_dir| format!("{}/pty{}", runtime_dir, Uuid::new_v4(),))?; - mkdir(Path::new(&dir), Mode::from_bits(0o711).unwrap())?; - let file_name = Path::new(&dir).join("pty.sock"); - let listener = UnixListener::bind(file_name.as_path()).map_err(io_error!( - e, - "bind socket {}", - file_name.display() - ))?; - Ok(ConsoleSocket { - listener, - path: file_name, - rmdir: true, - }) -}