add tracing macros
Signed-off-by: jiaxiao zhou <jiazho@microsoft.com>
This commit is contained in:
parent
e4558356cc
commit
fee0c653af
|
|
@ -53,6 +53,7 @@ serde.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
time.workspace = true
|
time.workspace = true
|
||||||
|
tracing = "0.1"
|
||||||
|
|
||||||
# Async dependencies
|
# Async dependencies
|
||||||
async-trait = { workspace = true, optional = true }
|
async-trait = { workspace = true, optional = true }
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
|
|
||||||
|
use tracing::{instrument, Span};
|
||||||
|
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
/// Flags to be passed from containerd daemon to a shim binary.
|
/// Flags to be passed from containerd daemon to a shim binary.
|
||||||
|
|
@ -44,6 +46,7 @@ pub struct Flags {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parses command line arguments passed to the shim.
|
/// Parses command line arguments passed to the shim.
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
pub fn parse<S: AsRef<OsStr>>(args: &[S]) -> Result<Flags> {
|
pub fn parse<S: AsRef<OsStr>>(args: &[S]) -> Result<Flags> {
|
||||||
let mut flags = Flags::default();
|
let mut flags = Flags::default();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,7 @@ use nix::{
|
||||||
};
|
};
|
||||||
use signal_hook_tokio::Signals;
|
use signal_hook_tokio::Signals;
|
||||||
use tokio::{io::AsyncWriteExt, sync::Notify};
|
use tokio::{io::AsyncWriteExt, sync::Notify};
|
||||||
|
use tracing::{instrument, Span};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
args,
|
args,
|
||||||
|
|
@ -99,6 +100,7 @@ pub trait Shim {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Async Shim entry point that must be invoked from tokio `main`.
|
/// Async Shim entry point that must be invoked from tokio `main`.
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub async fn run<T>(runtime_id: &str, opts: Option<Config>)
|
pub async fn run<T>(runtime_id: &str, opts: Option<Config>)
|
||||||
where
|
where
|
||||||
T: Shim + Send + Sync + 'static,
|
T: Shim + Send + Sync + 'static,
|
||||||
|
|
@ -109,6 +111,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
async fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
|
async fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
|
||||||
where
|
where
|
||||||
T: Shim + Send + Sync + 'static,
|
T: Shim + Send + Sync + 'static,
|
||||||
|
|
@ -239,6 +242,7 @@ impl ExitSignal {
|
||||||
|
|
||||||
/// Spawn is a helper func to launch shim process asynchronously.
|
/// Spawn is a helper func to launch shim process asynchronously.
|
||||||
/// Typically this expected to be called from `StartShim`.
|
/// Typically this expected to be called from `StartShim`.
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<String> {
|
pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<String> {
|
||||||
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
|
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
|
||||||
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
|
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
|
||||||
|
|
@ -299,6 +303,7 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
|
||||||
Ok(address)
|
Ok(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
fn setup_signals_tokio(config: &Config) -> Signals {
|
fn setup_signals_tokio(config: &Config) -> Signals {
|
||||||
if config.no_reaper {
|
if config.no_reaper {
|
||||||
Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed")
|
Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed")
|
||||||
|
|
@ -307,6 +312,7 @@ fn setup_signals_tokio(config: &Config) -> Signals {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
async fn handle_signals(signals: Signals) {
|
async fn handle_signals(signals: Signals) {
|
||||||
let mut signals = signals.fuse();
|
let mut signals = signals.fuse();
|
||||||
while let Some(sig) = signals.next().await {
|
while let Some(sig) = signals.next().await {
|
||||||
|
|
@ -360,12 +366,14 @@ async fn handle_signals(signals: Signals) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
async fn remove_socket_silently(address: &str) {
|
async fn remove_socket_silently(address: &str) {
|
||||||
remove_socket(address)
|
remove_socket(address)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| warn!("failed to remove socket: {}", e))
|
.unwrap_or_else(|e| warn!("failed to remove socket: {}", e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
async fn remove_socket(address: &str) -> Result<()> {
|
async fn remove_socket(address: &str) -> Result<()> {
|
||||||
let path = parse_sockaddr(address);
|
let path = parse_sockaddr(address);
|
||||||
if let Ok(md) = Path::new(path).metadata() {
|
if let Ok(md) = Path::new(path).metadata() {
|
||||||
|
|
@ -380,6 +388,7 @@ async fn remove_socket(address: &str) -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
async fn start_listener(address: &str) -> Result<UnixListener> {
|
async fn start_listener(address: &str) -> Result<UnixListener> {
|
||||||
let addr = address.to_string();
|
let addr = address.to_string();
|
||||||
asyncify(move || -> Result<UnixListener> {
|
asyncify(move || -> Result<UnixListener> {
|
||||||
|
|
@ -391,6 +400,7 @@ async fn start_listener(address: &str) -> Result<UnixListener> {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
async fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
|
async fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
|
||||||
for _i in 0..count {
|
for _i in 0..count {
|
||||||
match Client::connect(address) {
|
match Client::connect(address) {
|
||||||
|
|
|
||||||
|
|
@ -31,12 +31,14 @@ use containerd_shim_protos::{
|
||||||
shim::oci::Options,
|
shim::oci::Options,
|
||||||
};
|
};
|
||||||
use oci_spec::runtime::LinuxResources;
|
use oci_spec::runtime::LinuxResources;
|
||||||
|
use tracing::{instrument, Span};
|
||||||
|
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10
|
// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10
|
||||||
const OOM_SCORE_ADJ_MAX: i64 = 1000;
|
const OOM_SCORE_ADJ_MAX: i64 = 1000;
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
|
pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
|
||||||
if pid == 0 {
|
if pid == 0 {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|
@ -62,6 +64,7 @@ pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a process to the given relative cgroup path
|
/// Add a process to the given relative cgroup path
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> {
|
pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> {
|
||||||
let h = hierarchies::auto();
|
let h = hierarchies::auto();
|
||||||
// use relative path here, need to trim prefix '/'
|
// use relative path here, need to trim prefix '/'
|
||||||
|
|
@ -74,6 +77,7 @@ pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> {
|
||||||
|
|
||||||
/// Sets the OOM score for the process to the parents OOM score + 1
|
/// Sets the OOM score for the process to the parents OOM score + 1
|
||||||
/// to ensure that they parent has a lower score than the shim
|
/// to ensure that they parent has a lower score than the shim
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn adjust_oom_score(pid: u32) -> Result<()> {
|
pub fn adjust_oom_score(pid: u32) -> Result<()> {
|
||||||
let score = read_process_oom_score(std::os::unix::process::parent_id())?;
|
let score = read_process_oom_score(std::os::unix::process::parent_id())?;
|
||||||
if score < OOM_SCORE_ADJ_MAX {
|
if score < OOM_SCORE_ADJ_MAX {
|
||||||
|
|
@ -82,6 +86,7 @@ pub fn adjust_oom_score(pid: u32) -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn read_process_oom_score(pid: u32) -> Result<i64> {
|
fn read_process_oom_score(pid: u32) -> Result<i64> {
|
||||||
let content = fs::read_to_string(format!("/proc/{}/oom_score_adj", pid))
|
let content = fs::read_to_string(format!("/proc/{}/oom_score_adj", pid))
|
||||||
.map_err(io_error!(e, "read oom score"))?;
|
.map_err(io_error!(e, "read oom score"))?;
|
||||||
|
|
@ -92,12 +97,14 @@ fn read_process_oom_score(pid: u32) -> Result<i64> {
|
||||||
Ok(score)
|
Ok(score)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn write_process_oom_score(pid: u32, score: i64) -> Result<()> {
|
fn write_process_oom_score(pid: u32, score: i64) -> Result<()> {
|
||||||
fs::write(format!("/proc/{}/oom_score_adj", pid), score.to_string())
|
fs::write(format!("/proc/{}/oom_score_adj", pid), score.to_string())
|
||||||
.map_err(io_error!(e, "write oom score"))
|
.map_err(io_error!(e, "write oom score"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collect process cgroup stats, return only necessary parts of it
|
/// Collect process cgroup stats, return only necessary parts of it
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn collect_metrics(pid: u32) -> Result<Metrics> {
|
pub fn collect_metrics(pid: u32) -> Result<Metrics> {
|
||||||
let mut metrics = Metrics::new();
|
let mut metrics = Metrics::new();
|
||||||
|
|
||||||
|
|
@ -179,6 +186,7 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// get_cgroup will return either cgroup v1 or v2 depending on system configuration
|
// get_cgroup will return either cgroup v1 or v2 depending on system configuration
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn get_cgroup(pid: u32) -> Result<Cgroup> {
|
fn get_cgroup(pid: u32) -> Result<Cgroup> {
|
||||||
let hierarchies = hierarchies::auto();
|
let hierarchies = hierarchies::auto();
|
||||||
let cgroup = if hierarchies.v2() {
|
let cgroup = if hierarchies.v2() {
|
||||||
|
|
@ -194,6 +202,7 @@ fn get_cgroup(pid: u32) -> Result<Cgroup> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the cgroups v2 path given a PID
|
/// Get the cgroups v2 path given a PID
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result<PathBuf> {
|
pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result<PathBuf> {
|
||||||
// todo: should upstream to cgroups-rs
|
// todo: should upstream to cgroups-rs
|
||||||
let path = format!("/proc/{}/cgroup", pid);
|
let path = format!("/proc/{}/cgroup", pid);
|
||||||
|
|
@ -207,6 +216,7 @@ pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result<PathBuf> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://github.com/opencontainers/runc/blob/1950892f69597aa844cbf000fbdf77610dda3a44/libcontainer/cgroups/fs2/defaultpath.go#L83
|
// https://github.com/opencontainers/runc/blob/1950892f69597aa844cbf000fbdf77610dda3a44/libcontainer/cgroups/fs2/defaultpath.go#L83
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
|
fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
|
||||||
// the entry for cgroup v2 is always in the format like `0::$PATH`
|
// the entry for cgroup v2 is always in the format like `0::$PATH`
|
||||||
// where 0 is the hierarchy ID, the controller name is omitted in cgroup v2
|
// where 0 is the hierarchy ID, the controller name is omitted in cgroup v2
|
||||||
|
|
@ -222,6 +232,7 @@ fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update process cgroup limits
|
/// Update process cgroup limits
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> {
|
pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> {
|
||||||
// get container main process cgroup
|
// get container main process cgroup
|
||||||
let cgroup = get_cgroup(pid)?;
|
let cgroup = get_cgroup(pid)?;
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ pub use protos::{
|
||||||
shim::shim::DeleteResponse,
|
shim::shim::DeleteResponse,
|
||||||
ttrpc::{context::Context, Result as TtrpcResult},
|
ttrpc::{context::Context, Result as TtrpcResult},
|
||||||
};
|
};
|
||||||
|
use tracing::{instrument, Span};
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);
|
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);
|
||||||
|
|
||||||
|
|
@ -167,6 +168,7 @@ pub const SOCKET_ROOT: &str = "/var/run/containerd";
|
||||||
pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd";
|
pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd";
|
||||||
|
|
||||||
/// Make socket path from containerd socket path, namespace and id.
|
/// Make socket path from containerd socket path, namespace and id.
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
|
pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
|
||||||
let path = PathBuf::from(socket_path)
|
let path = PathBuf::from(socket_path)
|
||||||
.join(namespace)
|
.join(namespace)
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use log::{debug, error, info, warn};
|
pub use log::{debug, error, info, warn};
|
||||||
|
use tracing::{instrument, Span};
|
||||||
use util::{read_address, write_address};
|
use util::{read_address, write_address};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
|
@ -182,6 +183,7 @@ pub trait Shim {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shim entry point that must be invoked from `main`.
|
/// Shim entry point that must be invoked from `main`.
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
|
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
|
||||||
where
|
where
|
||||||
T: Shim + Send + Sync + 'static,
|
T: Shim + Send + Sync + 'static,
|
||||||
|
|
@ -192,6 +194,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
|
fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
|
||||||
where
|
where
|
||||||
T: Shim + Send + Sync + 'static,
|
T: Shim + Send + Sync + 'static,
|
||||||
|
|
@ -289,6 +292,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
fn create_server(_flags: args::Flags) -> Result<Server> {
|
fn create_server(_flags: args::Flags) -> Result<Server> {
|
||||||
let mut server = Server::new();
|
let mut server = Server::new();
|
||||||
|
|
||||||
|
|
@ -306,6 +310,7 @@ fn create_server(_flags: args::Flags) -> Result<Server> {
|
||||||
Ok(server)
|
Ok(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
fn setup_signals(_config: &Config) -> Option<AppSignals> {
|
fn setup_signals(_config: &Config) -> Option<AppSignals> {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
|
|
@ -341,6 +346,7 @@ unsafe extern "system" fn signal_handler(_: u32) -> i32 {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, parent = Span::current(), level= "Info")]
|
||||||
fn handle_signals(mut _signals: Option<AppSignals>) {
|
fn handle_signals(mut _signals: Option<AppSignals>) {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
|
|
@ -402,6 +408,7 @@ fn handle_signals(mut _signals: Option<AppSignals>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
|
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
|
||||||
for _i in 0..count {
|
for _i in 0..count {
|
||||||
match Client::connect(address) {
|
match Client::connect(address) {
|
||||||
|
|
@ -416,10 +423,12 @@ fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result
|
||||||
Err(other!("time out waiting for socket {}", address))
|
Err(other!("time out waiting for socket {}", address))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn remove_socket_silently(address: &str) {
|
fn remove_socket_silently(address: &str) {
|
||||||
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
|
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
fn remove_socket(address: &str) -> Result<()> {
|
fn remove_socket(address: &str) -> Result<()> {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
|
|
@ -448,6 +457,7 @@ fn remove_socket(address: &str) -> Result<()> {
|
||||||
|
|
||||||
/// Spawn is a helper func to launch shim process.
|
/// Spawn is a helper func to launch shim process.
|
||||||
/// Typically this expected to be called from `StartShim`.
|
/// Typically this expected to be called from `StartShim`.
|
||||||
|
#[instrument(parent = Span::current(), level= "Info")]
|
||||||
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
|
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
|
||||||
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
|
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
|
||||||
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
|
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue