split async and sync codes for runc-shim and shim

Signed-off-by: Feng Shaobao <fshb1988@gmail.com>
This commit is contained in:
Feng Shaobao 2022-03-07 20:48:14 +08:00
parent 777fe2ad23
commit cdc78f7760
27 changed files with 1150 additions and 966 deletions

View File

@ -28,7 +28,7 @@ use containerd_shim::asynchronous::monitor::{
use containerd_shim::asynchronous::processes::Process;
use containerd_shim::asynchronous::publisher::RemotePublisher;
use containerd_shim::asynchronous::task::TaskService;
use containerd_shim::asynchronous::utils::{
use containerd_shim::asynchronous::util::{
read_options, read_runtime, read_spec, write_str_to_file,
};
use containerd_shim::asynchronous::{spawn, ExitSignal, Shim};
@ -39,8 +39,8 @@ use containerd_shim::Error;
use containerd_shim::{io_error, Config, DeleteResponse, StartOpts};
use crate::asynchronous::runc::{RuncContainer, RuncFactory};
use crate::runc::{create_runc, ShimExecutor};
use crate::service::GROUP_LABELS;
use crate::common::create_runc;
use crate::common::{ShimExecutor, GROUP_LABELS};
mod runc;

View File

@ -37,21 +37,22 @@ use containerd_shim::asynchronous::monitor::{
monitor_subscribe, monitor_unsubscribe, Subscription,
};
use containerd_shim::asynchronous::processes::{ProcessLifecycle, ProcessTemplate};
use containerd_shim::asynchronous::utils::{
use containerd_shim::asynchronous::util::{
asyncify, mkdir, mount_rootfs, read_file_to_str, read_spec, write_options, write_runtime,
};
use containerd_shim::console::Console;
use containerd_shim::io::Stdio;
use containerd_shim::monitor::{ExitEvent, Subject, Topic};
use containerd_shim::protos::protobuf::{CodedInputStream, Message};
use containerd_shim::Console;
use containerd_shim::{io_error, other, Error};
use containerd_shim::{other_error, Result};
use runc::{Command, Executor, Runc};
use crate::console::receive_socket;
use crate::io::{create_io, ProcessIO};
use crate::runc::{
check_kill_error, create_runc, get_spec_from_request, CreateConfig, ShimExecutor, INIT_PID_FILE,
use crate::common::receive_socket;
use crate::common::CreateConfig;
use crate::common::{
check_kill_error, create_io, create_runc, get_spec_from_request, ProcessIO, ShimExecutor,
INIT_PID_FILE,
};
pub type ExecProcess = ProcessTemplate<RuncExecLifecycle>;

View File

@ -0,0 +1,198 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::os::unix::io::RawFd;
use std::path::Path;
use std::sync::Arc;
use log::{debug, warn};
use nix::cmsg_space;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
use nix::sys::termios::tcgetattr;
use nix::sys::uio::IoVec;
use containerd_shim::api::{ExecProcessRequest, Options};
use containerd_shim::io::Stdio;
use containerd_shim::util::IntoOption;
use containerd_shim::{io_error, other, other_error, Error};
use runc::io::{Io, NullIo, FIFO};
use runc::options::GlobalOpts;
use runc::Runc;
pub const GROUP_LABELS: [&str; 2] = [
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
];
pub const INIT_PID_FILE: &str = "init.pid";
pub struct ProcessIO {
pub uri: Option<String>,
pub io: Option<Arc<dyn Io>>,
pub copy: bool,
}
pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
if stdio.is_null() {
let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?;
let pio = ProcessIO {
uri: None,
io: Some(Arc::new(nio)),
copy: false,
};
return Ok(pio);
}
let stdout = stdio.stdout.as_str();
let scheme_path = stdout.trim().split("://").collect::<Vec<&str>>();
let scheme: &str;
let uri: String;
if scheme_path.len() <= 1 {
// no scheme specified
// default schema to fifo
uri = format!("fifo://{}", stdout);
scheme = "fifo"
} else {
uri = stdout.to_string();
scheme = scheme_path[0];
}
let mut pio = ProcessIO {
uri: Some(uri),
io: None,
copy: false,
};
if scheme == "fifo" {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
};
pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
#[derive(Clone)]
pub struct ShimExecutor {}
pub fn get_spec_from_request(
req: &ExecProcessRequest,
) -> containerd_shim::Result<oci_spec::runtime::Process> {
if let Some(val) = req.spec.as_ref() {
let mut p = serde_json::from_slice::<oci_spec::runtime::Process>(val.get_value())?;
p.set_terminal(Some(req.terminal));
Ok(p)
} else {
Err(Error::InvalidArgument("no spec in request".to_string()))
}
}
pub fn check_kill_error(emsg: String) -> Error {
let emsg = emsg.to_lowercase();
if emsg.contains("process already finished")
|| emsg.contains("container not running")
|| emsg.contains("no such process")
{
Error::NotFoundError("process already finished".to_string())
} else if emsg.contains("does not exist") {
Error::NotFoundError("no such container".to_string())
} else {
other!("unknown error after kill {}", emsg)
}
}
const DEFAULT_RUNC_ROOT: &str = "/run/containerd/runc";
const DEFAULT_COMMAND: &str = "runc";
pub fn create_runc<F>(
runtime: &str,
namespace: &str,
bundle: impl AsRef<Path>,
opts: &Options,
executor: F,
) -> containerd_shim::Result<Runc<F>> {
let runtime = if runtime.is_empty() {
DEFAULT_COMMAND
} else {
runtime
};
let root = opts.root.as_str();
let root = Path::new(if root.is_empty() {
DEFAULT_RUNC_ROOT
} else {
root
})
.join(namespace);
let log = bundle.as_ref().join("log.json");
GlobalOpts::default()
.command(runtime)
.root(root)
.log(log)
.log_json()
.systemd_cgroup(opts.systemd_cgroup)
.build_with_executor(executor)
.map_err(other_error!(e, "unable to create runc instance"))
}
#[derive(Default)]
pub(crate) struct CreateConfig {}
pub fn receive_socket(stream_fd: RawFd) -> containerd_shim::Result<RawFd> {
let mut buf = [0u8; 4096];
let iovec = [IoVec::from_mut_slice(&mut buf)];
let mut space = cmsg_space!([RawFd; 2]);
let (path, fds) = match recvmsg(stream_fd, &iovec, Some(&mut space), MsgFlags::empty()) {
Ok(msg) => {
let mut iter = msg.cmsgs();
if let Some(ControlMessageOwned::ScmRights(fds)) = iter.next() {
(iovec[0].as_slice(), fds)
} else {
return Err(other!("received message is empty"));
}
}
Err(e) => {
return Err(other!("failed to receive message: {}", e));
}
};
if fds.is_empty() {
return Err(other!("received message is empty"));
}
let path = String::from_utf8(Vec::from(path)).unwrap_or_else(|e| {
warn!("failed to get path from array {}", e);
"".to_string()
});
let path = path.trim_matches(char::from(0));
debug!(
"copy_console: console socket get path: {}, fd: {}",
path, &fds[0]
);
tcgetattr(fds[0])?;
Ok(fds[0])
}

View File

@ -1,59 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::os::unix::io::RawFd;
use log::{debug, warn};
use nix::cmsg_space;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
use nix::sys::termios::tcgetattr;
use nix::sys::uio::IoVec;
use containerd_shim::other;
use containerd_shim::Error;
pub fn receive_socket(stream_fd: RawFd) -> containerd_shim::Result<RawFd> {
let mut buf = [0u8; 4096];
let iovec = [IoVec::from_mut_slice(&mut buf)];
let mut space = cmsg_space!([RawFd; 2]);
let (path, fds) = match recvmsg(stream_fd, &iovec, Some(&mut space), MsgFlags::empty()) {
Ok(msg) => {
let mut iter = msg.cmsgs();
if let Some(ControlMessageOwned::ScmRights(fds)) = iter.next() {
(iovec[0].as_slice(), fds)
} else {
return Err(other!("received message is empty"));
}
}
Err(e) => {
return Err(other!("failed to receive message: {}", e));
}
};
if fds.is_empty() {
return Err(other!("received message is empty"));
}
let path = String::from_utf8(Vec::from(path)).unwrap_or_else(|e| {
warn!("failed to get path from array {}", e);
"".to_string()
});
let path = path.trim_matches(char::from(0));
debug!(
"copy_console: console socket get path: {}, fd: {}",
path, &fds[0]
);
tcgetattr(fds[0])?;
Ok(fds[0])
}

View File

@ -14,20 +14,15 @@
limitations under the License.
*/
mod cgroup;
mod container;
mod io;
mod runc;
mod service;
mod task;
#[cfg(feature = "async")]
mod asynchronous;
mod console;
mod common;
#[cfg(not(feature = "async"))]
mod synchronous;
#[cfg(not(feature = "async"))]
fn main() {
containerd_shim::run::<crate::service::Service>("io.containerd.runc.v2", None)
containerd_shim::run::<synchronous::Service>("io.containerd.runc.v2", None)
}
#[cfg(feature = "async")]

View File

@ -20,17 +20,17 @@ use std::fs;
use std::io::Read;
use std::path::Path;
use containerd_shim as shim;
use cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid;
use cgroups_rs::{hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem};
use oci_spec::runtime::LinuxResources;
use containerd_shim as shim;
use containerd_shim::api::Options;
use containerd_shim::protos::cgroups::metrics::{
CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics,
};
use containerd_shim::protos::protobuf::well_known_types::Any;
use containerd_shim::protos::protobuf::Message;
use oci_spec::runtime::LinuxResources;
use shim::error::{Error, Result};
use shim::{io_error, other_error};
@ -223,10 +223,11 @@ pub fn update_metrics(pid: u32, resources: &LinuxResources) -> Result<()> {
#[cfg(test)]
mod tests {
use crate::cgroup::{
use cgroups_rs::{hierarchies, Cgroup, CgroupPid};
use crate::synchronous::cgroup::{
add_task_to_cgroup, adjust_oom_score, read_process_oom_score, OOM_SCORE_ADJ_MAX,
};
use cgroups_rs::{hierarchies, Cgroup, CgroupPid};
#[test]
fn test_add_cgroup() {

View File

@ -26,17 +26,20 @@ use oci_spec::runtime::LinuxResources;
use time::OffsetDateTime;
use containerd_shim as shim;
use containerd_shim::console::{ioctl_set_winsz, Console, ConsoleSocket};
use containerd_shim::io::Stdio;
use shim::api::*;
use shim::console::ConsoleSocket;
use shim::error::{Error, Result};
use shim::io::Stdio;
use shim::ioctl_set_winsz;
use shim::protos::cgroups::metrics::Metrics;
use shim::protos::protobuf::well_known_types::Timestamp;
use shim::util::read_pid_from_file;
use shim::Console;
use shim::{io_error, other, other_error};
use crate::console::receive_socket;
use crate::io::{spawn_copy, ProcessIO};
use crate::common::receive_socket;
use crate::common::ProcessIO;
use crate::synchronous::io::spawn_copy;
pub trait ContainerFactory<C> {
fn create(&self, ns: &str, req: &CreateTaskRequest) -> Result<C>;
@ -65,12 +68,7 @@ pub trait Process {
pub trait Container {
fn start(&mut self, exec_id: Option<&str>) -> Result<i32>;
fn state(&self, exec_id: Option<&str>) -> Result<StateResponse>;
fn kill(
&mut self,
exec_id: Option<&str>,
signal: u32,
all: bool,
) -> containerd_shim::Result<()>;
fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()>;
fn wait_channel(&mut self, exec_id: Option<&str>) -> Result<Receiver<i8>>;
fn get_exit_info(&self, exec_id: Option<&str>) -> Result<(i32, i32, Option<OffsetDateTime>)>;
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)>;

View File

@ -14,22 +14,20 @@
limitations under the License.
*/
#[cfg(not(feature = "async"))]
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::sync::Arc;
use std::thread::JoinHandle;
use crossbeam::sync::WaitGroup;
use log::debug;
use containerd_shim::io::Stdio;
use containerd_shim::util::IntoOption;
use containerd_shim::{
error::{Error, Result},
io_error,
};
use runc::io::{Io, NullIo, FIFO};
use crate::common::ProcessIO;
pub fn spawn_copy<R: Read + Send + 'static, W: Write + Send + 'static>(
mut from: R,
@ -51,14 +49,7 @@ pub fn spawn_copy<R: Read + Send + 'static, W: Write + Send + 'static>(
})
}
pub struct ProcessIO {
pub uri: Option<String>,
pub io: Option<Arc<dyn Io>>,
pub copy: bool,
}
impl ProcessIO {
#[cfg(not(feature = "async"))]
pub fn copy(&self, stdio: &Stdio) -> Result<WaitGroup> {
let wg = WaitGroup::new();
if !self.copy {
@ -127,58 +118,4 @@ impl ProcessIO {
Ok(wg)
}
#[cfg(feature = "async")]
pub fn copy(&self, _stdio: &Stdio) -> Result<WaitGroup> {
unimplemented!()
}
}
pub fn create_io(id: &str, _io_uid: u32, _io_gid: u32, stdio: &Stdio) -> Result<ProcessIO> {
if stdio.is_null() {
let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?;
let pio = ProcessIO {
uri: None,
io: Some(Arc::new(nio)),
copy: false,
};
return Ok(pio);
}
let stdout = stdio.stdout.as_str();
let scheme_path = stdout.trim().split("://").collect::<Vec<&str>>();
let scheme: &str;
let uri: String;
if scheme_path.len() <= 1 {
// no scheme specified
// default schema to fifo
uri = format!("fifo://{}", stdout);
scheme = "fifo"
} else {
uri = stdout.to_string();
scheme = scheme_path[0];
}
let mut pio = ProcessIO {
uri: Some(uri),
io: None,
copy: false,
};
if scheme == "fifo" {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
};
pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}

View File

@ -0,0 +1,32 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::sync::Arc;
use containerd_shim::ExitSignal;
mod cgroup;
mod container;
mod io;
mod runc;
mod service;
mod task;
pub(crate) struct Service {
exit: Arc<ExitSignal>,
id: String,
namespace: String,
}

View File

@ -29,37 +29,33 @@ use oci_spec::runtime::{Linux, LinuxNamespaceType, LinuxResources, Spec};
use time::OffsetDateTime;
use containerd_shim as shim;
use containerd_shim::console::{Console, ConsoleSocket};
use containerd_shim::io::Stdio;
use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts};
use runc::{DefaultExecutor, Runc};
use shim::api::*;
use shim::console::ConsoleSocket;
use shim::error::{Error, Result};
use shim::io::Stdio;
use shim::mount::mount_rootfs;
use shim::protos::api::ProcessInfo;
use shim::protos::cgroups::metrics::Metrics;
use shim::protos::protobuf::well_known_types::{Any, Timestamp};
use shim::protos::protobuf::{CodedInputStream, Message, RepeatedField};
use shim::protos::shim::oci::ProcessDetails;
use shim::util::{
new_temp_console_socket, read_spec_from_file, write_options, write_runtime, IntoOption,
JsonOptions,
};
#[cfg(not(feature = "async"))]
use shim::util::{new_temp_console_socket, read_spec_from_file, write_options, write_runtime};
use shim::util::{IntoOption, JsonOptions};
use shim::Console;
use shim::{debug, error, io_error, other, other_error};
use crate::container::{CommonContainer, CommonProcess, Container, ContainerFactory, Process};
use crate::io::create_io;
pub const DEFAULT_RUNC_ROOT: &str = "/run/containerd/runc";
pub const INIT_PID_FILE: &str = "init.pid";
const DEFAULT_COMMAND: &str = "runc";
use crate::common;
use crate::common::{create_io, CreateConfig, INIT_PID_FILE};
use crate::synchronous::container::{
CommonContainer, CommonProcess, Container, ContainerFactory, Process,
};
#[derive(Clone, Default)]
pub(crate) struct RuncFactory {}
#[derive(Clone)]
pub struct ShimExecutor {}
impl ContainerFactory<RuncContainer> for RuncFactory {
fn create(&self, ns: &str, req: &CreateTaskRequest) -> Result<RuncContainer> {
let bundle = req.bundle.as_str();
@ -95,7 +91,7 @@ impl ContainerFactory<RuncContainer> for RuncFactory {
mount_rootfs(mount_type, source, &m.options.to_vec(), rootfs)?;
}
let runc = create_runc(runtime, ns, bundle, &opts, DefaultExecutor {})?;
let runc = common::create_runc(runtime, ns, bundle, &opts, DefaultExecutor {})?;
let id = req.get_id();
let stdio = Stdio {
@ -142,7 +138,6 @@ pub(crate) struct RuncContainer {
}
impl Container for RuncContainer {
#[cfg(not(feature = "async"))]
fn start(&mut self, exec_id: Option<&str>) -> Result<i32> {
match exec_id {
Some(exec_id) => {
@ -213,22 +208,16 @@ impl Container for RuncContainer {
}
}
#[cfg(feature = "async")]
fn start(&mut self, exec_id: Option<&str>) -> Result<i32> {
Err(Error::Unimplemented("start".to_string()))
}
fn state(&self, exec_id: Option<&str>) -> Result<StateResponse> {
self.common.state(exec_id)
}
#[cfg(not(feature = "async"))]
fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()> {
match exec_id {
Some(_) => {
let p = self.common.get_mut_process(exec_id)?;
kill_process(p.pid() as u32, p.exited_at(), signal)
.map_err(|e| check_kill_error(format!("{}", e)))
.map_err(|e| common::check_kill_error(format!("{}", e)))
}
None => self
.common
@ -239,15 +228,10 @@ impl Container for RuncContainer {
signal,
Some(&runc::options::KillOpts { all }),
)
.map_err(|e| check_kill_error(format!("{}", e))),
.map_err(|e| common::check_kill_error(format!("{}", e))),
}
}
#[cfg(feature = "async")]
fn kill(&mut self, exec_id: Option<&str>, signal: u32, all: bool) -> Result<()> {
Err(Error::Unimplemented("kill".to_string()))
}
fn wait_channel(&mut self, exec_id: Option<&str>) -> Result<Receiver<i8>> {
self.common.wait_channel(exec_id)
}
@ -256,7 +240,6 @@ impl Container for RuncContainer {
self.common.get_exit_info(exec_id)
}
#[cfg(not(feature = "async"))]
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> {
let (pid, code, exit_at) = self
.get_exit_info(exec_id_opt)
@ -292,23 +275,12 @@ impl Container for RuncContainer {
Ok((pid, code as u32, time_stamp))
}
#[cfg(feature = "async")]
fn delete(&mut self, exec_id_opt: Option<&str>) -> Result<(i32, u32, Timestamp)> {
Err(Error::Unimplemented("delete".to_string()))
}
#[cfg(not(feature = "async"))]
fn exec(&mut self, req: ExecProcessRequest) -> Result<()> {
self.common
.exec(req)
.map_err(other_error!(e, "failed exec"))
}
#[cfg(feature = "async")]
fn exec(&mut self, req: ExecProcessRequest) -> Result<()> {
Err(Error::Unimplemented("exec".to_string()))
}
fn resize_pty(&mut self, exec_id: Option<&str>, height: u32, width: u32) -> Result<()> {
self.common
.resize_pty(exec_id, height, width)
@ -322,7 +294,7 @@ impl Container for RuncContainer {
#[cfg(target_os = "linux")]
fn stats(&self) -> Result<Metrics> {
let pid = self.common.init.pid() as u32;
crate::cgroup::collect_metrics(pid)
crate::synchronous::cgroup::collect_metrics(pid)
}
#[cfg(not(target_os = "linux"))]
@ -333,7 +305,7 @@ impl Container for RuncContainer {
#[cfg(target_os = "linux")]
fn update(&mut self, resources: &LinuxResources) -> Result<()> {
let pid = self.common.init.pid() as u32;
crate::cgroup::update_metrics(pid, resources)
crate::synchronous::cgroup::update_metrics(pid, resources)
}
#[cfg(not(target_os = "linux"))]
@ -341,12 +313,6 @@ impl Container for RuncContainer {
Err(Error::Unimplemented("update".to_string()))
}
#[cfg(feature = "async")]
fn pids(&self) -> Result<PidsResponse> {
Err(Error::Unimplemented("pids".to_string()))
}
#[cfg(not(feature = "async"))]
fn pids(&self) -> Result<PidsResponse> {
let pids = self
.common
@ -468,7 +434,7 @@ impl InitProcess {
criu_work_path: "".to_string(),
}
}
#[cfg(not(feature = "async"))]
pub fn create(&mut self, _conf: &CreateConfig) -> Result<()> {
//TODO checkpoint support
let id = self.common.id.to_string();
@ -510,10 +476,6 @@ impl InitProcess {
self.common.set_pid_from_file(pid_path.as_path())?;
Ok(())
}
#[cfg(feature = "async")]
pub fn create(&mut self, _conf: &CreateConfig) -> Result<()> {
unimplemented!()
}
}
impl Process for InitProcess {
@ -664,7 +626,7 @@ impl Process for ExecProcess {
impl TryFrom<ExecProcessRequest> for ExecProcess {
type Error = Error;
fn try_from(req: ExecProcessRequest) -> std::result::Result<Self, Self::Error> {
let p = get_spec_from_request(&req)?;
let p = common::get_spec_from_request(&req)?;
let exec_process = ExecProcess {
common: CommonProcess {
state: Status::CREATED,
@ -687,61 +649,3 @@ impl TryFrom<ExecProcessRequest> for ExecProcess {
Ok(exec_process)
}
}
pub fn get_spec_from_request(req: &ExecProcessRequest) -> Result<oci_spec::runtime::Process> {
if let Some(val) = req.spec.as_ref() {
let mut p = serde_json::from_slice::<oci_spec::runtime::Process>(val.get_value())?;
p.set_terminal(Some(req.terminal));
Ok(p)
} else {
Err(Error::InvalidArgument("no spec in request".to_string()))
}
}
#[derive(Default)]
pub(crate) struct CreateConfig {}
pub fn check_kill_error(emsg: String) -> Error {
let emsg = emsg.to_lowercase();
if emsg.contains("process already finished")
|| emsg.contains("container not running")
|| emsg.contains("no such process")
{
Error::NotFoundError("process already finished".to_string())
} else if emsg.contains("does not exist") {
Error::NotFoundError("no such container".to_string())
} else {
other!("unknown error after kill {}", emsg)
}
}
pub fn create_runc<F>(
runtime: &str,
namespace: &str,
bundle: impl AsRef<Path>,
opts: &Options,
executor: F,
) -> Result<Runc<F>> {
let runtime = if runtime.is_empty() {
DEFAULT_COMMAND
} else {
runtime
};
let root = opts.root.as_str();
let root = Path::new(if root.is_empty() {
DEFAULT_RUNC_ROOT
} else {
root
})
.join(namespace);
let log = bundle.as_ref().join("log.json");
GlobalOpts::default()
.command(runtime)
.root(root)
.log(log)
.log_json()
.systemd_cgroup(opts.systemd_cgroup)
.build_with_executor(executor)
.map_err(other_error!(e, "unable to create runc instance"))
}

View File

@ -21,30 +21,23 @@ use std::path::Path;
use std::sync::Arc;
use containerd_shim as shim;
use containerd_shim::util::{read_options, read_runtime, read_spec_from_file, write_address};
use runc::options::{DeleteOpts, GlobalOpts, DEFAULT_COMMAND};
use runc::DefaultExecutor;
use shim::api::*;
use shim::error::{Error, Result};
use shim::monitor::{monitor_subscribe, Subject, Subscription, Topic};
use shim::protos::protobuf::SingularPtrField;
use shim::util::{get_timestamp, read_options, read_runtime, read_spec_from_file, write_address};
use shim::publisher::RemotePublisher;
use shim::util::get_timestamp;
use shim::{debug, error, io_error, other_error, warn};
use shim::{spawn, Config, ExitSignal, RemotePublisher, Shim, StartOpts};
use shim::{spawn, Config, ExitSignal, Shim, StartOpts};
use crate::container::{Container, Process};
use crate::runc::{create_runc, RuncContainer, RuncFactory, DEFAULT_RUNC_ROOT};
use crate::task::ShimTask;
pub const GROUP_LABELS: [&str; 2] = [
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
];
pub(crate) struct Service {
exit: Arc<ExitSignal>,
id: String,
namespace: String,
}
use crate::common::{create_runc, GROUP_LABELS};
use crate::synchronous::container::{Container, Process};
use crate::synchronous::runc::{RuncContainer, RuncFactory};
use crate::synchronous::task::ShimTask;
use crate::synchronous::Service;
impl Shim for Service {
type T = ShimTask<RuncFactory, RuncContainer>;
@ -65,7 +58,7 @@ impl Shim for Service {
}
}
fn start_shim(&mut self, opts: StartOpts) -> Result<String> {
fn start_shim(&mut self, opts: StartOpts) -> containerd_shim::Result<String> {
let mut grouping = opts.id.clone();
let spec = read_spec_from_file("")?;
match spec.annotations() {
@ -83,14 +76,14 @@ impl Shim for Service {
let (child_id, address) = spawn(opts, &grouping, Vec::new())?;
#[cfg(target_os = "linux")]
crate::cgroup::set_cgroup_and_oom_score(child_id)?;
crate::synchronous::cgroup::set_cgroup_and_oom_score(child_id)?;
write_address(&address)?;
Ok(address)
}
#[cfg(not(feature = "async"))]
fn delete_shim(&mut self) -> Result<DeleteResponse> {
fn delete_shim(&mut self) -> containerd_shim::Result<DeleteResponse> {
let namespace = self.namespace.as_str();
let bundle = current_dir().map_err(io_error!(e, "get current dir"))?;
let opts = read_options(&bundle)?;
@ -107,7 +100,7 @@ impl Shim for Service {
}
#[cfg(feature = "async")]
fn delete_shim(&mut self) -> Result<DeleteResponse> {
fn delete_shim(&mut self) -> containerd_shim::Result<DeleteResponse> {
Err(Error::Unimplemented("delete shim".to_string()))
}

View File

@ -30,7 +30,7 @@ use shim::Task;
use shim::{api::*, ExitSignal};
use shim::{TtrpcContext, TtrpcResult};
use crate::container::{Container, ContainerFactory};
use crate::synchronous::container::{Container, ContainerFactory};
pub struct ShimTask<F, C> {
pub containers: Arc<Mutex<HashMap<String, C>>>,

View File

@ -13,12 +13,13 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::env;
use containerd_shim::{Context, RemotePublisher};
use containerd_shim::publisher::RemotePublisher;
use containerd_shim::Context;
use containerd_shim_protos::events::task::TaskOOM;
#[cfg(not(feature = "async"))]
fn main() {
let args: Vec<String> = env::args().collect();
@ -38,9 +39,40 @@ fn main() {
let ctx = Context::default();
println!("Sending event");
publisher
.publish(ctx, "/tasks/oom", "default", event)
.expect("Publish failed");
println!("Done");
}
#[cfg(feature = "async")]
#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();
// Must not start with unix://
let address = args
.get(1)
.ok_or("First argument must be containerd's TTRPC address to publish events")
.unwrap();
println!("Connecting: {}", &address);
let publisher = RemotePublisher::new(address).await.expect("Connect failed");
let mut event = TaskOOM::new();
event.set_container_id("123".into());
let ctx = Context::default();
println!("Sending event");
publisher
.publish(ctx, "/tasks/oom", "default", event)
.await
.expect("Publish failed");
println!("Done");
}

View File

@ -13,73 +13,84 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::sync::Arc;
use log::info;
#[cfg(not(feature = "async"))]
use containerd_shim as shim;
use shim::{api, Config, DeleteResponse, ExitSignal, RemotePublisher, TtrpcContext, TtrpcResult};
#[derive(Clone)]
struct Service {
exit: Arc<ExitSignal>,
}
#[cfg(not(feature = "async"))]
mod skeleton {
use std::sync::Arc;
impl shim::Shim for Service {
type T = Service;
use log::info;
fn new(
_runtime_id: &str,
_id: &str,
_namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
Service {
exit: Arc::new(ExitSignal::default()),
use containerd_shim as shim;
use containerd_shim::synchronous::publisher::RemotePublisher;
use shim::{api, Config, DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult};
#[derive(Clone)]
pub(crate) struct Service {
exit: Arc<ExitSignal>,
}
impl shim::Shim for Service {
type T = Service;
fn new(
_runtime_id: &str,
_id: &str,
_namespace: &str,
_publisher: RemotePublisher,
_config: &mut Config,
) -> Self {
Service {
exit: Arc::new(ExitSignal::default()),
}
}
fn start_shim(&mut self, opts: shim::StartOpts) -> Result<String, shim::Error> {
let grouping = opts.id.clone();
let (_child_id, address) = shim::spawn(opts, &grouping, Vec::new())?;
Ok(address)
}
fn delete_shim(&mut self) -> Result<DeleteResponse, shim::Error> {
Ok(DeleteResponse::new())
}
fn wait(&mut self) {
self.exit.wait();
}
fn create_task_service(&self) -> Self::T {
self.clone()
}
}
fn start_shim(&mut self, opts: shim::StartOpts) -> Result<String, shim::Error> {
let grouping = opts.id.clone();
let (_child_id, address) = shim::spawn(opts, &grouping, Vec::new())?;
Ok(address)
}
impl shim::Task for Service {
fn connect(
&self,
_ctx: &TtrpcContext,
_req: api::ConnectRequest,
) -> TtrpcResult<api::ConnectResponse> {
info!("Connect request");
Ok(api::ConnectResponse {
version: String::from("example"),
..Default::default()
})
}
fn delete_shim(&mut self) -> Result<DeleteResponse, shim::Error> {
Ok(DeleteResponse::new())
}
fn wait(&mut self) {
self.exit.wait();
}
fn create_task_service(&self) -> Self::T {
self.clone()
}
}
impl shim::Task for Service {
fn connect(
&self,
_ctx: &TtrpcContext,
_req: api::ConnectRequest,
) -> TtrpcResult<api::ConnectResponse> {
info!("Connect request");
Ok(api::ConnectResponse {
version: String::from("example"),
..Default::default()
})
}
fn shutdown(&self, _ctx: &TtrpcContext, _req: api::ShutdownRequest) -> TtrpcResult<api::Empty> {
info!("Shutdown request");
self.exit.signal();
Ok(api::Empty::default())
fn shutdown(
&self,
_ctx: &TtrpcContext,
_req: api::ShutdownRequest,
) -> TtrpcResult<api::Empty> {
info!("Shutdown request");
self.exit.signal();
Ok(api::Empty::default())
}
}
}
fn main() {
shim::run::<Service>("io.containerd.empty.v1", None)
#[cfg(not(feature = "async"))]
shim::run::<skeleton::Service>("io.containerd.empty.v1", None)
}

View File

@ -21,7 +21,7 @@ use log::warn;
use tokio::net::{UnixListener, UnixStream};
use uuid::Uuid;
use crate::asynchronous::utils::mkdir;
use crate::asynchronous::util::mkdir;
use crate::Error;
use crate::Result;

View File

@ -40,7 +40,7 @@ use containerd_shim_protos::ttrpc::r#async::Server;
use crate::asynchronous::monitor::monitor_notify_by_pid;
use crate::asynchronous::publisher::RemotePublisher;
use crate::asynchronous::utils::{asyncify, read_file_to_str, write_str_to_file};
use crate::asynchronous::util::{asyncify, read_file_to_str, write_str_to_file};
use crate::error::Error;
use crate::error::Result;
use crate::{
@ -53,7 +53,7 @@ pub mod monitor;
pub mod processes;
pub mod publisher;
pub mod task;
pub mod utils;
pub mod util;
/// Asynchronous Main shim interface that must be implemented by all async shims.
///

View File

@ -23,10 +23,10 @@ use tokio::sync::oneshot::{channel, Receiver, Sender};
use containerd_shim_protos::api::{StateResponse, Status};
use containerd_shim_protos::protobuf::well_known_types::Timestamp;
use crate::asynchronous::utils::asyncify;
use crate::console::{ioctl_set_winsz, Console};
use crate::error::Error;
use crate::asynchronous::util::asyncify;
use crate::io::Stdio;
use crate::Error;
use crate::{ioctl_set_winsz, Console};
#[async_trait]
pub trait Process {

View File

@ -26,7 +26,7 @@ use containerd_shim_protos::ttrpc;
use containerd_shim_protos::ttrpc::context::Context;
use containerd_shim_protos::ttrpc::r#async::TtrpcContext;
use crate::asynchronous::utils::asyncify;
use crate::asynchronous::util::asyncify;
use crate::error::Result;
use crate::util::{any, connect, timestamp};

View File

@ -142,7 +142,7 @@ pub async fn mkdir(path: impl AsRef<Path>, mode: mode_t) -> Result<()> {
#[cfg(test)]
mod tests {
use crate::asynchronous::utils::{read_file_to_str, write_str_to_file};
use crate::asynchronous::util::{read_file_to_str, write_str_to_file};
#[tokio::test]
async fn test_read_write_str() {

View File

@ -33,37 +33,38 @@
//!
use std::collections::hash_map::DefaultHasher;
use std::env;
use std::fs;
use std::fs::File;
use std::hash::Hasher;
use std::io::Write;
use std::os::unix::fs::FileTypeExt;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::RawFd;
use std::os::unix::net::UnixListener;
use std::path::{Path, PathBuf};
use std::process::{self, Command, Stdio};
use std::sync::{Arc, Condvar, Mutex};
use command_fds::{CommandFdExt, FdMapping};
use libc::{c_int, pid_t, SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
pub use log::{debug, error, info, warn};
use signal_hook::iterator::Signals;
pub use containerd_shim_protos as protos;
use protos::protobuf::Message;
use nix::ioctl_write_ptr_bad;
pub use protos::shim::shim::DeleteResponse;
pub use protos::shim::shim_ttrpc::{create_task, Task};
pub use protos::ttrpc::{context::Context, Result as TtrpcResult, TtrpcContext};
use protos::ttrpc::{Client, Server};
pub use protos::ttrpc::{context::Context, Result as TtrpcResult};
#[cfg(feature = "async")]
pub use crate::asynchronous::*;
pub use crate::error::{Error, Result};
use crate::monitor::monitor_notify_by_pid;
pub use crate::publisher::RemotePublisher;
use crate::util::{read_address, write_address};
#[cfg(not(feature = "async"))]
pub use crate::synchronous::*;
#[macro_use]
pub mod error;
mod args;
#[cfg(feature = "async")]
pub mod asynchronous;
pub mod io;
mod logger;
pub mod monitor;
pub mod mount;
mod reap;
#[cfg(not(feature = "async"))]
pub mod synchronous;
pub mod util;
/// Generated request/response structures.
pub mod api {
pub use super::protos::api::Status;
@ -71,18 +72,47 @@ pub mod api {
pub use super::protos::shim::shim::*;
pub use super::protos::types::empty::Empty;
}
mod args;
mod logger;
pub mod monitor;
pub mod mount;
mod publisher;
mod reap;
pub mod util;
#[cfg(feature = "async")]
pub mod asynchronous;
pub mod console;
pub mod io;
macro_rules! cfg_not_async {
($($item:item)*) => {
$(
#[cfg(not(feature = "async"))]
#[cfg_attr(docsrs, doc(cfg(not(feature = "async"))))]
$item
)*
}
}
macro_rules! cfg_async {
($($item:item)*) => {
$(
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
$item
)*
}
}
cfg_not_async! {
pub use crate::synchronous::*;
pub use crate::synchronous::console;
pub use crate::synchronous::publisher;
pub use protos::shim::shim_ttrpc::Task;
pub use protos::ttrpc::TtrpcContext;
}
cfg_async! {
pub use crate::asynchronous::*;
pub use crate::asynchronous::console;
pub use crate::asynchronous::container;
pub use crate::asynchronous::processes;
pub use crate::asynchronous::task;
pub use crate::asynchronous::publisher;
pub use protos::shim_async::Task;
pub use protos::ttrpc::r#async::TtrpcContext;
}
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);
const TTRPC_ADDRESS: &str = "TTRPC_ADDRESS";
@ -116,210 +146,6 @@ pub struct StartOpts {
pub debug: bool,
}
/// Helper structure that wraps atomic bool to signal shim server when to shutdown the TTRPC server.
///
/// Shim implementations are responsible for calling [`Self::signal`].
#[allow(clippy::mutex_atomic)] // Condvar expected to be used with Mutex, not AtomicBool.
#[derive(Default)]
pub struct ExitSignal(Mutex<bool>, Condvar);
#[allow(clippy::mutex_atomic)]
impl ExitSignal {
/// Set exit signal to shutdown shim server.
pub fn signal(&self) {
let (lock, cvar) = (&self.0, &self.1);
let mut exit = lock.lock().unwrap();
*exit = true;
cvar.notify_all();
}
/// Wait for the exit signal to be set.
pub fn wait(&self) {
let (lock, cvar) = (&self.0, &self.1);
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
}
/// Main shim interface that must be implemented by all shims.
///
/// Start and delete routines will be called to handle containerd's shim lifecycle requests.
pub trait Shim {
/// Type to provide task service for the shim.
type T: Task + Send + Sync;
/// Create a new instance of Shim.
///
/// # Arguments
/// - `runtime_id`: identifier of the container runtime.
/// - `id`: identifier of the shim/container, passed in from Containerd.
/// - `namespace`: namespace of the shim/container, passed in from Containerd.
/// - `publisher`: publisher to send events to Containerd.
/// - `config`: for the shim to pass back configuration information
fn new(
runtime_id: &str,
id: &str,
namespace: &str,
publisher: RemotePublisher,
config: &mut Config,
) -> Self;
/// Start shim will be called by containerd when launching new shim instance.
///
/// It expected to return TTRPC address containerd daemon can use to communicate with
/// the given shim instance.
///
/// See https://github.com/containerd/containerd/tree/master/runtime/v2#start
fn start_shim(&mut self, opts: StartOpts) -> Result<String>;
/// Delete shim will be called by containerd after shim shutdown to cleanup any leftovers.
fn delete_shim(&mut self) -> Result<DeleteResponse>;
/// Wait for the shim to exit.
fn wait(&mut self);
/// Create the task service object.
fn create_task_service(&self) -> Self::T;
}
/// Shim entry point that must be invoked from `main`.
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
where
T: Shim + Send + Sync + 'static,
{
if let Some(err) = bootstrap::<T>(runtime_id, opts).err() {
eprintln!("{}: {:?}", runtime_id, err);
process::exit(1);
}
}
fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
where
T: Shim + Send + Sync + 'static,
{
// Parse command line
let os_args: Vec<_> = env::args_os().collect();
let flags = args::parse(&os_args[1..])?;
let ttrpc_address = env::var(TTRPC_ADDRESS)?;
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
// Create shim instance
let mut config = opts.unwrap_or_else(Config::default);
// Setup signals
let signals = setup_signals(&config);
if !config.no_sub_reaper {
reap::set_subreaper()?;
}
let mut shim = T::new(
runtime_id,
&flags.id,
&flags.namespace,
publisher,
&mut config,
);
match flags.action.as_str() {
"start" => {
let args = StartOpts {
id: flags.id,
publish_binary: flags.publish_binary,
address: flags.address,
ttrpc_address,
namespace: flags.namespace,
debug: flags.debug,
};
let address = shim.start_shim(args)?;
std::io::stdout()
.lock()
.write_fmt(format_args!("{}", address))
.map_err(io_error!(e, "write stdout"))?;
Ok(())
}
"delete" => {
std::thread::spawn(move || handle_signals(signals));
let response = shim.delete_shim()?;
let stdout = std::io::stdout();
let mut locked = stdout.lock();
response.write_to_writer(&mut locked)?;
Ok(())
}
_ => {
if !config.no_setup_logger {
logger::init(flags.debug)?;
}
let task = shim.create_task_service();
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;
server.start()?;
info!("Shim successfully started, waiting for exit signal...");
std::thread::spawn(move || handle_signals(signals));
shim.wait();
info!("Shutting down shim instance");
server.shutdown();
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
let address = read_address()?;
remove_socket_silently(&address);
Ok(())
}
}
}
fn setup_signals(config: &Config) -> Signals {
let signals = Signals::new(&[SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed");
if !config.no_reaper {
signals.add_signal(SIGCHLD).expect("add signal failed");
}
signals
}
fn handle_signals(mut signals: Signals) {
loop {
for sig in signals.wait() {
match sig {
SIGTERM | SIGINT => {
debug!("received {}", sig);
return;
}
SIGCHLD => loop {
unsafe {
let pid: pid_t = -1;
let mut status: c_int = 0;
let options: c_int = libc::WNOHANG;
let res_pid = libc::waitpid(pid, &mut status, options);
let status = libc::WEXITSTATUS(status);
if res_pid <= 0 {
break;
} else {
monitor_notify_by_pid(res_pid, status).unwrap_or_else(|e| {
error!("failed to send exit event {}", e);
});
}
}
},
_ => {
debug!("received {}", sig);
}
}
}
}
}
/// The shim process communicates with the containerd server through a communication channel
/// created by containerd. One endpoint of the communication channel is passed to shim process
/// through a file descriptor during forking, which is the fourth(3) file descriptor.
@ -364,121 +190,18 @@ fn start_listener(address: &str) -> std::io::Result<UnixListener> {
let path = parse_sockaddr(address);
// Try to create the needed directory hierarchy.
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)?;
std::fs::create_dir_all(parent)?;
}
UnixListener::bind(path)
}
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
match Client::connect(address) {
Ok(_) => {
return Ok(());
}
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(interval_in_ms));
}
}
}
Err(other!("time out waiting for socket {}", address))
}
fn remove_socket_silently(address: &str) {
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
}
fn remove_socket(address: &str) -> Result<()> {
let path = parse_sockaddr(address);
if let Ok(md) = Path::new(path).metadata() {
if md.file_type().is_socket() {
fs::remove_file(path).map_err(io_error!(e, "remove socket"))?;
}
}
Ok(())
}
/// Spawn is a helper func to launch shim process.
/// Typically this expected to be called from `StartShim`.
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
let address = socket_address(&opts.address, &opts.namespace, grouping);
// Create socket and prepare listener.
// We'll use `add_listener` when creating TTRPC server.
let listener = match start_listener(&address) {
Ok(l) => l,
Err(e) => {
if e.kind() != std::io::ErrorKind::AddrInUse {
return Err(error::Error::IoError {
context: "".to_string(),
err: e,
});
};
if let Ok(()) = wait_socket_working(&address, 5, 200) {
write_address(&address)?;
return Ok((0, address));
}
remove_socket(&address)?;
start_listener(&address).map_err(io_error!(e, ""))?
}
};
let mut command = Command::new(cmd);
command
.current_dir(cwd)
.stdout(Stdio::null())
.stdin(Stdio::null())
.stderr(Stdio::null())
.fd_mappings(vec![FdMapping {
parent_fd: listener.as_raw_fd(),
child_fd: SOCKET_FD,
}])?
.args(&[
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
]);
if opts.debug {
command.arg("-debug");
}
command.envs(vars);
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// Ownership of `listener` has been passed to child.
std::mem::forget(listener);
(child.id(), address)
})
pub struct Console {
pub file: File,
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn exit_signal() {
let signal = Arc::new(ExitSignal::default());
let cloned = Arc::clone(&signal);
let handle = thread::spawn(move || {
cloned.signal();
});
signal.wait();
if let Err(err) = handle.join() {
panic!("{:?}", err);
}
}
use crate::start_listener;
#[test]
fn test_start_listener() {
@ -498,9 +221,9 @@ mod tests {
let path = tmpdir.path().to_str().unwrap().to_owned();
let txt_file = path + "demo.txt";
fs::write(&txt_file, "test").unwrap();
std::fs::write(&txt_file, "test").unwrap();
assert!(start_listener(&txt_file).is_err());
let context = fs::read_to_string(&txt_file).unwrap();
let context = std::fs::read_to_string(&txt_file).unwrap();
assert_eq!(context, "test");
}
}

View File

@ -13,54 +13,12 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::collections::HashMap;
use std::fmt;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Mutex;
use lazy_static::lazy_static;
use log::{error, warn};
use crate::error::Result;
lazy_static! {
pub static ref MONITOR: Mutex<Monitor> = {
let monitor = Monitor {
seq_id: 0,
subscribers: HashMap::new(),
topic_subs: HashMap::new(),
};
Mutex::new(monitor)
};
}
pub fn monitor_subscribe(topic: Topic) -> Result<Subscription> {
let mut monitor = MONITOR.lock().unwrap();
let s = monitor.subscribe(topic)?;
Ok(s)
}
pub fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> {
let monitor = MONITOR.lock().unwrap();
monitor.notify_by_pid(pid, exit_code)
}
pub fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> {
let monitor = MONITOR.lock().unwrap();
monitor.notify_by_exec(id, exec_id, exit_code)
}
pub struct Monitor {
pub(crate) seq_id: i64,
pub(crate) subscribers: HashMap<i64, Subscriber>,
pub(crate) topic_subs: HashMap<Topic, Vec<i64>>,
}
pub(crate) struct Subscriber {
pub(crate) topic: Topic,
pub(crate) tx: Sender<ExitEvent>,
}
#[cfg(feature = "async")]
pub use crate::asynchronous::monitor::*;
#[cfg(not(feature = "async"))]
pub use crate::synchronous::monitor::*;
#[derive(Clone, Eq, Hash, PartialEq)]
pub enum Topic {
@ -69,11 +27,6 @@ pub enum Topic {
All,
}
pub struct Subscription {
pub id: i64,
pub rx: Receiver<ExitEvent>,
}
#[derive(Debug)]
pub struct ExitEvent {
// what kind of a thing exit
@ -106,72 +59,3 @@ pub enum Subject {
// if exec is empty, then the event is for the container
Exec(String, String),
}
impl Monitor {
pub fn subscribe(&mut self, topic: Topic) -> Result<Subscription> {
let (tx, rx) = channel::<ExitEvent>();
let id = self.seq_id;
self.seq_id += 1;
let subscriber = Subscriber {
tx,
topic: topic.clone(),
};
self.subscribers.insert(id, subscriber);
self.topic_subs
.entry(topic)
.or_insert_with(Vec::new)
.push(id);
Ok(Subscription { id, rx })
}
pub fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> {
let subject = Subject::Pid(pid);
self.notify_topic(&Topic::Pid, &subject, exit_code);
self.notify_topic(&Topic::All, &subject, exit_code);
Ok(())
}
pub fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> {
let subject = Subject::Exec(cid.into(), exec_id.into());
self.notify_topic(&Topic::Exec, &subject, exit_code);
self.notify_topic(&Topic::All, &subject, exit_code);
Ok(())
}
fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) {
self.topic_subs.get(topic).map_or((), |subs| {
for i in subs {
self.subscribers.get(i).and_then(|sub| {
sub.tx
.send(ExitEvent {
subject: subject.clone(),
exit_code,
})
.map_err(|e| warn!("failed to send {}", e))
.ok()
});
}
})
}
pub fn unsubscribe(&mut self, id: i64) -> Result<()> {
let sub = self.subscribers.remove(&id);
if let Some(s) = sub {
self.topic_subs.get_mut(&s.topic).map(|v| {
v.iter().position(|&x| x == id).map(|i| {
v.remove(i);
})
});
}
Ok(())
}
}
impl Drop for Subscription {
fn drop(&mut self) {
let mut monitor = MONITOR.lock().unwrap();
monitor.unsubscribe(self.id).unwrap_or_else(|e| {
error!("failed to unsubscribe the subscription {}, {}", self.id, e);
});
}
}

View File

@ -14,18 +14,10 @@
limitations under the License.
*/
use std::fs::File;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use log::warn;
use nix::ioctl_write_ptr_bad;
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);
pub struct Console {
pub file: File,
}
pub struct ConsoleSocket {
pub listener: UnixListener,

View File

@ -0,0 +1,380 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//! A library to implement custom runtime v2 shims for containerd.
//!
//! # Runtime
//! Runtime v2 introduces a first class shim API for runtime authors to integrate with containerd.
//! The shim API is minimal and scoped to the execution lifecycle of a container.
//!
//! This crate simplifies shim v2 runtime development for containerd. It handles common tasks such
//! as command line parsing, setting up shim's TTRPC server, logging, events, etc.
//!
//! Clients are expected to implement [Shim] and [Task] traits with task handling routines.
//! This generally replicates same API as in Go [version](https://github.com/containerd/containerd/blob/main/runtime/v2/example/cmd/main.go).
//!
//! Once implemented, shim's bootstrap code is as easy as:
//! ```text
//! shim::run::<Service>("io.containerd.empty.v1")
//! ```
//!
use std::env;
use std::fs;
use std::io::Write;
use std::os::unix::fs::FileTypeExt;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::process::{self, Command, Stdio};
use std::sync::{Arc, Condvar, Mutex};
use command_fds::{CommandFdExt, FdMapping};
use libc::{c_int, pid_t, SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
pub use log::{debug, error, info, warn};
use signal_hook::iterator::Signals;
use crate::protos::protobuf::Message;
use crate::protos::shim::shim_ttrpc::{create_task, Task};
use crate::protos::ttrpc::{Client, Server};
use util::{read_address, write_address};
use crate::api::DeleteResponse;
use crate::synchronous::publisher::RemotePublisher;
use crate::Error;
use crate::{args, logger, reap, Result, TTRPC_ADDRESS};
use crate::{parse_sockaddr, socket_address, start_listener, Config, StartOpts, SOCKET_FD};
pub mod monitor;
pub mod publisher;
pub mod util;
pub mod console;
/// Helper structure that wraps atomic bool to signal shim server when to shutdown the TTRPC server.
///
/// Shim implementations are responsible for calling [`Self::signal`].
#[allow(clippy::mutex_atomic)] // Condvar expected to be used with Mutex, not AtomicBool.
#[derive(Default)]
pub struct ExitSignal(Mutex<bool>, Condvar);
#[allow(clippy::mutex_atomic)]
impl ExitSignal {
/// Set exit signal to shutdown shim server.
pub fn signal(&self) {
let (lock, cvar) = (&self.0, &self.1);
let mut exit = lock.lock().unwrap();
*exit = true;
cvar.notify_all();
}
/// Wait for the exit signal to be set.
pub fn wait(&self) {
let (lock, cvar) = (&self.0, &self.1);
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
}
/// Main shim interface that must be implemented by all shims.
///
/// Start and delete routines will be called to handle containerd's shim lifecycle requests.
pub trait Shim {
/// Type to provide task service for the shim.
type T: Task + Send + Sync;
/// Create a new instance of Shim.
///
/// # Arguments
/// - `runtime_id`: identifier of the container runtime.
/// - `id`: identifier of the shim/container, passed in from Containerd.
/// - `namespace`: namespace of the shim/container, passed in from Containerd.
/// - `publisher`: publisher to send events to Containerd.
/// - `config`: for the shim to pass back configuration information
fn new(
runtime_id: &str,
id: &str,
namespace: &str,
publisher: RemotePublisher,
config: &mut Config,
) -> Self;
/// Start shim will be called by containerd when launching new shim instance.
///
/// It expected to return TTRPC address containerd daemon can use to communicate with
/// the given shim instance.
///
/// See https://github.com/containerd/containerd/tree/master/runtime/v2#start
fn start_shim(&mut self, opts: StartOpts) -> Result<String>;
/// Delete shim will be called by containerd after shim shutdown to cleanup any leftovers.
fn delete_shim(&mut self) -> Result<DeleteResponse>;
/// Wait for the shim to exit.
fn wait(&mut self);
/// Create the task service object.
fn create_task_service(&self) -> Self::T;
}
/// Shim entry point that must be invoked from `main`.
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
where
T: Shim + Send + Sync + 'static,
{
if let Some(err) = bootstrap::<T>(runtime_id, opts).err() {
eprintln!("{}: {:?}", runtime_id, err);
process::exit(1);
}
}
fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
where
T: Shim + Send + Sync + 'static,
{
// Parse command line
let os_args: Vec<_> = env::args_os().collect();
let flags = args::parse(&os_args[1..])?;
let ttrpc_address = env::var(TTRPC_ADDRESS)?;
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
// Create shim instance
let mut config = opts.unwrap_or_else(Config::default);
// Setup signals
let signals = setup_signals(&config);
if !config.no_sub_reaper {
reap::set_subreaper()?;
}
let mut shim = T::new(
runtime_id,
&flags.id,
&flags.namespace,
publisher,
&mut config,
);
match flags.action.as_str() {
"start" => {
let args = StartOpts {
id: flags.id,
publish_binary: flags.publish_binary,
address: flags.address,
ttrpc_address,
namespace: flags.namespace,
debug: flags.debug,
};
let address = shim.start_shim(args)?;
std::io::stdout()
.lock()
.write_fmt(format_args!("{}", address))
.map_err(io_error!(e, "write stdout"))?;
Ok(())
}
"delete" => {
std::thread::spawn(move || handle_signals(signals));
let response = shim.delete_shim()?;
let stdout = std::io::stdout();
let mut locked = stdout.lock();
response.write_to_writer(&mut locked)?;
Ok(())
}
_ => {
if !config.no_setup_logger {
logger::init(flags.debug)?;
}
let task = shim.create_task_service();
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;
server.start()?;
info!("Shim successfully started, waiting for exit signal...");
std::thread::spawn(move || handle_signals(signals));
shim.wait();
info!("Shutting down shim instance");
server.shutdown();
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
let address = read_address()?;
remove_socket_silently(&address);
Ok(())
}
}
}
fn setup_signals(config: &Config) -> Signals {
let signals = Signals::new(&[SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed");
if !config.no_reaper {
signals.add_signal(SIGCHLD).expect("add signal failed");
}
signals
}
fn handle_signals(mut signals: Signals) {
loop {
for sig in signals.wait() {
match sig {
SIGTERM | SIGINT => {
debug!("received {}", sig);
return;
}
SIGCHLD => loop {
unsafe {
let pid: pid_t = -1;
let mut status: c_int = 0;
let options: c_int = libc::WNOHANG;
let res_pid = libc::waitpid(pid, &mut status, options);
let status = libc::WEXITSTATUS(status);
if res_pid <= 0 {
break;
} else {
monitor::monitor_notify_by_pid(res_pid, status).unwrap_or_else(|e| {
error!("failed to send exit event {}", e);
});
}
}
},
_ => {
debug!("received {}", sig);
}
}
}
}
}
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
match Client::connect(address) {
Ok(_) => {
return Ok(());
}
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(interval_in_ms));
}
}
}
Err(other!("time out waiting for socket {}", address))
}
fn remove_socket_silently(address: &str) {
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
}
fn remove_socket(address: &str) -> Result<()> {
let path = parse_sockaddr(address);
if let Ok(md) = Path::new(path).metadata() {
if md.file_type().is_socket() {
fs::remove_file(path).map_err(io_error!(e, "remove socket"))?;
}
}
Ok(())
}
/// Spawn is a helper func to launch shim process.
/// Typically this expected to be called from `StartShim`.
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
let address = socket_address(&opts.address, &opts.namespace, grouping);
// Create socket and prepare listener.
// We'll use `add_listener` when creating TTRPC server.
let listener = match start_listener(&address) {
Ok(l) => l,
Err(e) => {
if e.kind() != std::io::ErrorKind::AddrInUse {
return Err(Error::IoError {
context: "".to_string(),
err: e,
});
};
if let Ok(()) = wait_socket_working(&address, 5, 200) {
write_address(&address)?;
return Ok((0, address));
}
remove_socket(&address)?;
start_listener(&address).map_err(io_error!(e, ""))?
}
};
let mut command = Command::new(cmd);
command
.current_dir(cwd)
.stdout(Stdio::null())
.stdin(Stdio::null())
.stderr(Stdio::null())
.fd_mappings(vec![FdMapping {
parent_fd: listener.as_raw_fd(),
child_fd: SOCKET_FD,
}])?
.args(&[
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
]);
if opts.debug {
command.arg("-debug");
}
command.envs(vars);
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// Ownership of `listener` has been passed to child.
std::mem::forget(listener);
(child.id(), address)
})
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn exit_signal() {
let signal = Arc::new(ExitSignal::default());
let cloned = Arc::clone(&signal);
let handle = thread::spawn(move || {
cloned.signal();
});
signal.wait();
if let Err(err) = handle.join() {
panic!("{:?}", err);
}
}
}

View File

@ -0,0 +1,137 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Mutex;
use lazy_static::lazy_static;
use log::{error, warn};
use crate::monitor::{ExitEvent, Subject, Topic};
use crate::Result;
lazy_static! {
pub static ref MONITOR: Mutex<Monitor> = {
let monitor = Monitor {
seq_id: 0,
subscribers: HashMap::new(),
topic_subs: HashMap::new(),
};
Mutex::new(monitor)
};
}
pub fn monitor_subscribe(topic: Topic) -> Result<Subscription> {
let mut monitor = MONITOR.lock().unwrap();
let s = monitor.subscribe(topic)?;
Ok(s)
}
pub fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> {
let monitor = MONITOR.lock().unwrap();
monitor.notify_by_pid(pid, exit_code)
}
pub fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> {
let monitor = MONITOR.lock().unwrap();
monitor.notify_by_exec(id, exec_id, exit_code)
}
pub struct Monitor {
pub(crate) seq_id: i64,
pub(crate) subscribers: HashMap<i64, Subscriber>,
pub(crate) topic_subs: HashMap<Topic, Vec<i64>>,
}
pub(crate) struct Subscriber {
pub(crate) topic: Topic,
pub(crate) tx: Sender<ExitEvent>,
}
pub struct Subscription {
pub id: i64,
pub rx: Receiver<ExitEvent>,
}
impl Monitor {
pub fn subscribe(&mut self, topic: Topic) -> Result<Subscription> {
let (tx, rx) = channel::<ExitEvent>();
let id = self.seq_id;
self.seq_id += 1;
let subscriber = Subscriber {
tx,
topic: topic.clone(),
};
self.subscribers.insert(id, subscriber);
self.topic_subs
.entry(topic)
.or_insert_with(Vec::new)
.push(id);
Ok(Subscription { id, rx })
}
pub fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> {
let subject = Subject::Pid(pid);
self.notify_topic(&Topic::Pid, &subject, exit_code);
self.notify_topic(&Topic::All, &subject, exit_code);
Ok(())
}
pub fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> {
let subject = Subject::Exec(cid.into(), exec_id.into());
self.notify_topic(&Topic::Exec, &subject, exit_code);
self.notify_topic(&Topic::All, &subject, exit_code);
Ok(())
}
fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) {
self.topic_subs.get(topic).map_or((), |subs| {
for i in subs {
self.subscribers.get(i).and_then(|sub| {
sub.tx
.send(ExitEvent {
subject: subject.clone(),
exit_code,
})
.map_err(|e| warn!("failed to send {}", e))
.ok()
});
}
})
}
pub fn unsubscribe(&mut self, id: i64) -> Result<()> {
let sub = self.subscribers.remove(&id);
if let Some(s) = sub {
self.topic_subs.get_mut(&s.topic).map(|v| {
v.iter().position(|&x| x == id).map(|i| {
v.remove(i);
})
});
}
Ok(())
}
}
impl Drop for Subscription {
fn drop(&mut self) {
let mut monitor = MONITOR.lock().unwrap();
monitor.unsubscribe(self.id).unwrap_or_else(|e| {
error!("failed to unsubscribe the subscription {}, {}", self.id, e);
});
}
}

View File

@ -0,0 +1,154 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::env;
use std::fs::{rename, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::net::UnixListener;
use std::path::Path;
use log::warn;
use nix::sys::stat::Mode;
use nix::unistd::mkdir;
use oci_spec::runtime::Spec;
use uuid::Uuid;
use containerd_shim_protos::shim::oci::Options;
use crate::console::ConsoleSocket;
use crate::util::{JsonOptions, OPTIONS_FILE_NAME, RUNTIME_FILE_NAME};
use crate::Error;
pub fn read_file_to_str<P: AsRef<Path>>(filename: P) -> crate::Result<String> {
let mut file = File::open(&filename).map_err(io_error!(
e,
"open {}",
filename.as_ref().to_string_lossy()
))?;
let mut content: String = String::new();
file.read_to_string(&mut content).map_err(io_error!(
e,
"read {}",
filename.as_ref().to_string_lossy()
))?;
Ok(content)
}
pub fn read_options(bundle: impl AsRef<Path>) -> crate::Result<Options> {
let path = bundle.as_ref().join(OPTIONS_FILE_NAME);
let opts_str = read_file_to_str(path)?;
let json_opt: JsonOptions = serde_json::from_str(&opts_str)?;
Ok(json_opt.into())
}
pub fn read_runtime(bundle: impl AsRef<Path>) -> crate::Result<String> {
let path = bundle.as_ref().join(RUNTIME_FILE_NAME);
read_file_to_str(path)
}
pub fn read_address() -> crate::Result<String> {
let path = Path::new("address");
read_file_to_str(path)
}
pub fn read_pid_from_file(pid_path: &Path) -> crate::Result<i32> {
let pid_str = read_file_to_str(pid_path)?;
let pid = pid_str.parse::<i32>()?;
Ok(pid)
}
pub fn write_str_to_path(filename: &Path, s: &str) -> crate::Result<()> {
let file = filename
.file_name()
.ok_or_else(|| Error::InvalidArgument(String::from("pid path illegal")))?;
let tmp_path = filename
.parent()
.map(|x| x.join(format!(".{}", file.to_str().unwrap_or(""))))
.ok_or_else(|| Error::InvalidArgument(String::from("failed to create tmp path")))?;
let tmp_path = tmp_path
.to_str()
.ok_or_else(|| Error::InvalidArgument(String::from("failed to get path")))?;
let mut f = OpenOptions::new()
.write(true)
.create_new(true)
.open(tmp_path)
.map_err(io_error!(e, "open {}", filename.to_str().unwrap()))?;
f.write_all(s.as_bytes())
.map_err(io_error!(e, "write tmp file"))?;
rename(tmp_path, filename).map_err(io_error!(
e,
"rename tmp file to {}",
filename.to_str().unwrap()
))?;
Ok(())
}
pub fn write_options(bundle: &str, opt: &Options) -> crate::Result<()> {
let json_opt = JsonOptions::from(opt.to_owned());
let opts_str = serde_json::to_string(&json_opt)?;
let path = Path::new(bundle).join(OPTIONS_FILE_NAME);
write_str_to_path(path.as_path(), opts_str.as_str())
}
pub fn write_runtime(bundle: &str, binary_name: &str) -> crate::Result<()> {
let path = Path::new(bundle).join(RUNTIME_FILE_NAME);
write_str_to_path(path.as_path(), binary_name)
}
pub fn write_address(address: &str) -> crate::Result<()> {
let path = Path::new("address");
write_str_to_path(path, address)
}
pub fn read_spec_from_file(bundle: &str) -> crate::Result<Spec> {
let path = Path::new(bundle).join("config.json");
Spec::load(path).map_err(other_error!(e, "read spec file"))
}
pub fn new_temp_console_socket() -> crate::Result<ConsoleSocket> {
let dir = env::var("XDG_RUNTIME_DIR")
.map(|runtime_dir| format!("{}/pty{}", runtime_dir, Uuid::new_v4(),))?;
mkdir(Path::new(&dir), Mode::from_bits(0o711).unwrap())?;
let file_name = Path::new(&dir).join("pty.sock");
let listener = UnixListener::bind(file_name.as_path()).map_err(io_error!(
e,
"bind socket {}",
file_name.display()
))?;
Ok(ConsoleSocket {
listener,
path: file_name,
rmdir: true,
})
}
/// A helper to help remove temperate file or dir when it became useless
pub struct HelperRemoveFile {
path: String,
}
impl HelperRemoveFile {
pub fn new(path: String) -> Self {
Self { path }
}
}
impl Drop for HelperRemoveFile {
fn drop(&mut self) {
std::fs::remove_file(&self.path)
.unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e));
}
}

View File

@ -14,28 +14,19 @@
limitations under the License.
*/
use std::env;
use std::fs::{rename, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::io::RawFd;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use log::warn;
use nix::sys::stat::Mode;
use nix::unistd::mkdir;
use oci_spec::runtime::Spec;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use containerd_shim_protos::protobuf::well_known_types::Any;
use containerd_shim_protos::protobuf::Message;
use crate::api::Options;
use crate::console::ConsoleSocket;
use crate::error::{Error, Result};
use crate::protos::protobuf::well_known_types::Timestamp;
#[cfg(feature = "async")]
pub use crate::asynchronous::util::*;
use crate::error::Result;
use crate::protos::protobuf::well_known_types::{Any, Timestamp};
use crate::protos::protobuf::Message;
#[cfg(not(feature = "async"))]
pub use crate::synchronous::util::*;
pub const CONFIG_FILE_NAME: &str = "config.json";
pub const OPTIONS_FILE_NAME: &str = "options.json";
@ -103,92 +94,6 @@ impl From<JsonOptions> for Options {
}
}
pub fn read_file_to_str<P: AsRef<Path>>(filename: P) -> Result<String> {
let mut file = File::open(&filename).map_err(io_error!(
e,
"open {}",
filename.as_ref().to_string_lossy()
))?;
let mut content: String = String::new();
file.read_to_string(&mut content).map_err(io_error!(
e,
"read {}",
filename.as_ref().to_string_lossy()
))?;
Ok(content)
}
pub fn read_options(bundle: impl AsRef<Path>) -> Result<Options> {
let path = bundle.as_ref().join(OPTIONS_FILE_NAME);
let opts_str = read_file_to_str(path)?;
let json_opt: JsonOptions = serde_json::from_str(&opts_str)?;
Ok(json_opt.into())
}
pub fn read_runtime(bundle: impl AsRef<Path>) -> Result<String> {
let path = bundle.as_ref().join(RUNTIME_FILE_NAME);
read_file_to_str(path)
}
pub fn read_address() -> Result<String> {
let path = Path::new("address");
read_file_to_str(path)
}
pub fn read_pid_from_file(pid_path: &Path) -> Result<i32> {
let pid_str = read_file_to_str(pid_path)?;
let pid = pid_str.parse::<i32>()?;
Ok(pid)
}
pub fn write_str_to_path(filename: &Path, s: &str) -> Result<()> {
let file = filename
.file_name()
.ok_or_else(|| Error::InvalidArgument(String::from("pid path illegal")))?;
let tmp_path = filename
.parent()
.map(|x| x.join(format!(".{}", file.to_str().unwrap_or(""))))
.ok_or_else(|| Error::InvalidArgument(String::from("failed to create tmp path")))?;
let tmp_path = tmp_path
.to_str()
.ok_or_else(|| Error::InvalidArgument(String::from("failed to get path")))?;
let mut f = OpenOptions::new()
.write(true)
.create_new(true)
.open(tmp_path)
.map_err(io_error!(e, "open {}", filename.to_str().unwrap()))?;
f.write_all(s.as_bytes())
.map_err(io_error!(e, "write tmp file"))?;
rename(tmp_path, filename).map_err(io_error!(
e,
"rename tmp file to {}",
filename.to_str().unwrap()
))?;
Ok(())
}
pub fn write_options(bundle: &str, opt: &Options) -> Result<()> {
let json_opt = JsonOptions::from(opt.to_owned());
let opts_str = serde_json::to_string(&json_opt)?;
let path = Path::new(bundle).join(OPTIONS_FILE_NAME);
write_str_to_path(path.as_path(), opts_str.as_str())
}
pub fn write_runtime(bundle: &str, binary_name: &str) -> Result<()> {
let path = Path::new(bundle).join(RUNTIME_FILE_NAME);
write_str_to_path(path.as_path(), binary_name)
}
pub fn write_address(address: &str) -> Result<()> {
let path = Path::new("address");
write_str_to_path(path, address)
}
pub fn read_spec_from_file(bundle: &str) -> Result<Spec> {
let path = Path::new(bundle).join("config.json");
Spec::load(path).map_err(other_error!(e, "read spec file"))
}
pub fn get_timestamp() -> Result<Timestamp> {
let mut timestamp = Timestamp::new();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
@ -281,37 +186,3 @@ impl AsOption for str {
}
}
}
/// A helper to help remove temperate file or dir when it became useless
pub struct HelperRemoveFile {
path: String,
}
impl HelperRemoveFile {
pub fn new(path: String) -> Self {
Self { path }
}
}
impl Drop for HelperRemoveFile {
fn drop(&mut self) {
std::fs::remove_file(&self.path)
.unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e));
}
}
pub fn new_temp_console_socket() -> Result<ConsoleSocket> {
let dir = env::var("XDG_RUNTIME_DIR")
.map(|runtime_dir| format!("{}/pty{}", runtime_dir, Uuid::new_v4(),))?;
mkdir(Path::new(&dir), Mode::from_bits(0o711).unwrap())?;
let file_name = Path::new(&dir).join("pty.sock");
let listener = UnixListener::bind(file_name.as_path()).map_err(io_error!(
e,
"bind socket {}",
file_name.display()
))?;
Ok(ConsoleSocket {
listener,
path: file_name,
rmdir: true,
})
}