Move Io and Console

Signed-off-by: Zhang Tianyang <burning9699@gmail.com>
This commit is contained in:
Zhang Tianyang 2022-02-17 15:19:24 +08:00
parent db4e648261
commit 117263d9be
13 changed files with 186 additions and 245 deletions

View File

@ -7,6 +7,9 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
async = []
[dependencies]
containerd-shim = { path = "../shim", version = "0.2.0" }
nix = "0.23.1"

View File

@ -24,14 +24,13 @@ use nix::sys::signal::kill;
use nix::sys::stat::Mode;
use nix::unistd::{mkdir, Pid};
use oci_spec::runtime::LinuxNamespaceType;
use runc::console::{Console, ConsoleSocket};
use runc::options::{CreateOpts, DeleteOpts, ExecOpts, GlobalOpts, KillOpts};
use runc::utils::new_temp_console_socket;
use shim::api::*;
use shim::container::{
CommonContainer, CommonProcess, ConsoleSocket, Container, ContainerFactory, Process,
};
use shim::container::{CommonContainer, CommonProcess, Container, ContainerFactory, Process};
use shim::error::{Error, Result};
use shim::io::{create_io, Console, Stdio};
use shim::io::{create_io, Stdio};
use shim::mount::mount_rootfs;
use shim::protos::protobuf::{well_known_types::Timestamp, CodedInputStream, Message};
use shim::util::{read_spec_from_file, write_options, write_runtime, IntoOption};
@ -155,7 +154,7 @@ impl Container for RuncContainer {
.common
.processes
.get_mut(exec_id)
.ok_or(other!("can not find the exec by id"))?;
.ok_or_else(|| other!("can not find the exec by id"))?;
let pid_path = Path::new(self.common.bundle.as_str())
.join(format!("{}.pid", &process.common.id));
@ -165,7 +164,7 @@ impl Container for RuncContainer {
console_socket: None,
detach: true,
};
let socket: Option<ConsoleSocket> = if process.common.stdio.terminal {
let socket = if process.common.stdio.terminal {
let s = new_temp_console_socket().map_err(other_error!(e, ""))?;
exec_opts.console_socket = Some(s.path.to_owned());
Some(s)
@ -194,7 +193,8 @@ impl Container for RuncContainer {
.exec(&self.common.id, &process.spec, Some(&exec_opts))
.map_err(other_error!(e, "failed exec"))?;
if process.common.stdio.terminal {
let console_socket = socket.ok_or(other!("failed to get console socket"))?;
let console_socket =
socket.ok_or_else(|| other!("failed to get console socket"))?;
let console = process.common.copy_console(&console_socket)?;
process.common.console = Some(console);
} else {
@ -414,7 +414,7 @@ impl InitProcess {
.no_pivot(self.no_pivot_root)
.no_new_keyring(self.no_new_key_ring)
.detach(false);
let socket: Option<ConsoleSocket> = if terminal {
let socket = if terminal {
let s = new_temp_console_socket().map_err(other_error!(e, ""))?;
create_opts.console_socket = Some(s.path.to_owned());
Some(s)
@ -435,7 +435,7 @@ impl InitProcess {
.create(&id, &bundle, Some(&create_opts))
.map_err(other_error!(e, "failed create"))?;
if terminal {
let console_socket = socket.ok_or(other!("failed to get console socket"))?;
let console_socket = socket.ok_or_else(|| other!("failed to get console socket"))?;
let console = self.common.copy_console(&console_socket)?;
self.common.console = Some(console);
} else {

View File

@ -112,7 +112,7 @@ impl Shim for Service {
let root = Path::new(if root.is_empty() {
DEFAULT_RUNC_ROOT
} else {
root.as_ref()
root
})
.join(namespace);
let log_buf = Path::new(bundle).join("log.json");

View File

@ -26,7 +26,7 @@ tempfile = "3.3.0"
thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde", "std"] }
uuid = { version = "0.8.2", features = ["v4"] }
containerd-shim = { path = "../shim", version = "0.2.0" }
os_pipe = "1.0.0"
# Async dependencies
tokio = { version = "1.15.0", features = ["full"], optional = true }

View File

@ -0,0 +1,55 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::fs::File;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use log::warn;
use nix::sys::termios::Termios;
pub struct Console {
pub file: File,
pub termios: Termios,
}
pub struct ConsoleSocket {
pub listener: UnixListener,
pub path: PathBuf,
pub rmdir: bool,
}
impl ConsoleSocket {
pub fn accept(&self) -> std::io::Result<UnixStream> {
let (stream, _addr) = self.listener.accept()?;
Ok(stream)
}
}
impl Drop for ConsoleSocket {
fn drop(&mut self) {
if self.rmdir {
let tmp_socket_dir = self.path.parent().unwrap();
std::fs::remove_dir_all(tmp_socket_dir).unwrap_or_else(|e| {
warn!(
"remove tmp console socket path {} : {}",
tmp_socket_dir.to_str().unwrap(),
e
)
})
}
}
}

View File

@ -13,28 +13,32 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
use nix::unistd::{Gid, Uid};
use std::fmt::Debug;
use std::fs::File;
use std::io::Result;
use std::fs::{File, OpenOptions};
use std::io::{Read, Result, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::sync::Mutex;
use log::debug;
use nix::unistd::{Gid, Uid};
use os_pipe::{PipeReader, PipeWriter};
use crate::Command;
pub trait Io: Debug + Sync + Send {
pub trait Io: Debug + Send + Sync {
/// Return write side of stdin
fn stdin(&self) -> Option<File> {
fn stdin(&self) -> Option<Box<dyn Write + Send + Sync>> {
None
}
/// Return read side of stdout
fn stdout(&self) -> Option<File> {
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
None
}
/// Return read side of stderr
fn stderr(&self) -> Option<File> {
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
None
}
@ -68,42 +72,8 @@ impl Default for IOOption {
/// When one side of the pipe is closed, the state will be represented with [`None`].
#[derive(Debug)]
pub struct Pipe {
// Might be ugly hack: using mutex in order to take rd/wr under immutable [`Pipe`]
rd: Mutex<Option<File>>,
wr: Mutex<Option<File>>,
}
impl Pipe {
pub fn new() -> std::io::Result<Self> {
let (r, w) = nix::unistd::pipe()?;
let (rd, wr) = unsafe {
(
Mutex::new(Some(File::from_raw_fd(r))),
Mutex::new(Some(File::from_raw_fd(w))),
)
};
Ok(Self { rd, wr })
}
pub fn take_read(&self) -> Option<File> {
let mut m = self.rd.lock().unwrap();
m.take()
}
pub fn take_write(&self) -> Option<File> {
let mut m = self.wr.lock().unwrap();
m.take()
}
pub fn close_read(&self) {
let mut m = self.rd.lock().unwrap();
let _ = m.take();
}
pub fn close_write(&self) {
let mut m = self.wr.lock().unwrap();
let _ = m.take();
}
rd: PipeReader,
wr: PipeWriter,
}
#[derive(Debug)]
@ -113,6 +83,12 @@ pub struct PipedIo {
stderr: Option<Pipe>,
}
impl Pipe {
fn new() -> std::io::Result<Self> {
let (rd, wr) = os_pipe::pipe()?;
Ok(Self { rd, wr })
}
}
impl PipedIo {
pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result<Self> {
Ok(Self {
@ -133,71 +109,75 @@ impl PipedIo {
}
let pipe = Pipe::new()?;
let guard = if stdin {
pipe.rd.lock().unwrap()
} else {
pipe.wr.lock().unwrap()
};
if let Some(f) = guard.as_ref() {
let uid = Some(Uid::from_raw(uid));
let gid = Some(Gid::from_raw(gid));
nix::unistd::fchown(f.as_raw_fd(), uid, gid)?;
if stdin {
let rd = pipe.rd.try_clone()?;
nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?;
} else {
let wr = pipe.wr.try_clone()?;
nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?;
}
drop(guard);
Ok(Some(pipe))
}
}
impl Io for PipedIo {
fn stdin(&self) -> Option<File> {
self.stdin.as_ref().map(|v| v.take_write()).flatten()
fn stdin(&self) -> Option<Box<dyn Write + Send + Sync>> {
self.stdin.as_ref().and_then(|pipe| {
pipe.wr
.try_clone()
.map(|x| Box::new(x) as Box<dyn Write + Send + Sync>)
.ok()
})
}
fn stdout(&self) -> Option<File> {
self.stdout.as_ref().map(|v| v.take_read()).flatten()
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
self.stdout.as_ref().and_then(|pipe| {
pipe.rd
.try_clone()
.map(|x| Box::new(x) as Box<dyn Read + Send>)
.ok()
})
}
fn stderr(&self) -> Option<File> {
self.stderr.as_ref().map(|v| v.take_read()).flatten()
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
self.stderr.as_ref().and_then(|pipe| {
pipe.rd
.try_clone()
.map(|x| Box::new(x) as Box<dyn Read + Send>)
.ok()
})
}
// Note that this internally use [`std::fs::File`]'s `try_clone()`.
// Thus, the files passed to commands will be not closed after command exit.
fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
if let Some(ref p) = self.stdin {
let m = p.rd.lock().unwrap();
if let Some(stdin) = &*m {
let f = stdin.try_clone()?;
cmd.stdin(f);
}
if let Some(p) = self.stdin.as_ref() {
let pr = p.rd.try_clone()?;
cmd.stdin(pr);
}
if let Some(ref p) = self.stdout {
let m = p.wr.lock().unwrap();
if let Some(f) = &*m {
let f = f.try_clone()?;
cmd.stdout(f);
}
if let Some(p) = self.stdout.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}
if let Some(ref p) = self.stderr {
let m = p.wr.lock().unwrap();
if let Some(f) = &*m {
let f = f.try_clone()?;
cmd.stderr(f);
}
if let Some(p) = self.stderr.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}
Ok(())
}
fn close_after_start(&self) {
if let Some(ref p) = self.stdout {
p.close_write();
if let Some(p) = self.stdout.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
}
if let Some(ref p) = self.stderr {
p.close_write();
if let Some(p) = self.stderr.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
}
}
}
@ -235,6 +215,40 @@ impl Io for NullIo {
}
}
/// FIFO for the scenario that set FIFO for command Io.
#[derive(Debug)]
pub struct FIFO {
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
}
impl Io for FIFO {
fn set(&self, cmd: &mut Command) -> Result<()> {
if let Some(path) = self.stdin.as_ref() {
let stdin = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)?;
cmd.stdin(stdin);
}
if let Some(path) = self.stdout.as_ref() {
let stdout = OpenOptions::new().write(true).open(path)?;
cmd.stdout(stdout);
}
if let Some(path) = self.stderr.as_ref() {
let stderr = OpenOptions::new().write(true).open(path)?;
cmd.stderr(stderr);
}
Ok(())
}
fn close_after_start(&self) {}
}
#[cfg(test)]
mod tests {
use super::*;
@ -267,21 +281,17 @@ mod tests {
let mut stdin = io.stdin().unwrap();
stdin.write_all(&buf).unwrap();
buf[0] = 0x0;
io.stdin.as_ref().map(|v| {
v.rd.lock()
.unwrap()
io.stdin
.as_ref()
.unwrap()
.read(&mut buf)
.unwrap()
});
.map(|v| v.rd.try_clone().unwrap().read(&mut buf).unwrap());
assert_eq!(&buf, &[0xfau8]);
let mut stdout = io.stdout().unwrap();
buf[0] = 0xce;
io.stdout
.as_ref()
.map(|v| v.wr.lock().unwrap().as_ref().unwrap().write(&buf).unwrap());
.map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap());
buf[0] = 0x0;
stdout.read_exact(&mut buf).unwrap();
assert_eq!(&buf, &[0xceu8]);
@ -290,7 +300,7 @@ mod tests {
buf[0] = 0xa5;
io.stderr
.as_ref()
.map(|v| v.wr.lock().unwrap().as_ref().unwrap().write(&buf).unwrap());
.map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap());
buf[0] = 0x0;
stderr.read_exact(&mut buf).unwrap();
assert_eq!(&buf, &[0xa5u8]);

View File

@ -40,10 +40,11 @@ use std::process::ExitStatus;
use oci_spec::runtime::{Linux, Process};
// suspended for difficulties
pub mod console;
pub mod container;
pub mod error;
pub mod events;
pub mod io;
#[cfg(feature = "async")]
pub mod monitor;
pub mod options;

View File

@ -37,10 +37,8 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use containerd_shim as shim;
use shim::io::Io;
use crate::error::Error;
use crate::io::Io;
use crate::utils;
use crate::{LogFormat, Runc};

View File

@ -20,14 +20,13 @@ use std::env;
use std::os::unix::net::UnixListener;
use std::path::{Path, PathBuf};
use containerd_shim as shim;
use nix::sys::stat::Mode;
use nix::unistd::mkdir;
use path_absolutize::*;
use shim::container::ConsoleSocket;
use tempfile::{Builder, NamedTempFile};
use uuid::Uuid;
use crate::console::ConsoleSocket;
use crate::error::Error;
// helper to resolve path (such as path for runc binary, pid files, etc. )

View File

@ -27,7 +27,7 @@ signal-hook = "0.3.13"
oci-spec = "0.5.4"
containerd-shim-protos = { path = "../shim-protos", version = "0.1.2" }
runc = { path = "../runc"}
runc = { path = "../runc" }
[dev-dependencies]
tempfile = "3.0"

View File

@ -18,8 +18,7 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::{File, OpenOptions};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use log::{debug, warn};
@ -27,11 +26,12 @@ use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
use nix::sys::termios::tcgetattr;
use nix::sys::uio::IoVec;
use nix::{cmsg_space, ioctl_write_ptr_bad};
use runc::console::{Console, ConsoleSocket};
use time::OffsetDateTime;
use crate::api::*;
use crate::error::{Error, Result};
use crate::io::{spawn_copy, Console, ProcessIO, Stdio};
use crate::io::{spawn_copy, ProcessIO, Stdio};
use crate::protos::protobuf::well_known_types::Timestamp;
use crate::util::read_pid_from_file;
@ -243,7 +243,9 @@ impl Process for CommonProcess {
fn copy_console(&self, console_socket: &ConsoleSocket) -> Result<Console> {
debug!("copy_console: waiting for runtime to send console fd");
let stream = console_socket.accept()?;
let stream = console_socket
.accept()
.map_err(io_error!(e, "accept console socket"))?;
let mut buf = [0u8; 4096];
let iovec = [IoVec::from_mut_slice(&mut buf)];
let mut space = cmsg_space!([RawFd; 2]);
@ -347,34 +349,3 @@ impl Process for CommonProcess {
}
}
}
pub struct ConsoleSocket {
pub listener: UnixListener,
pub path: PathBuf,
pub rmdir: bool,
}
impl ConsoleSocket {
pub fn accept(&self) -> Result<UnixStream> {
let (stream, _addr) = self
.listener
.accept()
.map_err(io_error!(e, "accept console socket"))?;
Ok(stream)
}
}
impl Drop for ConsoleSocket {
fn drop(&mut self) {
if self.rmdir {
let tmp_socket_dir = self.path.parent().unwrap();
std::fs::remove_dir_all(tmp_socket_dir).unwrap_or_else(|e| {
warn!(
"remove tmp console socket path {} : {}",
tmp_socket_dir.to_str().unwrap(),
e
)
})
}
}
}

View File

@ -16,114 +16,17 @@
use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::process::Command;
use std::sync::Arc;
use std::thread::JoinHandle;
use crossbeam::sync::WaitGroup;
use log::debug;
use nix::sys::termios::Termios;
use runc::io::{Io, NullIo, FIFO};
use crate::error::{Error, Result};
use crate::util::IntoOption;
pub trait WriteCloser: Write {
fn close(&self);
}
pub trait Io: Sync + Send {
/// Return write side of stdin
fn stdin(&self) -> Option<Box<dyn WriteCloser + Send + Sync>>;
/// Return read side of stdout
fn stdout(&self) -> Option<Box<dyn Read + Send>>;
/// Return read side of stderr
fn stderr(&self) -> Option<Box<dyn Read + Send>>;
/// Set IO for passed command.
/// Read side of stdin, write side of stdout and write side of stderr should be provided to command.
fn set(&self, cmd: &mut Command) -> Result<()>;
/// Only close write side (should be stdout/err "from" runc process)
fn close_after_start(&self);
}
pub struct NullIo {}
impl Io for NullIo {
fn stdin(&self) -> Option<Box<dyn WriteCloser + Send + Sync>> {
None
}
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
None
}
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
None
}
fn set(&self, cmd: &mut Command) -> Result<()> {
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
Ok(())
}
fn close_after_start(&self) {}
}
pub struct FIFO {
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
}
impl Io for FIFO {
fn stdin(&self) -> Option<Box<dyn WriteCloser + Send + Sync>> {
None
}
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
None
}
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
None
}
fn set(&self, cmd: &mut Command) -> Result<()> {
let r: Option<std::io::Result<()>> = self.stdin.as_ref().map(|path| {
let stdin = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)?;
cmd.stdin(stdin);
Ok(())
});
r.unwrap_or(Ok(())).map_err(io_error!(e, "open stdin"))?;
let r: Option<std::io::Result<()>> = self.stdout.as_ref().map(|path| {
let stdout = OpenOptions::new().write(true).open(path)?;
cmd.stdout(stdout);
Ok(())
});
r.unwrap_or(Ok(())).map_err(io_error!(e, "open stdout"))?;
let r: Option<std::io::Result<()>> = self.stderr.as_ref().map(|path| {
let stderr = OpenOptions::new().write(true).open(path)?;
cmd.stderr(stderr);
Ok(())
});
r.unwrap_or(Ok(())).map_err(io_error!(e, "open stderr"))?;
Ok(())
}
fn close_after_start(&self) {}
}
pub struct Console {
pub file: File,
pub termios: Termios,
@ -189,7 +92,7 @@ impl ProcessIO {
w,
None,
Some(Box::new(move || {
closer.close();
drop(closer);
})),
);
}
@ -250,9 +153,10 @@ impl ProcessIO {
pub fn create_io(id: &str, _io_uid: u32, _io_gid: u32, stdio: &Stdio) -> Result<ProcessIO> {
if stdio.is_null() {
let nio = NullIo::new().map_err(io_error!(e, "new Null Io"))?;
let pio = ProcessIO {
uri: None,
io: Some(Arc::new(NullIo {})),
io: Some(Arc::new(nio)),
copy: false,
};
return Ok(pio);

View File

@ -186,7 +186,7 @@ pub trait Shim {
}
/// Shim entry point that must be invoked from `main`.
pub fn run<T>(runtime_id: &str, opts:Option<Config>)
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
where
T: Shim + Send + Sync + 'static,
{