runc: introduce helper function execute()
Introduce helper function monitor.rs::execute() to avoid duplicated code, and also correctly setup stdout/stderr for ProcessMonitor. Signed-off-by: Liu Jiang <gerry@linux.alibaba.com>
This commit is contained in:
parent
cd965aa06a
commit
8c2a61a807
|
|
@ -55,7 +55,7 @@ mod utils;
|
||||||
use crate::container::Container;
|
use crate::container::Container;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
#[cfg(feature = "async")]
|
#[cfg(feature = "async")]
|
||||||
use crate::monitor::{DefaultMonitor, Exit, ProcessMonitor};
|
use crate::monitor::{execute, DefaultMonitor, ExecuteResult};
|
||||||
use crate::options::*;
|
use crate::options::*;
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, crate::error::Error>;
|
type Result<T> = std::result::Result<T, crate::error::Error>;
|
||||||
|
|
@ -327,34 +327,23 @@ impl Runc {
|
||||||
const MONITOR: DefaultMonitor = DefaultMonitor::new();
|
const MONITOR: DefaultMonitor = DefaultMonitor::new();
|
||||||
|
|
||||||
pub async fn launch(&self, cmd: Command, combined_output: bool) -> Result<Response> {
|
pub async fn launch(&self, cmd: Command, combined_output: bool) -> Result<Response> {
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel::<Exit>();
|
let ExecuteResult {
|
||||||
let start = Self::MONITOR.start(cmd, tx);
|
exit,
|
||||||
let wait = Self::MONITOR.wait(rx);
|
status,
|
||||||
let (
|
stdout,
|
||||||
std::process::Output {
|
stderr,
|
||||||
status,
|
} = execute(&Self::MONITOR, cmd).await?;
|
||||||
stdout,
|
|
||||||
stderr,
|
|
||||||
},
|
|
||||||
Exit { pid, .. },
|
|
||||||
) = tokio::try_join!(start, wait).map_err(Error::InvalidCommand)?;
|
|
||||||
|
|
||||||
// ugly hack to work around
|
|
||||||
let stdout = String::from_utf8(stdout)
|
|
||||||
.expect("returned non-utf8 characters from container process.");
|
|
||||||
let stderr = String::from_utf8(stderr)
|
|
||||||
.expect("returned non-utf8 characters from container process.");
|
|
||||||
|
|
||||||
if status.success() {
|
if status.success() {
|
||||||
if combined_output {
|
if combined_output {
|
||||||
Ok(Response {
|
Ok(Response {
|
||||||
pid,
|
pid: exit.pid,
|
||||||
status,
|
status,
|
||||||
output: stdout + stderr.as_str(),
|
output: stdout + stderr.as_str(),
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Ok(Response {
|
Ok(Response {
|
||||||
pid,
|
pid: exit.pid,
|
||||||
status,
|
status,
|
||||||
output: stdout,
|
output: stdout,
|
||||||
})
|
})
|
||||||
|
|
@ -390,26 +379,14 @@ impl Runc {
|
||||||
match opts {
|
match opts {
|
||||||
Some(CreateOpts { io: Some(_io), .. }) => {
|
Some(CreateOpts { io: Some(_io), .. }) => {
|
||||||
_io.set(&mut cmd).map_err(Error::UnavailableIO)?;
|
_io.set(&mut cmd).map_err(Error::UnavailableIO)?;
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel::<Exit>();
|
let result = execute(&Self::MONITOR, cmd).await?;
|
||||||
let start = Self::MONITOR.start(cmd, tx);
|
|
||||||
let wait = Self::MONITOR.wait(rx);
|
|
||||||
let (
|
|
||||||
std::process::Output {
|
|
||||||
status,
|
|
||||||
stdout,
|
|
||||||
stderr,
|
|
||||||
},
|
|
||||||
_,
|
|
||||||
) = tokio::try_join!(start, wait).map_err(Error::InvalidCommand)?;
|
|
||||||
_io.close_after_start();
|
_io.close_after_start();
|
||||||
|
|
||||||
let stdout = String::from_utf8(stdout).unwrap();
|
if !result.status.success() {
|
||||||
let stderr = String::from_utf8(stderr).unwrap();
|
|
||||||
if !status.success() {
|
|
||||||
return Err(Error::CommandFailed {
|
return Err(Error::CommandFailed {
|
||||||
status,
|
status: result.status,
|
||||||
stdout,
|
stdout: result.stdout,
|
||||||
stderr,
|
stderr: result.stderr,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,12 +14,15 @@
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::process::Output;
|
use std::process::{ExitStatus, Output, Stdio};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::error;
|
use log::error;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tokio::sync::oneshot::{Receiver, Sender};
|
use tokio::process::Command;
|
||||||
|
use tokio::sync::oneshot::{channel, Receiver, Sender};
|
||||||
|
|
||||||
|
use crate::error::Error;
|
||||||
|
|
||||||
/// A trait for spawning and waiting for a process.
|
/// A trait for spawning and waiting for a process.
|
||||||
///
|
///
|
||||||
|
|
@ -36,11 +39,7 @@ pub trait ProcessMonitor {
|
||||||
/// Use [tokio::process::Command::stdout(Stdio::piped())](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#method.stdout)
|
/// Use [tokio::process::Command::stdout(Stdio::piped())](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#method.stdout)
|
||||||
/// and/or [tokio::process::Command::stderr(Stdio::piped())](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#method.stderr)
|
/// and/or [tokio::process::Command::stderr(Stdio::piped())](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#method.stderr)
|
||||||
/// respectively, when creating the [Command](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#).
|
/// respectively, when creating the [Command](https://docs.rs/tokio/1.16.1/tokio/process/struct.Command.html#).
|
||||||
async fn start(
|
async fn start(&self, mut cmd: Command, tx: Sender<Exit>) -> std::io::Result<Output> {
|
||||||
&self,
|
|
||||||
mut cmd: tokio::process::Command,
|
|
||||||
tx: Sender<Exit>,
|
|
||||||
) -> std::io::Result<Output> {
|
|
||||||
let chi = cmd.spawn()?;
|
let chi = cmd.spawn()?;
|
||||||
// Safe to expect() because wait() hasn't been called yet, dependence on tokio interanl
|
// Safe to expect() because wait() hasn't been called yet, dependence on tokio interanl
|
||||||
// implementation details.
|
// implementation details.
|
||||||
|
|
@ -90,6 +89,49 @@ pub struct Exit {
|
||||||
pub status: i32,
|
pub status: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execution result returned by `execute()`.
|
||||||
|
pub struct ExecuteResult {
|
||||||
|
pub exit: Exit,
|
||||||
|
pub status: ExitStatus,
|
||||||
|
pub stdout: String,
|
||||||
|
pub stderr: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute a `Command` and collect exit status, output and error messages.
|
||||||
|
///
|
||||||
|
/// To collect output and error messages, pipes must be used for Command's stdout and stderr.
|
||||||
|
/// This method will create pipes and overwrite stdout/stderr of `cmd`.
|
||||||
|
///
|
||||||
|
/// Note: invalid UTF-8 characters in output and error messages will be replaced with the `<60>` char.
|
||||||
|
pub async fn execute<T: ProcessMonitor + Send + Sync>(
|
||||||
|
monitor: &T,
|
||||||
|
mut cmd: Command,
|
||||||
|
) -> Result<ExecuteResult, Error> {
|
||||||
|
let (tx, rx) = channel::<Exit>();
|
||||||
|
|
||||||
|
cmd.stdout(Stdio::piped());
|
||||||
|
cmd.stderr(Stdio::piped());
|
||||||
|
let start = monitor.start(cmd, tx);
|
||||||
|
let wait = monitor.wait(rx);
|
||||||
|
let (
|
||||||
|
Output {
|
||||||
|
stdout,
|
||||||
|
stderr,
|
||||||
|
status,
|
||||||
|
},
|
||||||
|
exit,
|
||||||
|
) = tokio::try_join!(start, wait).map_err(Error::InvalidCommand)?;
|
||||||
|
let stdout = String::from_utf8_lossy(&stdout).to_string();
|
||||||
|
let stderr = String::from_utf8_lossy(&stderr).to_string();
|
||||||
|
|
||||||
|
Ok(ExecuteResult {
|
||||||
|
exit,
|
||||||
|
status,
|
||||||
|
stdout,
|
||||||
|
stderr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
@ -123,4 +165,16 @@ mod tests {
|
||||||
let status = monitor.wait(rx).await.unwrap();
|
let status = monitor.wait(rx).await.unwrap();
|
||||||
assert_eq!(status.status, 0);
|
assert_eq!(status.status, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_execute() {
|
||||||
|
let cmd = Command::new("ls");
|
||||||
|
let monitor = DefaultMonitor::new();
|
||||||
|
let result = execute(&monitor, cmd).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.exit.status, 0);
|
||||||
|
assert!(result.status.success());
|
||||||
|
assert!(!result.stdout.is_empty());
|
||||||
|
assert_eq!(result.stderr.len(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue