mirror of https://github.com/chaos-mesh/toda.git
Compare commits
30 Commits
Author | SHA1 | Date |
---|---|---|
|
523c67dbd5 | |
|
4bc8404382 | |
|
04ba4edc20 | |
|
67b43ee417 | |
|
7eb184edd0 | |
|
f9c784ceac | |
|
65071a4a6e | |
|
7ea57ef5f3 | |
|
ac795a08cf | |
|
7ca342b0b0 | |
|
8734addae3 | |
|
a866988506 | |
|
62043db60a | |
|
4a991b5fbb | |
|
fe18e46efa | |
|
5aee355f82 | |
|
fbf1cb7215 | |
|
ab6be766d3 | |
|
c6248aa5c1 | |
|
a2ec5f7947 | |
|
6790947de8 | |
|
b33b9f97dd | |
|
6a0857dd39 | |
|
c52d0f593e | |
|
32cc198562 | |
|
afdc4ca828 | |
|
8a4086c542 | |
|
9f251055d5 | |
|
421cd8f076 | |
|
1ae28e3bda |
|
@ -23,7 +23,7 @@ jobs:
|
|||
- name: Add user_allow_other to /etc/fuse.conf
|
||||
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
|
||||
- name: Run tests
|
||||
run: cargo test --verbose
|
||||
run: cargo test --verbose -- --test-threads=1
|
||||
clippy_check:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
18
Cargo.toml
18
Cargo.toml
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "toda"
|
||||
version = "0.1.16"
|
||||
version = "0.2.4"
|
||||
authors = ["Yang Keao <keao.yang@yahoo.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -10,12 +10,12 @@ edition = "2018"
|
|||
structopt = "0.3"
|
||||
nix = "0.18"
|
||||
anyhow = "1.0"
|
||||
fuser = {version = "0.6", features = ["abi-7-31"]}
|
||||
fuser = {version = "0.6", features = ["abi-7-19"]}
|
||||
time = "0.1"
|
||||
libc = "0.2"
|
||||
async-trait = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
tokio = {version = "0.2", features = ["rt-core", "rt-threaded", "sync", "fs", "time", "blocking"]}
|
||||
tokio = {version = "0.2", features = ["rt-core", "rt-threaded", "sync", "fs", "time", "blocking", "macros", "full"]}
|
||||
tokio-util = "0.6"
|
||||
thiserror = "1.0"
|
||||
futures = "0.3"
|
||||
derive_more = "0.99.9"
|
||||
|
@ -30,9 +30,15 @@ once_cell = "1.4"
|
|||
dynasmrt = "1.0.0"
|
||||
procfs = "0.8.0"
|
||||
itertools = "0.9.0"
|
||||
log = "0.4"
|
||||
env_logger = "0.8"
|
||||
retry = "1.2.0"
|
||||
tracing = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
tracing-subscriber = "0.2"
|
||||
jsonrpc-stdio-server = "17.0.0"
|
||||
jsonrpc-derive = "17.0.0"
|
||||
jsonrpc-core = "17.0.0"
|
||||
jsonrpc-core-client = "17.0.0"
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
debug = true
|
||||
|
|
|
@ -12,7 +12,7 @@ ENV https_proxy $HTTPS_PROXY
|
|||
|
||||
RUN apt-get update && apt-get install build-essential curl git pkg-config libfuse-dev fuse -y && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain nightly-2020-07-01 -y
|
||||
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain nightly-2021-12-23 -y
|
||||
ENV PATH "/root/.cargo/bin:${PATH}"
|
||||
|
||||
RUN if [ -n "$HTTP_PROXY" ]; then echo "[http]\n\
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "update",
|
||||
"params": [
|
||||
[
|
||||
{
|
||||
"type": "latency",
|
||||
"path": "/var/lib/postgresql/data/**/*",
|
||||
"percent": 100,
|
||||
"latency": "10s"
|
||||
}
|
||||
]
|
||||
],
|
||||
"id": 1
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "update",
|
||||
"params": [
|
||||
[
|
||||
{
|
||||
"type": "mistake",
|
||||
"path": "/var/test/**/*",
|
||||
"methods": [
|
||||
"READ",
|
||||
"WRITE"
|
||||
],
|
||||
"mistake": {
|
||||
"filling": "zero",
|
||||
"maxOccurrences": 1,
|
||||
"maxLength": 10000
|
||||
},
|
||||
"percent": 100
|
||||
}
|
||||
]
|
||||
],
|
||||
"id": 1
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
[]
|
|
@ -10,4 +10,6 @@ RUN go build -o /go/bin/app
|
|||
FROM chaos-mesh/toda
|
||||
COPY --from=build-env /go/bin/app /
|
||||
COPY --from=build-env /go/bin/app /main-app
|
||||
|
||||
ENV GOMAXPROCS 64
|
||||
CMD ["/app"]
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -33,59 +34,67 @@ func main() {
|
|||
|
||||
originalLength := len([]byte("HELLO WORLD"))
|
||||
|
||||
for {
|
||||
var fVec []*os.File
|
||||
var mMap [][]byte
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i <= 100; i++ {
|
||||
go func() {
|
||||
wg.Add(1)
|
||||
for {
|
||||
var fVec []*os.File
|
||||
var mMap [][]byte
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
f, err := os.OpenFile("/var/run/test/test", os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v+", err)
|
||||
return
|
||||
}
|
||||
err = f.Truncate(1024)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
_, err = f.Seek(10, os.SEEK_SET)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
for i := 0; i < 20; i++ {
|
||||
f, err := os.OpenFile("/var/run/test/test", os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v+", err)
|
||||
return
|
||||
}
|
||||
err = f.Truncate(1024)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
_, err = f.Seek(10, os.SEEK_SET)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
fVec = append(fVec, f)
|
||||
data, err := syscall.Mmap(int(f.Fd()), 0, 10+originalLength+3, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v+", err)
|
||||
return
|
||||
}
|
||||
mMap = append(mMap, data)
|
||||
fVec = append(fVec, f)
|
||||
data, err := syscall.Mmap(int(f.Fd()), 0, 10+originalLength+3, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v+", err)
|
||||
return
|
||||
}
|
||||
mMap = append(mMap, data)
|
||||
|
||||
f = fVec[i]
|
||||
data = mMap[i]
|
||||
f = fVec[i]
|
||||
data = mMap[i]
|
||||
|
||||
count := strconv.Itoa(i)
|
||||
for pos, char := range count {
|
||||
if pos < 3 {
|
||||
data[10+originalLength+pos] = byte(char)
|
||||
count := strconv.Itoa(i)
|
||||
for pos, char := range count {
|
||||
if pos < 3 {
|
||||
data[10+originalLength+pos] = byte(char)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
buf := make([]byte, originalLength+len(count))
|
||||
n, err := f.Read(buf)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("%v %d bytes: %s\n", time.Now(), n, string(buf[:n]))
|
||||
}
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
fVec[i].Close()
|
||||
syscall.Munmap(mMap[i])
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
buf := make([]byte, originalLength+len(count))
|
||||
n, err := f.Read(buf)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("%v %d bytes: %s\n", time.Now(), n, string(buf[:n]))
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
fVec[i].Close()
|
||||
syscall.Munmap(mMap[i])
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
[
|
||||
{
|
||||
"type": "latency",
|
||||
"path": "/var/run/test/**/*",
|
||||
"percent": 100,
|
||||
"latency": "100ms"
|
||||
}
|
||||
]
|
|
@ -1 +1 @@
|
|||
nightly-2020-07-01
|
||||
nightly-2021-12-23
|
|
@ -0,0 +1,4 @@
|
|||
reorder_imports = true
|
||||
imports_granularity = "Module"
|
||||
group_imports = "StdExternalCrate"
|
||||
unstable_features = true
|
|
@ -1,20 +1,18 @@
|
|||
use std::ffi::OsString;
|
||||
use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use fuser::*;
|
||||
use tracing::trace_span;
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
use super::errors::Result;
|
||||
use super::reply::*;
|
||||
use super::runtime::spawn;
|
||||
|
||||
use std::ffi::OsString;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use log::trace;
|
||||
|
||||
pub fn spawn_reply<F, R, V>(id: u64, reply: R, f: F)
|
||||
where
|
||||
F: Future<Output = Result<V>> + Send + 'static,
|
||||
|
@ -22,9 +20,8 @@ where
|
|||
V: Debug,
|
||||
{
|
||||
spawn(async move {
|
||||
trace!("reply to request({})", id);
|
||||
let result = f.await;
|
||||
reply.reply(id, result);
|
||||
let result = f.instrument(trace_span!("request", id)).await;
|
||||
reply.reply(result);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -142,7 +139,13 @@ pub trait AsyncFileSystemImpl: Send + Sync {
|
|||
|
||||
async fn opendir(&self, ino: u64, flags: i32) -> Result<Open>;
|
||||
|
||||
async fn readdir(&self, ino: u64, fh: u64, offset: i64, reply: ReplyDirectory);
|
||||
async fn readdir(
|
||||
&self,
|
||||
ino: u64,
|
||||
fh: u64,
|
||||
offset: i64,
|
||||
reply: &mut ReplyDirectory,
|
||||
) -> Result<()>;
|
||||
|
||||
async fn releasedir(&self, ino: u64, fh: u64, flags: i32) -> Result<()>;
|
||||
|
||||
|
@ -471,10 +474,20 @@ impl<T: AsyncFileSystemImpl + 'static> Filesystem for AsyncFileSystem<T> {
|
|||
async_impl.opendir(ino, flags).await
|
||||
});
|
||||
}
|
||||
fn readdir(&mut self, _req: &Request, ino: u64, fh: u64, offset: i64, reply: ReplyDirectory) {
|
||||
fn readdir(
|
||||
&mut self,
|
||||
_req: &Request,
|
||||
ino: u64,
|
||||
fh: u64,
|
||||
offset: i64,
|
||||
mut reply: ReplyDirectory,
|
||||
) {
|
||||
let async_impl = self.0.clone();
|
||||
spawn(async move {
|
||||
async_impl.readdir(ino, fh, offset, reply).await;
|
||||
match async_impl.readdir(ino, fh, offset, &mut reply).await {
|
||||
Ok(_) => reply.ok(),
|
||||
Err(err) => reply.error(err.into()),
|
||||
}
|
||||
});
|
||||
}
|
||||
fn releasedir(&mut self, req: &Request, ino: u64, fh: u64, flags: i32, reply: ReplyEmpty) {
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
use nix::errno::Errno;
|
||||
use nix::Error;
|
||||
use thiserror::Error;
|
||||
|
||||
use log::error;
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HookFsError {
|
||||
|
@ -69,11 +68,11 @@ impl From<tokio::task::JoinError> for HookFsError {
|
|||
}
|
||||
}
|
||||
|
||||
impl Into<libc::c_int> for HookFsError {
|
||||
fn into(self) -> libc::c_int {
|
||||
impl From<HookFsError> for libc::c_int {
|
||||
fn from(err: HookFsError) -> libc::c_int {
|
||||
use HookFsError::*;
|
||||
|
||||
match self {
|
||||
match err {
|
||||
Sys(errno) => errno as i32,
|
||||
InodeNotFound { inode: _ } => libc::EFAULT,
|
||||
FhNotFound { fh: _ } => libc::EFAULT,
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,10 +1,10 @@
|
|||
use fuser::*;
|
||||
use log::{debug, error, trace};
|
||||
use std::fmt::Debug;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::errors::Result;
|
||||
use fuser::*;
|
||||
use tracing::{debug, error, trace};
|
||||
|
||||
use std::fmt::Debug;
|
||||
use super::errors::Result;
|
||||
|
||||
const TTL: Duration = Duration::from_secs(0);
|
||||
|
||||
|
@ -28,10 +28,7 @@ pub struct Entry {
|
|||
}
|
||||
impl Entry {
|
||||
pub fn new(stat: FileAttr, generation: u64) -> Self {
|
||||
Self {
|
||||
stat,
|
||||
generation,
|
||||
}
|
||||
Self { stat, generation }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,12 +45,11 @@ impl Open {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Attr {
|
||||
pub time: std::time::Duration,
|
||||
pub attr: FileAttr,
|
||||
}
|
||||
impl Attr {
|
||||
pub fn new(time: std::time::Duration, attr: FileAttr) -> Self {
|
||||
Self { time, attr }
|
||||
pub fn new(attr: FileAttr) -> Self {
|
||||
Self { attr }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,12 +116,7 @@ pub struct Create {
|
|||
pub flags: i32,
|
||||
}
|
||||
impl Create {
|
||||
pub fn new(
|
||||
attr: FileAttr,
|
||||
generation: u64,
|
||||
fh: u64,
|
||||
flags: i32,
|
||||
) -> Self {
|
||||
pub fn new(attr: FileAttr, generation: u64, fh: u64, flags: i32) -> Self {
|
||||
Self {
|
||||
attr,
|
||||
generation,
|
||||
|
@ -172,14 +163,14 @@ pub trait FsReply<T: Debug>: Sized {
|
|||
fn reply_ok(self, item: T);
|
||||
fn reply_err(self, err: libc::c_int);
|
||||
|
||||
fn reply(self, id: u64, result: Result<T>) {
|
||||
fn reply(self, result: Result<T>) {
|
||||
match result {
|
||||
Ok(item) => {
|
||||
trace!("ok. reply for request({})", id);
|
||||
trace!("ok");
|
||||
self.reply_ok(item)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("err. reply with {} for request ({})", err, id);
|
||||
debug!("err. reply with {}", err);
|
||||
|
||||
let err = err.into();
|
||||
if err == -1 {
|
||||
|
@ -211,7 +202,7 @@ impl FsReply<Open> for ReplyOpen {
|
|||
|
||||
impl FsReply<Attr> for ReplyAttr {
|
||||
fn reply_ok(self, item: Attr) {
|
||||
self.attr(&item.time, &item.attr);
|
||||
self.attr(&TTL, &item.attr);
|
||||
}
|
||||
fn reply_err(self, err: libc::c_int) {
|
||||
self.error(err);
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
use once_cell::sync::Lazy;
|
||||
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use std::future::Future;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use log::trace;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::trace;
|
||||
|
||||
pub static RUNTIME: Lazy<RwLock<Option<Runtime>>> = Lazy::new(|| {
|
||||
trace!("build tokio runtime");
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
use super::filter;
|
||||
use super::Injector;
|
||||
|
||||
use super::injector_config::{AttrOverrideConfig, FileType as ConfigFileType, FilterConfig};
|
||||
use crate::hookfs::Result;
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use fuser::{FileAttr, FileType};
|
||||
use log::{debug, trace};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use std::path::Path;
|
||||
use super::injector_config::{AttrOverrideConfig, FileType as ConfigFileType, FilterConfig};
|
||||
use super::{filter, Injector};
|
||||
use crate::hookfs::Result;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AttrOverrideInjector {
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
use super::filter;
|
||||
use super::Injector;
|
||||
|
||||
use super::injector_config::FaultsConfig;
|
||||
use crate::hookfs::{Error, Result};
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, trace};
|
||||
use nix::errno::Errno;
|
||||
use rand::Rng;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use std::path::Path;
|
||||
use super::injector_config::FaultsConfig;
|
||||
use super::{filter, Injector};
|
||||
use crate::hookfs::{Error, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FaultInjector {
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
use std::convert::TryFrom;
|
||||
use std::path::Path;
|
||||
|
||||
use super::injector_config::FilterConfig;
|
||||
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use bitflags::bitflags;
|
||||
use glob::{MatchOptions, Pattern};
|
||||
use rand::Rng;
|
||||
use tracing::{info, trace};
|
||||
|
||||
use log::{info, trace};
|
||||
use super::injector_config::FilterConfig;
|
||||
|
||||
bitflags! {
|
||||
pub struct Method: u32 {
|
||||
|
@ -110,15 +109,13 @@ impl Filter {
|
|||
.unwrap_or(Method::all());
|
||||
|
||||
let path_filter = conf
|
||||
.path
|
||||
.map(|path| -> Option<Pattern> {
|
||||
.path.and_then(|path| -> Option<Pattern> {
|
||||
if !path.is_empty() {
|
||||
Pattern::new(&path).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flatten();
|
||||
});
|
||||
Ok(Self {
|
||||
path_filter,
|
||||
methods,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
|
@ -9,6 +9,7 @@ pub enum InjectorConfig {
|
|||
Latency(LatencyConfig),
|
||||
Fault(FaultsConfig),
|
||||
AttrOverride(AttrOverrideConfig),
|
||||
Mistake(MistakesConfig),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
|
@ -82,3 +83,26 @@ pub struct Timespec {
|
|||
pub sec: i64,
|
||||
pub nsec: i32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum MistakeType {
|
||||
Zero,
|
||||
Random,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MistakeConfig {
|
||||
pub filling: MistakeType,
|
||||
pub max_length: usize,
|
||||
pub max_occurrences: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MistakesConfig {
|
||||
pub mistake: MistakeConfig,
|
||||
#[serde(flatten)]
|
||||
pub filter: FilterConfig,
|
||||
}
|
||||
|
|
|
@ -1,20 +1,21 @@
|
|||
use async_trait::async_trait;
|
||||
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::filter;
|
||||
use super::injector_config::LatencyConfig;
|
||||
use super::Injector;
|
||||
use crate::hookfs::Result;
|
||||
|
||||
use log::{debug, trace};
|
||||
use async_trait::async_trait;
|
||||
use tokio::time::delay_for;
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use super::injector_config::LatencyConfig;
|
||||
use super::{filter, Injector};
|
||||
use crate::hookfs::Result;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LatencyInjector {
|
||||
latency: Duration,
|
||||
filter: filter::Filter,
|
||||
cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -22,13 +23,27 @@ impl Injector for LatencyInjector {
|
|||
async fn inject(&self, method: &filter::Method, path: &Path) -> Result<()> {
|
||||
trace!("test for filter");
|
||||
if self.filter.filter(method, path) {
|
||||
debug!("inject io delay {:?}", self.latency);
|
||||
delay_for(self.latency).await;
|
||||
let token = self.cancel_token.clone();
|
||||
let latency = self.latency;
|
||||
debug!("inject io delay {:?}", latency);
|
||||
|
||||
select! {
|
||||
_ = delay_for(latency) => {}
|
||||
_ = token.cancelled() => {
|
||||
debug!("cancelled");
|
||||
}
|
||||
}
|
||||
|
||||
debug!("latency finished");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn interrupt(&self) {
|
||||
debug!("interrupt latency");
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl LatencyInjector {
|
||||
|
@ -38,6 +53,7 @@ impl LatencyInjector {
|
|||
Ok(Self {
|
||||
latency: conf.latency,
|
||||
filter: filter::Filter::build(conf.filter)?,
|
||||
cancel_token: CancellationToken::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
use std::cmp::{max, min};
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use super::injector_config::{MistakeConfig, MistakeType, MistakesConfig};
|
||||
use super::{filter, Injector};
|
||||
use crate::hookfs::{Reply, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MistakeInjector {
|
||||
mistake: MistakeConfig,
|
||||
filter: filter::Filter,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Injector for MistakeInjector {
|
||||
async fn inject(&self, _: &filter::Method, _: &Path) -> Result<()> {
|
||||
debug!("MI:Injecting");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn inject_reply(&self, method: &super::Method, path: &Path, reply: &mut Reply) -> Result<()> {
|
||||
if self.filter.filter(method, path) {
|
||||
debug!("MI:Injecting reply");
|
||||
if let Reply::Data(data) = reply {
|
||||
let data = &mut data.data;
|
||||
self.handle(data)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn inject_write_data(&self, path: &Path, data: &mut Vec<u8>) -> Result<()> {
|
||||
if self.filter.filter(&super::Method::WRITE, path) {
|
||||
debug!("MI:Injecting write data");
|
||||
self.handle(data)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl MistakeInjector {
|
||||
pub fn build(conf: MistakesConfig) -> anyhow::Result<Self> {
|
||||
trace!("build mistake injector");
|
||||
Ok(Self {
|
||||
mistake: conf.mistake,
|
||||
filter: filter::Filter::build(conf.filter)?,
|
||||
})
|
||||
}
|
||||
pub fn handle(&self, data: &mut Vec<u8>) -> Result<()> {
|
||||
trace!("sabotage data");
|
||||
let mut rng = rand::thread_rng();
|
||||
let data_length = data.len();
|
||||
let mistake = &self.mistake;
|
||||
let occurrence = match mistake.max_occurrences {
|
||||
0 => 0,
|
||||
mo => rng.gen_range(1, mo + 1),
|
||||
};
|
||||
for _ in 0..occurrence {
|
||||
let pos = rng.gen_range(0, max(data_length, 1));
|
||||
let length = match min(mistake.max_length, data_length - pos) {
|
||||
0 => 0,
|
||||
l => rng.gen_range(1, l + 1),
|
||||
};
|
||||
debug!(
|
||||
"Setting index [{},{}) to {:?}",
|
||||
pos,
|
||||
pos + length,
|
||||
mistake.filling
|
||||
);
|
||||
match mistake.filling {
|
||||
MistakeType::Zero => {
|
||||
for item in data.iter_mut().skip(pos).take(length) {
|
||||
*item = 0;
|
||||
}
|
||||
}
|
||||
MistakeType::Random => rng.fill(&mut data[pos..pos + length]),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -3,17 +3,18 @@ mod fault_injector;
|
|||
mod filter;
|
||||
mod injector_config;
|
||||
mod latency_injector;
|
||||
mod mistake_injector;
|
||||
mod multi_injector;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
pub use filter::Method;
|
||||
use fuser::FileAttr;
|
||||
pub use injector_config::InjectorConfig;
|
||||
pub use multi_injector::MultiInjector;
|
||||
|
||||
use crate::hookfs::{Reply, Result};
|
||||
use async_trait::async_trait;
|
||||
use fuser::FileAttr;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Injector: Send + Sync + std::fmt::Debug {
|
||||
|
@ -27,6 +28,11 @@ pub trait Injector: Send + Sync + std::fmt::Debug {
|
|||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
fn inject_write_data(&self, _path: &Path, _data: &mut Vec<u8>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn inject_attr(&self, _attr: &mut FileAttr, _path: &Path) {}
|
||||
|
||||
fn interrupt(&self) {}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
use super::attr_override_injector::AttrOverrideInjector;
|
||||
use super::fault_injector::FaultInjector;
|
||||
use super::filter;
|
||||
use super::injector_config::InjectorConfig;
|
||||
use super::latency_injector::LatencyInjector;
|
||||
use super::Injector;
|
||||
use crate::hookfs::{Reply, Result};
|
||||
use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use fuser::FileAttr;
|
||||
use log::trace;
|
||||
use tracing::trace;
|
||||
|
||||
use std::path::Path;
|
||||
use super::attr_override_injector::AttrOverrideInjector;
|
||||
use super::fault_injector::FaultInjector;
|
||||
use super::injector_config::InjectorConfig;
|
||||
use super::latency_injector::LatencyInjector;
|
||||
use super::mistake_injector::MistakeInjector;
|
||||
use super::{filter, Injector};
|
||||
use crate::hookfs::{Reply, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MultiInjector {
|
||||
|
@ -33,6 +33,9 @@ impl MultiInjector {
|
|||
InjectorConfig::AttrOverride(attr_override) => {
|
||||
(box AttrOverrideInjector::build(attr_override)?) as Box<dyn Injector>
|
||||
}
|
||||
InjectorConfig::Mistake(mistakes) => {
|
||||
(box MistakeInjector::build(mistakes)?) as Box<dyn Injector>
|
||||
}
|
||||
};
|
||||
injectors.push(injector)
|
||||
}
|
||||
|
@ -64,4 +67,17 @@ impl Injector for MultiInjector {
|
|||
injector.inject_attr(attr, path)
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_write_data(&self, path: &Path, data: &mut Vec<u8>) -> Result<()> {
|
||||
for injector in self.injectors.iter() {
|
||||
injector.inject_write_data(path, data)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn interrupt(&self) {
|
||||
for injector in self.injectors.iter() {
|
||||
injector.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
use std::sync::{mpsc, Arc, Mutex};
|
||||
|
||||
use jsonrpc_derive::rpc;
|
||||
use jsonrpc_stdio_server::jsonrpc_core::*;
|
||||
use jsonrpc_stdio_server::ServerBuilder;
|
||||
use tracing::{info, trace};
|
||||
|
||||
use crate::hookfs::HookFs;
|
||||
use crate::injector::{InjectorConfig, MultiInjector};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Comm {
|
||||
Shutdown = 0,
|
||||
}
|
||||
|
||||
pub async fn start_server(config: RpcImpl) {
|
||||
info!("Starting jsonrpc server");
|
||||
let server = new_server(config);
|
||||
let server = server.build();
|
||||
server.await;
|
||||
}
|
||||
|
||||
pub fn new_server(config: RpcImpl) -> ServerBuilder {
|
||||
info!("Creating jsonrpc server");
|
||||
let io = new_handler(config);
|
||||
ServerBuilder::new(io)
|
||||
}
|
||||
|
||||
pub fn new_handler(config: RpcImpl) -> IoHandler {
|
||||
info!("Creating jsonrpc handler");
|
||||
let mut io = IoHandler::new();
|
||||
io.extend_with(config.to_delegate());
|
||||
io
|
||||
}
|
||||
|
||||
#[rpc]
|
||||
pub trait Rpc {
|
||||
#[rpc(name = "get_status")]
|
||||
fn get_status(&self, inst: String) -> Result<String>;
|
||||
#[rpc(name = "update")]
|
||||
fn update(&self, config: Vec<InjectorConfig>) -> Result<String>;
|
||||
}
|
||||
|
||||
pub struct RpcImpl {
|
||||
status: Mutex<anyhow::Result<()>>,
|
||||
tx: Mutex<mpsc::Sender<Comm>>,
|
||||
hookfs: Option<Arc<HookFs>>,
|
||||
}
|
||||
|
||||
impl RpcImpl {
|
||||
pub fn new(
|
||||
status: Mutex<anyhow::Result<()>>,
|
||||
tx: Mutex<mpsc::Sender<Comm>>,
|
||||
hookfs: Option<Arc<HookFs>>,
|
||||
) -> Self {
|
||||
Self { status, tx, hookfs }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RpcImpl {
|
||||
fn drop(&mut self) {
|
||||
trace!("Dropping jrpc handler");
|
||||
}
|
||||
}
|
||||
|
||||
impl Rpc for RpcImpl {
|
||||
fn get_status(&self, _inst: String) -> Result<String> {
|
||||
info!("rpc get_status called");
|
||||
match &*self.status.lock().unwrap() {
|
||||
Ok(_) => Ok("ok".to_string()),
|
||||
Err(e) => {
|
||||
let tx = &self.tx.lock().unwrap();
|
||||
tx.send(Comm::Shutdown)
|
||||
.expect("Send through channel failed");
|
||||
Ok(e.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
fn update(&self, config: Vec<InjectorConfig>) -> Result<String> {
|
||||
info!("rpc update called");
|
||||
if let Err(e) = &*self.status.lock().unwrap() {
|
||||
return Ok(e.to_string());
|
||||
}
|
||||
let injectors = MultiInjector::build(config);
|
||||
if let Err(e) = &injectors {
|
||||
return Ok(e.to_string());
|
||||
}
|
||||
futures::executor::block_on(async {
|
||||
let hookfs = self.hookfs.as_ref().unwrap();
|
||||
let mut current_injectors = hookfs.injector.write().await;
|
||||
*current_injectors = injectors.unwrap();
|
||||
});
|
||||
Ok("ok".to_string())
|
||||
}
|
||||
}
|
11
src/lib.rs
11
src/lib.rs
|
@ -19,6 +19,13 @@
|
|||
#![allow(clippy::or_fun_call)]
|
||||
#![allow(clippy::too_many_arguments)]
|
||||
|
||||
pub mod injector;
|
||||
|
||||
pub mod fuse_device;
|
||||
pub mod hookfs;
|
||||
pub mod injector;
|
||||
pub mod jsonrpc;
|
||||
pub mod mount;
|
||||
pub mod mount_injector;
|
||||
pub mod ptrace;
|
||||
pub mod replacer;
|
||||
pub mod stop;
|
||||
pub mod utils;
|
||||
|
|
86
src/main.rs
86
src/main.rs
|
@ -24,6 +24,7 @@ extern crate derive_more;
|
|||
mod fuse_device;
|
||||
mod hookfs;
|
||||
mod injector;
|
||||
mod jsonrpc;
|
||||
mod mount;
|
||||
mod mount_injector;
|
||||
mod ptrace;
|
||||
|
@ -31,20 +32,24 @@ mod replacer;
|
|||
mod stop;
|
||||
mod utils;
|
||||
|
||||
use injector::InjectorConfig;
|
||||
use mount_injector::{MountInjectionGuard, MountInjector};
|
||||
use replacer::{Replacer, UnionReplacer};
|
||||
use utils::encode_path;
|
||||
|
||||
use anyhow::Result;
|
||||
use log::info;
|
||||
use nix::sys::signal::{signal, SigHandler, Signal};
|
||||
use nix::unistd::{pipe, read, write};
|
||||
use structopt::StructOpt;
|
||||
use env_logger;
|
||||
|
||||
use std::convert::TryFrom;
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{mpsc, Mutex};
|
||||
use std::{io, thread};
|
||||
|
||||
use anyhow::Result;
|
||||
use injector::InjectorConfig;
|
||||
use jsonrpc::start_server;
|
||||
use mount_injector::{MountInjectionGuard, MountInjector};
|
||||
use nix::sys::signal::{signal, SigHandler, Signal};
|
||||
use nix::unistd::{pipe, read, write};
|
||||
use replacer::{Replacer, UnionReplacer};
|
||||
use structopt::StructOpt;
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{info, instrument};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use utils::encode_path;
|
||||
|
||||
#[derive(StructOpt, Debug, Clone)]
|
||||
#[structopt(name = "basic")]
|
||||
|
@ -59,9 +64,8 @@ struct Options {
|
|||
verbose: String,
|
||||
}
|
||||
|
||||
fn inject(option: Options) -> Result<MountInjectionGuard> {
|
||||
info!("parse injector configs");
|
||||
let injector_config: Vec<InjectorConfig> = serde_json::from_reader(std::io::stdin())?;
|
||||
#[instrument(skip(option))]
|
||||
fn inject(option: Options, injector_config: Vec<InjectorConfig>) -> Result<MountInjectionGuard> {
|
||||
info!("inject with config {:?}", injector_config);
|
||||
|
||||
let path = option.path.clone();
|
||||
|
@ -70,7 +74,7 @@ fn inject(option: Options) -> Result<MountInjectionGuard> {
|
|||
let path = path.canonicalize()?;
|
||||
|
||||
let replacer = if !option.mount_only {
|
||||
let mut replacer = UnionReplacer::new();
|
||||
let mut replacer = UnionReplacer::default();
|
||||
replacer.prepare(&path, &path)?;
|
||||
|
||||
Some(replacer)
|
||||
|
@ -100,6 +104,7 @@ fn inject(option: Options) -> Result<MountInjectionGuard> {
|
|||
Ok(mount_guard)
|
||||
}
|
||||
|
||||
#[instrument(skip(option, mount_guard))]
|
||||
fn resume(option: Options, mount_guard: MountInjectionGuard) -> Result<()> {
|
||||
info!("disable injection");
|
||||
mount_guard.disable_injection();
|
||||
|
@ -111,7 +116,7 @@ fn resume(option: Options, mount_guard: MountInjectionGuard) -> Result<()> {
|
|||
let (_, new_path) = encode_path(&path)?;
|
||||
|
||||
let replacer = if !option.mount_only {
|
||||
let mut replacer = UnionReplacer::new();
|
||||
let mut replacer = UnionReplacer::default();
|
||||
replacer.prepare(&path, &new_path)?;
|
||||
info!("running replacer");
|
||||
let result = replacer.run();
|
||||
|
@ -142,6 +147,12 @@ extern "C" fn signal_handler(_: libc::c_int) {
|
|||
}
|
||||
}
|
||||
|
||||
fn wait_for_signal(chan: RawFd) -> Result<()> {
|
||||
let mut buf = vec![0u8; 6];
|
||||
read(chan, buf.as_mut_slice())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let (reader, writer) = pipe()?;
|
||||
unsafe {
|
||||
|
@ -152,16 +163,43 @@ fn main() -> Result<()> {
|
|||
unsafe { signal(Signal::SIGTERM, SigHandler::Handler(signal_handler))? };
|
||||
|
||||
let option = Options::from_args();
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(&option.verbose)).init();
|
||||
let env_filter = EnvFilter::try_from_default_env()
|
||||
.or_else(|_| EnvFilter::try_from(&option.verbose))
|
||||
.or_else(|_| EnvFilter::try_new("trace"))
|
||||
.unwrap();
|
||||
tracing_subscriber::fmt()
|
||||
.with_writer(io::stderr)
|
||||
.with_env_filter(env_filter)
|
||||
.init();
|
||||
info!("start with option: {:?}", option);
|
||||
let mount_injector = inject(option.clone(), vec![]);
|
||||
|
||||
let mount_injector = inject(option.clone())?;
|
||||
let status = match &mount_injector {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(anyhow::Error::msg(e.to_string())),
|
||||
};
|
||||
|
||||
let (tx, _) = mpsc::channel();
|
||||
{
|
||||
let hookfs = match &mount_injector {
|
||||
Ok(e) => Some(e.hookfs.clone()),
|
||||
Err(_) => None,
|
||||
};
|
||||
thread::spawn(|| {
|
||||
Runtime::new()
|
||||
.expect("Failed to create Tokio runtime")
|
||||
.block_on(start_server(jsonrpc::RpcImpl::new(
|
||||
Mutex::new(status),
|
||||
Mutex::new(tx),
|
||||
hookfs,
|
||||
)));
|
||||
});
|
||||
}
|
||||
info!("waiting for signal to exit");
|
||||
let mut buf = vec![0u8; 6];
|
||||
read(reader, buf.as_mut_slice())?;
|
||||
wait_for_signal(reader)?;
|
||||
info!("start to recover and exit");
|
||||
|
||||
resume(option, mount_injector)?;
|
||||
|
||||
if let Ok(v) = mount_injector {
|
||||
resume(option, v)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -2,9 +2,7 @@ use std::fs::create_dir_all;
|
|||
use std::path::Path;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
use nix::mount::{mount, MsFlags};
|
||||
|
||||
use procfs::process::{self, Process};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
|
@ -1,21 +1,16 @@
|
|||
use crate::hookfs;
|
||||
use crate::injector::MultiInjector;
|
||||
use crate::mount;
|
||||
use crate::stop;
|
||||
use crate::InjectorConfig;
|
||||
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
use nix::mount::umount;
|
||||
use retry::delay::Fixed;
|
||||
use retry::{retry, OperationResult};
|
||||
use tracing::info;
|
||||
|
||||
use log::info;
|
||||
|
||||
use retry::{delay::Fixed, retry, OperationResult};
|
||||
use crate::injector::{InjectorConfig, MultiInjector};
|
||||
use crate::{hookfs, mount, stop};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MountInjector {
|
||||
|
@ -27,7 +22,7 @@ pub struct MountInjector {
|
|||
pub struct MountInjectionGuard {
|
||||
original_path: PathBuf,
|
||||
new_path: PathBuf,
|
||||
hookfs: Arc<hookfs::HookFs>,
|
||||
pub hookfs: Arc<hookfs::HookFs>,
|
||||
handler: Option<JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
|
@ -46,9 +41,9 @@ impl MountInjectionGuard {
|
|||
retry(Fixed::from_millis(500).take(20), || {
|
||||
if let Err(err) = umount(mount_point.as_path()) {
|
||||
info!("umount returns error: {:?}", err);
|
||||
return OperationResult::Retry(err);
|
||||
OperationResult::Retry(err)
|
||||
} else {
|
||||
return OperationResult::Ok(());
|
||||
OperationResult::Ok(())
|
||||
}
|
||||
})?;
|
||||
|
||||
|
@ -135,7 +130,7 @@ impl MountInjector {
|
|||
|
||||
std::fs::create_dir_all(new_path.as_path())?;
|
||||
|
||||
let args = ["allow_other", "fsname=toda", "default_permissions"];
|
||||
let args = ["allow_other", "fsname=toda", "default_permissions", "nonempty"];
|
||||
let flags: Vec<_> = args
|
||||
.iter()
|
||||
.flat_map(|item| vec![OsStr::new("-o"), OsStr::new(item)])
|
||||
|
@ -153,7 +148,7 @@ impl MountInjector {
|
|||
// TODO: remove this. But wait for FUSE gets up
|
||||
// Related Issue: https://github.com/zargony/fuse-rs/issues/9
|
||||
before_mount_waiter.wait();
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
std::thread::sleep(std::time::Duration::from_millis(200));
|
||||
|
||||
Ok(MountInjectionGuard {
|
||||
handler: Some(handler),
|
||||
|
|
|
@ -1,20 +1,25 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use nix::sys::mman::{MapFlags, ProtFlags};
|
||||
use nix::sys::ptrace;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::sys::uio::{process_vm_writev, IoVec, RemoteIoVec};
|
||||
use nix::sys::wait;
|
||||
use nix::unistd::Pid;
|
||||
|
||||
use log::{info, trace, warn};
|
||||
use procfs::ProcError;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ffi::CString;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::mman::{MapFlags, ProtFlags};
|
||||
use nix::sys::signal::Signal;
|
||||
use nix::sys::uio::{process_vm_writev, IoVec, RemoteIoVec};
|
||||
use nix::sys::{ptrace, wait};
|
||||
use nix::unistd::Pid;
|
||||
use nix::Error::Sys;
|
||||
use procfs::process::Task;
|
||||
use procfs::ProcError;
|
||||
use retry::delay::Fixed;
|
||||
use retry::Error::{self, Operation};
|
||||
use retry::OperationResult;
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
use Error::Internal;
|
||||
|
||||
// There should be only one PtraceManager in one thread. But as we don't implement TLS
|
||||
// , we cannot use thread-local variables safely.
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -30,7 +35,45 @@ pub fn trace(pid: i32) -> Result<TracedProcess> {
|
|||
PTRACE_MANAGER.with(|pm| pm.trace(pid))
|
||||
}
|
||||
|
||||
fn thread_is_gone(state: char) -> bool {
|
||||
// return true if the process is Zombie or Dead
|
||||
state == 'Z' || state == 'x' || state == 'X'
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
fn attach_task(task: &Task) -> Result<()> {
|
||||
let pid = Pid::from_raw(task.tid);
|
||||
let process = procfs::process::Process::new(task.tid)?;
|
||||
|
||||
trace!("attach task: {}", task.tid);
|
||||
match ptrace::attach(pid) {
|
||||
Err(Sys(errno))
|
||||
if errno == Errno::ESRCH
|
||||
|| (errno == Errno::EPERM && thread_is_gone(process.stat.state)) =>
|
||||
{
|
||||
info!("task {} doesn't exist, maybe has stopped", task.tid)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("attach error: {:?}", err);
|
||||
return Err(err.into());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
info!("attach task: {} successfully", task.tid);
|
||||
|
||||
// TODO: check wait result
|
||||
match wait::waitpid(pid, Some(wait::WaitPidFlag::__WALL)) {
|
||||
Ok(status) => {
|
||||
info!("wait status: {:?}", status);
|
||||
}
|
||||
Err(err) => warn!("fail to wait for process({}): {:?}", pid, err),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl PtraceManager {
|
||||
#[instrument(skip(self))]
|
||||
pub fn trace(&self, pid: i32) -> Result<TracedProcess> {
|
||||
let raw_pid = pid;
|
||||
let pid = Pid::from_raw(pid);
|
||||
|
@ -39,47 +82,40 @@ impl PtraceManager {
|
|||
match counter_ref.get_mut(&raw_pid) {
|
||||
Some(count) => *count += 1,
|
||||
None => {
|
||||
trace!("send SIGSTOP to process: {}", pid);
|
||||
kill(pid, Signal::SIGSTOP)?;
|
||||
trace!("stop {} successfully", pid);
|
||||
|
||||
let process = procfs::process::Process::new(raw_pid)?;
|
||||
for task in process.tasks()? {
|
||||
if let Ok(task) = task {
|
||||
let pid = Pid::from_raw(task.tid);
|
||||
let mut iterations = 2;
|
||||
let mut traced_tasks = HashSet::<i32>::new();
|
||||
|
||||
info!("attach task: {}", task.tid);
|
||||
match ptrace::attach(pid) {
|
||||
Err(nix::Error::Sys(nix::errno::Errno::ESRCH)) => {
|
||||
info!("task {} doesn't exist, maybe has stopped", task.tid)
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
_ => {}
|
||||
while iterations > 0 {
|
||||
let mut new_threads_found = false;
|
||||
let process = procfs::process::Process::new(raw_pid)?;
|
||||
for task in process.tasks()?.flatten() {
|
||||
if traced_tasks.contains(&task.tid) {
|
||||
continue;
|
||||
}
|
||||
info!("attach task: {} successfully", task.tid);
|
||||
|
||||
// TODO: check wait result
|
||||
match wait::waitpid(pid, Some(wait::WaitPidFlag::__WALL)) {
|
||||
Ok(status) => {
|
||||
info!("wait status: {:?}", status);
|
||||
}
|
||||
Err(err) => warn!("fail to wait for process({}): {:?}", pid, err),
|
||||
if let Ok(()) = attach_task(&task) {
|
||||
trace!("newly traced task: {}", task.tid);
|
||||
new_threads_found = true;
|
||||
traced_tasks.insert(task.tid);
|
||||
}
|
||||
}
|
||||
|
||||
if !new_threads_found {
|
||||
iterations -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
info!("trace process: {} successfully", pid);
|
||||
counter_ref.insert(raw_pid, 1);
|
||||
|
||||
info!("send SIGCONT to process: {}", pid);
|
||||
kill(pid, Signal::SIGCONT)?;
|
||||
info!("continue {} successfully", pid);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(TracedProcess { pid: raw_pid })
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn detach(&self, pid: i32) -> Result<()> {
|
||||
let mut counter_ref = self.counter.borrow_mut();
|
||||
match counter_ref.get_mut(&pid) {
|
||||
|
@ -90,27 +126,51 @@ impl PtraceManager {
|
|||
counter_ref.remove(&pid);
|
||||
|
||||
info!("detach process: {}", pid);
|
||||
match procfs::process::Process::new(pid) {
|
||||
Ok(process) => {
|
||||
for task in process.tasks()? {
|
||||
if let Ok(task) = task {
|
||||
info!("detach task: {}", task.tid);
|
||||
match ptrace::detach(Pid::from_raw(task.tid), None) {
|
||||
Ok(()) => {}
|
||||
Err(nix::Error::Sys(nix::errno::Errno::ESRCH)) => info!(
|
||||
"task {} doesn't exist, maybe has stopped",
|
||||
task.tid
|
||||
),
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
info!("detach task: {} successfully", task.tid);
|
||||
}
|
||||
if let Err(err) = retry::retry::<_, _, _, anyhow::Error, _>(
|
||||
Fixed::from_millis(500).take(20),
|
||||
|| match procfs::process::Process::new(pid) {
|
||||
Err(ProcError::NotFound(_)) => {
|
||||
info!("process {} not found", pid);
|
||||
OperationResult::Ok(())
|
||||
}
|
||||
info!("detach process: {} successfully", pid);
|
||||
Err(err) => {
|
||||
warn!("fail to detach task: {}, retry", pid);
|
||||
OperationResult::Retry(err.into())
|
||||
}
|
||||
Ok(process) => match process.tasks() {
|
||||
Err(err) => OperationResult::Retry(err.into()),
|
||||
Ok(tasks) => {
|
||||
for task in tasks.flatten() {
|
||||
match ptrace::detach(Pid::from_raw(task.tid), None) {
|
||||
Ok(()) => {
|
||||
info!("successfully detached task: {}", task.tid);
|
||||
}
|
||||
Err(Sys(Errno::ESRCH)) => trace!(
|
||||
"task {} doesn't exist, maybe has stopped or not traced",
|
||||
task.tid
|
||||
),
|
||||
Err(err) => {
|
||||
warn!("fail to detach: {:?}", err)
|
||||
},
|
||||
}
|
||||
trace!("detach task: {} successfully", task.tid);
|
||||
}
|
||||
info!("detach process: {} successfully", pid);
|
||||
OperationResult::Ok(())
|
||||
}
|
||||
},
|
||||
},
|
||||
) {
|
||||
warn!("fail to detach: {:?}", err);
|
||||
match err {
|
||||
Operation {
|
||||
error: e,
|
||||
total_delay: _,
|
||||
tries: _,
|
||||
} => return Err(e),
|
||||
Internal(err) => error!("internal error: {:?}", err),
|
||||
}
|
||||
Err(ProcError::NotFound(_)) => info!("process {} not found", pid),
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -133,6 +193,7 @@ impl Clone for TracedProcess {
|
|||
}
|
||||
|
||||
impl TracedProcess {
|
||||
#[instrument]
|
||||
fn protect(&self) -> Result<ThreadGuard> {
|
||||
let regs = ptrace::getregs(Pid::from_raw(self.pid))?;
|
||||
|
||||
|
@ -148,6 +209,7 @@ impl TracedProcess {
|
|||
Ok(guard)
|
||||
}
|
||||
|
||||
#[instrument(skip(f))]
|
||||
fn with_protect<R, F: Fn(&Self) -> Result<R>>(&self, f: F) -> Result<R> {
|
||||
let guard = self.protect()?;
|
||||
|
||||
|
@ -158,6 +220,7 @@ impl TracedProcess {
|
|||
Ok(ret)
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
fn syscall(&self, id: u64, args: &[u64]) -> Result<u64> {
|
||||
trace!("run syscall {} {:?}", id, args);
|
||||
|
||||
|
@ -216,6 +279,7 @@ impl TracedProcess {
|
|||
})
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub fn mmap(&self, length: u64, fd: u64) -> Result<u64> {
|
||||
let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE | ProtFlags::PROT_EXEC;
|
||||
let flags = MapFlags::MAP_PRIVATE | MapFlags::MAP_ANON;
|
||||
|
@ -226,10 +290,12 @@ impl TracedProcess {
|
|||
)
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub fn munmap(&self, addr: u64, len: u64) -> Result<u64> {
|
||||
self.syscall(11, &[addr, len])
|
||||
}
|
||||
|
||||
#[instrument(skip(f))]
|
||||
pub fn with_mmap<R, F: Fn(&Self, u64) -> Result<R>>(&self, len: u64, f: F) -> Result<R> {
|
||||
let addr = self.mmap(len, 0)?;
|
||||
|
||||
|
@ -240,7 +306,8 @@ impl TracedProcess {
|
|||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn chdir<P: AsRef<Path>>(&self, filename: P) -> Result<()> {
|
||||
#[instrument]
|
||||
pub fn chdir<P: AsRef<Path> + std::fmt::Debug>(&self, filename: P) -> Result<()> {
|
||||
let filename = CString::new(filename.as_ref().as_os_str().as_bytes())?;
|
||||
let path = filename.as_bytes_with_nul();
|
||||
|
||||
|
@ -252,6 +319,7 @@ impl TracedProcess {
|
|||
})
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub fn write_mem(&self, addr: u64, content: &[u8]) -> Result<()> {
|
||||
let pid = Pid::from_raw(self.pid);
|
||||
|
||||
|
@ -267,6 +335,7 @@ impl TracedProcess {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(codes))]
|
||||
pub fn run_codes<F: Fn(u64) -> Result<(u64, Vec<u8>)>>(&self, codes: F) -> Result<()> {
|
||||
let pid = Pid::from_raw(self.pid);
|
||||
|
||||
|
@ -320,7 +389,7 @@ impl Drop for TracedProcess {
|
|||
|
||||
if let Err(err) = PTRACE_MANAGER.with(|pm| pm.detach(self.pid)) {
|
||||
info!(
|
||||
"deteching process {} failed with error: {:?}",
|
||||
"detaching process {} failed with error: {:?}",
|
||||
self.pid, err
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
use super::ptrace;
|
||||
use super::utils::all_processes;
|
||||
use super::Replacer;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
use log::{error, info, trace};
|
||||
use super::utils::all_processes;
|
||||
use super::{ptrace, Replacer};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CwdReplacer {
|
||||
processes: Vec<ptrace::TracedProcess>,
|
||||
new_path: PathBuf,
|
||||
processes: Vec<(ptrace::TracedProcess, PathBuf)>,
|
||||
}
|
||||
|
||||
impl CwdReplacer {
|
||||
|
@ -36,8 +33,13 @@ impl CwdReplacer {
|
|||
}
|
||||
})
|
||||
.filter(|(_, path)| path.starts_with(detect_path.as_ref()))
|
||||
.filter_map(|(pid, _)| match ptrace::trace(pid) {
|
||||
Ok(process) => Some(process),
|
||||
.filter_map(|(pid, path)| match ptrace::trace(pid) {
|
||||
Ok(process) => {
|
||||
let mut new_path = new_path.as_ref().to_path_buf();
|
||||
|
||||
new_path.push(path.strip_prefix(detect_path.as_ref()).unwrap());
|
||||
Some((process, new_path))
|
||||
}
|
||||
Err(err) => {
|
||||
error!("fail to ptrace process: pid({}) with error: {:?}", pid, err);
|
||||
None
|
||||
|
@ -45,18 +47,16 @@ impl CwdReplacer {
|
|||
})
|
||||
.collect();
|
||||
|
||||
Ok(CwdReplacer {
|
||||
processes,
|
||||
new_path: new_path.as_ref().to_owned(),
|
||||
})
|
||||
Ok(CwdReplacer { processes })
|
||||
}
|
||||
}
|
||||
|
||||
impl Replacer for CwdReplacer {
|
||||
fn run(&mut self) -> Result<()> {
|
||||
info!("running cwd replacer");
|
||||
for process in self.processes.iter() {
|
||||
process.chdir(&self.new_path)?;
|
||||
for (process, new_path) in self.processes.iter() {
|
||||
trace!("replacing cwd: {} to {:?}", process.pid, new_path);
|
||||
process.chdir(new_path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,21 +1,17 @@
|
|||
use super::ptrace;
|
||||
use super::utils::all_processes;
|
||||
use super::Replacer;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::io::{Cursor, Read, Write};
|
||||
use std::iter::FromIterator;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
use dynasmrt::{dynasm, DynasmApi, DynasmLabelApi};
|
||||
|
||||
use log::{error, info, trace};
|
||||
|
||||
use procfs::process::FDTarget;
|
||||
|
||||
use itertools::Itertools;
|
||||
use procfs::process::FDTarget;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
use super::utils::all_processes;
|
||||
use super::{ptrace, Replacer};
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(packed)]
|
||||
|
|
|
@ -1,24 +1,18 @@
|
|||
use super::ptrace;
|
||||
use super::utils::all_processes;
|
||||
use super::Replacer;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::io::{Cursor, Read, Write};
|
||||
use std::iter::FromIterator;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use log::{error, info, trace};
|
||||
|
||||
use procfs::process::MMapPath;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
|
||||
use dynasmrt::{dynasm, DynasmApi, DynasmLabelApi};
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
use nix::sys::mman::{MapFlags, ProtFlags};
|
||||
use procfs::process::MMapPath;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
use super::utils::all_processes;
|
||||
use super::{ptrace, Replacer};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ReplaceCase {
|
||||
|
@ -194,6 +188,7 @@ impl ProcessAccessor {
|
|||
; mov rdi, QWORD [r14+r15] // addr
|
||||
; mov rsi, QWORD [r14+r15+8] // length
|
||||
; mov rdx, 0x0
|
||||
; push rdi
|
||||
; syscall
|
||||
// open
|
||||
; mov rax, 0x2
|
||||
|
@ -206,6 +201,7 @@ impl ProcessAccessor {
|
|||
; mov rsi, libc::O_RDWR
|
||||
; mov rdx, 0x0
|
||||
; syscall
|
||||
; pop rdi // addr
|
||||
; push rax
|
||||
; mov r8, rax // fd
|
||||
// mmap
|
||||
|
|
|
@ -1,31 +1,26 @@
|
|||
use crate::ptrace;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use crate::ptrace;
|
||||
|
||||
mod cwd_replacer;
|
||||
mod fd_replacer;
|
||||
mod mmap_replacer;
|
||||
mod utils;
|
||||
|
||||
use log::error;
|
||||
use tracing::error;
|
||||
|
||||
pub trait Replacer {
|
||||
fn run(&mut self) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct UnionReplacer<'a> {
|
||||
replacers: Vec<Box<dyn Replacer + 'a>>,
|
||||
}
|
||||
|
||||
impl<'a> UnionReplacer<'a> {
|
||||
pub fn new() -> UnionReplacer<'a> {
|
||||
UnionReplacer {
|
||||
replacers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prepare<P1: AsRef<Path>, P2: AsRef<Path>>(
|
||||
&mut self,
|
||||
detect_path: P1,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use anyhow::Result;
|
||||
|
||||
use procfs::process::{self, Process};
|
||||
|
||||
pub fn all_processes() -> Result<impl Iterator<Item = Process>> {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
||||
struct Stop {
|
||||
inner: Mutex<bool>,
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
use std::sync::mpsc::channel;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use toda::jsonrpc::{self, new_handler, Comm};
|
||||
#[test]
|
||||
fn test_status_good() {
|
||||
let (tx, _rx) = channel();
|
||||
let io = new_handler(jsonrpc::RpcImpl::new(
|
||||
Mutex::new(Ok(())),
|
||||
Mutex::new(tx),
|
||||
None,
|
||||
));
|
||||
let request = r#"{"jsonrpc": "2.0","method":"get_status","params":[""],"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":"ok","id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_status_bad() {
|
||||
let (tx, rx) = channel();
|
||||
let io = new_handler(jsonrpc::RpcImpl::new(
|
||||
Mutex::new(Err(anyhow!("Not good"))),
|
||||
Mutex::new(tx),
|
||||
None,
|
||||
));
|
||||
let request = r#"{"jsonrpc": "2.0","method":"get_status","params":[""],"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":"Not good","id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
|
||||
assert_eq!(rx.recv().unwrap(), Comm::Shutdown);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_should_not_update_config_if_status_is_failed() {
|
||||
let (tx, _rx) = channel();
|
||||
let request = r#"{"jsonrpc": "2.0","method":"update","params":[[]],"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":"Not good","id":1}"#;
|
||||
let io = new_handler(jsonrpc::RpcImpl::new(
|
||||
Mutex::new(Err(anyhow!("Not good"))),
|
||||
Mutex::new(tx),
|
||||
None,
|
||||
));
|
||||
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_should_fail_if_config_is_bad() {
|
||||
let (tx, _rx) = channel();
|
||||
let request = r#"{"jsonrpc": "2.0","method":"update","params":[["blah"]],"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid params: invalid type: string \"blah\", expected internally tagged enum."},"id":1}"#;
|
||||
let io = new_handler(jsonrpc::RpcImpl::new(
|
||||
Mutex::new(Ok(())),
|
||||
Mutex::new(tx),
|
||||
None,
|
||||
));
|
||||
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
|
||||
}
|
|
@ -11,20 +11,17 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use toda::hookfs;
|
||||
use toda::injector::MultiInjector;
|
||||
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::{read_link, read_to_string, write, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::os::unix::fs::symlink;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Once;
|
||||
use std::sync::{Arc, Once};
|
||||
|
||||
use nix::fcntl;
|
||||
use nix::sys::stat;
|
||||
use nix::unistd;
|
||||
use nix::{fcntl, unistd};
|
||||
use toda::hookfs;
|
||||
use toda::injector::MultiInjector;
|
||||
|
||||
// These tests are port from go-fuse test
|
||||
|
||||
|
@ -35,7 +32,7 @@ fn init(name: &str) -> (PathBuf, fuser::BackgroundSession) {
|
|||
let test_path: PathBuf = ["/tmp/test_mnt", name].iter().collect();
|
||||
|
||||
INIT.call_once(|| {
|
||||
flexi_logger::Logger::with_env().start().unwrap();
|
||||
env_logger::init();
|
||||
});
|
||||
|
||||
std::fs::remove_dir_all(&test_path_backend).ok();
|
||||
|
|
Loading…
Reference in New Issue