Windows support for the synchronous shim

Signed-off-by: James Sturtevant <jsturtevant@gmail.com>
Signed-off-by: James Sturtevant <jstur@microsoft.com>
This commit is contained in:
James Sturtevant 2023-04-19 13:52:00 -07:00 committed by James Sturtevant
parent 5c96c0fe63
commit bceaf4aca3
No known key found for this signature in database
17 changed files with 943 additions and 47 deletions

99
.github/workflows/ci-windows.yml vendored Normal file
View File

@ -0,0 +1,99 @@
name: CI-windows
on:
pull_request:
push:
schedule:
- cron: '0 0 * * *' # Every day at midnight
jobs:
checks:
name: Checks
runs-on: ${{ matrix.os }}
timeout-minutes: 20
strategy:
matrix:
os: [windows-latest]
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- run: cargo check --examples --tests -p containerd-shim -p containerd-shim-protos
- run: rustup toolchain install nightly --component rustfmt
- run: cargo +nightly fmt -p containerd-shim -p containerd-shim-protos -- --check --files-with-diff
- run: cargo clippy -p containerd-shim -p containerd-shim-protos -- -D warnings
- run: cargo doc --no-deps -p containerd-shim -p containerd-shim-protos
env:
RUSTDOCFLAGS: -Dwarnings
tests:
name: Tests
runs-on: ${{ matrix.os }}
timeout-minutes: 15
strategy:
matrix:
os: [windows-latest]
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Tests
run: |
cargo test -p containerd-shim -p containerd-shim-protos
integration:
name: Integration
runs-on: ${{ matrix.os }}
timeout-minutes: 40
strategy:
matrix:
os: [windows-latest]
containerd: [1.7.0]
steps:
- name: Checkout extensions
uses: actions/checkout@v3
- name: Install containerd
run: |
$ErrorActionPreference = "Stop"
# Install containerd https://github.com/containerd/containerd/blob/v1.7.0/docs/getting-started.md#installing-containerd-on-windows
# Download and extract desired containerd Windows binaries
curl.exe -L https://github.com/containerd/containerd/releases/download/v${{ matrix.containerd }}/containerd-${{ matrix.containerd }}-windows-amd64.tar.gz -o containerd-windows-amd64.tar.gz
tar.exe xvf .\containerd-windows-amd64.tar.gz
# Copy and configure
mkdir "$Env:ProgramFiles\containerd"
Copy-Item -Path ".\bin\*" -Destination "$Env:ProgramFiles\containerd" -Recurse -Force
cd $Env:ProgramFiles\containerd\
.\containerd.exe config default | Out-File config.toml -Encoding ascii
# Review the configuration. Depending on setup you may want to adjust:
# - the sandbox_image (Kubernetes pause image)
# - cni bin_dir and conf_dir locations
Get-Content config.toml
# Register and start service
.\containerd.exe --register-service
Start-Service containerd
working-directory: ${{ runner.temp }}
- name: Run integration test
run: |
$ErrorActionPreference = "Stop"
get-service containerd
$env:TTRPC_ADDRESS="\\.\pipe\containerd-containerd.ttrpc"
# run the example
cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" -publish-binary ./bin/containerd start
ps skeleton
cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe

2
.gitignore vendored
View File

@ -9,3 +9,5 @@ Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
log
.vscode

View File

@ -13,7 +13,7 @@ homepage.workspace = true
[dependencies]
protobuf = "3.1"
ttrpc = "0.7"
ttrpc = { git = "https://github.com/jsturtevant/ttrpc-rust", rev = "d96bea3a41b6c6b85c1d8da53e5085fb0789a685" } # unmerged pr https://github.com/containerd/ttrpc-rust/pull/182
async-trait = { version = "0.1.48", optional = true }
[build-dependencies]

View File

@ -17,13 +17,16 @@ async = ["tokio", "containerd-shim-protos/async", "async-trait", "futures", "sig
name = "skeleton_async"
required-features = ["async"]
[[example]]
name = "windows-log-reader"
path = "examples/windows_log_reader.rs"
[dependencies]
go-flag = "0.1.0"
thiserror = "1.0"
log = { version = "0.4", features = ["std"] }
libc = "0.2.95"
nix = "0.26"
command-fds = "0.2.1"
lazy_static = "1.4.0"
time = { version = "0.3.7", features = ["serde", "std"] }
serde_json = "1.0.78"
@ -45,5 +48,13 @@ signal-hook-tokio = { version = "0.3.1", optional = true, features = ["futures-v
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.2.9"
[target.'cfg(unix)'.dependencies]
command-fds = "0.2.1"
[target.'cfg(windows)'.dependencies]
windows-sys = {version = "0.48.0", features = ["Win32_Foundation","Win32_System_WindowsProgramming","Win32_System_Console", "Win32_System_Pipes","Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Threading"]}
mio = { version = "0.8", features = ["os-ext", "os-poll"] }
os_pipe = "1.1.3"
[dev-dependencies]
tempfile = "3.0"

View File

@ -135,7 +135,30 @@ $ cat log
[INFO] reaper thread stopped
```
### Running on Windows
```powershell
# Run containerd in background
$env:TTRPC_ADDRESS="\\.\pipe\containerd-containerd.ttrpc"
$ cargo run --example skeleton -- -namespace default -id 1234 -address "\\.\pipe\containerd-containerd" start
\\.\pipe\containerd-shim-17630016127144989388-pipe
# (Optional) Run the log collector in a separate command window
# note: log reader won't work if containerd is connected to the named pipe, this works when running manually to help debug locally
$ cargo run --example windows-log-reader \\.\pipe\containerd-shim-default-1234-log
Reading logs from: \\.\pipe\containerd-shim-default-1234-log
<logs will appear after next command>
$ cargo run --example shim-proto-connect \\.\pipe\containerd-shim-17630016127144989388-pipe
Connecting to \\.\pipe\containerd-shim-17630016127144989388-pipe...
Sending `Connect` request...
Connect response: version: "example"
Sending `Shutdown` request...
Shutdown response: ""
```
## Supported Platforms
Currently, following OSs and hardware architectures are supported, and more efforts are needed to enable and validate other OSs and architectures.
- Linux
- Mac OS
- Windows

View File

@ -0,0 +1,71 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#[cfg(windows)]
use std::error::Error;
#[cfg(windows)]
fn main() -> Result<(), Box<dyn Error>> {
use std::{
env,
fs::OpenOptions,
os::windows::{
fs::OpenOptionsExt,
io::{FromRawHandle, IntoRawHandle},
},
time::Duration,
};
use mio::{windows::NamedPipe, Events, Interest, Poll, Token};
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
let args: Vec<String> = env::args().collect();
let address = args
.get(1)
.ok_or("First argument must be shims address to read logs (\\\\.\\pipe\\containerd-shim-{ns}-{id}-log) ")
.unwrap();
println!("Reading logs from: {}", &address);
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
let file = opts.open(address).unwrap();
let mut client = unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) };
let mut stdio = std::io::stdout();
let mut poll = Poll::new().unwrap();
poll.registry()
.register(&mut client, Token(1), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
loop {
poll.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
match std::io::copy(&mut client, &mut stdio) {
Ok(_) => break,
Err(_) => continue,
}
}
Ok(())
}
#[cfg(unix)]
fn main() {
println!("This example is only for Windows");
}

View File

@ -50,9 +50,11 @@ pub enum Error {
Setup(#[from] log::SetLoggerError),
/// Unable to pass fd to child process (we rely on `command_fds` crate for this).
#[cfg(unix)]
#[error("Failed to pass socket fd to child: {0}")]
FdMap(#[from] command_fds::FdMappingCollision),
#[cfg(unix)]
#[error("Nix error: {0}")]
Nix(#[from] nix::Error),
@ -65,6 +67,7 @@ pub enum Error {
#[error("Failed pre condition: {0}")]
FailedPreconditionError(String),
#[cfg(unix)]
#[error("{context} error: {err}")]
MountError {
context: String,

View File

@ -32,20 +32,24 @@
//! ```
//!
use std::{collections::hash_map::DefaultHasher, fs::File, hash::Hasher, path::PathBuf};
#[cfg(windows)]
use std::{fs::OpenOptions, os::windows::prelude::OpenOptionsExt};
#[cfg(unix)]
use std::{
collections::hash_map::DefaultHasher,
fs::File,
hash::Hasher,
os::unix::{io::RawFd, net::UnixListener},
path::{Path, PathBuf},
path::Path,
};
pub use containerd_shim_protos as protos;
#[cfg(unix)]
use nix::ioctl_write_ptr_bad;
pub use protos::{
shim::shim::DeleteResponse,
ttrpc::{context::Context, Result as TtrpcResult},
};
#[cfg(windows)]
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
#[cfg(feature = "async")]
pub use crate::asynchronous::*;
@ -67,6 +71,7 @@ pub mod mount;
mod reap;
#[cfg(not(feature = "async"))]
pub mod synchronous;
mod sys;
pub mod util;
/// Generated request/response structures.
@ -112,6 +117,7 @@ cfg_async! {
pub use protos::ttrpc::r#async::TtrpcContext;
}
#[cfg(unix)]
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);
const TTRPC_ADDRESS: &str = "TTRPC_ADDRESS";
@ -149,6 +155,7 @@ pub struct StartOpts {
/// The shim process communicates with the containerd server through a communication channel
/// created by containerd. One endpoint of the communication channel is passed to shim process
/// through a file descriptor during forking, which is the fourth(3) file descriptor.
#[cfg(unix)]
const SOCKET_FD: RawFd = 3;
#[cfg(target_os = "linux")]
@ -157,6 +164,9 @@ pub const SOCKET_ROOT: &str = "/run/containerd";
#[cfg(target_os = "macos")]
pub const SOCKET_ROOT: &str = "/var/run/containerd";
#[cfg(target_os = "windows")]
pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd";
/// Make socket path from containerd socket path, namespace and id.
pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
let path = PathBuf::from(socket_path)
@ -171,9 +181,20 @@ pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
hasher.finish()
};
format_address(hash)
}
#[cfg(windows)]
fn format_address(hash: u64) -> String {
format!(r"\\.\pipe\containerd-shim-{}-pipe", hash)
}
#[cfg(unix)]
fn format_address(hash: u64) -> String {
format!("unix://{}/{:x}.sock", SOCKET_ROOT, hash)
}
#[cfg(unix)]
fn parse_sockaddr(addr: &str) -> &str {
if let Some(addr) = addr.strip_prefix("unix://") {
return addr;
@ -186,6 +207,26 @@ fn parse_sockaddr(addr: &str) -> &str {
addr
}
#[cfg(windows)]
fn start_listener(address: &str) -> std::io::Result<()> {
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
if let Ok(f) = opts.open(address) {
info!("found existing named pipe: {}", address);
drop(f);
return Err(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"address already exists",
));
}
// windows starts the listener on the second invocation of the shim
Ok(())
}
#[cfg(unix)]
fn start_listener(address: &str) -> std::io::Result<UnixListener> {
let path = parse_sockaddr(address);
// Try to create the needed directory hierarchy.
@ -204,6 +245,7 @@ mod tests {
use crate::start_listener;
#[test]
#[cfg(unix)]
fn test_start_listener() {
let tmpdir = tempfile::tempdir().unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned();
@ -226,4 +268,16 @@ mod tests {
let context = std::fs::read_to_string(&txt_file).unwrap();
assert_eq!(context, "test");
}
#[test]
#[cfg(windows)]
fn test_start_listener_windows() {
use mio::windows::NamedPipe;
let named_pipe = "\\\\.\\pipe\\test-pipe-duplicate".to_string();
start_listener(&named_pipe).unwrap();
let _pipe_server = NamedPipe::new(named_pipe.clone()).unwrap();
start_listener(&named_pipe).expect_err("address already exists");
}
}

View File

@ -26,16 +26,20 @@ use std::{
use log::{Metadata, Record};
use crate::error::Error;
#[cfg(windows)]
use crate::sys::windows::NamedPipeLogger;
pub struct FifoLogger {
file: Mutex<File>,
}
impl FifoLogger {
#[allow(dead_code)]
pub fn new() -> Result<FifoLogger, io::Error> {
Self::with_path("log")
}
#[allow(dead_code)]
pub fn with_path<P: AsRef<Path>>(path: P) -> Result<FifoLogger, io::Error> {
let f = OpenOptions::new()
.write(true)
@ -74,34 +78,61 @@ impl log::Log for FifoLogger {
}
}
#[cfg(unix)]
pub fn init(debug: bool) -> Result<(), Error> {
let logger = FifoLogger::new().map_err(io_error!(e, "failed to init logger"))?;
let boxed_logger = Box::new(logger);
configure_logger(boxed_logger, debug)
}
#[cfg(windows)]
// Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd-<namespace>-<id>-log
// There is an assumption that there is always only one client connected which is containerd.
// If there is a restart of containerd then logs during that time period will be lost.
//
// https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77
// https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137
pub fn init(debug: bool, namespace: &String, id: &String) -> Result<(), Error> {
let logger =
NamedPipeLogger::new(namespace, id).map_err(io_error!(e, "failed to init logger"))?;
let boxed_logger = Box::new(logger);
configure_logger(boxed_logger, debug)
}
fn configure_logger(logger: Box<dyn log::Log>, debug: bool) -> Result<(), Error> {
let level = if debug {
log::LevelFilter::Debug
} else {
log::LevelFilter::Info
};
log::set_boxed_logger(Box::new(logger))?;
log::set_boxed_logger(logger)?;
log::set_max_level(level);
Ok(())
}
#[cfg(test)]
mod tests {
use log::{Log, Record};
use nix::{sys::stat, unistd};
use super::*;
#[test]
fn test_fifo_log() {
#[cfg(unix)]
use nix::{sys::stat, unistd};
let tmpdir = tempfile::tempdir().unwrap();
let path = tmpdir.path().to_str().unwrap().to_owned() + "/log";
#[cfg(unix)]
unistd::mkfifo(Path::new(&path), stat::Mode::S_IRWXU).unwrap();
#[cfg(windows)]
File::create(path.clone()).unwrap();
let path1 = path.clone();
let thread = std::thread::spawn(move || {
let _fifo = OpenOptions::new()

View File

@ -1,3 +1,4 @@
#![cfg(not(windows))]
/*
Copyright The containerd Authors.

View File

@ -32,33 +32,37 @@
//! ```
//!
macro_rules! cfg_unix {
($($item:item)*) => {
$(
#[cfg(unix)]
$item
)*
}
}
macro_rules! cfg_windows {
($($item:item)*) => {
$(
#[cfg(windows)]
$item
)*
}
}
use std::{
convert::TryFrom,
env, fs,
env,
io::Write,
os::unix::{fs::FileTypeExt, io::AsRawFd},
path::Path,
process::{self, Command, Stdio},
sync::{Arc, Condvar, Mutex},
};
use command_fds::{CommandFdExt, FdMapping};
use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
pub use log::{debug, error, info, warn};
use nix::{
errno::Errno,
sys::{
signal::Signal,
wait::{self, WaitPidFlag, WaitStatus},
},
unistd::Pid,
};
use signal_hook::iterator::Signals;
use util::{read_address, write_address};
use crate::{
api::DeleteResponse,
args, logger, parse_sockaddr,
args, logger,
protos::{
protobuf::Message,
shim::shim_ttrpc::{create_task, Task},
@ -66,9 +70,45 @@ use crate::{
},
reap, socket_address, start_listener,
synchronous::publisher::RemotePublisher,
Config, Error, Result, StartOpts, SOCKET_FD, TTRPC_ADDRESS,
Config, Error, Result, StartOpts, TTRPC_ADDRESS,
};
cfg_unix! {
use std::os::unix::net::UnixListener;
use crate::{SOCKET_FD, parse_sockaddr};
use command_fds::{CommandFdExt, FdMapping};
use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
use nix::{
errno::Errno,
sys::{
signal::Signal,
wait::{self, WaitPidFlag, WaitStatus},
},
unistd::Pid,
};
use signal_hook::iterator::Signals;
use std::os::unix::fs::FileTypeExt;
use std::{convert::TryFrom, fs, path::Path};
use std::os::fd::AsRawFd;
}
cfg_windows! {
use std::{io, ptr};
use windows_sys::Win32::{
Foundation::{CloseHandle, HANDLE},
System::{
Console::SetConsoleCtrlHandler,
Threading::{CreateSemaphoreA, ReleaseSemaphore, },
},
};
static mut SEMAPHORE: HANDLE = 0 as HANDLE;
const MAX_SEM_COUNT: i32 = 255;
use windows_sys::Win32::System::Threading::WaitForSingleObject;
use windows_sys::Win32::System::Threading::INFINITE;
}
pub mod monitor;
pub mod publisher;
pub mod util;
@ -158,9 +198,13 @@ where
// Create shim instance
let mut config = opts.unwrap_or_default();
#[cfg(unix)]
// Setup signals
let signals = setup_signals(&config);
#[cfg(windows)]
setup_signals();
if !config.no_sub_reaper {
reap::set_subreaper()?;
}
@ -188,7 +232,10 @@ where
Ok(())
}
"delete" => {
#[cfg(unix)]
std::thread::spawn(move || handle_signals(signals));
#[cfg(windows)]
std::thread::spawn(handle_signals);
let response = shim.delete_shim()?;
let stdout = std::io::stdout();
let mut locked = stdout.lock();
@ -197,18 +244,28 @@ where
Ok(())
}
_ => {
#[cfg(windows)]
util::setup_debugger_event();
if !config.no_setup_logger {
#[cfg(unix)]
logger::init(flags.debug)?;
#[cfg(windows)]
logger::init(flags.debug, &flags.namespace, &flags.id)?;
}
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
let task = shim.create_task_service(publisher);
let task_service = create_task(Arc::new(Box::new(task)));
let mut server = Server::new().register_service(task_service);
server = server.add_listener(SOCKET_FD)?;
let mut server = create_server(flags)?;
server = server.register_service(task_service);
server.start()?;
#[cfg(windows)]
signal_server_started();
info!("Shim successfully started, waiting for exit signal...");
#[cfg(unix)]
std::thread::spawn(move || handle_signals(signals));
shim.wait();
@ -224,6 +281,22 @@ where
}
}
#[cfg(windows)]
fn create_server(flags: args::Flags) -> Result<Server> {
let mut server = Server::new();
let address = socket_address(&flags.address, &flags.namespace, &flags.id);
server = server.bind(address.as_str())?;
Ok(server)
}
#[cfg(unix)]
fn create_server(_flags: args::Flags) -> Result<Server> {
let mut server = Server::new();
server = server.add_listener(SOCKET_FD)?;
Ok(server)
}
#[cfg(unix)]
fn setup_signals(config: &Config) -> Signals {
let signals = Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed");
if !config.no_reaper {
@ -232,6 +305,30 @@ fn setup_signals(config: &Config) -> Signals {
signals
}
#[cfg(windows)]
fn setup_signals() {
unsafe {
SEMAPHORE = CreateSemaphoreA(ptr::null_mut(), 0, MAX_SEM_COUNT, ptr::null());
if SEMAPHORE == 0 {
panic!("Failed to create semaphore: {}", io::Error::last_os_error());
}
if SetConsoleCtrlHandler(Some(signal_handler), 1) == 0 {
let e = io::Error::last_os_error();
CloseHandle(SEMAPHORE);
SEMAPHORE = 0 as HANDLE;
panic!("Failed to set console handler: {}", e);
}
}
}
#[cfg(windows)]
unsafe extern "system" fn signal_handler(_: u32) -> i32 {
ReleaseSemaphore(SEMAPHORE, 1, ptr::null_mut());
1
}
#[cfg(unix)]
fn handle_signals(mut signals: Signals) {
loop {
for sig in signals.wait() {
@ -274,6 +371,16 @@ fn handle_signals(mut signals: Signals) {
}
}
#[cfg(windows)]
// must start on thread as waitforSingleObject puts the current thread to sleep
fn handle_signals() {
unsafe {
WaitForSingleObject(SEMAPHORE, INFINITE);
//Windows doesn't have similiar signal like SIGCHLD
// We could implement something if required but for now
}
}
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
match Client::connect(address) {
@ -292,6 +399,7 @@ fn remove_socket_silently(address: &str) {
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
}
#[cfg(unix)]
fn remove_socket(address: &str) -> Result<()> {
let path = parse_sockaddr(address);
if let Ok(md) = Path::new(path).metadata() {
@ -302,6 +410,26 @@ fn remove_socket(address: &str) -> Result<()> {
Ok(())
}
#[cfg(windows)]
fn remove_socket(address: &str) -> Result<()> {
use std::{
fs::OpenOptions,
os::windows::prelude::{AsRawHandle, OpenOptionsExt},
};
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
if let Ok(f) = opts.open(address) {
info!("attempting to remove existing named pipe: {}", address);
unsafe { CloseHandle(f.as_raw_handle() as isize) };
}
Ok(())
}
/// Spawn is a helper func to launch shim process.
/// Typically this expected to be called from `StartShim`.
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
@ -311,6 +439,7 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
// Create socket and prepare listener.
// We'll use `add_listener` when creating TTRPC server.
#[allow(clippy::let_unit_value)]
let listener = match start_listener(&address) {
Ok(l) => l,
Err(e) => {
@ -320,6 +449,8 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
err: e,
});
};
// If the Address is already in use then make sure it is up and running and return the address
// This allows for running a single shim per container scenarios
if let Ok(()) = wait_socket_working(&address, 5, 200) {
write_address(&address)?;
return Ok((0, address));
@ -329,6 +460,18 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
}
};
run_shim(cmd, cwd, opts, vars, listener, address)
}
#[cfg(unix)]
fn run_shim(
cmd: std::path::PathBuf,
cwd: std::path::PathBuf,
opts: StartOpts,
vars: Vec<(&str, &str)>,
listener: UnixListener,
address: String,
) -> Result<(u32, String)> {
let mut command = Command::new(cmd);
command
@ -363,6 +506,97 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
})
}
// Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
#[cfg(windows)]
fn run_shim(
cmd: std::path::PathBuf,
cwd: std::path::PathBuf,
opts: StartOpts,
vars: Vec<(&str, &str)>,
_listener: (),
address: String,
) -> Result<(u32, String)> {
let mut command = Command::new(cmd);
let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?;
let stdio_writer = writer.try_clone().unwrap();
command
.current_dir(cwd)
.stdout(stdio_writer)
.stdin(Stdio::null())
.stderr(Stdio::null())
.args([
"-namespace",
&opts.namespace,
"-id",
&opts.id,
"-address",
&opts.address,
]);
if opts.debug {
command.arg("-debug");
}
command.envs(vars);
disable_handle_inheritance();
command
.spawn()
.map_err(io_error!(e, "spawn shim"))
.map(|child| {
// IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr
// AND the shim Start method must NOT write to stdout/stderr
drop(writer);
drop(command);
io::copy(&mut reader, &mut io::stderr()).unwrap();
(child.id(), address)
})
}
// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn.
// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles.
// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles.
// As a workaround we can Disables inheritance on the io pipe handles.
// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560
#[cfg(windows)]
fn disable_handle_inheritance() {
use windows_sys::Win32::{
Foundation::{SetHandleInformation, HANDLE_FLAG_INHERIT},
System::Console::{GetStdHandle, STD_ERROR_HANDLE, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE},
};
unsafe {
let std_err = GetStdHandle(STD_ERROR_HANDLE);
let std_in = GetStdHandle(STD_INPUT_HANDLE);
let std_out = GetStdHandle(STD_OUTPUT_HANDLE);
for handle in [std_err, std_in, std_out] {
SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0);
//info!(" handle for... {:?}", handle);
//CloseHandle(handle);
}
}
}
// This closes the stdout handle which was mapped to the stderr on the first invocation of the shim.
// This releases first process which will give containerd the address of the namedpipe.
#[cfg(windows)]
fn signal_server_started() {
use windows_sys::Win32::System::Console::{GetStdHandle, STD_OUTPUT_HANDLE};
unsafe {
let std_out = GetStdHandle(STD_OUTPUT_HANDLE);
for handle in [std_out] {
CloseHandle(handle);
}
}
}
#[cfg(test)]
mod tests {
use std::thread;

View File

@ -25,9 +25,11 @@ use client::{
};
use containerd_shim_protos as client;
#[cfg(unix)]
use crate::util::connect;
use crate::{
error::Result,
util::{connect, convert_to_any, timestamp},
util::{convert_to_any, timestamp},
};
/// Remote publisher connects to containerd's TTRPC endpoint to publish events from shim.
@ -47,10 +49,19 @@ impl RemotePublisher {
})
}
#[cfg(unix)]
fn connect(address: impl AsRef<str>) -> Result<Client> {
let fd = connect(address)?;
// Client::new() takes ownership of the RawFd.
Ok(Client::new(fd))
Ok(Client::new_from_fd(fd)?)
}
#[cfg(windows)]
fn connect(address: impl AsRef<str>) -> Result<Client> {
match Client::connect(address.as_ref()) {
Ok(client) => Ok(client),
Err(e) => Err(e.into()),
}
}
/// Publish a new event.
@ -90,10 +101,7 @@ impl Events for RemotePublisher {
#[cfg(test)]
mod tests {
use std::{
os::unix::{io::AsRawFd, net::UnixListener},
sync::{Arc, Barrier},
};
use std::sync::{Arc, Barrier};
use client::{
api::{Empty, ForwardRequest},
@ -102,6 +110,8 @@ mod tests {
use ttrpc::Server;
use super::*;
#[cfg(windows)]
use crate::synchronous::wait_socket_working;
struct FakeServer {}
@ -115,8 +125,12 @@ mod tests {
#[test]
fn test_connect() {
#[cfg(unix)]
let tmpdir = tempfile::tempdir().unwrap();
#[cfg(unix)]
let path = format!("{}/socket", tmpdir.as_ref().to_str().unwrap());
#[cfg(windows)]
let path = "\\\\.\\pipe\\test-pipe".to_string();
let path1 = path.clone();
assert!(RemotePublisher::connect("a".repeat(16384)).is_err());
@ -125,17 +139,14 @@ mod tests {
let barrier = Arc::new(Barrier::new(2));
let barrier2 = barrier.clone();
let thread = std::thread::spawn(move || {
let listener = UnixListener::bind(&path1).unwrap();
listener.set_nonblocking(true).unwrap();
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t);
let mut server = Server::new()
.add_listener(listener.as_raw_fd())
.unwrap()
.register_service(service);
std::mem::forget(listener);
let mut server = create_server(&path1);
server.start().unwrap();
#[cfg(windows)]
// make sure pipe is ready on windows
wait_socket_working(&path1, 5, 5).unwrap();
barrier2.wait();
barrier2.wait();
@ -153,4 +164,30 @@ mod tests {
thread.join().unwrap();
}
#[cfg(unix)]
fn create_server(server_address: &String) -> Server {
use std::os::unix::{io::AsRawFd, net::UnixListener};
let listener = UnixListener::bind(server_address).unwrap();
listener.set_nonblocking(true).unwrap();
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t);
let server = Server::new()
.add_listener(listener.as_raw_fd())
.unwrap()
.register_service(service);
std::mem::forget(listener);
server
}
#[cfg(windows)]
fn create_server(server_address: &String) -> Server {
let t = Arc::new(Box::new(FakeServer {}) as Box<dyn Events + Send + Sync>);
let service = client::create_events(t);
Server::new()
.bind(server_address)
.unwrap()
.register_service(service)
}
}

View File

@ -21,8 +21,10 @@ use std::{
};
use containerd_shim_protos::shim::oci::Options;
#[cfg(unix)]
use libc::mode_t;
use log::warn;
#[cfg(unix)]
use nix::sys::stat::Mode;
use oci_spec::runtime::Spec;
@ -117,6 +119,7 @@ pub fn read_spec_from_file(bundle: &str) -> crate::Result<Spec> {
Spec::load(path).map_err(other_error!(e, "read spec file"))
}
#[cfg(unix)]
pub fn mkdir(path: impl AsRef<Path>, mode: mode_t) -> crate::Result<()> {
let path_buf = path.as_ref().to_path_buf();
if !path_buf.as_path().exists() {
@ -143,3 +146,57 @@ impl Drop for HelperRemoveFile {
.unwrap_or_else(|e| warn!("remove dir {} error: {}", &self.path, e));
}
}
#[cfg(target_os = "windows")]
// helper to configure pause thread until signaled. Useful in attaching a debugger
// https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L313-L315
// use with https://github.com/moby/docker-signal
pub(crate) fn setup_debugger_event() {
use std::{env, io, process};
use log::{debug, error};
use windows_sys::Win32::System::Threading::{WaitForSingleObject, INFINITE};
let debugger = env::var("SHIM_DEBUGGER").unwrap_or_else(|_| "".to_string());
if debugger.is_empty() {
return;
}
let event_name = format!("Global\\debugger-{}", process::id());
debug!("Halting until signalled: {}", event_name);
let e = match create_event(event_name) {
Ok(e) => e,
Err(e) => {
error!("failed to create event for debugger: {}", e);
return;
}
};
match unsafe { WaitForSingleObject(e, INFINITE) } {
0 => {}
_ => {
error!(
"failed to wait for debugger event: {}",
io::Error::last_os_error()
);
return;
}
}
debug!("signal received, continuing");
}
#[cfg(target_os = "windows")]
fn create_event(name: String) -> crate::Result<isize> {
use std::{ffi::OsStr, io, os::windows::prelude::OsStrExt};
use windows_sys::Win32::System::Threading::CreateEventW;
let name = OsStr::new(name.as_str())
.encode_wide()
.chain(Some(0)) // add NULL termination
.collect::<Vec<_>>();
let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 0, name.as_ptr()) };
match result {
0 => Err(Error::Other(io::Error::last_os_error().to_string())),
_ => Ok(result),
}
}

View File

@ -0,0 +1,20 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#[cfg(windows)]
pub(crate) mod windows;
#[cfg(windows)]
pub use crate::sys::windows::NamedPipeLogger;

View File

@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
pub(crate) mod named_pipe_logger;
pub use named_pipe_logger::NamedPipeLogger;

View File

@ -0,0 +1,236 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::{
io::{self, Write},
sync::{Arc, Mutex},
thread,
};
use log::{Metadata, Record};
use mio::{windows::NamedPipe, Events, Interest, Poll, Token};
pub struct NamedPipeLogger {
current_connection: Arc<Mutex<NamedPipe>>,
}
impl NamedPipeLogger {
pub fn new(namespace: &String, id: &String) -> Result<NamedPipeLogger, io::Error> {
let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", namespace, id);
let mut pipe_server = NamedPipe::new(pipe_name).unwrap();
let mut poll = Poll::new().unwrap();
poll.registry()
.register(
&mut pipe_server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap();
let current_connection = Arc::new(Mutex::new(pipe_server));
let server_connection = current_connection.clone();
let logger = NamedPipeLogger { current_connection };
thread::spawn(move || {
let mut events = Events::with_capacity(128);
loop {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
if event.is_writable() {
match server_connection.lock().unwrap().connect() {
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// this would block just keep processing
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// this would block just keep processing
}
Err(e) => {
panic!("Error connecting to client: {}", e);
}
};
}
if event.is_readable() {
server_connection.lock().unwrap().disconnect().unwrap();
}
}
}
});
Ok(logger)
}
}
impl log::Log for NamedPipeLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= log::max_level()
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let message = format!("[{}] {}\n", record.level(), record.args());
match self
.current_connection
.lock()
.unwrap()
.write(message.as_bytes())
{
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// this would block just keep processing
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// this would block just keep processing
}
Err(e) if e.raw_os_error() == Some(536) => {
// no client connected
}
Err(e) if e.raw_os_error() == Some(232) => {
// client was connected but is in process of shutting down
}
Err(e) => {
panic!("Error writing to client: {}", e)
}
}
}
}
fn flush(&self) {
_ = self.current_connection.lock().unwrap().flush();
}
}
#[cfg(test)]
mod tests {
use std::{
fs::OpenOptions,
io::Read,
os::windows::{
fs::OpenOptionsExt,
io::{FromRawHandle, IntoRawHandle},
prelude::AsRawHandle,
},
time::Duration,
};
use log::{Log, Record};
use mio::{windows::NamedPipe, Events, Interest, Poll, Token};
use windows_sys::Win32::{
Foundation::ERROR_PIPE_NOT_CONNECTED, Storage::FileSystem::FILE_FLAG_OVERLAPPED,
};
use super::*;
#[test]
fn test_namedpipe_log_can_write_before_client_connected() {
let ns = "test".to_string();
let id = "notconnected".to_string();
let logger = NamedPipeLogger::new(&ns, &id).unwrap();
// test can write before a reader is connected (should succeed but the messages will be dropped)
log::set_max_level(log::LevelFilter::Info);
let record = Record::builder()
.level(log::Level::Info)
.line(Some(1))
.file(Some("sample file"))
.args(format_args!("hello"))
.build();
logger.log(&record);
logger.flush();
}
#[test]
fn test_namedpipe_log() {
use std::fs::File;
let ns = "test".to_string();
let id = "clients".to_string();
let pipe_name = format!("\\\\.\\pipe\\containerd-shim-{}-{}-log", ns, id);
let logger = NamedPipeLogger::new(&ns, &id).unwrap();
let mut client = create_client(pipe_name.as_str());
log::set_max_level(log::LevelFilter::Info);
let record = Record::builder()
.level(log::Level::Info)
.line(Some(1))
.file(Some("sample file"))
.args(format_args!("hello"))
.build();
logger.log(&record);
logger.flush();
let buf = read_message(&mut client);
assert_eq!("[INFO] hello", std::str::from_utf8(&buf).unwrap());
// test that we can reconnect after a reader disconnects
// we need to get the raw handle and drop that as well to force full disconnect
// and give a few milliseconds for the disconnect to happen
println!("dropping client");
let handle = client.as_raw_handle();
drop(client);
let f = unsafe { File::from_raw_handle(handle) };
drop(f);
std::thread::sleep(Duration::from_millis(100));
let mut client2 = create_client(pipe_name.as_str());
logger.log(&record);
logger.flush();
read_message(&mut client2);
}
fn read_message(client: &mut NamedPipe) -> [u8; 12] {
let mut poll = Poll::new().unwrap();
poll.registry()
.register(client, Token(1), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut buf = [0; 12];
loop {
poll.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
match client.read(&mut buf) {
Ok(0) => {
panic!("Read no bytes from pipe")
}
Ok(_) => {
break;
}
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_NOT_CONNECTED as i32) => {
panic!("not connected to the pipe");
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => panic!("Error reading from pipe: {}", e),
}
}
buf
}
fn create_client(pipe_name: &str) -> mio::windows::NamedPipe {
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
let file = opts.open(pipe_name).unwrap();
unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) }
}
}

View File

@ -14,10 +14,9 @@
limitations under the License.
*/
use std::{
os::unix::io::RawFd,
time::{SystemTime, UNIX_EPOCH},
};
#[cfg(unix)]
use std::os::unix::io::RawFd;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@ -100,6 +99,7 @@ impl From<JsonOptions> for Options {
}
}
#[cfg(unix)]
pub fn connect(address: impl AsRef<str>) -> Result<RawFd> {
use nix::{sys::socket::*, unistd::close};