From bceaf4aca3d9f544dfc6f21cad2af0f69837669c Mon Sep 17 00:00:00 2001 From: James Sturtevant Date: Wed, 19 Apr 2023 13:52:00 -0700 Subject: [PATCH] Windows support for the synchronous shim Signed-off-by: James Sturtevant Signed-off-by: James Sturtevant --- .github/workflows/ci-windows.yml | 99 +++++++ .gitignore | 2 + crates/shim-protos/Cargo.toml | 2 +- crates/shim/Cargo.toml | 13 +- crates/shim/README.md | 23 ++ crates/shim/examples/windows_log_reader.rs | 71 +++++ crates/shim/src/error.rs | 3 + crates/shim/src/lib.rs | 62 +++- crates/shim/src/logger.rs | 37 ++- crates/shim/src/mount.rs | 1 + crates/shim/src/synchronous/mod.rs | 272 ++++++++++++++++-- crates/shim/src/synchronous/publisher.rs | 67 ++++- crates/shim/src/synchronous/util.rs | 57 ++++ crates/shim/src/sys/mod.rs | 20 ++ crates/shim/src/sys/windows/mod.rs | 17 ++ .../shim/src/sys/windows/named_pipe_logger.rs | 236 +++++++++++++++ crates/shim/src/util.rs | 8 +- 17 files changed, 943 insertions(+), 47 deletions(-) create mode 100644 .github/workflows/ci-windows.yml create mode 100644 crates/shim/examples/windows_log_reader.rs create mode 100644 crates/shim/src/sys/mod.rs create mode 100644 crates/shim/src/sys/windows/mod.rs create mode 100644 crates/shim/src/sys/windows/named_pipe_logger.rs diff --git a/.github/workflows/ci-windows.yml b/.github/workflows/ci-windows.yml new file mode 100644 index 0000000..5f0e5fb --- /dev/null +++ b/.github/workflows/ci-windows.yml @@ -0,0 +1,99 @@ +name: CI-windows +on: + pull_request: + push: + schedule: + - cron: '0 0 * * *' # Every day at midnight + +jobs: + checks: + name: Checks + runs-on: ${{ matrix.os }} + timeout-minutes: 20 + + strategy: + matrix: + os: [windows-latest] + + steps: + - uses: actions/checkout@v3 + - uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - run: cargo check --examples --tests -p containerd-shim -p containerd-shim-protos + + - run: rustup toolchain install nightly --component rustfmt + - run: cargo +nightly fmt -p containerd-shim -p containerd-shim-protos -- --check --files-with-diff + + - run: cargo clippy -p containerd-shim -p containerd-shim-protos -- -D warnings + - run: cargo doc --no-deps -p containerd-shim -p containerd-shim-protos + env: + RUSTDOCFLAGS: -Dwarnings + + tests: + name: Tests + runs-on: ${{ matrix.os }} + timeout-minutes: 15 + + strategy: + matrix: + os: [windows-latest] + + steps: + - uses: actions/checkout@v3 + - uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Tests + run: | + cargo test -p containerd-shim -p containerd-shim-protos + + integration: + name: Integration + runs-on: ${{ matrix.os }} + timeout-minutes: 40 + + strategy: + matrix: + os: [windows-latest] + containerd: [1.7.0] + + steps: + - name: Checkout extensions + uses: actions/checkout@v3 + + - name: Install containerd + run: | + $ErrorActionPreference = "Stop" + + # Install containerd https://github.com/containerd/containerd/blob/v1.7.0/docs/getting-started.md#installing-containerd-on-windows + # Download and extract desired containerd Windows binaries + curl.exe -L https://github.com/containerd/containerd/releases/download/v${{ matrix.containerd }}/containerd-${{ matrix.containerd }}-windows-amd64.tar.gz -o containerd-windows-amd64.tar.gz + tar.exe xvf .\containerd-windows-amd64.tar.gz + + # Copy and configure + mkdir "$Env:ProgramFiles\containerd" + Copy-Item -Path ".\bin\*" -Destination "$Env:ProgramFiles\containerd" -Recurse -Force + cd $Env:ProgramFiles\containerd\ + .\containerd.exe config default | Out-File config.toml -Encoding ascii + + # Review the configuration. Depending on setup you may want to adjust: + # - the sandbox_image (Kubernetes pause image) + # - cni bin_dir and conf_dir locations + Get-Content config.toml + + # Register and start service + .\containerd.exe --register-service + Start-Service containerd + working-directory: ${{ runner.temp }} + - name: Run integration test + run: | + $ErrorActionPreference = "Stop" + + get-service containerd + $env:TTRPC_ADDRESS="\\.\pipe\containerd-containerd.ttrpc" + + # run the example + cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" -publish-binary ./bin/containerd start + ps skeleton + cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe diff --git a/.gitignore b/.gitignore index 9045886..9450144 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk log + +.vscode diff --git a/crates/shim-protos/Cargo.toml b/crates/shim-protos/Cargo.toml index 4270b8a..a1efb72 100644 --- a/crates/shim-protos/Cargo.toml +++ b/crates/shim-protos/Cargo.toml @@ -13,7 +13,7 @@ homepage.workspace = true [dependencies] protobuf = "3.1" -ttrpc = "0.7" +ttrpc = { git = "https://github.com/jsturtevant/ttrpc-rust", rev = "d96bea3a41b6c6b85c1d8da53e5085fb0789a685" } # unmerged pr https://github.com/containerd/ttrpc-rust/pull/182 async-trait = { version = "0.1.48", optional = true } [build-dependencies] diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index e4d3ada..6640cec 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -17,13 +17,16 @@ async = ["tokio", "containerd-shim-protos/async", "async-trait", "futures", "sig name = "skeleton_async" required-features = ["async"] +[[example]] +name = "windows-log-reader" +path = "examples/windows_log_reader.rs" + [dependencies] go-flag = "0.1.0" thiserror = "1.0" log = { version = "0.4", features = ["std"] } libc = "0.2.95" nix = "0.26" -command-fds = "0.2.1" lazy_static = "1.4.0" time = { version = "0.3.7", features = ["serde", "std"] } serde_json = "1.0.78" @@ -45,5 +48,13 @@ signal-hook-tokio = { version = "0.3.1", optional = true, features = ["futures-v [target.'cfg(target_os = "linux")'.dependencies] cgroups-rs = "0.2.9" +[target.'cfg(unix)'.dependencies] +command-fds = "0.2.1" + +[target.'cfg(windows)'.dependencies] +windows-sys = {version = "0.48.0", features = ["Win32_Foundation","Win32_System_WindowsProgramming","Win32_System_Console", "Win32_System_Pipes","Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Threading"]} +mio = { version = "0.8", features = ["os-ext", "os-poll"] } +os_pipe = "1.1.3" + [dev-dependencies] tempfile = "3.0" diff --git a/crates/shim/README.md b/crates/shim/README.md index db688e0..9271812 100644 --- a/crates/shim/README.md +++ b/crates/shim/README.md @@ -135,7 +135,30 @@ $ cat log [INFO] reaper thread stopped ``` +### Running on Windows +```powershell +# Run containerd in background +$env:TTRPC_ADDRESS="\\.\pipe\containerd-containerd.ttrpc" + +$ cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" start +\\.\pipe\containerd-shim-17630016127144989388-pipe + +# (Optional) Run the log collector in a separate command window +# note: log reader won't work if containerd is connected to the named pipe, this works when running manually to help debug locally +$ cargo run --example windows-log-reader \\.\pipe\containerd-shim-default-1234-log +Reading logs from: \\.\pipe\containerd-shim-default-1234-log + + +$ cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe +Connecting to \\.\pipe\containerd-shim-17630016127144989388-pipe... +Sending `Connect` request... +Connect response: version: "example" +Sending `Shutdown` request... +Shutdown response: "" +``` + ## Supported Platforms Currently, following OSs and hardware architectures are supported, and more efforts are needed to enable and validate other OSs and architectures. - Linux - Mac OS +- Windows diff --git a/crates/shim/examples/windows_log_reader.rs b/crates/shim/examples/windows_log_reader.rs new file mode 100644 index 0000000..31393b4 --- /dev/null +++ b/crates/shim/examples/windows_log_reader.rs @@ -0,0 +1,71 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +#[cfg(windows)] +use std::error::Error; + +#[cfg(windows)] +fn main() -> Result<(), Box> { + use std::{ + env, + fs::OpenOptions, + os::windows::{ + fs::OpenOptionsExt, + io::{FromRawHandle, IntoRawHandle}, + }, + time::Duration, + }; + + use mio::{windows::NamedPipe, Events, Interest, Poll, Token}; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + + let args: Vec = env::args().collect(); + + let address = args + .get(1) + .ok_or("First argument must be shims address to read logs (\\\\.\\pipe\\containerd-shim-{ns}-{id}-log) ") + .unwrap(); + + println!("Reading logs from: {}", &address); + + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = opts.open(address).unwrap(); + let mut client = unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) }; + + let mut stdio = std::io::stdout(); + let mut poll = Poll::new().unwrap(); + poll.registry() + .register(&mut client, Token(1), Interest::READABLE) + .unwrap(); + let mut events = Events::with_capacity(128); + loop { + poll.poll(&mut events, Some(Duration::from_millis(10))) + .unwrap(); + match std::io::copy(&mut client, &mut stdio) { + Ok(_) => break, + Err(_) => continue, + } + } + + Ok(()) +} + +#[cfg(unix)] +fn main() { + println!("This example is only for Windows"); +} diff --git a/crates/shim/src/error.rs b/crates/shim/src/error.rs index 095f610..241f98d 100644 --- a/crates/shim/src/error.rs +++ b/crates/shim/src/error.rs @@ -50,9 +50,11 @@ pub enum Error { Setup(#[from] log::SetLoggerError), /// Unable to pass fd to child process (we rely on `command_fds` crate for this). + #[cfg(unix)] #[error("Failed to pass socket fd to child: {0}")] FdMap(#[from] command_fds::FdMappingCollision), + #[cfg(unix)] #[error("Nix error: {0}")] Nix(#[from] nix::Error), @@ -65,6 +67,7 @@ pub enum Error { #[error("Failed pre condition: {0}")] FailedPreconditionError(String), + #[cfg(unix)] #[error("{context} error: {err}")] MountError { context: String, diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index c8e4637..411485a 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -32,20 +32,24 @@ //! ``` //! +use std::{collections::hash_map::DefaultHasher, fs::File, hash::Hasher, path::PathBuf}; +#[cfg(windows)] +use std::{fs::OpenOptions, os::windows::prelude::OpenOptionsExt}; +#[cfg(unix)] use std::{ - collections::hash_map::DefaultHasher, - fs::File, - hash::Hasher, os::unix::{io::RawFd, net::UnixListener}, - path::{Path, PathBuf}, + path::Path, }; pub use containerd_shim_protos as protos; +#[cfg(unix)] use nix::ioctl_write_ptr_bad; pub use protos::{ shim::shim::DeleteResponse, ttrpc::{context::Context, Result as TtrpcResult}, }; +#[cfg(windows)] +use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; #[cfg(feature = "async")] pub use crate::asynchronous::*; @@ -67,6 +71,7 @@ pub mod mount; mod reap; #[cfg(not(feature = "async"))] pub mod synchronous; +mod sys; pub mod util; /// Generated request/response structures. @@ -112,6 +117,7 @@ cfg_async! { pub use protos::ttrpc::r#async::TtrpcContext; } +#[cfg(unix)] ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize); const TTRPC_ADDRESS: &str = "TTRPC_ADDRESS"; @@ -149,6 +155,7 @@ pub struct StartOpts { /// The shim process communicates with the containerd server through a communication channel /// created by containerd. One endpoint of the communication channel is passed to shim process /// through a file descriptor during forking, which is the fourth(3) file descriptor. +#[cfg(unix)] const SOCKET_FD: RawFd = 3; #[cfg(target_os = "linux")] @@ -157,6 +164,9 @@ pub const SOCKET_ROOT: &str = "/run/containerd"; #[cfg(target_os = "macos")] pub const SOCKET_ROOT: &str = "/var/run/containerd"; +#[cfg(target_os = "windows")] +pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd"; + /// Make socket path from containerd socket path, namespace and id. pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String { let path = PathBuf::from(socket_path) @@ -171,9 +181,20 @@ pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String { hasher.finish() }; + format_address(hash) +} + +#[cfg(windows)] +fn format_address(hash: u64) -> String { + format!(r"\\.\pipe\containerd-shim-{}-pipe", hash) +} + +#[cfg(unix)] +fn format_address(hash: u64) -> String { format!("unix://{}/{:x}.sock", SOCKET_ROOT, hash) } +#[cfg(unix)] fn parse_sockaddr(addr: &str) -> &str { if let Some(addr) = addr.strip_prefix("unix://") { return addr; @@ -186,6 +207,26 @@ fn parse_sockaddr(addr: &str) -> &str { addr } +#[cfg(windows)] +fn start_listener(address: &str) -> std::io::Result<()> { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + if let Ok(f) = opts.open(address) { + info!("found existing named pipe: {}", address); + drop(f); + return Err(std::io::Error::new( + std::io::ErrorKind::AddrInUse, + "address already exists", + )); + } + + // windows starts the listener on the second invocation of the shim + Ok(()) +} + +#[cfg(unix)] fn start_listener(address: &str) -> std::io::Result { let path = parse_sockaddr(address); // Try to create the needed directory hierarchy. @@ -204,6 +245,7 @@ mod tests { use crate::start_listener; #[test] + #[cfg(unix)] fn test_start_listener() { let tmpdir = tempfile::tempdir().unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned(); @@ -226,4 +268,16 @@ mod tests { let context = std::fs::read_to_string(&txt_file).unwrap(); assert_eq!(context, "test"); } + + #[test] + #[cfg(windows)] + fn test_start_listener_windows() { + use mio::windows::NamedPipe; + + let named_pipe = "\\\\.\\pipe\\test-pipe-duplicate".to_string(); + + start_listener(&named_pipe).unwrap(); + let _pipe_server = NamedPipe::new(named_pipe.clone()).unwrap(); + start_listener(&named_pipe).expect_err("address already exists"); + } } diff --git a/crates/shim/src/logger.rs b/crates/shim/src/logger.rs index 9afe602..cb83167 100644 --- a/crates/shim/src/logger.rs +++ b/crates/shim/src/logger.rs @@ -26,16 +26,20 @@ use std::{ use log::{Metadata, Record}; use crate::error::Error; +#[cfg(windows)] +use crate::sys::windows::NamedPipeLogger; pub struct FifoLogger { file: Mutex, } impl FifoLogger { + #[allow(dead_code)] pub fn new() -> Result { Self::with_path("log") } + #[allow(dead_code)] pub fn with_path>(path: P) -> Result { let f = OpenOptions::new() .write(true) @@ -74,34 +78,61 @@ impl log::Log for FifoLogger { } } +#[cfg(unix)] pub fn init(debug: bool) -> Result<(), Error> { let logger = FifoLogger::new().map_err(io_error!(e, "failed to init logger"))?; + + let boxed_logger = Box::new(logger); + configure_logger(boxed_logger, debug) +} + +#[cfg(windows)] +// Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd---log +// There is an assumption that there is always only one client connected which is containerd. +// If there is a restart of containerd then logs during that time period will be lost. +// +// https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77 +// https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137 +pub fn init(debug: bool, namespace: &String, id: &String) -> Result<(), Error> { + let logger = + NamedPipeLogger::new(namespace, id).map_err(io_error!(e, "failed to init logger"))?; + + let boxed_logger = Box::new(logger); + configure_logger(boxed_logger, debug) +} + +fn configure_logger(logger: Box, debug: bool) -> Result<(), Error> { let level = if debug { log::LevelFilter::Debug } else { log::LevelFilter::Info }; - log::set_boxed_logger(Box::new(logger))?; + log::set_boxed_logger(logger)?; log::set_max_level(level); - Ok(()) } #[cfg(test)] mod tests { use log::{Log, Record}; - use nix::{sys::stat, unistd}; use super::*; #[test] fn test_fifo_log() { + #[cfg(unix)] + use nix::{sys::stat, unistd}; + let tmpdir = tempfile::tempdir().unwrap(); let path = tmpdir.path().to_str().unwrap().to_owned() + "/log"; + #[cfg(unix)] unistd::mkfifo(Path::new(&path), stat::Mode::S_IRWXU).unwrap(); + #[cfg(windows)] + File::create(path.clone()).unwrap(); + let path1 = path.clone(); let thread = std::thread::spawn(move || { let _fifo = OpenOptions::new() diff --git a/crates/shim/src/mount.rs b/crates/shim/src/mount.rs index 54717c6..f823a50 100644 --- a/crates/shim/src/mount.rs +++ b/crates/shim/src/mount.rs @@ -1,3 +1,4 @@ +#![cfg(not(windows))] /* Copyright The containerd Authors. diff --git a/crates/shim/src/synchronous/mod.rs b/crates/shim/src/synchronous/mod.rs index 7fd5021..18fc8cf 100644 --- a/crates/shim/src/synchronous/mod.rs +++ b/crates/shim/src/synchronous/mod.rs @@ -32,33 +32,37 @@ //! ``` //! +macro_rules! cfg_unix { + ($($item:item)*) => { + $( + #[cfg(unix)] + $item + )* + } +} + +macro_rules! cfg_windows { + ($($item:item)*) => { + $( + #[cfg(windows)] + $item + )* + } +} + use std::{ - convert::TryFrom, - env, fs, + env, io::Write, - os::unix::{fs::FileTypeExt, io::AsRawFd}, - path::Path, process::{self, Command, Stdio}, sync::{Arc, Condvar, Mutex}, }; -use command_fds::{CommandFdExt, FdMapping}; -use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; pub use log::{debug, error, info, warn}; -use nix::{ - errno::Errno, - sys::{ - signal::Signal, - wait::{self, WaitPidFlag, WaitStatus}, - }, - unistd::Pid, -}; -use signal_hook::iterator::Signals; use util::{read_address, write_address}; use crate::{ api::DeleteResponse, - args, logger, parse_sockaddr, + args, logger, protos::{ protobuf::Message, shim::shim_ttrpc::{create_task, Task}, @@ -66,9 +70,45 @@ use crate::{ }, reap, socket_address, start_listener, synchronous::publisher::RemotePublisher, - Config, Error, Result, StartOpts, SOCKET_FD, TTRPC_ADDRESS, + Config, Error, Result, StartOpts, TTRPC_ADDRESS, }; +cfg_unix! { + use std::os::unix::net::UnixListener; + use crate::{SOCKET_FD, parse_sockaddr}; + use command_fds::{CommandFdExt, FdMapping}; + use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM}; + use nix::{ + errno::Errno, + sys::{ + signal::Signal, + wait::{self, WaitPidFlag, WaitStatus}, + }, + unistd::Pid, + }; + use signal_hook::iterator::Signals; + use std::os::unix::fs::FileTypeExt; + use std::{convert::TryFrom, fs, path::Path}; + use std::os::fd::AsRawFd; +} + +cfg_windows! { + use std::{io, ptr}; + use windows_sys::Win32::{ + Foundation::{CloseHandle, HANDLE}, + System::{ + Console::SetConsoleCtrlHandler, + Threading::{CreateSemaphoreA, ReleaseSemaphore, }, + }, + }; + + static mut SEMAPHORE: HANDLE = 0 as HANDLE; + const MAX_SEM_COUNT: i32 = 255; + + use windows_sys::Win32::System::Threading::WaitForSingleObject; + use windows_sys::Win32::System::Threading::INFINITE; +} + pub mod monitor; pub mod publisher; pub mod util; @@ -158,9 +198,13 @@ where // Create shim instance let mut config = opts.unwrap_or_default(); + #[cfg(unix)] // Setup signals let signals = setup_signals(&config); + #[cfg(windows)] + setup_signals(); + if !config.no_sub_reaper { reap::set_subreaper()?; } @@ -188,7 +232,10 @@ where Ok(()) } "delete" => { + #[cfg(unix)] std::thread::spawn(move || handle_signals(signals)); + #[cfg(windows)] + std::thread::spawn(handle_signals); let response = shim.delete_shim()?; let stdout = std::io::stdout(); let mut locked = stdout.lock(); @@ -197,18 +244,28 @@ where Ok(()) } _ => { + #[cfg(windows)] + util::setup_debugger_event(); + if !config.no_setup_logger { + #[cfg(unix)] logger::init(flags.debug)?; + #[cfg(windows)] + logger::init(flags.debug, &flags.namespace, &flags.id)?; } let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; let task = shim.create_task_service(publisher); let task_service = create_task(Arc::new(Box::new(task))); - let mut server = Server::new().register_service(task_service); - server = server.add_listener(SOCKET_FD)?; + let mut server = create_server(flags)?; + server = server.register_service(task_service); server.start()?; + #[cfg(windows)] + signal_server_started(); + info!("Shim successfully started, waiting for exit signal..."); + #[cfg(unix)] std::thread::spawn(move || handle_signals(signals)); shim.wait(); @@ -224,6 +281,22 @@ where } } +#[cfg(windows)] +fn create_server(flags: args::Flags) -> Result { + let mut server = Server::new(); + let address = socket_address(&flags.address, &flags.namespace, &flags.id); + server = server.bind(address.as_str())?; + Ok(server) +} + +#[cfg(unix)] +fn create_server(_flags: args::Flags) -> Result { + let mut server = Server::new(); + server = server.add_listener(SOCKET_FD)?; + Ok(server) +} + +#[cfg(unix)] fn setup_signals(config: &Config) -> Signals { let signals = Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed"); if !config.no_reaper { @@ -232,6 +305,30 @@ fn setup_signals(config: &Config) -> Signals { signals } +#[cfg(windows)] +fn setup_signals() { + unsafe { + SEMAPHORE = CreateSemaphoreA(ptr::null_mut(), 0, MAX_SEM_COUNT, ptr::null()); + if SEMAPHORE == 0 { + panic!("Failed to create semaphore: {}", io::Error::last_os_error()); + } + + if SetConsoleCtrlHandler(Some(signal_handler), 1) == 0 { + let e = io::Error::last_os_error(); + CloseHandle(SEMAPHORE); + SEMAPHORE = 0 as HANDLE; + panic!("Failed to set console handler: {}", e); + } + } +} + +#[cfg(windows)] +unsafe extern "system" fn signal_handler(_: u32) -> i32 { + ReleaseSemaphore(SEMAPHORE, 1, ptr::null_mut()); + 1 +} + +#[cfg(unix)] fn handle_signals(mut signals: Signals) { loop { for sig in signals.wait() { @@ -274,6 +371,16 @@ fn handle_signals(mut signals: Signals) { } } +#[cfg(windows)] +// must start on thread as waitforSingleObject puts the current thread to sleep +fn handle_signals() { + unsafe { + WaitForSingleObject(SEMAPHORE, INFINITE); + //Windows doesn't have similiar signal like SIGCHLD + // We could implement something if required but for now + } +} + fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { for _i in 0..count { match Client::connect(address) { @@ -292,6 +399,7 @@ fn remove_socket_silently(address: &str) { remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e)) } +#[cfg(unix)] fn remove_socket(address: &str) -> Result<()> { let path = parse_sockaddr(address); if let Ok(md) = Path::new(path).metadata() { @@ -302,6 +410,26 @@ fn remove_socket(address: &str) -> Result<()> { Ok(()) } +#[cfg(windows)] +fn remove_socket(address: &str) -> Result<()> { + use std::{ + fs::OpenOptions, + os::windows::prelude::{AsRawHandle, OpenOptionsExt}, + }; + + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + if let Ok(f) = opts.open(address) { + info!("attempting to remove existing named pipe: {}", address); + unsafe { CloseHandle(f.as_raw_handle() as isize) }; + } + Ok(()) +} + /// Spawn is a helper func to launch shim process. /// Typically this expected to be called from `StartShim`. pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> { @@ -311,6 +439,7 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result // Create socket and prepare listener. // We'll use `add_listener` when creating TTRPC server. + #[allow(clippy::let_unit_value)] let listener = match start_listener(&address) { Ok(l) => l, Err(e) => { @@ -320,6 +449,8 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result err: e, }); }; + // If the Address is already in use then make sure it is up and running and return the address + // This allows for running a single shim per container scenarios if let Ok(()) = wait_socket_working(&address, 5, 200) { write_address(&address)?; return Ok((0, address)); @@ -329,6 +460,18 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result } }; + run_shim(cmd, cwd, opts, vars, listener, address) +} + +#[cfg(unix)] +fn run_shim( + cmd: std::path::PathBuf, + cwd: std::path::PathBuf, + opts: StartOpts, + vars: Vec<(&str, &str)>, + listener: UnixListener, + address: String, +) -> Result<(u32, String)> { let mut command = Command::new(cmd); command @@ -363,6 +506,97 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result }) } +// Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70 +// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating +// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could +// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented. +#[cfg(windows)] +fn run_shim( + cmd: std::path::PathBuf, + cwd: std::path::PathBuf, + opts: StartOpts, + vars: Vec<(&str, &str)>, + _listener: (), + address: String, +) -> Result<(u32, String)> { + let mut command = Command::new(cmd); + + let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?; + let stdio_writer = writer.try_clone().unwrap(); + + command + .current_dir(cwd) + .stdout(stdio_writer) + .stdin(Stdio::null()) + .stderr(Stdio::null()) + .args([ + "-namespace", + &opts.namespace, + "-id", + &opts.id, + "-address", + &opts.address, + ]); + + if opts.debug { + command.arg("-debug"); + } + command.envs(vars); + + disable_handle_inheritance(); + command + .spawn() + .map_err(io_error!(e, "spawn shim")) + .map(|child| { + // IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr + // AND the shim Start method must NOT write to stdout/stderr + drop(writer); + drop(command); + io::copy(&mut reader, &mut io::stderr()).unwrap(); + (child.id(), address) + }) +} + +// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn. +// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles. +// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles. +// As a workaround we can Disables inheritance on the io pipe handles. +// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560 +#[cfg(windows)] +fn disable_handle_inheritance() { + use windows_sys::Win32::{ + Foundation::{SetHandleInformation, HANDLE_FLAG_INHERIT}, + System::Console::{GetStdHandle, STD_ERROR_HANDLE, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE}, + }; + + unsafe { + let std_err = GetStdHandle(STD_ERROR_HANDLE); + let std_in = GetStdHandle(STD_INPUT_HANDLE); + let std_out = GetStdHandle(STD_OUTPUT_HANDLE); + + for handle in [std_err, std_in, std_out] { + SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0); + //info!(" handle for... {:?}", handle); + //CloseHandle(handle); + } + } +} + +// This closes the stdout handle which was mapped to the stderr on the first invocation of the shim. +// This releases first process which will give containerd the address of the namedpipe. +#[cfg(windows)] +fn signal_server_started() { + use windows_sys::Win32::System::Console::{GetStdHandle, STD_OUTPUT_HANDLE}; + + unsafe { + let std_out = GetStdHandle(STD_OUTPUT_HANDLE); + + for handle in [std_out] { + CloseHandle(handle); + } + } +} + #[cfg(test)] mod tests { use std::thread; diff --git a/crates/shim/src/synchronous/publisher.rs b/crates/shim/src/synchronous/publisher.rs index 459901c..8f4f664 100644 --- a/crates/shim/src/synchronous/publisher.rs +++ b/crates/shim/src/synchronous/publisher.rs @@ -25,9 +25,11 @@ use client::{ }; use containerd_shim_protos as client; +#[cfg(unix)] +use crate::util::connect; use crate::{ error::Result, - util::{connect, convert_to_any, timestamp}, + util::{convert_to_any, timestamp}, }; /// Remote publisher connects to containerd's TTRPC endpoint to publish events from shim. @@ -47,10 +49,19 @@ impl RemotePublisher { }) } + #[cfg(unix)] fn connect(address: impl AsRef) -> Result { let fd = connect(address)?; // Client::new() takes ownership of the RawFd. - Ok(Client::new(fd)) + Ok(Client::new_from_fd(fd)?) + } + + #[cfg(windows)] + fn connect(address: impl AsRef) -> Result { + match Client::connect(address.as_ref()) { + Ok(client) => Ok(client), + Err(e) => Err(e.into()), + } } /// Publish a new event. @@ -90,10 +101,7 @@ impl Events for RemotePublisher { #[cfg(test)] mod tests { - use std::{ - os::unix::{io::AsRawFd, net::UnixListener}, - sync::{Arc, Barrier}, - }; + use std::sync::{Arc, Barrier}; use client::{ api::{Empty, ForwardRequest}, @@ -102,6 +110,8 @@ mod tests { use ttrpc::Server; use super::*; + #[cfg(windows)] + use crate::synchronous::wait_socket_working; struct FakeServer {} @@ -115,8 +125,12 @@ mod tests { #[test] fn test_connect() { + #[cfg(unix)] let tmpdir = tempfile::tempdir().unwrap(); + #[cfg(unix)] let path = format!("{}/socket", tmpdir.as_ref().to_str().unwrap()); + #[cfg(windows)] + let path = "\\\\.\\pipe\\test-pipe".to_string(); let path1 = path.clone(); assert!(RemotePublisher::connect("a".repeat(16384)).is_err()); @@ -125,17 +139,14 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let barrier2 = barrier.clone(); let thread = std::thread::spawn(move || { - let listener = UnixListener::bind(&path1).unwrap(); - listener.set_nonblocking(true).unwrap(); - let t = Arc::new(Box::new(FakeServer {}) as Box); - let service = client::create_events(t); - let mut server = Server::new() - .add_listener(listener.as_raw_fd()) - .unwrap() - .register_service(service); - std::mem::forget(listener); + let mut server = create_server(&path1); server.start().unwrap(); + + #[cfg(windows)] + // make sure pipe is ready on windows + wait_socket_working(&path1, 5, 5).unwrap(); + barrier2.wait(); barrier2.wait(); @@ -153,4 +164,30 @@ mod tests { thread.join().unwrap(); } + + #[cfg(unix)] + fn create_server(server_address: &String) -> Server { + use std::os::unix::{io::AsRawFd, net::UnixListener}; + let listener = UnixListener::bind(server_address).unwrap(); + listener.set_nonblocking(true).unwrap(); + let t = Arc::new(Box::new(FakeServer {}) as Box); + let service = client::create_events(t); + let server = Server::new() + .add_listener(listener.as_raw_fd()) + .unwrap() + .register_service(service); + std::mem::forget(listener); + server + } + + #[cfg(windows)] + fn create_server(server_address: &String) -> Server { + let t = Arc::new(Box::new(FakeServer {}) as Box); + let service = client::create_events(t); + + Server::new() + .bind(server_address) + .unwrap() + .register_service(service) + } } diff --git a/crates/shim/src/synchronous/util.rs b/crates/shim/src/synchronous/util.rs index 8b103c3..c78c17a 100644 --- a/crates/shim/src/synchronous/util.rs +++ b/crates/shim/src/synchronous/util.rs @@ -21,8 +21,10 @@ use std::{ }; use containerd_shim_protos::shim::oci::Options; +#[cfg(unix)] use libc::mode_t; use log::warn; +#[cfg(unix)] use nix::sys::stat::Mode; use oci_spec::runtime::Spec; @@ -117,6 +119,7 @@ pub fn read_spec_from_file(bundle: &str) -> crate::Result { Spec::load(path).map_err(other_error!(e, "read spec file")) } +#[cfg(unix)] pub fn mkdir(path: impl AsRef, mode: mode_t) -> crate::Result<()> { let path_buf = path.as_ref().to_path_buf(); if !path_buf.as_path().exists() { @@ -143,3 +146,57 @@ impl Drop for HelperRemoveFile { .unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e)); } } + +#[cfg(target_os = "windows")] +// helper to configure pause thread until signaled. Useful in attaching a debugger +// https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L313-L315 +// use with https://github.com/moby/docker-signal +pub(crate) fn setup_debugger_event() { + use std::{env, io, process}; + + use log::{debug, error}; + use windows_sys::Win32::System::Threading::{WaitForSingleObject, INFINITE}; + + let debugger = env::var("SHIM_DEBUGGER").unwrap_or_else(|_| "".to_string()); + if debugger.is_empty() { + return; + } + let event_name = format!("Global\\debugger-{}", process::id()); + debug!("Halting until signalled: {}", event_name); + let e = match create_event(event_name) { + Ok(e) => e, + Err(e) => { + error!("failed to create event for debugger: {}", e); + return; + } + }; + match unsafe { WaitForSingleObject(e, INFINITE) } { + 0 => {} + _ => { + error!( + "failed to wait for debugger event: {}", + io::Error::last_os_error() + ); + return; + } + } + debug!("signal received, continuing"); +} + +#[cfg(target_os = "windows")] +fn create_event(name: String) -> crate::Result { + use std::{ffi::OsStr, io, os::windows::prelude::OsStrExt}; + + use windows_sys::Win32::System::Threading::CreateEventW; + + let name = OsStr::new(name.as_str()) + .encode_wide() + .chain(Some(0)) // add NULL termination + .collect::>(); + + let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 0, name.as_ptr()) }; + match result { + 0 => Err(Error::Other(io::Error::last_os_error().to_string())), + _ => Ok(result), + } +} diff --git a/crates/shim/src/sys/mod.rs b/crates/shim/src/sys/mod.rs new file mode 100644 index 0000000..3f98c21 --- /dev/null +++ b/crates/shim/src/sys/mod.rs @@ -0,0 +1,20 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#[cfg(windows)] +pub(crate) mod windows; +#[cfg(windows)] +pub use crate::sys::windows::NamedPipeLogger; diff --git a/crates/shim/src/sys/windows/mod.rs b/crates/shim/src/sys/windows/mod.rs new file mode 100644 index 0000000..322dfbd --- /dev/null +++ b/crates/shim/src/sys/windows/mod.rs @@ -0,0 +1,17 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +pub(crate) mod named_pipe_logger; +pub use named_pipe_logger::NamedPipeLogger; diff --git a/crates/shim/src/sys/windows/named_pipe_logger.rs b/crates/shim/src/sys/windows/named_pipe_logger.rs new file mode 100644 index 0000000..658246e --- /dev/null +++ b/crates/shim/src/sys/windows/named_pipe_logger.rs @@ -0,0 +1,236 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::{ + io::{self, Write}, + sync::{Arc, Mutex}, + thread, +}; + +use log::{Metadata, Record}; +use mio::{windows::NamedPipe, Events, Interest, Poll, Token}; + +pub struct NamedPipeLogger { + current_connection: Arc>, +} + +impl NamedPipeLogger { + pub fn new(namespace: &String, id: &String) -> Result { + let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", namespace, id); + let mut pipe_server = NamedPipe::new(pipe_name).unwrap(); + let mut poll = Poll::new().unwrap(); + poll.registry() + .register( + &mut pipe_server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + ) + .unwrap(); + + let current_connection = Arc::new(Mutex::new(pipe_server)); + let server_connection = current_connection.clone(); + let logger = NamedPipeLogger { current_connection }; + + thread::spawn(move || { + let mut events = Events::with_capacity(128); + loop { + poll.poll(&mut events, None).unwrap(); + + for event in events.iter() { + if event.is_writable() { + match server_connection.lock().unwrap().connect() { + Ok(()) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { + // this would block just keep processing + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // this would block just keep processing + } + Err(e) => { + panic!("Error connecting to client: {}", e); + } + }; + } + if event.is_readable() { + server_connection.lock().unwrap().disconnect().unwrap(); + } + } + } + }); + + Ok(logger) + } +} + +impl log::Log for NamedPipeLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= log::max_level() + } + + fn log(&self, record: &Record) { + if self.enabled(record.metadata()) { + let message = format!("[{}] {}\n", record.level(), record.args()); + + match self + .current_connection + .lock() + .unwrap() + .write(message.as_bytes()) + { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { + // this would block just keep processing + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // this would block just keep processing + } + Err(e) if e.raw_os_error() == Some(536) => { + // no client connected + } + Err(e) if e.raw_os_error() == Some(232) => { + // client was connected but is in process of shutting down + } + Err(e) => { + panic!("Error writing to client: {}", e) + } + } + } + } + + fn flush(&self) { + _ = self.current_connection.lock().unwrap().flush(); + } +} + +#[cfg(test)] +mod tests { + use std::{ + fs::OpenOptions, + io::Read, + os::windows::{ + fs::OpenOptionsExt, + io::{FromRawHandle, IntoRawHandle}, + prelude::AsRawHandle, + }, + time::Duration, + }; + + use log::{Log, Record}; + use mio::{windows::NamedPipe, Events, Interest, Poll, Token}; + use windows_sys::Win32::{ + Foundation::ERROR_PIPE_NOT_CONNECTED, Storage::FileSystem::FILE_FLAG_OVERLAPPED, + }; + + use super::*; + + #[test] + fn test_namedpipe_log_can_write_before_client_connected() { + let ns = "test".to_string(); + let id = "notconnected".to_string(); + let logger = NamedPipeLogger::new(&ns, &id).unwrap(); + + // test can write before a reader is connected (should succeed but the messages will be dropped) + log::set_max_level(log::LevelFilter::Info); + let record = Record::builder() + .level(log::Level::Info) + .line(Some(1)) + .file(Some("sample file")) + .args(format_args!("hello")) + .build(); + logger.log(&record); + logger.flush(); + } + + #[test] + fn test_namedpipe_log() { + use std::fs::File; + + let ns = "test".to_string(); + let id = "clients".to_string(); + let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", ns, id); + + let logger = NamedPipeLogger::new(&ns, &id).unwrap(); + let mut client = create_client(pipe_name.as_str()); + + log::set_max_level(log::LevelFilter::Info); + let record = Record::builder() + .level(log::Level::Info) + .line(Some(1)) + .file(Some("sample file")) + .args(format_args!("hello")) + .build(); + logger.log(&record); + logger.flush(); + + let buf = read_message(&mut client); + assert_eq!("[INFO] hello", std::str::from_utf8(&buf).unwrap()); + + // test that we can reconnect after a reader disconnects + // we need to get the raw handle and drop that as well to force full disconnect + // and give a few milliseconds for the disconnect to happen + println!("dropping client"); + let handle = client.as_raw_handle(); + drop(client); + let f = unsafe { File::from_raw_handle(handle) }; + drop(f); + std::thread::sleep(Duration::from_millis(100)); + + let mut client2 = create_client(pipe_name.as_str()); + logger.log(&record); + logger.flush(); + + read_message(&mut client2); + } + + fn read_message(client: &mut NamedPipe) -> [u8; 12] { + let mut poll = Poll::new().unwrap(); + poll.registry() + .register(client, Token(1), Interest::READABLE) + .unwrap(); + let mut events = Events::with_capacity(128); + let mut buf = [0; 12]; + loop { + poll.poll(&mut events, Some(Duration::from_millis(10))) + .unwrap(); + match client.read(&mut buf) { + Ok(0) => { + panic!("Read no bytes from pipe") + } + Ok(_) => { + break; + } + Err(e) if e.raw_os_error() == Some(ERROR_PIPE_NOT_CONNECTED as i32) => { + panic!("not connected to the pipe"); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => panic!("Error reading from pipe: {}", e), + } + } + buf + } + + fn create_client(pipe_name: &str) -> mio::windows::NamedPipe { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = opts.open(pipe_name).unwrap(); + + unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) } + } +} diff --git a/crates/shim/src/util.rs b/crates/shim/src/util.rs index 8913e84..4c6f74d 100644 --- a/crates/shim/src/util.rs +++ b/crates/shim/src/util.rs @@ -14,10 +14,9 @@ limitations under the License. */ -use std::{ - os::unix::io::RawFd, - time::{SystemTime, UNIX_EPOCH}, -}; +#[cfg(unix)] +use std::os::unix::io::RawFd; +use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -100,6 +99,7 @@ impl From for Options { } } +#[cfg(unix)] pub fn connect(address: impl AsRef) -> Result { use nix::{sys::socket::*, unistd::close};