From 117263d9be92f1144ac77b82da0ff1823f62d926 Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Thu, 17 Feb 2022 15:19:24 +0800 Subject: [PATCH] Move Io and Console Signed-off-by: Zhang Tianyang --- crates/runc-shim/Cargo.toml | 3 + crates/runc-shim/src/runc.rs | 18 +-- crates/runc-shim/src/service.rs | 2 +- crates/runc/Cargo.toml | 2 +- crates/runc/src/console.rs | 55 +++++++++ crates/runc/src/io.rs | 192 +++++++++++++++++--------------- crates/runc/src/lib.rs | 3 +- crates/runc/src/options.rs | 4 +- crates/runc/src/utils.rs | 3 +- crates/shim/Cargo.toml | 2 +- crates/shim/src/container.rs | 41 +------ crates/shim/src/io.rs | 104 +---------------- crates/shim/src/lib.rs | 2 +- 13 files changed, 186 insertions(+), 245 deletions(-) create mode 100644 crates/runc/src/console.rs diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index da480b5..f9b3571 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -7,6 +7,9 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +async = [] + [dependencies] containerd-shim = { path = "../shim", version = "0.2.0" } nix = "0.23.1" diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index b6a6a56..6683812 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -24,14 +24,13 @@ use nix::sys::signal::kill; use nix::sys::stat::Mode; use nix::unistd::{mkdir, Pid}; use oci_spec::runtime::LinuxNamespaceType; +use runc::console::{Console, ConsoleSocket}; use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts}; use runc::utils::new_temp_console_socket; use shim::api::*; -use shim::container::{ - CommonContainer, CommonProcess, ConsoleSocket, Container, ContainerFactory, Process, -}; +use shim::container::{CommonContainer, CommonProcess, Container, ContainerFactory, Process}; use shim::error::{Error, Result}; -use shim::io::{create_io, Console, Stdio}; +use shim::io::{create_io, Stdio}; use shim::mount::mount_rootfs; use shim::protos::protobuf::{well_known_types::Timestamp, CodedInputStream, Message}; use shim::util::{read_spec_from_file, write_options, write_runtime, IntoOption}; @@ -155,7 +154,7 @@ impl Container for RuncContainer { .common .processes .get_mut(exec_id) - .ok_or(other!("can not find the exec by id"))?; + .ok_or_else(|| other!("can not find the exec by id"))?; let pid_path = Path::new(self.common.bundle.as_str()) .join(format!("{}.pid", &process.common.id)); @@ -165,7 +164,7 @@ impl Container for RuncContainer { console_socket: None, detach: true, }; - let socket: Option = if process.common.stdio.terminal { + let socket = if process.common.stdio.terminal { let s = new_temp_console_socket().map_err(other_error!(e, ""))?; exec_opts.console_socket = Some(s.path.to_owned()); Some(s) @@ -194,7 +193,8 @@ impl Container for RuncContainer { .exec(&self.common.id, &process.spec, Some(&exec_opts)) .map_err(other_error!(e, "failed exec"))?; if process.common.stdio.terminal { - let console_socket = socket.ok_or(other!("failed to get console socket"))?; + let console_socket = + socket.ok_or_else(|| other!("failed to get console socket"))?; let console = process.common.copy_console(&console_socket)?; process.common.console = Some(console); } else { @@ -414,7 +414,7 @@ impl InitProcess { .no_pivot(self.no_pivot_root) .no_new_keyring(self.no_new_key_ring) .detach(false); - let socket: Option = if terminal { + let socket = if terminal { let s = new_temp_console_socket().map_err(other_error!(e, ""))?; create_opts.console_socket = Some(s.path.to_owned()); Some(s) @@ -435,7 +435,7 @@ impl InitProcess { .create(&id, &bundle, Some(&create_opts)) .map_err(other_error!(e, "failed create"))?; if terminal { - let console_socket = socket.ok_or(other!("failed to get console socket"))?; + let console_socket = socket.ok_or_else(|| other!("failed to get console socket"))?; let console = self.common.copy_console(&console_socket)?; self.common.console = Some(console); } else { diff --git a/crates/runc-shim/src/service.rs b/crates/runc-shim/src/service.rs index deef864..6bbe1ff 100644 --- a/crates/runc-shim/src/service.rs +++ b/crates/runc-shim/src/service.rs @@ -112,7 +112,7 @@ impl Shim for Service { let root = Path::new(if root.is_empty() { DEFAULT_RUNC_ROOT } else { - root.as_ref() + root }) .join(namespace); let log_buf = Path::new(bundle).join("log.json"); diff --git a/crates/runc/Cargo.toml b/crates/runc/Cargo.toml index 940b4c6..5bb0f7e 100644 --- a/crates/runc/Cargo.toml +++ b/crates/runc/Cargo.toml @@ -26,7 +26,7 @@ tempfile = "3.3.0" thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde", "std"] } uuid = { version = "0.8.2", features = ["v4"] } -containerd-shim = { path = "../shim", version = "0.2.0" } +os_pipe = "1.0.0" # Async dependencies tokio = { version = "1.15.0", features = ["full"], optional = true } diff --git a/crates/runc/src/console.rs b/crates/runc/src/console.rs new file mode 100644 index 0000000..ab4c5bd --- /dev/null +++ b/crates/runc/src/console.rs @@ -0,0 +1,55 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::fs::File; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::PathBuf; + +use log::warn; +use nix::sys::termios::Termios; + +pub struct Console { + pub file: File, + pub termios: Termios, +} + +pub struct ConsoleSocket { + pub listener: UnixListener, + pub path: PathBuf, + pub rmdir: bool, +} + +impl ConsoleSocket { + pub fn accept(&self) -> std::io::Result { + let (stream, _addr) = self.listener.accept()?; + Ok(stream) + } +} + +impl Drop for ConsoleSocket { + fn drop(&mut self) { + if self.rmdir { + let tmp_socket_dir = self.path.parent().unwrap(); + std::fs::remove_dir_all(tmp_socket_dir).unwrap_or_else(|e| { + warn!( + "remove tmp console socket path {} : {}", + tmp_socket_dir.to_str().unwrap(), + e + ) + }) + } + } +} diff --git a/crates/runc/src/io.rs b/crates/runc/src/io.rs index 2332ab4..9242bca 100644 --- a/crates/runc/src/io.rs +++ b/crates/runc/src/io.rs @@ -13,28 +13,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -use nix::unistd::{Gid, Uid}; use std::fmt::Debug; -use std::fs::File; -use std::io::Result; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Result, Write}; +use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::{AsRawFd, FromRawFd}; use std::sync::Mutex; +use log::debug; +use nix::unistd::{Gid, Uid}; +use os_pipe::{PipeReader, PipeWriter}; + use crate::Command; -pub trait Io: Debug + Sync + Send { +pub trait Io: Debug + Send + Sync { /// Return write side of stdin - fn stdin(&self) -> Option { + fn stdin(&self) -> Option> { None } /// Return read side of stdout - fn stdout(&self) -> Option { + fn stdout(&self) -> Option> { None } /// Return read side of stderr - fn stderr(&self) -> Option { + fn stderr(&self) -> Option> { None } @@ -68,42 +72,8 @@ impl Default for IOOption { /// When one side of the pipe is closed, the state will be represented with [`None`]. #[derive(Debug)] pub struct Pipe { - // Might be ugly hack: using mutex in order to take rd/wr under immutable [`Pipe`] - rd: Mutex>, - wr: Mutex>, -} - -impl Pipe { - pub fn new() -> std::io::Result { - let (r, w) = nix::unistd::pipe()?; - let (rd, wr) = unsafe { - ( - Mutex::new(Some(File::from_raw_fd(r))), - Mutex::new(Some(File::from_raw_fd(w))), - ) - }; - Ok(Self { rd, wr }) - } - - pub fn take_read(&self) -> Option { - let mut m = self.rd.lock().unwrap(); - m.take() - } - - pub fn take_write(&self) -> Option { - let mut m = self.wr.lock().unwrap(); - m.take() - } - - pub fn close_read(&self) { - let mut m = self.rd.lock().unwrap(); - let _ = m.take(); - } - - pub fn close_write(&self) { - let mut m = self.wr.lock().unwrap(); - let _ = m.take(); - } + rd: PipeReader, + wr: PipeWriter, } #[derive(Debug)] @@ -113,6 +83,12 @@ pub struct PipedIo { stderr: Option, } +impl Pipe { + fn new() -> std::io::Result { + let (rd, wr) = os_pipe::pipe()?; + Ok(Self { rd, wr }) + } +} impl PipedIo { pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result { Ok(Self { @@ -133,71 +109,75 @@ impl PipedIo { } let pipe = Pipe::new()?; - let guard = if stdin { - pipe.rd.lock().unwrap() + let uid = Some(Uid::from_raw(uid)); + let gid = Some(Gid::from_raw(gid)); + if stdin { + let rd = pipe.rd.try_clone()?; + nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?; } else { - pipe.wr.lock().unwrap() - }; - if let Some(f) = guard.as_ref() { - let uid = Some(Uid::from_raw(uid)); - let gid = Some(Gid::from_raw(gid)); - nix::unistd::fchown(f.as_raw_fd(), uid, gid)?; + let wr = pipe.wr.try_clone()?; + nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?; } - drop(guard); - Ok(Some(pipe)) } } impl Io for PipedIo { - fn stdin(&self) -> Option { - self.stdin.as_ref().map(|v| v.take_write()).flatten() + fn stdin(&self) -> Option> { + self.stdin.as_ref().and_then(|pipe| { + pipe.wr + .try_clone() + .map(|x| Box::new(x) as Box) + .ok() + }) } - fn stdout(&self) -> Option { - self.stdout.as_ref().map(|v| v.take_read()).flatten() + fn stdout(&self) -> Option> { + self.stdout.as_ref().and_then(|pipe| { + pipe.rd + .try_clone() + .map(|x| Box::new(x) as Box) + .ok() + }) } - fn stderr(&self) -> Option { - self.stderr.as_ref().map(|v| v.take_read()).flatten() + fn stderr(&self) -> Option> { + self.stderr.as_ref().and_then(|pipe| { + pipe.rd + .try_clone() + .map(|x| Box::new(x) as Box) + .ok() + }) } // Note that this internally use [`std::fs::File`]'s `try_clone()`. // Thus, the files passed to commands will be not closed after command exit. fn set(&self, cmd: &mut Command) -> std::io::Result<()> { - if let Some(ref p) = self.stdin { - let m = p.rd.lock().unwrap(); - if let Some(stdin) = &*m { - let f = stdin.try_clone()?; - cmd.stdin(f); - } + if let Some(p) = self.stdin.as_ref() { + let pr = p.rd.try_clone()?; + cmd.stdin(pr); } - if let Some(ref p) = self.stdout { - let m = p.wr.lock().unwrap(); - if let Some(f) = &*m { - let f = f.try_clone()?; - cmd.stdout(f); - } + if let Some(p) = self.stdout.as_ref() { + let pw = p.wr.try_clone()?; + cmd.stdout(pw); } - if let Some(ref p) = self.stderr { - let m = p.wr.lock().unwrap(); - if let Some(f) = &*m { - let f = f.try_clone()?; - cmd.stderr(f); - } + if let Some(p) = self.stderr.as_ref() { + let pw = p.wr.try_clone()?; + cmd.stdout(pw); } Ok(()) } fn close_after_start(&self) { - if let Some(ref p) = self.stdout { - p.close_write(); + if let Some(p) = self.stdout.as_ref() { + nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e)); } - if let Some(ref p) = self.stderr { - p.close_write(); + + if let Some(p) = self.stderr.as_ref() { + nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e)); } } } @@ -235,6 +215,40 @@ impl Io for NullIo { } } +/// FIFO for the scenario that set FIFO for command Io. +#[derive(Debug)] +pub struct FIFO { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, +} + +impl Io for FIFO { + fn set(&self, cmd: &mut Command) -> Result<()> { + if let Some(path) = self.stdin.as_ref() { + let stdin = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NONBLOCK) + .open(path)?; + cmd.stdin(stdin); + } + + if let Some(path) = self.stdout.as_ref() { + let stdout = OpenOptions::new().write(true).open(path)?; + cmd.stdout(stdout); + } + + if let Some(path) = self.stderr.as_ref() { + let stderr = OpenOptions::new().write(true).open(path)?; + cmd.stderr(stderr); + } + + Ok(()) + } + + fn close_after_start(&self) {} +} + #[cfg(test)] mod tests { use super::*; @@ -267,21 +281,17 @@ mod tests { let mut stdin = io.stdin().unwrap(); stdin.write_all(&buf).unwrap(); buf[0] = 0x0; - io.stdin.as_ref().map(|v| { - v.rd.lock() - .unwrap() - .as_ref() - .unwrap() - .read(&mut buf) - .unwrap() - }); + + io.stdin + .as_ref() + .map(|v| v.rd.try_clone().unwrap().read(&mut buf).unwrap()); assert_eq!(&buf, &[0xfau8]); let mut stdout = io.stdout().unwrap(); buf[0] = 0xce; io.stdout .as_ref() - .map(|v| v.wr.lock().unwrap().as_ref().unwrap().write(&buf).unwrap()); + .map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap()); buf[0] = 0x0; stdout.read_exact(&mut buf).unwrap(); assert_eq!(&buf, &[0xceu8]); @@ -290,7 +300,7 @@ mod tests { buf[0] = 0xa5; io.stderr .as_ref() - .map(|v| v.wr.lock().unwrap().as_ref().unwrap().write(&buf).unwrap()); + .map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap()); buf[0] = 0x0; stderr.read_exact(&mut buf).unwrap(); assert_eq!(&buf, &[0xa5u8]); diff --git a/crates/runc/src/lib.rs b/crates/runc/src/lib.rs index 80b7cee..5afce7c 100644 --- a/crates/runc/src/lib.rs +++ b/crates/runc/src/lib.rs @@ -40,10 +40,11 @@ use std::process::ExitStatus; use oci_spec::runtime::{Linux, Process}; -// suspended for difficulties +pub mod console; pub mod container; pub mod error; pub mod events; +pub mod io; #[cfg(feature = "async")] pub mod monitor; pub mod options; diff --git a/crates/runc/src/options.rs b/crates/runc/src/options.rs index de4e37f..3049ee6 100644 --- a/crates/runc/src/options.rs +++ b/crates/runc/src/options.rs @@ -37,10 +37,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use containerd_shim as shim; -use shim::io::Io; - use crate::error::Error; +use crate::io::Io; use crate::utils; use crate::{LogFormat, Runc}; diff --git a/crates/runc/src/utils.rs b/crates/runc/src/utils.rs index 251a411..bdc786c 100644 --- a/crates/runc/src/utils.rs +++ b/crates/runc/src/utils.rs @@ -20,14 +20,13 @@ use std::env; use std::os::unix::net::UnixListener; use std::path::{Path, PathBuf}; -use containerd_shim as shim; use nix::sys::stat::Mode; use nix::unistd::mkdir; use path_absolutize::*; -use shim::container::ConsoleSocket; use tempfile::{Builder, NamedTempFile}; use uuid::Uuid; +use crate::console::ConsoleSocket; use crate::error::Error; // helper to resolve path (such as path for runc binary, pid files, etc. ) diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index 766224b..cde9583 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -27,7 +27,7 @@ signal-hook = "0.3.13" oci-spec = "0.5.4" containerd-shim-protos = { path = "../shim-protos", version = "0.1.2" } -runc = { path = "../runc"} +runc = { path = "../runc" } [dev-dependencies] tempfile = "3.0" diff --git a/crates/shim/src/container.rs b/crates/shim/src/container.rs index 566f7e9..68df449 100644 --- a/crates/shim/src/container.rs +++ b/crates/shim/src/container.rs @@ -18,8 +18,7 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::fs::{File, OpenOptions}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::os::unix::net::{UnixListener, UnixStream}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use log::{debug, warn}; @@ -27,11 +26,12 @@ use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}; use nix::sys::termios::tcgetattr; use nix::sys::uio::IoVec; use nix::{cmsg_space, ioctl_write_ptr_bad}; +use runc::console::{Console, ConsoleSocket}; use time::OffsetDateTime; use crate::api::*; use crate::error::{Error, Result}; -use crate::io::{spawn_copy, Console, ProcessIO, Stdio}; +use crate::io::{spawn_copy, ProcessIO, Stdio}; use crate::protos::protobuf::well_known_types::Timestamp; use crate::util::read_pid_from_file; @@ -243,7 +243,9 @@ impl Process for CommonProcess { fn copy_console(&self, console_socket: &ConsoleSocket) -> Result { debug!("copy_console: waiting for runtime to send console fd"); - let stream = console_socket.accept()?; + let stream = console_socket + .accept() + .map_err(io_error!(e, "accept console socket"))?; let mut buf = [0u8; 4096]; let iovec = [IoVec::from_mut_slice(&mut buf)]; let mut space = cmsg_space!([RawFd; 2]); @@ -347,34 +349,3 @@ impl Process for CommonProcess { } } } - -pub struct ConsoleSocket { - pub listener: UnixListener, - pub path: PathBuf, - pub rmdir: bool, -} - -impl ConsoleSocket { - pub fn accept(&self) -> Result { - let (stream, _addr) = self - .listener - .accept() - .map_err(io_error!(e, "accept console socket"))?; - Ok(stream) - } -} - -impl Drop for ConsoleSocket { - fn drop(&mut self) { - if self.rmdir { - let tmp_socket_dir = self.path.parent().unwrap(); - std::fs::remove_dir_all(tmp_socket_dir).unwrap_or_else(|e| { - warn!( - "remove tmp console socket path {} : {}", - tmp_socket_dir.to_str().unwrap(), - e - ) - }) - } - } -} diff --git a/crates/shim/src/io.rs b/crates/shim/src/io.rs index 076dd42..d93ac29 100644 --- a/crates/shim/src/io.rs +++ b/crates/shim/src/io.rs @@ -16,114 +16,17 @@ use std::fmt::Debug; use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; -use std::os::unix::fs::OpenOptionsExt; -use std::process::Command; use std::sync::Arc; use std::thread::JoinHandle; use crossbeam::sync::WaitGroup; use log::debug; use nix::sys::termios::Termios; +use runc::io::{Io, NullIo, FIFO}; use crate::error::{Error, Result}; use crate::util::IntoOption; -pub trait WriteCloser: Write { - fn close(&self); -} - -pub trait Io: Sync + Send { - /// Return write side of stdin - fn stdin(&self) -> Option>; - - /// Return read side of stdout - fn stdout(&self) -> Option>; - - /// Return read side of stderr - fn stderr(&self) -> Option>; - - /// Set IO for passed command. - /// Read side of stdin, write side of stdout and write side of stderr should be provided to command. - fn set(&self, cmd: &mut Command) -> Result<()>; - - /// Only close write side (should be stdout/err "from" runc process) - fn close_after_start(&self); -} - -pub struct NullIo {} - -impl Io for NullIo { - fn stdin(&self) -> Option> { - None - } - - fn stdout(&self) -> Option> { - None - } - - fn stderr(&self) -> Option> { - None - } - - fn set(&self, cmd: &mut Command) -> Result<()> { - cmd.stdout(std::process::Stdio::null()); - cmd.stderr(std::process::Stdio::null()); - Ok(()) - } - - fn close_after_start(&self) {} -} - -pub struct FIFO { - pub stdin: Option, - pub stdout: Option, - pub stderr: Option, -} - -impl Io for FIFO { - fn stdin(&self) -> Option> { - None - } - - fn stdout(&self) -> Option> { - None - } - - fn stderr(&self) -> Option> { - None - } - - fn set(&self, cmd: &mut Command) -> Result<()> { - let r: Option> = self.stdin.as_ref().map(|path| { - let stdin = OpenOptions::new() - .read(true) - .custom_flags(libc::O_NONBLOCK) - .open(path)?; - cmd.stdin(stdin); - Ok(()) - }); - r.unwrap_or(Ok(())).map_err(io_error!(e, "open stdin"))?; - - let r: Option> = self.stdout.as_ref().map(|path| { - let stdout = OpenOptions::new().write(true).open(path)?; - cmd.stdout(stdout); - Ok(()) - }); - r.unwrap_or(Ok(())).map_err(io_error!(e, "open stdout"))?; - - let r: Option> = self.stderr.as_ref().map(|path| { - let stderr = OpenOptions::new().write(true).open(path)?; - cmd.stderr(stderr); - Ok(()) - }); - r.unwrap_or(Ok(())).map_err(io_error!(e, "open stderr"))?; - - Ok(()) - } - - fn close_after_start(&self) {} -} - pub struct Console { pub file: File, pub termios: Termios, @@ -189,7 +92,7 @@ impl ProcessIO { w, None, Some(Box::new(move || { - closer.close(); + drop(closer); })), ); } @@ -250,9 +153,10 @@ impl ProcessIO { pub fn create_io(id: &str, _io_uid: u32, _io_gid: u32, stdio: &Stdio) -> Result { if stdio.is_null() { + let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?; let pio = ProcessIO { uri: None, - io: Some(Arc::new(NullIo {})), + io: Some(Arc::new(nio)), copy: false, }; return Ok(pio); diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 07117d3..8ed5b94 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -186,7 +186,7 @@ pub trait Shim { } /// Shim entry point that must be invoked from `main`. -pub fn run(runtime_id: &str, opts:Option) +pub fn run(runtime_id: &str, opts: Option) where T: Shim + Send + Sync + 'static, {