Compare commits

...

30 Commits

Author SHA1 Message Date
YangKeao 523c67dbd5
Bump version to v0.2.4 (#44)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
2024-07-07 14:22:16 +08:00
Michael Morris 4bc8404382
Fix incorrect mmap args (#42)
Signed-off-by: MichaelMorris <michael.morris@est.tech>
2024-07-07 14:14:46 +08:00
YangKeao 04ba4edc20
run test one by one (#43)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
2024-07-07 14:06:49 +08:00
YangKeao 67b43ee417
Bump version to v0.2.3
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-04-12 13:16:28 +08:00
STRRL 7eb184edd0
fix: appending nonempty when mount fuse (#31)
Signed-off-by: STRRL <str_ruiling@outlook.com>
2022-04-02 11:39:12 +08:00
YangKeao f9c784ceac
bump version to v0.2.2
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-01-05 17:09:54 +08:00
YangKeao 65071a4a6e
cancel the waiting injector
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-01-05 15:12:35 +08:00
YangKeao 7ea57ef5f3 fix according to cargo clippy
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-01-04 17:52:54 +08:00
YangKeao ac795a08cf Bump version to v0.2.1
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-01-04 17:38:44 +08:00
YangKeao 7ca342b0b0 fix cwd replacer
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
2022-01-04 17:38:23 +08:00
Yang Keao 8734addae3 Bump version to v0.2.0
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-04-06 13:12:19 +08:00
AsterNighT a866988506
lock aabi to 7.19 and update examples (#24)
Signed-off-by: AsterNighT <klxjt99@outlook.com>
2021-03-25 14:12:33 +08:00
AsterNighT 62043db60a
Use jrpc server to set injectors (#23)
* Make toda call jsonrpc through stderr after startup

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Setup a jrpc server over stdio and move tracings to stderr

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix format issue

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Add some tracing output and a test

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update jsonrpc using derive

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Delete useless comment

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update test to make it work like a go client

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Report error through rpc

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Upd

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix bug due to instant quit of main

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Sleep longer to prevent certain issue cause by rpc latency

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix some ugly issues

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Delete a useless ugly sleep

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update rpc server

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix test

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Apply rustfmt

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Use signal only to exit

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update test

Signed-off-by: AsterNighT <klxjt99@outlook.com>
2021-03-24 15:32:53 +08:00
Yang Keao 4a991b5fbb Bump version to 0.1.21
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-03-18 15:35:06 +08:00
AsterNighT fe18e46efa
Toda jsonrpc server over stdio (#21)
* Make toda call jsonrpc through stderr after startup

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Setup a jrpc server over stdio and move tracings to stderr

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix format issue

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Add some tracing output and a test

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update jsonrpc using derive

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Delete useless comment

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Update test to make it work like a go client

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Report error through rpc

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Upd

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Fix bug due to instant quit of main

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Sleep longer to prevent certain issue cause by rpc latency

Signed-off-by: AsterNighT <klxjt99@outlook.com>
2021-03-18 15:34:29 +08:00
YangKeao 5aee355f82
fix performance issue && add close (#22)
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-03-15 13:01:24 +08:00
Yang Keao fbf1cb7215 bump version to v0.1.20
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-02-24 18:39:33 +08:00
AsterNighT ab6be766d3
Remove percent field in config (#20)
Signed-off-by: AsterNighT <751841735@qq.com>
2021-02-24 18:35:19 +08:00
Yang Keao c6248aa5c1 bump version to v0.1.19
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-02-24 17:24:25 +08:00
AsterNighT a2ec5f7947
WIP:Fix/mistake (#19)
* Modify config structure to match need of chaos-mesh

Signed-off-by: AsterNighT <751841735@qq.com>

* Fix error and fmt

Signed-off-by: AsterNighT <751841735@qq.com>

* Fix compile errors

Signed-off-by: AsterNighT <751841735@qq.com>

* Fix compile errors

Signed-off-by: AsterNighT <751841735@qq.com>
2021-02-24 17:23:22 +08:00
Yang Keao 6790947de8 bump version to v0.1.18
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-02-22 15:50:34 +08:00
AsterNighT b33b9f97dd
Add mistake injector (#18)
* Add mistake injector

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Rename mistake.class to mistake.filling

Signed-off-by: AsterNighT <klxjt99@outlook.com>

* Refactor mistake generator

Signed-off-by: AsterNighT <klxjt99@outlook.com>
2021-02-22 15:49:29 +08:00
Yang Keao 6a0857dd39 refine ptrace
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-19 20:20:20 +08:00
Yang Keao c52d0f593e auto fix clippy
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-19 16:51:54 +08:00
Yang Keao 32cc198562 run cargo fmt
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-19 16:48:04 +08:00
Yang Keao afdc4ca828 use tracing and fix EIO bugs
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-19 16:44:18 +08:00
Yang Keao 8a4086c542 fix bugs in unlink and rmdir
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-19 14:54:47 +08:00
Yang Keao 9f251055d5 better lock control 2021-01-19 13:59:27 +08:00
Yang Keao 421cd8f076 run
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-18 21:30:56 +08:00
Yang Keao 1ae28e3bda fix test compile bug
Signed-off-by: Yang Keao <keao.yang@yahoo.com>
2021-01-18 21:25:23 +08:00
39 changed files with 1800 additions and 811 deletions

View File

@ -23,7 +23,7 @@ jobs:
- name: Add user_allow_other to /etc/fuse.conf
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
- name: Run tests
run: cargo test --verbose
run: cargo test --verbose -- --test-threads=1
clippy_check:
runs-on: ubuntu-latest
steps:

890
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "toda"
version = "0.1.16"
version = "0.2.4"
authors = ["Yang Keao <keao.yang@yahoo.com>"]
edition = "2018"
@ -10,12 +10,12 @@ edition = "2018"
structopt = "0.3"
nix = "0.18"
anyhow = "1.0"
fuser = {version = "0.6", features = ["abi-7-31"]}
fuser = {version = "0.6", features = ["abi-7-19"]}
time = "0.1"
libc = "0.2"
async-trait = "0.1"
tracing-futures = "0.2"
tokio = {version = "0.2", features = ["rt-core", "rt-threaded", "sync", "fs", "time", "blocking"]}
tokio = {version = "0.2", features = ["rt-core", "rt-threaded", "sync", "fs", "time", "blocking", "macros", "full"]}
tokio-util = "0.6"
thiserror = "1.0"
futures = "0.3"
derive_more = "0.99.9"
@ -30,9 +30,15 @@ once_cell = "1.4"
dynasmrt = "1.0.0"
procfs = "0.8.0"
itertools = "0.9.0"
log = "0.4"
env_logger = "0.8"
retry = "1.2.0"
tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.2"
jsonrpc-stdio-server = "17.0.0"
jsonrpc-derive = "17.0.0"
jsonrpc-core = "17.0.0"
jsonrpc-core-client = "17.0.0"
[profile.release]
debug = true
debug = true

View File

@ -12,7 +12,7 @@ ENV https_proxy $HTTPS_PROXY
RUN apt-get update && apt-get install build-essential curl git pkg-config libfuse-dev fuse -y && rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain nightly-2020-07-01 -y
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain nightly-2021-12-23 -y
ENV PATH "/root/.cargo/bin:${PATH}"
RUN if [ -n "$HTTP_PROXY" ]; then echo "[http]\n\

View File

@ -0,0 +1,15 @@
{
"jsonrpc": "2.0",
"method": "update",
"params": [
[
{
"type": "latency",
"path": "/var/lib/postgresql/data/**/*",
"percent": 100,
"latency": "10s"
}
]
],
"id": 1
}

View File

@ -0,0 +1,23 @@
{
"jsonrpc": "2.0",
"method": "update",
"params": [
[
{
"type": "mistake",
"path": "/var/test/**/*",
"methods": [
"READ",
"WRITE"
],
"mistake": {
"filling": "zero",
"maxOccurrences": 1,
"maxLength": 10000
},
"percent": 100
}
]
],
"id": 1
}

View File

@ -0,0 +1 @@
[]

View File

@ -10,4 +10,6 @@ RUN go build -o /go/bin/app
FROM chaos-mesh/toda
COPY --from=build-env /go/bin/app /
COPY --from=build-env /go/bin/app /main-app
ENV GOMAXPROCS 64
CMD ["/app"]

View File

@ -20,6 +20,7 @@ import (
"strconv"
"syscall"
"time"
"sync"
)
func main() {
@ -33,59 +34,67 @@ func main() {
originalLength := len([]byte("HELLO WORLD"))
for {
var fVec []*os.File
var mMap [][]byte
var wg sync.WaitGroup
for i := 0; i <= 100; i++ {
go func() {
wg.Add(1)
for {
var fVec []*os.File
var mMap [][]byte
for i := 0; i < 100; i++ {
f, err := os.OpenFile("/var/run/test/test", os.O_RDWR, 0666)
if err != nil {
fmt.Printf("Error: %v+", err)
return
}
err = f.Truncate(1024)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
_, err = f.Seek(10, os.SEEK_SET)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
for i := 0; i < 20; i++ {
f, err := os.OpenFile("/var/run/test/test", os.O_RDWR, 0666)
if err != nil {
fmt.Printf("Error: %v+", err)
return
}
err = f.Truncate(1024)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
_, err = f.Seek(10, os.SEEK_SET)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fVec = append(fVec, f)
data, err := syscall.Mmap(int(f.Fd()), 0, 10+originalLength+3, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
fmt.Printf("Error: %v+", err)
return
}
mMap = append(mMap, data)
fVec = append(fVec, f)
data, err := syscall.Mmap(int(f.Fd()), 0, 10+originalLength+3, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
fmt.Printf("Error: %v+", err)
return
}
mMap = append(mMap, data)
f = fVec[i]
data = mMap[i]
f = fVec[i]
data = mMap[i]
count := strconv.Itoa(i)
for pos, char := range count {
if pos < 3 {
data[10+originalLength+pos] = byte(char)
count := strconv.Itoa(i)
for pos, char := range count {
if pos < 3 {
data[10+originalLength+pos] = byte(char)
}
}
time.Sleep(time.Second)
buf := make([]byte, originalLength+len(count))
n, err := f.Read(buf)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("%v %d bytes: %s\n", time.Now(), n, string(buf[:n]))
}
for i := 0; i < 20; i++ {
fVec[i].Close()
syscall.Munmap(mMap[i])
}
}
time.Sleep(time.Second)
buf := make([]byte, originalLength+len(count))
n, err := f.Read(buf)
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("%v %d bytes: %s\n", time.Now(), n, string(buf[:n]))
}
for i := 0; i < 100; i++ {
fVec[i].Close()
syscall.Munmap(mMap[i])
}
}()
}
wg.Wait()
}

View File

@ -1,8 +0,0 @@
[
{
"type": "latency",
"path": "/var/run/test/**/*",
"percent": 100,
"latency": "100ms"
}
]

View File

@ -1 +1 @@
nightly-2020-07-01
nightly-2021-12-23

4
rustfmt.toml Normal file
View File

@ -0,0 +1,4 @@
reorder_imports = true
imports_granularity = "Module"
group_imports = "StdExternalCrate"
unstable_features = true

View File

@ -1,20 +1,18 @@
use std::ffi::OsString;
use std::fmt::Debug;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use fuser::*;
use tracing::trace_span;
use tracing_futures::Instrument;
use super::errors::Result;
use super::reply::*;
use super::runtime::spawn;
use std::ffi::OsString;
use std::fmt::Debug;
use std::sync::Arc;
use std::{
future::Future,
path::{Path, PathBuf},
};
use log::trace;
pub fn spawn_reply<F, R, V>(id: u64, reply: R, f: F)
where
F: Future<Output = Result<V>> + Send + 'static,
@ -22,9 +20,8 @@ where
V: Debug,
{
spawn(async move {
trace!("reply to request({})", id);
let result = f.await;
reply.reply(id, result);
let result = f.instrument(trace_span!("request", id)).await;
reply.reply(result);
});
}
@ -142,7 +139,13 @@ pub trait AsyncFileSystemImpl: Send + Sync {
async fn opendir(&self, ino: u64, flags: i32) -> Result<Open>;
async fn readdir(&self, ino: u64, fh: u64, offset: i64, reply: ReplyDirectory);
async fn readdir(
&self,
ino: u64,
fh: u64,
offset: i64,
reply: &mut ReplyDirectory,
) -> Result<()>;
async fn releasedir(&self, ino: u64, fh: u64, flags: i32) -> Result<()>;
@ -471,10 +474,20 @@ impl<T: AsyncFileSystemImpl + 'static> Filesystem for AsyncFileSystem<T> {
async_impl.opendir(ino, flags).await
});
}
fn readdir(&mut self, _req: &Request, ino: u64, fh: u64, offset: i64, reply: ReplyDirectory) {
fn readdir(
&mut self,
_req: &Request,
ino: u64,
fh: u64,
offset: i64,
mut reply: ReplyDirectory,
) {
let async_impl = self.0.clone();
spawn(async move {
async_impl.readdir(ino, fh, offset, reply).await;
match async_impl.readdir(ino, fh, offset, &mut reply).await {
Ok(_) => reply.ok(),
Err(err) => reply.error(err.into()),
}
});
}
fn releasedir(&mut self, req: &Request, ino: u64, fh: u64, flags: i32, reply: ReplyEmpty) {

View File

@ -1,8 +1,7 @@
use nix::errno::Errno;
use nix::Error;
use thiserror::Error;
use log::error;
use tracing::error;
#[derive(Error, Debug)]
pub enum HookFsError {
@ -69,11 +68,11 @@ impl From<tokio::task::JoinError> for HookFsError {
}
}
impl Into<libc::c_int> for HookFsError {
fn into(self) -> libc::c_int {
impl From<HookFsError> for libc::c_int {
fn from(err: HookFsError) -> libc::c_int {
use HookFsError::*;
match self {
match err {
Sys(errno) => errno as i32,
InodeNotFound { inode: _ } => libc::EFAULT,
FhNotFound { fh: _ } => libc::EFAULT,

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,10 @@
use fuser::*;
use log::{debug, error, trace};
use std::fmt::Debug;
use std::time::Duration;
use super::errors::Result;
use fuser::*;
use tracing::{debug, error, trace};
use std::fmt::Debug;
use super::errors::Result;
const TTL: Duration = Duration::from_secs(0);
@ -28,10 +28,7 @@ pub struct Entry {
}
impl Entry {
pub fn new(stat: FileAttr, generation: u64) -> Self {
Self {
stat,
generation,
}
Self { stat, generation }
}
}
@ -48,12 +45,11 @@ impl Open {
#[derive(Debug)]
pub struct Attr {
pub time: std::time::Duration,
pub attr: FileAttr,
}
impl Attr {
pub fn new(time: std::time::Duration, attr: FileAttr) -> Self {
Self { time, attr }
pub fn new(attr: FileAttr) -> Self {
Self { attr }
}
}
@ -120,12 +116,7 @@ pub struct Create {
pub flags: i32,
}
impl Create {
pub fn new(
attr: FileAttr,
generation: u64,
fh: u64,
flags: i32,
) -> Self {
pub fn new(attr: FileAttr, generation: u64, fh: u64, flags: i32) -> Self {
Self {
attr,
generation,
@ -172,14 +163,14 @@ pub trait FsReply<T: Debug>: Sized {
fn reply_ok(self, item: T);
fn reply_err(self, err: libc::c_int);
fn reply(self, id: u64, result: Result<T>) {
fn reply(self, result: Result<T>) {
match result {
Ok(item) => {
trace!("ok. reply for request({})", id);
trace!("ok");
self.reply_ok(item)
}
Err(err) => {
debug!("err. reply with {} for request ({})", err, id);
debug!("err. reply with {}", err);
let err = err.into();
if err == -1 {
@ -211,7 +202,7 @@ impl FsReply<Open> for ReplyOpen {
impl FsReply<Attr> for ReplyAttr {
fn reply_ok(self, item: Attr) {
self.attr(&item.time, &item.attr);
self.attr(&TTL, &item.attr);
}
fn reply_err(self, err: libc::c_int) {
self.error(err);

View File

@ -1,12 +1,10 @@
use once_cell::sync::Lazy;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use std::future::Future;
use std::sync::RwLock;
use log::trace;
use once_cell::sync::Lazy;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tracing::trace;
pub static RUNTIME: Lazy<RwLock<Option<Runtime>>> = Lazy::new(|| {
trace!("build tokio runtime");

View File

@ -1,14 +1,12 @@
use super::filter;
use super::Injector;
use super::injector_config::{AttrOverrideConfig, FileType as ConfigFileType, FilterConfig};
use crate::hookfs::Result;
use std::path::Path;
use async_trait::async_trait;
use fuser::{FileAttr, FileType};
use log::{debug, trace};
use tracing::{debug, trace};
use std::path::Path;
use super::injector_config::{AttrOverrideConfig, FileType as ConfigFileType, FilterConfig};
use super::{filter, Injector};
use crate::hookfs::Result;
#[derive(Debug)]
pub struct AttrOverrideInjector {

View File

@ -1,15 +1,13 @@
use super::filter;
use super::Injector;
use super::injector_config::FaultsConfig;
use crate::hookfs::{Error, Result};
use std::path::Path;
use async_trait::async_trait;
use log::{debug, trace};
use nix::errno::Errno;
use rand::Rng;
use tracing::{debug, trace};
use std::path::Path;
use super::injector_config::FaultsConfig;
use super::{filter, Injector};
use crate::hookfs::{Error, Result};
#[derive(Debug)]
pub struct FaultInjector {

View File

@ -1,14 +1,13 @@
use std::convert::TryFrom;
use std::path::Path;
use super::injector_config::FilterConfig;
use anyhow::{anyhow, Error, Result};
use bitflags::bitflags;
use glob::{MatchOptions, Pattern};
use rand::Rng;
use tracing::{info, trace};
use log::{info, trace};
use super::injector_config::FilterConfig;
bitflags! {
pub struct Method: u32 {
@ -110,15 +109,13 @@ impl Filter {
.unwrap_or(Method::all());
let path_filter = conf
.path
.map(|path| -> Option<Pattern> {
.path.and_then(|path| -> Option<Pattern> {
if !path.is_empty() {
Pattern::new(&path).ok()
} else {
None
}
})
.flatten();
});
Ok(Self {
path_filter,
methods,

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
@ -9,6 +9,7 @@ pub enum InjectorConfig {
Latency(LatencyConfig),
Fault(FaultsConfig),
AttrOverride(AttrOverrideConfig),
Mistake(MistakesConfig),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -82,3 +83,26 @@ pub struct Timespec {
pub sec: i64,
pub nsec: i32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub enum MistakeType {
Zero,
Random,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MistakeConfig {
pub filling: MistakeType,
pub max_length: usize,
pub max_occurrences: usize,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MistakesConfig {
pub mistake: MistakeConfig,
#[serde(flatten)]
pub filter: FilterConfig,
}

View File

@ -1,20 +1,21 @@
use async_trait::async_trait;
use std::path::Path;
use std::time::Duration;
use super::filter;
use super::injector_config::LatencyConfig;
use super::Injector;
use crate::hookfs::Result;
use log::{debug, trace};
use async_trait::async_trait;
use tokio::time::delay_for;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace};
use super::injector_config::LatencyConfig;
use super::{filter, Injector};
use crate::hookfs::Result;
#[derive(Debug)]
pub struct LatencyInjector {
latency: Duration,
filter: filter::Filter,
cancel_token: CancellationToken,
}
#[async_trait]
@ -22,13 +23,27 @@ impl Injector for LatencyInjector {
async fn inject(&self, method: &filter::Method, path: &Path) -> Result<()> {
trace!("test for filter");
if self.filter.filter(method, path) {
debug!("inject io delay {:?}", self.latency);
delay_for(self.latency).await;
let token = self.cancel_token.clone();
let latency = self.latency;
debug!("inject io delay {:?}", latency);
select! {
_ = delay_for(latency) => {}
_ = token.cancelled() => {
debug!("cancelled");
}
}
debug!("latency finished");
}
Ok(())
}
fn interrupt(&self) {
debug!("interrupt latency");
self.cancel_token.cancel();
}
}
impl LatencyInjector {
@ -38,6 +53,7 @@ impl LatencyInjector {
Ok(Self {
latency: conf.latency,
filter: filter::Filter::build(conf.filter)?,
cancel_token: CancellationToken::new(),
})
}
}

View File

@ -0,0 +1,85 @@
use std::cmp::{max, min};
use std::path::Path;
use async_trait::async_trait;
use rand::Rng;
use tracing::{debug, trace};
use super::injector_config::{MistakeConfig, MistakeType, MistakesConfig};
use super::{filter, Injector};
use crate::hookfs::{Reply, Result};
#[derive(Debug)]
pub struct MistakeInjector {
mistake: MistakeConfig,
filter: filter::Filter,
}
#[async_trait]
impl Injector for MistakeInjector {
async fn inject(&self, _: &filter::Method, _: &Path) -> Result<()> {
debug!("MI:Injecting");
Ok(())
}
fn inject_reply(&self, method: &super::Method, path: &Path, reply: &mut Reply) -> Result<()> {
if self.filter.filter(method, path) {
debug!("MI:Injecting reply");
if let Reply::Data(data) = reply {
let data = &mut data.data;
self.handle(data)?;
}
}
Ok(())
}
fn inject_write_data(&self, path: &Path, data: &mut Vec<u8>) -> Result<()> {
if self.filter.filter(&super::Method::WRITE, path) {
debug!("MI:Injecting write data");
self.handle(data)?;
}
Ok(())
}
}
impl MistakeInjector {
pub fn build(conf: MistakesConfig) -> anyhow::Result<Self> {
trace!("build mistake injector");
Ok(Self {
mistake: conf.mistake,
filter: filter::Filter::build(conf.filter)?,
})
}
pub fn handle(&self, data: &mut Vec<u8>) -> Result<()> {
trace!("sabotage data");
let mut rng = rand::thread_rng();
let data_length = data.len();
let mistake = &self.mistake;
let occurrence = match mistake.max_occurrences {
0 => 0,
mo => rng.gen_range(1, mo + 1),
};
for _ in 0..occurrence {
let pos = rng.gen_range(0, max(data_length, 1));
let length = match min(mistake.max_length, data_length - pos) {
0 => 0,
l => rng.gen_range(1, l + 1),
};
debug!(
"Setting index [{},{}) to {:?}",
pos,
pos + length,
mistake.filling
);
match mistake.filling {
MistakeType::Zero => {
for item in data.iter_mut().skip(pos).take(length) {
*item = 0;
}
}
MistakeType::Random => rng.fill(&mut data[pos..pos + length]),
}
}
Ok(())
}
}

View File

@ -3,17 +3,18 @@ mod fault_injector;
mod filter;
mod injector_config;
mod latency_injector;
mod mistake_injector;
mod multi_injector;
use std::path::Path;
use async_trait::async_trait;
pub use filter::Method;
use fuser::FileAttr;
pub use injector_config::InjectorConfig;
pub use multi_injector::MultiInjector;
use crate::hookfs::{Reply, Result};
use async_trait::async_trait;
use fuser::FileAttr;
use std::path::Path;
#[async_trait]
pub trait Injector: Send + Sync + std::fmt::Debug {
@ -27,6 +28,11 @@ pub trait Injector: Send + Sync + std::fmt::Debug {
) -> Result<()> {
Ok(())
}
fn inject_write_data(&self, _path: &Path, _data: &mut Vec<u8>) -> Result<()> {
Ok(())
}
fn inject_attr(&self, _attr: &mut FileAttr, _path: &Path) {}
fn interrupt(&self) {}
}

View File

@ -1,16 +1,16 @@
use super::attr_override_injector::AttrOverrideInjector;
use super::fault_injector::FaultInjector;
use super::filter;
use super::injector_config::InjectorConfig;
use super::latency_injector::LatencyInjector;
use super::Injector;
use crate::hookfs::{Reply, Result};
use std::path::Path;
use async_trait::async_trait;
use fuser::FileAttr;
use log::trace;
use tracing::trace;
use std::path::Path;
use super::attr_override_injector::AttrOverrideInjector;
use super::fault_injector::FaultInjector;
use super::injector_config::InjectorConfig;
use super::latency_injector::LatencyInjector;
use super::mistake_injector::MistakeInjector;
use super::{filter, Injector};
use crate::hookfs::{Reply, Result};
#[derive(Debug)]
pub struct MultiInjector {
@ -33,6 +33,9 @@ impl MultiInjector {
InjectorConfig::AttrOverride(attr_override) => {
(box AttrOverrideInjector::build(attr_override)?) as Box<dyn Injector>
}
InjectorConfig::Mistake(mistakes) => {
(box MistakeInjector::build(mistakes)?) as Box<dyn Injector>
}
};
injectors.push(injector)
}
@ -64,4 +67,17 @@ impl Injector for MultiInjector {
injector.inject_attr(attr, path)
}
}
fn inject_write_data(&self, path: &Path, data: &mut Vec<u8>) -> Result<()> {
for injector in self.injectors.iter() {
injector.inject_write_data(path, data)?;
}
Ok(())
}
fn interrupt(&self) {
for injector in self.injectors.iter() {
injector.interrupt();
}
}
}

95
src/jsonrpc.rs Normal file
View File

@ -0,0 +1,95 @@
use std::sync::{mpsc, Arc, Mutex};
use jsonrpc_derive::rpc;
use jsonrpc_stdio_server::jsonrpc_core::*;
use jsonrpc_stdio_server::ServerBuilder;
use tracing::{info, trace};
use crate::hookfs::HookFs;
use crate::injector::{InjectorConfig, MultiInjector};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Comm {
Shutdown = 0,
}
pub async fn start_server(config: RpcImpl) {
info!("Starting jsonrpc server");
let server = new_server(config);
let server = server.build();
server.await;
}
pub fn new_server(config: RpcImpl) -> ServerBuilder {
info!("Creating jsonrpc server");
let io = new_handler(config);
ServerBuilder::new(io)
}
pub fn new_handler(config: RpcImpl) -> IoHandler {
info!("Creating jsonrpc handler");
let mut io = IoHandler::new();
io.extend_with(config.to_delegate());
io
}
#[rpc]
pub trait Rpc {
#[rpc(name = "get_status")]
fn get_status(&self, inst: String) -> Result<String>;
#[rpc(name = "update")]
fn update(&self, config: Vec<InjectorConfig>) -> Result<String>;
}
pub struct RpcImpl {
status: Mutex<anyhow::Result<()>>,
tx: Mutex<mpsc::Sender<Comm>>,
hookfs: Option<Arc<HookFs>>,
}
impl RpcImpl {
pub fn new(
status: Mutex<anyhow::Result<()>>,
tx: Mutex<mpsc::Sender<Comm>>,
hookfs: Option<Arc<HookFs>>,
) -> Self {
Self { status, tx, hookfs }
}
}
impl Drop for RpcImpl {
fn drop(&mut self) {
trace!("Dropping jrpc handler");
}
}
impl Rpc for RpcImpl {
fn get_status(&self, _inst: String) -> Result<String> {
info!("rpc get_status called");
match &*self.status.lock().unwrap() {
Ok(_) => Ok("ok".to_string()),
Err(e) => {
let tx = &self.tx.lock().unwrap();
tx.send(Comm::Shutdown)
.expect("Send through channel failed");
Ok(e.to_string())
}
}
}
fn update(&self, config: Vec<InjectorConfig>) -> Result<String> {
info!("rpc update called");
if let Err(e) = &*self.status.lock().unwrap() {
return Ok(e.to_string());
}
let injectors = MultiInjector::build(config);
if let Err(e) = &injectors {
return Ok(e.to_string());
}
futures::executor::block_on(async {
let hookfs = self.hookfs.as_ref().unwrap();
let mut current_injectors = hookfs.injector.write().await;
*current_injectors = injectors.unwrap();
});
Ok("ok".to_string())
}
}

View File

@ -19,6 +19,13 @@
#![allow(clippy::or_fun_call)]
#![allow(clippy::too_many_arguments)]
pub mod injector;
pub mod fuse_device;
pub mod hookfs;
pub mod injector;
pub mod jsonrpc;
pub mod mount;
pub mod mount_injector;
pub mod ptrace;
pub mod replacer;
pub mod stop;
pub mod utils;

View File

@ -24,6 +24,7 @@ extern crate derive_more;
mod fuse_device;
mod hookfs;
mod injector;
mod jsonrpc;
mod mount;
mod mount_injector;
mod ptrace;
@ -31,20 +32,24 @@ mod replacer;
mod stop;
mod utils;
use injector::InjectorConfig;
use mount_injector::{MountInjectionGuard, MountInjector};
use replacer::{Replacer, UnionReplacer};
use utils::encode_path;
use anyhow::Result;
use log::info;
use nix::sys::signal::{signal, SigHandler, Signal};
use nix::unistd::{pipe, read, write};
use structopt::StructOpt;
use env_logger;
use std::convert::TryFrom;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
use std::sync::{mpsc, Mutex};
use std::{io, thread};
use anyhow::Result;
use injector::InjectorConfig;
use jsonrpc::start_server;
use mount_injector::{MountInjectionGuard, MountInjector};
use nix::sys::signal::{signal, SigHandler, Signal};
use nix::unistd::{pipe, read, write};
use replacer::{Replacer, UnionReplacer};
use structopt::StructOpt;
use tokio::runtime::Runtime;
use tracing::{info, instrument};
use tracing_subscriber::EnvFilter;
use utils::encode_path;
#[derive(StructOpt, Debug, Clone)]
#[structopt(name = "basic")]
@ -59,9 +64,8 @@ struct Options {
verbose: String,
}
fn inject(option: Options) -> Result<MountInjectionGuard> {
info!("parse injector configs");
let injector_config: Vec<InjectorConfig> = serde_json::from_reader(std::io::stdin())?;
#[instrument(skip(option))]
fn inject(option: Options, injector_config: Vec<InjectorConfig>) -> Result<MountInjectionGuard> {
info!("inject with config {:?}", injector_config);
let path = option.path.clone();
@ -70,7 +74,7 @@ fn inject(option: Options) -> Result<MountInjectionGuard> {
let path = path.canonicalize()?;
let replacer = if !option.mount_only {
let mut replacer = UnionReplacer::new();
let mut replacer = UnionReplacer::default();
replacer.prepare(&path, &path)?;
Some(replacer)
@ -100,6 +104,7 @@ fn inject(option: Options) -> Result<MountInjectionGuard> {
Ok(mount_guard)
}
#[instrument(skip(option, mount_guard))]
fn resume(option: Options, mount_guard: MountInjectionGuard) -> Result<()> {
info!("disable injection");
mount_guard.disable_injection();
@ -111,7 +116,7 @@ fn resume(option: Options, mount_guard: MountInjectionGuard) -> Result<()> {
let (_, new_path) = encode_path(&path)?;
let replacer = if !option.mount_only {
let mut replacer = UnionReplacer::new();
let mut replacer = UnionReplacer::default();
replacer.prepare(&path, &new_path)?;
info!("running replacer");
let result = replacer.run();
@ -142,6 +147,12 @@ extern "C" fn signal_handler(_: libc::c_int) {
}
}
fn wait_for_signal(chan: RawFd) -> Result<()> {
let mut buf = vec![0u8; 6];
read(chan, buf.as_mut_slice())?;
Ok(())
}
fn main() -> Result<()> {
let (reader, writer) = pipe()?;
unsafe {
@ -152,16 +163,43 @@ fn main() -> Result<()> {
unsafe { signal(Signal::SIGTERM, SigHandler::Handler(signal_handler))? };
let option = Options::from_args();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(&option.verbose)).init();
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_from(&option.verbose))
.or_else(|_| EnvFilter::try_new("trace"))
.unwrap();
tracing_subscriber::fmt()
.with_writer(io::stderr)
.with_env_filter(env_filter)
.init();
info!("start with option: {:?}", option);
let mount_injector = inject(option.clone(), vec![]);
let mount_injector = inject(option.clone())?;
let status = match &mount_injector {
Ok(_) => Ok(()),
Err(e) => Err(anyhow::Error::msg(e.to_string())),
};
let (tx, _) = mpsc::channel();
{
let hookfs = match &mount_injector {
Ok(e) => Some(e.hookfs.clone()),
Err(_) => None,
};
thread::spawn(|| {
Runtime::new()
.expect("Failed to create Tokio runtime")
.block_on(start_server(jsonrpc::RpcImpl::new(
Mutex::new(status),
Mutex::new(tx),
hookfs,
)));
});
}
info!("waiting for signal to exit");
let mut buf = vec![0u8; 6];
read(reader, buf.as_mut_slice())?;
wait_for_signal(reader)?;
info!("start to recover and exit");
resume(option, mount_injector)?;
if let Ok(v) = mount_injector {
resume(option, v)?;
}
Ok(())
}

View File

@ -2,9 +2,7 @@ use std::fs::create_dir_all;
use std::path::Path;
use anyhow::{Context, Result};
use nix::mount::{mount, MsFlags};
use procfs::process::{self, Process};
#[derive(Debug, Clone)]

View File

@ -1,21 +1,16 @@
use crate::hookfs;
use crate::injector::MultiInjector;
use crate::mount;
use crate::stop;
use crate::InjectorConfig;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::JoinHandle;
use anyhow::{anyhow, Result};
use nix::mount::umount;
use retry::delay::Fixed;
use retry::{retry, OperationResult};
use tracing::info;
use log::info;
use retry::{delay::Fixed, retry, OperationResult};
use crate::injector::{InjectorConfig, MultiInjector};
use crate::{hookfs, mount, stop};
#[derive(Debug)]
pub struct MountInjector {
@ -27,7 +22,7 @@ pub struct MountInjector {
pub struct MountInjectionGuard {
original_path: PathBuf,
new_path: PathBuf,
hookfs: Arc<hookfs::HookFs>,
pub hookfs: Arc<hookfs::HookFs>,
handler: Option<JoinHandle<Result<()>>>,
}
@ -46,9 +41,9 @@ impl MountInjectionGuard {
retry(Fixed::from_millis(500).take(20), || {
if let Err(err) = umount(mount_point.as_path()) {
info!("umount returns error: {:?}", err);
return OperationResult::Retry(err);
OperationResult::Retry(err)
} else {
return OperationResult::Ok(());
OperationResult::Ok(())
}
})?;
@ -135,7 +130,7 @@ impl MountInjector {
std::fs::create_dir_all(new_path.as_path())?;
let args = ["allow_other", "fsname=toda", "default_permissions"];
let args = ["allow_other", "fsname=toda", "default_permissions", "nonempty"];
let flags: Vec<_> = args
.iter()
.flat_map(|item| vec![OsStr::new("-o"), OsStr::new(item)])
@ -153,7 +148,7 @@ impl MountInjector {
// TODO: remove this. But wait for FUSE gets up
// Related Issue: https://github.com/zargony/fuse-rs/issues/9
before_mount_waiter.wait();
std::thread::sleep(std::time::Duration::from_secs(1));
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(MountInjectionGuard {
handler: Some(handler),

View File

@ -1,20 +1,25 @@
use anyhow::{anyhow, Result};
use nix::sys::mman::{MapFlags, ProtFlags};
use nix::sys::ptrace;
use nix::sys::signal::{kill, Signal};
use nix::sys::uio::{process_vm_writev, IoVec, RemoteIoVec};
use nix::sys::wait;
use nix::unistd::Pid;
use log::{info, trace, warn};
use procfs::ProcError;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use anyhow::{anyhow, Result};
use nix::errno::Errno;
use nix::sys::mman::{MapFlags, ProtFlags};
use nix::sys::signal::Signal;
use nix::sys::uio::{process_vm_writev, IoVec, RemoteIoVec};
use nix::sys::{ptrace, wait};
use nix::unistd::Pid;
use nix::Error::Sys;
use procfs::process::Task;
use procfs::ProcError;
use retry::delay::Fixed;
use retry::Error::{self, Operation};
use retry::OperationResult;
use tracing::{error, info, instrument, trace, warn};
use Error::Internal;
// There should be only one PtraceManager in one thread. But as we don't implement TLS
// , we cannot use thread-local variables safely.
#[derive(Debug, Default)]
@ -30,7 +35,45 @@ pub fn trace(pid: i32) -> Result<TracedProcess> {
PTRACE_MANAGER.with(|pm| pm.trace(pid))
}
fn thread_is_gone(state: char) -> bool {
// return true if the process is Zombie or Dead
state == 'Z' || state == 'x' || state == 'X'
}
#[instrument]
fn attach_task(task: &Task) -> Result<()> {
let pid = Pid::from_raw(task.tid);
let process = procfs::process::Process::new(task.tid)?;
trace!("attach task: {}", task.tid);
match ptrace::attach(pid) {
Err(Sys(errno))
if errno == Errno::ESRCH
|| (errno == Errno::EPERM && thread_is_gone(process.stat.state)) =>
{
info!("task {} doesn't exist, maybe has stopped", task.tid)
}
Err(err) => {
warn!("attach error: {:?}", err);
return Err(err.into());
}
_ => {}
}
info!("attach task: {} successfully", task.tid);
// TODO: check wait result
match wait::waitpid(pid, Some(wait::WaitPidFlag::__WALL)) {
Ok(status) => {
info!("wait status: {:?}", status);
}
Err(err) => warn!("fail to wait for process({}): {:?}", pid, err),
};
Ok(())
}
impl PtraceManager {
#[instrument(skip(self))]
pub fn trace(&self, pid: i32) -> Result<TracedProcess> {
let raw_pid = pid;
let pid = Pid::from_raw(pid);
@ -39,47 +82,40 @@ impl PtraceManager {
match counter_ref.get_mut(&raw_pid) {
Some(count) => *count += 1,
None => {
trace!("send SIGSTOP to process: {}", pid);
kill(pid, Signal::SIGSTOP)?;
trace!("stop {} successfully", pid);
let process = procfs::process::Process::new(raw_pid)?;
for task in process.tasks()? {
if let Ok(task) = task {
let pid = Pid::from_raw(task.tid);
let mut iterations = 2;
let mut traced_tasks = HashSet::<i32>::new();
info!("attach task: {}", task.tid);
match ptrace::attach(pid) {
Err(nix::Error::Sys(nix::errno::Errno::ESRCH)) => {
info!("task {} doesn't exist, maybe has stopped", task.tid)
}
Err(err) => return Err(err.into()),
_ => {}
while iterations > 0 {
let mut new_threads_found = false;
let process = procfs::process::Process::new(raw_pid)?;
for task in process.tasks()?.flatten() {
if traced_tasks.contains(&task.tid) {
continue;
}
info!("attach task: {} successfully", task.tid);
// TODO: check wait result
match wait::waitpid(pid, Some(wait::WaitPidFlag::__WALL)) {
Ok(status) => {
info!("wait status: {:?}", status);
}
Err(err) => warn!("fail to wait for process({}): {:?}", pid, err),
if let Ok(()) = attach_task(&task) {
trace!("newly traced task: {}", task.tid);
new_threads_found = true;
traced_tasks.insert(task.tid);
}
}
if !new_threads_found {
iterations -= 1;
}
}
info!("trace process: {} successfully", pid);
counter_ref.insert(raw_pid, 1);
info!("send SIGCONT to process: {}", pid);
kill(pid, Signal::SIGCONT)?;
info!("continue {} successfully", pid);
}
}
Ok(TracedProcess { pid: raw_pid })
}
#[instrument(skip(self))]
pub fn detach(&self, pid: i32) -> Result<()> {
let mut counter_ref = self.counter.borrow_mut();
match counter_ref.get_mut(&pid) {
@ -90,27 +126,51 @@ impl PtraceManager {
counter_ref.remove(&pid);
info!("detach process: {}", pid);
match procfs::process::Process::new(pid) {
Ok(process) => {
for task in process.tasks()? {
if let Ok(task) = task {
info!("detach task: {}", task.tid);
match ptrace::detach(Pid::from_raw(task.tid), None) {
Ok(()) => {}
Err(nix::Error::Sys(nix::errno::Errno::ESRCH)) => info!(
"task {} doesn't exist, maybe has stopped",
task.tid
),
Err(err) => return Err(err.into()),
}
info!("detach task: {} successfully", task.tid);
}
if let Err(err) = retry::retry::<_, _, _, anyhow::Error, _>(
Fixed::from_millis(500).take(20),
|| match procfs::process::Process::new(pid) {
Err(ProcError::NotFound(_)) => {
info!("process {} not found", pid);
OperationResult::Ok(())
}
info!("detach process: {} successfully", pid);
Err(err) => {
warn!("fail to detach task: {}, retry", pid);
OperationResult::Retry(err.into())
}
Ok(process) => match process.tasks() {
Err(err) => OperationResult::Retry(err.into()),
Ok(tasks) => {
for task in tasks.flatten() {
match ptrace::detach(Pid::from_raw(task.tid), None) {
Ok(()) => {
info!("successfully detached task: {}", task.tid);
}
Err(Sys(Errno::ESRCH)) => trace!(
"task {} doesn't exist, maybe has stopped or not traced",
task.tid
),
Err(err) => {
warn!("fail to detach: {:?}", err)
},
}
trace!("detach task: {} successfully", task.tid);
}
info!("detach process: {} successfully", pid);
OperationResult::Ok(())
}
},
},
) {
warn!("fail to detach: {:?}", err);
match err {
Operation {
error: e,
total_delay: _,
tries: _,
} => return Err(e),
Internal(err) => error!("internal error: {:?}", err),
}
Err(ProcError::NotFound(_)) => info!("process {} not found", pid),
Err(err) => return Err(err.into()),
}
};
}
Ok(())
@ -133,6 +193,7 @@ impl Clone for TracedProcess {
}
impl TracedProcess {
#[instrument]
fn protect(&self) -> Result<ThreadGuard> {
let regs = ptrace::getregs(Pid::from_raw(self.pid))?;
@ -148,6 +209,7 @@ impl TracedProcess {
Ok(guard)
}
#[instrument(skip(f))]
fn with_protect<R, F: Fn(&Self) -> Result<R>>(&self, f: F) -> Result<R> {
let guard = self.protect()?;
@ -158,6 +220,7 @@ impl TracedProcess {
Ok(ret)
}
#[instrument]
fn syscall(&self, id: u64, args: &[u64]) -> Result<u64> {
trace!("run syscall {} {:?}", id, args);
@ -216,6 +279,7 @@ impl TracedProcess {
})
}
#[instrument]
pub fn mmap(&self, length: u64, fd: u64) -> Result<u64> {
let prot = ProtFlags::PROT_READ | ProtFlags::PROT_WRITE | ProtFlags::PROT_EXEC;
let flags = MapFlags::MAP_PRIVATE | MapFlags::MAP_ANON;
@ -226,10 +290,12 @@ impl TracedProcess {
)
}
#[instrument]
pub fn munmap(&self, addr: u64, len: u64) -> Result<u64> {
self.syscall(11, &[addr, len])
}
#[instrument(skip(f))]
pub fn with_mmap<R, F: Fn(&Self, u64) -> Result<R>>(&self, len: u64, f: F) -> Result<R> {
let addr = self.mmap(len, 0)?;
@ -240,7 +306,8 @@ impl TracedProcess {
Ok(ret)
}
pub fn chdir<P: AsRef<Path>>(&self, filename: P) -> Result<()> {
#[instrument]
pub fn chdir<P: AsRef<Path> + std::fmt::Debug>(&self, filename: P) -> Result<()> {
let filename = CString::new(filename.as_ref().as_os_str().as_bytes())?;
let path = filename.as_bytes_with_nul();
@ -252,6 +319,7 @@ impl TracedProcess {
})
}
#[instrument]
pub fn write_mem(&self, addr: u64, content: &[u8]) -> Result<()> {
let pid = Pid::from_raw(self.pid);
@ -267,6 +335,7 @@ impl TracedProcess {
Ok(())
}
#[instrument(skip(codes))]
pub fn run_codes<F: Fn(u64) -> Result<(u64, Vec<u8>)>>(&self, codes: F) -> Result<()> {
let pid = Pid::from_raw(self.pid);
@ -320,7 +389,7 @@ impl Drop for TracedProcess {
if let Err(err) = PTRACE_MANAGER.with(|pm| pm.detach(self.pid)) {
info!(
"deteching process {} failed with error: {:?}",
"detaching process {} failed with error: {:?}",
self.pid, err
)
}

View File

@ -1,18 +1,15 @@
use super::ptrace;
use super::utils::all_processes;
use super::Replacer;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use anyhow::Result;
use tracing::{error, info, trace};
use log::{error, info, trace};
use super::utils::all_processes;
use super::{ptrace, Replacer};
#[derive(Debug)]
pub struct CwdReplacer {
processes: Vec<ptrace::TracedProcess>,
new_path: PathBuf,
processes: Vec<(ptrace::TracedProcess, PathBuf)>,
}
impl CwdReplacer {
@ -36,8 +33,13 @@ impl CwdReplacer {
}
})
.filter(|(_, path)| path.starts_with(detect_path.as_ref()))
.filter_map(|(pid, _)| match ptrace::trace(pid) {
Ok(process) => Some(process),
.filter_map(|(pid, path)| match ptrace::trace(pid) {
Ok(process) => {
let mut new_path = new_path.as_ref().to_path_buf();
new_path.push(path.strip_prefix(detect_path.as_ref()).unwrap());
Some((process, new_path))
}
Err(err) => {
error!("fail to ptrace process: pid({}) with error: {:?}", pid, err);
None
@ -45,18 +47,16 @@ impl CwdReplacer {
})
.collect();
Ok(CwdReplacer {
processes,
new_path: new_path.as_ref().to_owned(),
})
Ok(CwdReplacer { processes })
}
}
impl Replacer for CwdReplacer {
fn run(&mut self) -> Result<()> {
info!("running cwd replacer");
for process in self.processes.iter() {
process.chdir(&self.new_path)?;
for (process, new_path) in self.processes.iter() {
trace!("replacing cwd: {} to {:?}", process.pid, new_path);
process.chdir(new_path)?;
}
Ok(())

View File

@ -1,21 +1,17 @@
use super::ptrace;
use super::utils::all_processes;
use super::Replacer;
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{Cursor, Read, Write};
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::{collections::HashMap, fmt::Debug};
use anyhow::{anyhow, Result};
use dynasmrt::{dynasm, DynasmApi, DynasmLabelApi};
use log::{error, info, trace};
use procfs::process::FDTarget;
use itertools::Itertools;
use procfs::process::FDTarget;
use tracing::{error, info, trace};
use super::utils::all_processes;
use super::{ptrace, Replacer};
#[derive(Clone, Copy)]
#[repr(packed)]

View File

@ -1,24 +1,18 @@
use super::ptrace;
use super::utils::all_processes;
use super::Replacer;
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::{Cursor, Read, Write};
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use log::{error, info, trace};
use procfs::process::MMapPath;
use anyhow::{anyhow, Result};
use dynasmrt::{dynasm, DynasmApi, DynasmLabelApi};
use itertools::Itertools;
use nix::sys::mman::{MapFlags, ProtFlags};
use procfs::process::MMapPath;
use tracing::{error, info, trace};
use super::utils::all_processes;
use super::{ptrace, Replacer};
#[derive(Clone, Debug)]
struct ReplaceCase {
@ -194,6 +188,7 @@ impl ProcessAccessor {
; mov rdi, QWORD [r14+r15] // addr
; mov rsi, QWORD [r14+r15+8] // length
; mov rdx, 0x0
; push rdi
; syscall
// open
; mov rax, 0x2
@ -206,6 +201,7 @@ impl ProcessAccessor {
; mov rsi, libc::O_RDWR
; mov rdx, 0x0
; syscall
; pop rdi // addr
; push rax
; mov r8, rax // fd
// mmap

View File

@ -1,31 +1,26 @@
use crate::ptrace;
use std::path::Path;
use anyhow::Result;
use crate::ptrace;
mod cwd_replacer;
mod fd_replacer;
mod mmap_replacer;
mod utils;
use log::error;
use tracing::error;
pub trait Replacer {
fn run(&mut self) -> Result<()>;
}
#[derive(Default)]
pub struct UnionReplacer<'a> {
replacers: Vec<Box<dyn Replacer + 'a>>,
}
impl<'a> UnionReplacer<'a> {
pub fn new() -> UnionReplacer<'a> {
UnionReplacer {
replacers: Vec::new(),
}
}
pub fn prepare<P1: AsRef<Path>, P2: AsRef<Path>>(
&mut self,
detect_path: P1,

View File

@ -1,5 +1,4 @@
use anyhow::Result;
use procfs::process::{self, Process};
pub fn all_processes() -> Result<impl Iterator<Item = Process>> {

View File

@ -1,5 +1,4 @@
use std::sync::Arc;
use std::sync::{Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
struct Stop {
inner: Mutex<bool>,

57
tests/jsonrpc_test.rs Normal file
View File

@ -0,0 +1,57 @@
use std::sync::mpsc::channel;
use std::sync::Mutex;
use anyhow::anyhow;
use toda::jsonrpc::{self, new_handler, Comm};
#[test]
fn test_status_good() {
let (tx, _rx) = channel();
let io = new_handler(jsonrpc::RpcImpl::new(
Mutex::new(Ok(())),
Mutex::new(tx),
None,
));
let request = r#"{"jsonrpc": "2.0","method":"get_status","params":[""],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":"ok","id":1}"#;
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
}
#[test]
fn test_status_bad() {
let (tx, rx) = channel();
let io = new_handler(jsonrpc::RpcImpl::new(
Mutex::new(Err(anyhow!("Not good"))),
Mutex::new(tx),
None,
));
let request = r#"{"jsonrpc": "2.0","method":"get_status","params":[""],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":"Not good","id":1}"#;
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
assert_eq!(rx.recv().unwrap(), Comm::Shutdown);
}
#[test]
fn test_should_not_update_config_if_status_is_failed() {
let (tx, _rx) = channel();
let request = r#"{"jsonrpc": "2.0","method":"update","params":[[]],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":"Not good","id":1}"#;
let io = new_handler(jsonrpc::RpcImpl::new(
Mutex::new(Err(anyhow!("Not good"))),
Mutex::new(tx),
None,
));
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
}
#[test]
fn test_should_fail_if_config_is_bad() {
let (tx, _rx) = channel();
let request = r#"{"jsonrpc": "2.0","method":"update","params":[["blah"]],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid params: invalid type: string \"blah\", expected internally tagged enum."},"id":1}"#;
let io = new_handler(jsonrpc::RpcImpl::new(
Mutex::new(Ok(())),
Mutex::new(tx),
None,
));
assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
}

View File

@ -11,20 +11,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use toda::hookfs;
use toda::injector::MultiInjector;
use std::ffi::OsStr;
use std::fs::{read_link, read_to_string, write, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::symlink;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Once;
use std::sync::{Arc, Once};
use nix::fcntl;
use nix::sys::stat;
use nix::unistd;
use nix::{fcntl, unistd};
use toda::hookfs;
use toda::injector::MultiInjector;
// These tests are port from go-fuse test
@ -35,7 +32,7 @@ fn init(name: &str) -> (PathBuf, fuser::BackgroundSession) {
let test_path: PathBuf = ["/tmp/test_mnt", name].iter().collect();
INIT.call_once(|| {
flexi_logger::Logger::with_env().start().unwrap();
env_logger::init();
});
std::fs::remove_dir_all(&test_path_backend).ok();