diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index d1b8954..62cbad7 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -53,6 +53,7 @@ serde.workspace = true serde_json.workspace = true thiserror.workspace = true time.workspace = true +tracing = "0.1" # Async dependencies async-trait = { workspace = true, optional = true } diff --git a/crates/shim/src/args.rs b/crates/shim/src/args.rs index 325f479..8f0ff77 100644 --- a/crates/shim/src/args.rs +++ b/crates/shim/src/args.rs @@ -16,6 +16,8 @@ use std::ffi::OsStr; +use tracing::{instrument, Span}; + use crate::error::{Error, Result}; /// Flags to be passed from containerd daemon to a shim binary. @@ -44,6 +46,7 @@ pub struct Flags { } /// Parses command line arguments passed to the shim. +#[instrument(skip_all, parent = Span::current(), level= "Info")] pub fn parse>(args: &[S]) -> Result { let mut flags = Flags::default(); diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index 8189edd..841cc7b 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -48,6 +48,7 @@ use nix::{ }; use signal_hook_tokio::Signals; use tokio::{io::AsyncWriteExt, sync::Notify}; +use tracing::{instrument, Span}; use crate::{ args, @@ -99,6 +100,7 @@ pub trait Shim { } /// Async Shim entry point that must be invoked from tokio `main`. +#[instrument(parent = Span::current(), level= "Info")] pub async fn run(runtime_id: &str, opts: Option) where T: Shim + Send + Sync + 'static, @@ -109,6 +111,7 @@ where } } +#[instrument(parent = Span::current(), level= "Info")] async fn bootstrap(runtime_id: &str, opts: Option) -> Result<()> where T: Shim + Send + Sync + 'static, @@ -239,6 +242,7 @@ impl ExitSignal { /// Spawn is a helper func to launch shim process asynchronously. /// Typically this expected to be called from `StartShim`. +#[instrument(parent = Span::current(), level= "Info")] pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result { let cmd = env::current_exe().map_err(io_error!(e, ""))?; let cwd = env::current_dir().map_err(io_error!(e, ""))?; @@ -299,6 +303,7 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Ok(address) } +#[instrument(skip_all, parent = Span::current(), level= "Info")] fn setup_signals_tokio(config: &Config) -> Signals { if config.no_reaper { Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed") @@ -307,6 +312,7 @@ fn setup_signals_tokio(config: &Config) -> Signals { } } +#[instrument(skip_all, parent = Span::current(), level= "Info")] async fn handle_signals(signals: Signals) { let mut signals = signals.fuse(); while let Some(sig) = signals.next().await { @@ -360,12 +366,14 @@ async fn handle_signals(signals: Signals) { } } +#[instrument(parent = Span::current(), level= "Info")] async fn remove_socket_silently(address: &str) { remove_socket(address) .await .unwrap_or_else(|e| warn!("failed to remove socket: {}", e)) } +#[instrument(parent = Span::current(), level= "Info")] async fn remove_socket(address: &str) -> Result<()> { let path = parse_sockaddr(address); if let Ok(md) = Path::new(path).metadata() { @@ -380,6 +388,7 @@ async fn remove_socket(address: &str) -> Result<()> { Ok(()) } +#[instrument(skip_all, parent = Span::current(), level= "Info")] async fn start_listener(address: &str) -> Result { let addr = address.to_string(); asyncify(move || -> Result { @@ -391,6 +400,7 @@ async fn start_listener(address: &str) -> Result { .await } +#[instrument(parent = Span::current(), level= "Info")] async fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { for _i in 0..count { match Client::connect(address) { diff --git a/crates/shim/src/cgroup.rs b/crates/shim/src/cgroup.rs index 5e4222c..d7ba3c6 100644 --- a/crates/shim/src/cgroup.rs +++ b/crates/shim/src/cgroup.rs @@ -31,12 +31,14 @@ use containerd_shim_protos::{ shim::oci::Options, }; use oci_spec::runtime::LinuxResources; +use tracing::{instrument, Span}; use crate::error::{Error, Result}; // OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10 const OOM_SCORE_ADJ_MAX: i64 = 1000; +#[instrument(parent = Span::current(), level= "Info")] pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> { if pid == 0 { return Ok(()); @@ -62,6 +64,7 @@ pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> { } /// Add a process to the given relative cgroup path +#[instrument(parent = Span::current(), level= "Info")] pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> { let h = hierarchies::auto(); // use relative path here, need to trim prefix '/' @@ -74,6 +77,7 @@ pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> { /// Sets the OOM score for the process to the parents OOM score + 1 /// to ensure that they parent has a lower score than the shim +#[instrument(parent = Span::current(), level= "Info")] pub fn adjust_oom_score(pid: u32) -> Result<()> { let score = read_process_oom_score(std::os::unix::process::parent_id())?; if score < OOM_SCORE_ADJ_MAX { @@ -82,6 +86,7 @@ pub fn adjust_oom_score(pid: u32) -> Result<()> { Ok(()) } +#[instrument(parent = Span::current(), level= "Info")] fn read_process_oom_score(pid: u32) -> Result { let content = fs::read_to_string(format!("/proc/{}/oom_score_adj", pid)) .map_err(io_error!(e, "read oom score"))?; @@ -92,12 +97,14 @@ fn read_process_oom_score(pid: u32) -> Result { Ok(score) } +#[instrument(parent = Span::current(), level= "Info")] fn write_process_oom_score(pid: u32, score: i64) -> Result<()> { fs::write(format!("/proc/{}/oom_score_adj", pid), score.to_string()) .map_err(io_error!(e, "write oom score")) } /// Collect process cgroup stats, return only necessary parts of it +#[instrument(parent = Span::current(), level= "Info")] pub fn collect_metrics(pid: u32) -> Result { let mut metrics = Metrics::new(); @@ -179,6 +186,7 @@ pub fn collect_metrics(pid: u32) -> Result { } // get_cgroup will return either cgroup v1 or v2 depending on system configuration +#[instrument(parent = Span::current(), level= "Info")] fn get_cgroup(pid: u32) -> Result { let hierarchies = hierarchies::auto(); let cgroup = if hierarchies.v2() { @@ -194,6 +202,7 @@ fn get_cgroup(pid: u32) -> Result { } /// Get the cgroups v2 path given a PID +#[instrument(parent = Span::current(), level= "Info")] pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result { // todo: should upstream to cgroups-rs let path = format!("/proc/{}/cgroup", pid); @@ -207,6 +216,7 @@ pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result { } // https://github.com/opencontainers/runc/blob/1950892f69597aa844cbf000fbdf77610dda3a44/libcontainer/cgroups/fs2/defaultpath.go#L83 +#[instrument(parent = Span::current(), level= "Info")] fn parse_cgroups_v2_path(content: &str) -> Result { // the entry for cgroup v2 is always in the format like `0::$PATH` // where 0 is the hierarchy ID, the controller name is omitted in cgroup v2 @@ -222,6 +232,7 @@ fn parse_cgroups_v2_path(content: &str) -> Result { } /// Update process cgroup limits +#[instrument(parent = Span::current(), level= "Info")] pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> { // get container main process cgroup let cgroup = get_cgroup(pid)?; diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 2575ff6..f2b34fd 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -30,6 +30,7 @@ pub use protos::{ shim::shim::DeleteResponse, ttrpc::{context::Context, Result as TtrpcResult}, }; +use tracing::{instrument, Span}; #[cfg(unix)] ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize); @@ -167,6 +168,7 @@ pub const SOCKET_ROOT: &str = "/var/run/containerd"; pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd"; /// Make socket path from containerd socket path, namespace and id. +#[instrument(parent = Span::current(), level= "Info")] pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String { let path = PathBuf::from(socket_path) .join(namespace) diff --git a/crates/shim/src/synchronous/mod.rs b/crates/shim/src/synchronous/mod.rs index 8acb556..d55ee8a 100644 --- a/crates/shim/src/synchronous/mod.rs +++ b/crates/shim/src/synchronous/mod.rs @@ -58,6 +58,7 @@ use std::{ }; pub use log::{debug, error, info, warn}; +use tracing::{instrument, Span}; use util::{read_address, write_address}; use crate::{ @@ -182,6 +183,7 @@ pub trait Shim { } /// Shim entry point that must be invoked from `main`. +#[instrument(parent = Span::current(), level= "Info")] pub fn run(runtime_id: &str, opts: Option) where T: Shim + Send + Sync + 'static, @@ -192,6 +194,7 @@ where } } +#[instrument(parent = Span::current(), level= "Info")] fn bootstrap(runtime_id: &str, opts: Option) -> Result<()> where T: Shim + Send + Sync + 'static, @@ -289,6 +292,7 @@ where } } +#[instrument(skip_all, parent = Span::current(), level= "Info")] fn create_server(_flags: args::Flags) -> Result { let mut server = Server::new(); @@ -306,6 +310,7 @@ fn create_server(_flags: args::Flags) -> Result { Ok(server) } +#[instrument(skip_all, parent = Span::current(), level= "Info")] fn setup_signals(_config: &Config) -> Option { #[cfg(unix)] { @@ -341,6 +346,7 @@ unsafe extern "system" fn signal_handler(_: u32) -> i32 { 1 } +#[instrument(skip_all, parent = Span::current(), level= "Info")] fn handle_signals(mut _signals: Option) { #[cfg(unix)] { @@ -402,6 +408,7 @@ fn handle_signals(mut _signals: Option) { } } +#[instrument(parent = Span::current(), level= "Info")] fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { for _i in 0..count { match Client::connect(address) { @@ -416,10 +423,12 @@ fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result Err(other!("time out waiting for socket {}", address)) } +#[instrument(parent = Span::current(), level= "Info")] fn remove_socket_silently(address: &str) { remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e)) } +#[instrument(parent = Span::current(), level= "Info")] fn remove_socket(address: &str) -> Result<()> { #[cfg(unix)] { @@ -448,6 +457,7 @@ fn remove_socket(address: &str) -> Result<()> { /// Spawn is a helper func to launch shim process. /// Typically this expected to be called from `StartShim`. +#[instrument(parent = Span::current(), level= "Info")] pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> { let cmd = env::current_exe().map_err(io_error!(e, ""))?; let cwd = env::current_dir().map_err(io_error!(e, ""))?;