Make one function that works for both platforms

Signed-off-by: James Sturtevant <jstur@microsoft.com>
This commit is contained in:
James Sturtevant 2023-04-27 15:58:12 -07:00
parent bceaf4aca3
commit fa4a0045e6
No known key found for this signature in database
7 changed files with 161 additions and 203 deletions

View File

@ -97,3 +97,5 @@ jobs:
cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" -publish-binary ./bin/containerd start cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" -publish-binary ./bin/containerd start
ps skeleton ps skeleton
cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe
$skeleton = get-process skeleton -ErrorAction SilentlyContinue
if ($skeleton) { exit 1 }

View File

@ -166,7 +166,7 @@ where
} }
_ => { _ => {
if !config.no_setup_logger { if !config.no_setup_logger {
logger::init(flags.debug)?; logger::init(flags.debug, &flags.namespace, &flags.id)?;
} }
let publisher = RemotePublisher::new(&ttrpc_address).await?; let publisher = RemotePublisher::new(&ttrpc_address).await?;

View File

@ -181,17 +181,15 @@ pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
hasher.finish() hasher.finish()
}; };
format_address(hash) #[cfg(unix)]
} {
format!("unix://{}/{:x}.sock", SOCKET_ROOT, hash)
}
#[cfg(windows)] #[cfg(windows)]
fn format_address(hash: u64) -> String { {
format!(r"\\.\pipe\containerd-shim-{}-pipe", hash) format!(r"\\.\pipe\containerd-shim-{}-pipe", hash)
} }
#[cfg(unix)]
fn format_address(hash: u64) -> String {
format!("unix://{}/{:x}.sock", SOCKET_ROOT, hash)
} }
#[cfg(unix)] #[cfg(unix)]

View File

@ -78,37 +78,27 @@ impl log::Log for FifoLogger {
} }
} }
#[cfg(unix)] pub fn init(debug: bool, _namespace: &str, _id: &str) -> Result<(), Error> {
pub fn init(debug: bool) -> Result<(), Error> { #[cfg(unix)]
let logger = FifoLogger::new().map_err(io_error!(e, "failed to init logger"))?; let logger = FifoLogger::new().map_err(io_error!(e, "failed to init logger"))?;
let boxed_logger = Box::new(logger); // Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd-<namespace>-<id>-log
configure_logger(boxed_logger, debug) // There is an assumption that there is always only one client connected which is containerd.
} // If there is a restart of containerd then logs during that time period will be lost.
//
#[cfg(windows)] // https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77
// Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd-<namespace>-<id>-log // https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137
// There is an assumption that there is always only one client connected which is containerd. #[cfg(windows)]
// If there is a restart of containerd then logs during that time period will be lost.
//
// https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77
// https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137
pub fn init(debug: bool, namespace: &String, id: &String) -> Result<(), Error> {
let logger = let logger =
NamedPipeLogger::new(namespace, id).map_err(io_error!(e, "failed to init logger"))?; NamedPipeLogger::new(_namespace, _id).map_err(io_error!(e, "failed to init logger"))?;
let boxed_logger = Box::new(logger);
configure_logger(boxed_logger, debug)
}
fn configure_logger(logger: Box<dyn log::Log>, debug: bool) -> Result<(), Error> {
let level = if debug { let level = if debug {
log::LevelFilter::Debug log::LevelFilter::Debug
} else { } else {
log::LevelFilter::Info log::LevelFilter::Info
}; };
log::set_boxed_logger(logger)?; log::set_boxed_logger(Box::new(logger))?;
log::set_max_level(level); log::set_max_level(level);
Ok(()) Ok(())
} }

View File

@ -74,7 +74,6 @@ use crate::{
}; };
cfg_unix! { cfg_unix! {
use std::os::unix::net::UnixListener;
use crate::{SOCKET_FD, parse_sockaddr}; use crate::{SOCKET_FD, parse_sockaddr};
use command_fds::{CommandFdExt, FdMapping}; use command_fds::{CommandFdExt, FdMapping};
use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
@ -93,20 +92,23 @@ cfg_unix! {
} }
cfg_windows! { cfg_windows! {
use std::{io, ptr}; use std::{
io, ptr,
fs::OpenOptions,
os::windows::prelude::{AsRawHandle, OpenOptionsExt},
};
use windows_sys::Win32::{ use windows_sys::Win32::{
Foundation::{CloseHandle, HANDLE}, Foundation::{CloseHandle, HANDLE},
System::{ System::{
Console::SetConsoleCtrlHandler, Console::SetConsoleCtrlHandler,
Threading::{CreateSemaphoreA, ReleaseSemaphore, }, Threading::{CreateSemaphoreA, ReleaseSemaphore, WaitForSingleObject, INFINITE},
}, },
Storage::FileSystem::FILE_FLAG_OVERLAPPED
}; };
static mut SEMAPHORE: HANDLE = 0 as HANDLE; static mut SEMAPHORE: HANDLE = 0 as HANDLE;
const MAX_SEM_COUNT: i32 = 255; const MAX_SEM_COUNT: i32 = 255;
use windows_sys::Win32::System::Threading::WaitForSingleObject;
use windows_sys::Win32::System::Threading::INFINITE;
} }
pub mod monitor; pub mod monitor;
@ -248,9 +250,6 @@ where
util::setup_debugger_event(); util::setup_debugger_event();
if !config.no_setup_logger { if !config.no_setup_logger {
#[cfg(unix)]
logger::init(flags.debug)?;
#[cfg(windows)]
logger::init(flags.debug, &flags.namespace, &flags.id)?; logger::init(flags.debug, &flags.namespace, &flags.id)?;
} }
@ -281,18 +280,20 @@ where
} }
} }
#[cfg(windows)]
fn create_server(flags: args::Flags) -> Result<Server> {
let mut server = Server::new();
let address = socket_address(&flags.address, &flags.namespace, &flags.id);
server = server.bind(address.as_str())?;
Ok(server)
}
#[cfg(unix)]
fn create_server(_flags: args::Flags) -> Result<Server> { fn create_server(_flags: args::Flags) -> Result<Server> {
let mut server = Server::new(); let mut server = Server::new();
server = server.add_listener(SOCKET_FD)?;
#[cfg(unix)]
{
server = server.add_listener(SOCKET_FD)?;
}
#[cfg(windows)]
{
let address = socket_address(&_flags.address, &_flags.namespace, &_flags.id);
server = server.bind(address.as_str())?;
}
Ok(server) Ok(server)
} }
@ -399,34 +400,29 @@ fn remove_socket_silently(address: &str) {
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e)) remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
} }
#[cfg(unix)]
fn remove_socket(address: &str) -> Result<()> { fn remove_socket(address: &str) -> Result<()> {
let path = parse_sockaddr(address); #[cfg(unix)]
if let Ok(md) = Path::new(path).metadata() { {
if md.file_type().is_socket() { let path = parse_sockaddr(address);
fs::remove_file(path).map_err(io_error!(e, "remove socket"))?; if let Ok(md) = Path::new(path).metadata() {
if md.file_type().is_socket() {
fs::remove_file(path).map_err(io_error!(e, "remove socket"))?;
}
} }
} }
Ok(())
}
#[cfg(windows)] #[cfg(windows)]
fn remove_socket(address: &str) -> Result<()> { {
use std::{ let mut opts = OpenOptions::new();
fs::OpenOptions, opts.read(true)
os::windows::prelude::{AsRawHandle, OpenOptionsExt}, .write(true)
}; .custom_flags(FILE_FLAG_OVERLAPPED);
if let Ok(f) = opts.open(address) {
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; info!("attempting to remove existing named pipe: {}", address);
unsafe { CloseHandle(f.as_raw_handle() as isize) };
let mut opts = OpenOptions::new(); }
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
if let Ok(f) = opts.open(address) {
info!("attempting to remove existing named pipe: {}", address);
unsafe { CloseHandle(f.as_raw_handle() as isize) };
} }
Ok(()) Ok(())
} }
@ -438,9 +434,10 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
let address = socket_address(&opts.address, &opts.namespace, grouping); let address = socket_address(&opts.address, &opts.namespace, grouping);
// Create socket and prepare listener. // Create socket and prepare listener.
// We'll use `add_listener` when creating TTRPC server. // On Linux, We'll use `add_listener` when creating TTRPC server, on Windows the value isn't used hence the clippy allow
// (see note below about activation process for windows)
#[allow(clippy::let_unit_value)] #[allow(clippy::let_unit_value)]
let listener = match start_listener(&address) { let _listener = match start_listener(&address) {
Ok(l) => l, Ok(l) => l,
Err(e) => { Err(e) => {
if e.kind() != std::io::ErrorKind::AddrInUse { if e.kind() != std::io::ErrorKind::AddrInUse {
@ -449,7 +446,7 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
err: e, err: e,
}); });
}; };
// If the Address is already in use then make sure it is up and running and return the address // If the address is already in use then make sure it is up and running and return the address
// This allows for running a single shim per container scenarios // This allows for running a single shim per container scenarios
if let Ok(()) = wait_socket_working(&address, 5, 200) { if let Ok(()) = wait_socket_working(&address, 5, 200) {
write_address(&address)?; write_address(&address)?;
@ -460,108 +457,75 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
} }
}; };
run_shim(cmd, cwd, opts, vars, listener, address)
}
#[cfg(unix)]
fn run_shim(
cmd: std::path::PathBuf,
cwd: std::path::PathBuf,
opts: StartOpts,
vars: Vec<(&str, &str)>,
listener: UnixListener,
address: String,
) -> Result<(u32, String)> {
let mut command = Command::new(cmd); let mut command = Command::new(cmd);
command.current_dir(cwd).envs(vars).args([
command "-namespace",
.current_dir(cwd) &opts.namespace,
.stdout(Stdio::null()) "-id",
.stdin(Stdio::null()) &opts.id,
.stderr(Stdio::null()) "-address",
.fd_mappings(vec![FdMapping { &opts.address,
parent_fd: listener.as_raw_fd(), ]);
child_fd: SOCKET_FD,
}])?
.args([
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
]);
if opts.debug {
command.arg("-debug");
}
command.envs(vars);
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// Ownership of `listener` has been passed to child.
std::mem::forget(listener);
(child.id(), address)
})
}
// Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
#[cfg(windows)]
fn run_shim(
cmd: std::path::PathBuf,
cwd: std::path::PathBuf,
opts: StartOpts,
vars: Vec<(&str, &str)>,
_listener: (),
address: String,
) -> Result<(u32, String)> {
let mut command = Command::new(cmd);
let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?;
let stdio_writer = writer.try_clone().unwrap();
command
.current_dir(cwd)
.stdout(stdio_writer)
.stdin(Stdio::null())
.stderr(Stdio::null())
.args([
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
]);
if opts.debug { if opts.debug {
command.arg("-debug"); command.arg("-debug");
} }
command.envs(vars);
disable_handle_inheritance(); #[cfg(unix)]
command {
.spawn() command
.map_err(io_error!(e, "spawn shim")) .stdout(Stdio::null())
.map(|child| { .stdin(Stdio::null())
// IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr .stderr(Stdio::null())
// AND the shim Start method must NOT write to stdout/stderr .fd_mappings(vec![FdMapping {
drop(writer); parent_fd: _listener.as_raw_fd(),
drop(command); child_fd: SOCKET_FD,
io::copy(&mut reader, &mut io::stderr()).unwrap(); }])?;
(child.id(), address)
}) command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// Ownership of `listener` has been passed to child.
std::mem::forget(_listener);
(child.id(), address)
})
}
#[cfg(windows)]
{
// Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?;
let stdio_writer = writer.try_clone().unwrap();
command
.stdout(stdio_writer)
.stdin(Stdio::null())
.stderr(Stdio::null());
// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn.
// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles.
// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles.
// As a workaround we can Disables inheritance on the io pipe handles.
// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560
disable_handle_inheritance();
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr
// AND the shim Start method must NOT write to stdout/stderr
drop(writer);
drop(command);
io::copy(&mut reader, &mut io::stderr()).unwrap();
(child.id(), address)
})
}
} }
// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn.
// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles.
// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles.
// As a workaround we can Disables inheritance on the io pipe handles.
// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560
#[cfg(windows)] #[cfg(windows)]
fn disable_handle_inheritance() { fn disable_handle_inheritance() {
use windows_sys::Win32::{ use windows_sys::Win32::{

View File

@ -49,18 +49,20 @@ impl RemotePublisher {
}) })
} }
#[cfg(unix)]
fn connect(address: impl AsRef<str>) -> Result<Client> { fn connect(address: impl AsRef<str>) -> Result<Client> {
let fd = connect(address)?; #[cfg(unix)]
// Client::new() takes ownership of the RawFd. {
Ok(Client::new_from_fd(fd)?) let fd = connect(address)?;
} // Client::new() takes ownership of the RawFd.
Ok(Client::new_from_fd(fd)?)
}
#[cfg(windows)] #[cfg(windows)]
fn connect(address: impl AsRef<str>) -> Result<Client> { {
match Client::connect(address.as_ref()) { match Client::connect(address.as_ref()) {
Ok(client) => Ok(client), Ok(client) => Ok(client),
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
}
} }
} }
@ -165,29 +167,31 @@ mod tests {
thread.join().unwrap(); thread.join().unwrap();
} }
#[cfg(unix)]
fn create_server(server_address: &String) -> Server { fn create_server(server_address: &String) -> Server {
use std::os::unix::{io::AsRawFd, net::UnixListener}; #[cfg(unix)]
let listener = UnixListener::bind(server_address).unwrap(); {
listener.set_nonblocking(true).unwrap(); use std::os::unix::{io::AsRawFd, net::UnixListener};
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>); let listener = UnixListener::bind(server_address).unwrap();
let service = client::create_events(t); listener.set_nonblocking(true).unwrap();
let server = Server::new() let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
.add_listener(listener.as_raw_fd()) let service = client::create_events(t);
.unwrap() let server = Server::new()
.register_service(service); .add_listener(listener.as_raw_fd())
std::mem::forget(listener); .unwrap()
server .register_service(service);
} std::mem::forget(listener);
server
}
#[cfg(windows)] #[cfg(windows)]
fn create_server(server_address: &String) -> Server { {
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>); let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t); let service = client::create_events(t);
Server::new() Server::new()
.bind(server_address) .bind(server_address)
.unwrap() .unwrap()
.register_service(service) .register_service(service)
}
} }
} }

View File

@ -28,7 +28,7 @@ pub struct NamedPipeLogger {
} }
impl NamedPipeLogger { impl NamedPipeLogger {
pub fn new(namespace: &String, id: &String) -> Result<NamedPipeLogger, io::Error> { pub fn new(namespace: &str, id: &str) -> Result<NamedPipeLogger, io::Error> {
let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", namespace, id); let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", namespace, id);
let mut pipe_server = NamedPipe::new(pipe_name).unwrap(); let mut pipe_server = NamedPipe::new(pipe_name).unwrap();
let mut poll = Poll::new().unwrap(); let mut poll = Poll::new().unwrap();