mirror of https://github.com/tikv/raft-engine.git
Compare commits
27 Commits
Author | SHA1 | Date |
---|---|---|
|
d53b2b9505 | |
|
03f77d90c0 | |
|
392f5e66f8 | |
|
7e12385c31 | |
|
6bb58c379b | |
|
de1ec93752 | |
|
e1c5dd80a6 | |
|
209e00d8c0 | |
|
0e40c53566 | |
|
897e0ebf67 | |
|
c7828cb3e7 | |
|
3ad316a3c4 | |
|
8ef3b42057 | |
|
c1f81d4d06 | |
|
cf5d1b9144 | |
|
cd1533d79c | |
|
084c5ca0e4 | |
|
e505d631c8 | |
|
d043b4ad4e | |
|
e8de5d75ae | |
|
385182bfc2 | |
|
70a8873fbb | |
|
fa56f891fd | |
|
9aa124558c | |
|
22dfb426cd | |
|
3724bfc510 | |
|
ec6aa90163 |
|
@ -1,6 +1,15 @@
|
||||||
name: Rust
|
name: Rust
|
||||||
|
|
||||||
on: [push, pull_request]
|
on:
|
||||||
|
push:
|
||||||
|
branches-ignore:
|
||||||
|
- dependabot/**
|
||||||
|
|
||||||
|
pull_request:
|
||||||
|
paths-ignore:
|
||||||
|
- "OWNERS"
|
||||||
|
- "OWNERS_ALIASES"
|
||||||
|
- "*.md"
|
||||||
|
|
||||||
env:
|
env:
|
||||||
CARGO_TERM_COLOR: always
|
CARGO_TERM_COLOR: always
|
||||||
|
@ -10,7 +19,9 @@ jobs:
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
os: [ ubuntu-latest, macos-latest, windows-latest ]
|
# tests with sanitizer on `ubuntu-latest` is supported by
|
||||||
|
# self-defined `nihtly test` job, so we don't need to test it here.
|
||||||
|
os: [ macos-latest ]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
|
@ -19,7 +30,7 @@ jobs:
|
||||||
uses: actions-rs/toolchain@v1
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
toolchain: nightly-2023-07-01
|
toolchain: nightly-2025-04-03
|
||||||
override: true
|
override: true
|
||||||
components: rustfmt, clippy, rust-src
|
components: rustfmt, clippy, rust-src
|
||||||
- uses: Swatinem/rust-cache@v1
|
- uses: Swatinem/rust-cache@v1
|
||||||
|
@ -34,19 +45,13 @@ jobs:
|
||||||
git diff --exit-code
|
git diff --exit-code
|
||||||
- name: Clippy
|
- name: Clippy
|
||||||
run: make clippy
|
run: make clippy
|
||||||
|
env:
|
||||||
|
EXTRA_CARGO_ARGS: '--fix'
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: make test
|
run: make test
|
||||||
env:
|
env:
|
||||||
RUST_BACKTRACE: 1
|
RUST_BACKTRACE: 1
|
||||||
EXTRA_CARGO_ARGS: '--verbose'
|
EXTRA_CARGO_ARGS: '--verbose'
|
||||||
- name: Run asan tests
|
|
||||||
if: ${{ matrix.os == 'ubuntu-latest' }}
|
|
||||||
run: make test
|
|
||||||
env:
|
|
||||||
RUST_BACKTRACE: 1
|
|
||||||
RUSTFLAGS: '-Zsanitizer=address'
|
|
||||||
RUSTDOCFLAGS: '-Zsanitizer=address'
|
|
||||||
EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu'
|
|
||||||
stable:
|
stable:
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
strategy:
|
strategy:
|
||||||
|
@ -60,7 +65,7 @@ jobs:
|
||||||
uses: actions-rs/toolchain@v1
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
toolchain: 1.66.0
|
toolchain: 1.85.0
|
||||||
override: true
|
override: true
|
||||||
components: rustfmt, clippy, rust-src
|
components: rustfmt, clippy, rust-src
|
||||||
- uses: Swatinem/rust-cache@v1
|
- uses: Swatinem/rust-cache@v1
|
||||||
|
@ -87,7 +92,7 @@ jobs:
|
||||||
uses: actions-rs/toolchain@v1
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
toolchain: nightly-2023-07-01
|
toolchain: nightly-2025-04-03
|
||||||
override: true
|
override: true
|
||||||
components: llvm-tools-preview
|
components: llvm-tools-preview
|
||||||
- uses: Swatinem/rust-cache@v1
|
- uses: Swatinem/rust-cache@v1
|
||||||
|
@ -96,15 +101,16 @@ jobs:
|
||||||
- name: Install grcov
|
- name: Install grcov
|
||||||
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
|
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: |
|
run: make test_matrix
|
||||||
make test_matrix
|
|
||||||
env:
|
env:
|
||||||
RUSTFLAGS: '-Zinstrument-coverage'
|
RUSTFLAGS: '-Cinstrument-coverage'
|
||||||
LLVM_PROFILE_FILE: '%p-%m.profraw'
|
LLVM_PROFILE_FILE: '%p-%m.profraw'
|
||||||
EXTRA_CARGO_ARGS: '--verbose'
|
EXTRA_CARGO_ARGS: '--verbose'
|
||||||
- name: Run grcov
|
- name: Run grcov
|
||||||
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
|
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
|
||||||
- name: Upload
|
- name: Upload
|
||||||
uses: codecov/codecov-action@v2
|
uses: codecov/codecov-action@v3
|
||||||
|
env:
|
||||||
|
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||||
with:
|
with:
|
||||||
file: coverage.lcov
|
file: coverage.lcov
|
||||||
|
|
18
CHANGELOG.md
18
CHANGELOG.md
|
@ -2,6 +2,24 @@
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### New Features
|
||||||
|
|
||||||
|
* Add a new Prometheus metric `raft_engine_write_compression_ratio` to track compression ratio of write #358
|
||||||
|
|
||||||
|
## [0.4.2] - 2024-04-16
|
||||||
|
|
||||||
|
### Behavior Changes
|
||||||
|
|
||||||
|
* Periodically flush unsynced bytes when rewriting to avoid I/O jitters if flushing too many bytes impede the foreground writes. (#347)
|
||||||
|
* Errors will be returned if rewriting fails, instread of `panic` directly. (#343)
|
||||||
|
|
||||||
|
## [0.4.1] - 2023-09-14
|
||||||
|
|
||||||
|
### Behavior Changes
|
||||||
|
|
||||||
|
* When log recycling is enabled, Raft Engine will now retain 50% more log files to reduce the chance of running out.
|
||||||
|
* Reduce the scope of keys reserved for internal use.
|
||||||
|
|
||||||
## [0.4.0] - 2023-09-01
|
## [0.4.0] - 2023-09-01
|
||||||
|
|
||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
|
|
48
Cargo.toml
48
Cargo.toml
|
@ -1,9 +1,9 @@
|
||||||
[package]
|
[package]
|
||||||
name = "raft-engine"
|
name = "raft-engine"
|
||||||
version = "0.4.0"
|
version = "0.4.2"
|
||||||
authors = ["The TiKV Project Developers"]
|
authors = ["The TiKV Project Developers"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
rust-version = "1.66.0"
|
rust-version = "1.85.0"
|
||||||
description = "A persistent storage engine for Multi-Raft logs"
|
description = "A persistent storage engine for Multi-Raft logs"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
repository = "https://github.com/tikv/raft-engine"
|
repository = "https://github.com/tikv/raft-engine"
|
||||||
|
@ -42,9 +42,12 @@ hex = "0.4"
|
||||||
if_chain = "1.0"
|
if_chain = "1.0"
|
||||||
lazy_static = "1.3"
|
lazy_static = "1.3"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
|
log = { version = "0.4", features = [
|
||||||
lz4-sys = "1.9"
|
"max_level_trace",
|
||||||
memmap2 = { version = "0.7", optional = true }
|
"release_max_level_debug",
|
||||||
|
] }
|
||||||
|
lz4-sys = { version = "=1.9.5" }
|
||||||
|
memmap2 = { version = "0.9", optional = true }
|
||||||
nix = "0.26"
|
nix = "0.26"
|
||||||
num-derive = "0.4"
|
num-derive = "0.4"
|
||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
|
@ -55,38 +58,33 @@ protobuf = "2"
|
||||||
rayon = "1.5"
|
rayon = "1.5"
|
||||||
rhai = { version = "1.7", features = ["sync"], optional = true }
|
rhai = { version = "1.7", features = ["sync"], optional = true }
|
||||||
scopeguard = "1.1"
|
scopeguard = "1.1"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "=1.0.194", features = ["derive"] }
|
||||||
serde_repr = "0.1"
|
serde_repr = "0.1"
|
||||||
strum = { version = "0.25.0", features = ["derive"] }
|
strum = { version = "0.26.2", features = ["derive"] }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = "0.4"
|
criterion = "0.4"
|
||||||
ctor = "0.2"
|
ctor = "0.2"
|
||||||
env_logger = "0.10"
|
env_logger = "0.10"
|
||||||
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] }
|
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
|
||||||
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
|
"protobuf-codec",
|
||||||
|
] }
|
||||||
|
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
|
||||||
|
"protobuf-codec",
|
||||||
|
] }
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
rand_distr = "0.4"
|
rand_distr = "0.4"
|
||||||
tempfile = "3.6"
|
tempfile = "3.6"
|
||||||
toml = "0.7"
|
toml = "0.8"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["internals", "scripting"]
|
default = ["internals", "scripting"]
|
||||||
internals = []
|
internals = []
|
||||||
nightly = [
|
nightly = ["prometheus/nightly"]
|
||||||
"prometheus/nightly",
|
failpoints = ["fail/failpoints"]
|
||||||
]
|
scripting = ["rhai"]
|
||||||
failpoints = [
|
swap = ["nightly", "memmap2"]
|
||||||
"fail/failpoints",
|
|
||||||
]
|
|
||||||
scripting = [
|
|
||||||
"rhai",
|
|
||||||
]
|
|
||||||
swap = [
|
|
||||||
"nightly",
|
|
||||||
"memmap2",
|
|
||||||
]
|
|
||||||
std_fs = []
|
std_fs = []
|
||||||
|
|
||||||
nightly_group = ["nightly", "swap"]
|
nightly_group = ["nightly", "swap"]
|
||||||
|
@ -95,8 +93,8 @@ nightly_group = ["nightly", "swap"]
|
||||||
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
|
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
|
||||||
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
|
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
|
||||||
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
|
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
|
||||||
# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged.
|
# Lock the version of cc-rs to avoid build failure on MacOS, ref https://github.com/rust-lang/cc-rs/issues/984.
|
||||||
grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" }
|
cc = { git = "https://github.com/rust-lang/cc-rs", tag = "1.0.98" }
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["stress", "ctl"]
|
members = ["stress", "ctl"]
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
# See the OWNERS docs at https://go.k8s.io/owners
|
||||||
|
approvers:
|
||||||
|
- 5kbpers
|
||||||
|
- AndreMouche
|
||||||
|
- andylokandy
|
||||||
|
- breezewish
|
||||||
|
- brson
|
||||||
|
- bufferflies
|
||||||
|
- BusyJay
|
||||||
|
- cfzjywxk
|
||||||
|
- Connor1996
|
||||||
|
- coocood
|
||||||
|
- crazycs520
|
||||||
|
- disksing
|
||||||
|
- ekexium
|
||||||
|
- gengliqi
|
||||||
|
- glorv
|
||||||
|
- hicqu
|
||||||
|
- hunterlxt
|
||||||
|
- imtbkcat
|
||||||
|
- innerr
|
||||||
|
- iosmanthus
|
||||||
|
- jackysp
|
||||||
|
- kennytm
|
||||||
|
- Little-Wallace
|
||||||
|
- liuzix
|
||||||
|
- lonng
|
||||||
|
- LykxSassinator
|
||||||
|
- lysu
|
||||||
|
- marsishandsome
|
||||||
|
- MyonKeminta
|
||||||
|
- niedhui
|
||||||
|
- NingLin-P
|
||||||
|
- nrc
|
||||||
|
- overvenus
|
||||||
|
- pingyu
|
||||||
|
- skyzh
|
||||||
|
- SpadeA-Tang
|
||||||
|
- sticnarf
|
||||||
|
- sunxiaoguang
|
||||||
|
- tabokie
|
||||||
|
- TennyZhuang
|
||||||
|
- tonyxuqqi
|
||||||
|
- v01dstar
|
||||||
|
- yiwu-arbug
|
||||||
|
- you06
|
||||||
|
- youjiali1995
|
||||||
|
- YuJuncen
|
||||||
|
- zhangjinpeng87
|
||||||
|
- zhongzc
|
||||||
|
- zhouqiang-cl
|
||||||
|
- zyguan
|
||||||
|
reviewers:
|
||||||
|
- 3AceShowHand
|
||||||
|
- 3pointer
|
||||||
|
- CalvinNeo
|
||||||
|
- ethercflow
|
||||||
|
- fredchenbj
|
||||||
|
- Fullstop000
|
||||||
|
- gozssky
|
||||||
|
- haojinming
|
||||||
|
- hbisheng
|
||||||
|
- hhwyt
|
||||||
|
- HuSharp
|
||||||
|
- jayzhan211
|
||||||
|
- Jibbow
|
||||||
|
- JmPotato
|
||||||
|
- Leavrth
|
||||||
|
- lhy1024
|
||||||
|
- longfangsong
|
||||||
|
- lzmhhh123
|
||||||
|
- Mossaka
|
||||||
|
- MrCroxx
|
||||||
|
- nolouch
|
||||||
|
- rleungx
|
||||||
|
- Rustin170506
|
||||||
|
- tier-cap
|
||||||
|
- wjhuang2016
|
||||||
|
- wshwsh12
|
||||||
|
- Xuanwo
|
|
@ -54,7 +54,7 @@ Put this in your Cargo.toml:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
[dependencies]
|
[dependencies]
|
||||||
raft-engine = "0.4.0"
|
raft-engine = "0.4"
|
||||||
```
|
```
|
||||||
|
|
||||||
Available Cargo features:
|
Available Cargo features:
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
[package]
|
[package]
|
||||||
name = "raft-engine-ctl"
|
name = "raft-engine-ctl"
|
||||||
version = "0.4.0"
|
version = "0.4.2"
|
||||||
authors = ["The TiKV Project Developers"]
|
authors = ["The TiKV Project Developers"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
rust-version = "1.61.0"
|
rust-version = "1.75.0"
|
||||||
description = "A control tool for Raft Engine"
|
description = "A control tool for Raft Engine"
|
||||||
repository = "https://github.com/tikv/raft-engine"
|
repository = "https://github.com/tikv/raft-engine"
|
||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
|
@ -11,4 +11,7 @@ license = "Apache-2.0"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = { version = "3.1", features = ["derive", "cargo"] }
|
clap = { version = "3.1", features = ["derive", "cargo"] }
|
||||||
env_logger = "0.10"
|
env_logger = "0.10"
|
||||||
raft-engine = { path = "..", version = "0.4.0", features = ["scripting", "internals"] }
|
raft-engine = { path = "..", version = "0.4.1", features = [
|
||||||
|
"scripting",
|
||||||
|
"internals",
|
||||||
|
] }
|
||||||
|
|
|
@ -360,7 +360,6 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use protobuf::CodedOutputStream;
|
use protobuf::CodedOutputStream;
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use std::{f32, f64, i16, i32, i64, u16, u32, u64};
|
|
||||||
|
|
||||||
const U16_TESTS: &[u16] = &[
|
const U16_TESTS: &[u16] = &[
|
||||||
i16::MIN as u16,
|
i16::MIN as u16,
|
||||||
|
|
|
@ -116,7 +116,7 @@ pub struct Config {
|
||||||
pub prefill_for_recycle: bool,
|
pub prefill_for_recycle: bool,
|
||||||
|
|
||||||
/// Maximum capacity for preparing log files for recycling when start.
|
/// Maximum capacity for preparing log files for recycling when start.
|
||||||
/// If not `None`, its size is equal to `purge-threshold`.
|
/// If `None`, its size is equal to `purge-threshold`*1.5.
|
||||||
/// Only available for `prefill-for-recycle` is true.
|
/// Only available for `prefill-for-recycle` is true.
|
||||||
///
|
///
|
||||||
/// Default: None
|
/// Default: None
|
||||||
|
@ -219,10 +219,10 @@ impl Config {
|
||||||
}
|
}
|
||||||
if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
|
if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
|
||||||
// (1) At most u32::MAX so that the file number can be capped into an u32
|
// (1) At most u32::MAX so that the file number can be capped into an u32
|
||||||
// without colliding. (2) Add some more file as an additional buffer to
|
// without colliding. (2) Increase the threshold by 50% to add some more file
|
||||||
// avoid jitters.
|
// as an additional buffer to avoid jitters.
|
||||||
std::cmp::min(
|
std::cmp::min(
|
||||||
(self.purge_threshold.0 / self.target_file_size.0) as usize + 2,
|
(self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2,
|
||||||
u32::MAX as usize,
|
u32::MAX as usize,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
@ -241,7 +241,7 @@ impl Config {
|
||||||
if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
|
if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
|
||||||
// Keep same with the maximum setting of `recycle_capacity`.
|
// Keep same with the maximum setting of `recycle_capacity`.
|
||||||
std::cmp::min(
|
std::cmp::min(
|
||||||
(prefill_limit / self.target_file_size.0) as usize + 2,
|
(prefill_limit / self.target_file_size.0) as usize * 3 / 2,
|
||||||
u32::MAX as usize,
|
u32::MAX as usize,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -142,7 +142,7 @@ where
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let len = log_batch.finish_populate(
|
let (len, compression_ratio) = log_batch.finish_populate(
|
||||||
self.cfg.batch_compression_threshold.0 as usize,
|
self.cfg.batch_compression_threshold.0 as usize,
|
||||||
self.cfg.compression_level,
|
self.cfg.compression_level,
|
||||||
)?;
|
)?;
|
||||||
|
@ -172,7 +172,7 @@ where
|
||||||
}
|
}
|
||||||
perf_context!(log_write_duration).observe_since(now);
|
perf_context!(log_write_duration).observe_since(now);
|
||||||
if sync {
|
if sync {
|
||||||
// As per trait protocol, this error should be retriable. But we panic anyway to
|
// As per trait protocol, sync error should be retriable. But we panic anyway to
|
||||||
// save the trouble of propagating it to other group members.
|
// save the trouble of propagating it to other group members.
|
||||||
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
|
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
|
||||||
}
|
}
|
||||||
|
@ -225,6 +225,7 @@ where
|
||||||
now = end;
|
now = end;
|
||||||
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
|
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
|
||||||
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
|
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
|
||||||
|
ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM.observe(compression_ratio);
|
||||||
Ok(len)
|
Ok(len)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,7 +338,27 @@ where
|
||||||
.read()
|
.read()
|
||||||
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
|
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
|
||||||
for i in ents_idx.iter() {
|
for i in ents_idx.iter() {
|
||||||
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
|
vec.push({
|
||||||
|
match read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i) {
|
||||||
|
Ok(entry) => entry,
|
||||||
|
Err(e) => {
|
||||||
|
// The index is not found in the file, it means the entry is already
|
||||||
|
// stale by `compact` or `rewrite`. Retry to read the entry from the
|
||||||
|
// memtable.
|
||||||
|
let immutable = memtable.read();
|
||||||
|
// Ensure that the corresponding memtable is locked with a read lock
|
||||||
|
// before completing the fetching of entries
|
||||||
|
// from the raft logs. This prevents the
|
||||||
|
// scenario where the index could become stale while
|
||||||
|
// being concurrently updated by the `rewrite` operation.
|
||||||
|
if let Some(idx) = immutable.get_entry(i.index) {
|
||||||
|
read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &idx)?
|
||||||
|
} else {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
|
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
|
||||||
return Ok(ents_idx.len());
|
return Ok(ents_idx.len());
|
||||||
|
@ -634,9 +655,11 @@ pub(crate) mod tests {
|
||||||
use crate::util::ReadableSize;
|
use crate::util::ReadableSize;
|
||||||
use kvproto::raft_serverpb::RaftLocalState;
|
use kvproto::raft_serverpb::RaftLocalState;
|
||||||
use raft::eraftpb::Entry;
|
use raft::eraftpb::Entry;
|
||||||
|
use rand::{thread_rng, Rng};
|
||||||
use std::collections::{BTreeSet, HashSet};
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
|
pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
|
||||||
impl<F: FileSystem> RaftLogEngine<F> {
|
impl<F: FileSystem> RaftLogEngine<F> {
|
||||||
|
@ -1928,8 +1951,6 @@ pub(crate) mod tests {
|
||||||
#[cfg(feature = "nightly")]
|
#[cfg(feature = "nightly")]
|
||||||
#[bench]
|
#[bench]
|
||||||
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
|
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
|
||||||
use rand::{thread_rng, Rng};
|
|
||||||
|
|
||||||
let dir = tempfile::Builder::new()
|
let dir = tempfile::Builder::new()
|
||||||
.prefix("bench_engine_fetch_entries")
|
.prefix("bench_engine_fetch_entries")
|
||||||
.tempdir()
|
.tempdir()
|
||||||
|
@ -2162,6 +2183,7 @@ pub(crate) mod tests {
|
||||||
prefill_for_recycle: true,
|
prefill_for_recycle: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
let recycle_capacity = cfg.recycle_capacity() as u64;
|
||||||
let fs = Arc::new(DeleteMonitoredFileSystem::new());
|
let fs = Arc::new(DeleteMonitoredFileSystem::new());
|
||||||
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
|
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
|
||||||
|
|
||||||
|
@ -2197,11 +2219,11 @@ pub(crate) mod tests {
|
||||||
assert_eq!(reserved_start_2, reserved_start_3);
|
assert_eq!(reserved_start_2, reserved_start_3);
|
||||||
|
|
||||||
// Reuse all of reserved files.
|
// Reuse all of reserved files.
|
||||||
for rid in 1..=50 {
|
for rid in 1..=recycle_capacity {
|
||||||
engine.append(rid, 1, 11, Some(&entry_data));
|
engine.append(rid, 1, 11, Some(&entry_data));
|
||||||
}
|
}
|
||||||
assert!(fs.reserved_metadata.lock().unwrap().is_empty());
|
assert!(fs.reserved_metadata.lock().unwrap().is_empty());
|
||||||
for rid in 1..=50 {
|
for rid in 1..=recycle_capacity {
|
||||||
engine.clean(rid);
|
engine.clean(rid);
|
||||||
}
|
}
|
||||||
engine.purge_manager.must_rewrite_append_queue(None, None);
|
engine.purge_manager.must_rewrite_append_queue(None, None);
|
||||||
|
@ -2585,6 +2607,53 @@ pub(crate) mod tests {
|
||||||
assert!(data.is_empty(), "data loss {:?}", data);
|
assert!(data.is_empty(), "data loss {:?}", data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_fetch_with_concurrently_rewrite() {
|
||||||
|
let dir = tempfile::Builder::new()
|
||||||
|
.prefix("test_fetch_with_concurrently_rewrite")
|
||||||
|
.tempdir()
|
||||||
|
.unwrap();
|
||||||
|
let cfg = Config {
|
||||||
|
dir: dir.path().to_str().unwrap().to_owned(),
|
||||||
|
target_file_size: ReadableSize(2048),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let fs = Arc::new(DeleteMonitoredFileSystem::new());
|
||||||
|
let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap());
|
||||||
|
let entry_data = vec![b'x'; 128];
|
||||||
|
// Set up a concurrent write with purge, and fetch.
|
||||||
|
let mut vec: Vec<Entry> = Vec::new();
|
||||||
|
let fetch_engine = engine.clone();
|
||||||
|
let flag = Arc::new(AtomicBool::new(false));
|
||||||
|
let start_flag = flag.clone();
|
||||||
|
let th = std::thread::spawn(move || {
|
||||||
|
while !start_flag.load(Ordering::Acquire) {
|
||||||
|
std::thread::sleep(Duration::from_millis(10));
|
||||||
|
}
|
||||||
|
for _ in 0..10 {
|
||||||
|
let region_id = thread_rng().gen_range(1..=10);
|
||||||
|
// Should not return file seqno out of range error.
|
||||||
|
let _ = fetch_engine
|
||||||
|
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
|
||||||
|
.map_err(|e| {
|
||||||
|
assert!(!format!("{e}").contains("file seqno out of"));
|
||||||
|
});
|
||||||
|
vec.clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for i in 0..10 {
|
||||||
|
for rid in 1..=10 {
|
||||||
|
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
|
||||||
|
}
|
||||||
|
flag.store(true, Ordering::Release);
|
||||||
|
for rid in 1..=10 {
|
||||||
|
engine.clean(rid);
|
||||||
|
}
|
||||||
|
engine.purge_expired_files().unwrap();
|
||||||
|
}
|
||||||
|
th.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_internal_key_filter() {
|
fn test_internal_key_filter() {
|
||||||
let dir = tempfile::Builder::new()
|
let dir = tempfile::Builder::new()
|
||||||
|
@ -2704,6 +2773,7 @@ pub(crate) mod tests {
|
||||||
purge_threshold: ReadableSize(40),
|
purge_threshold: ReadableSize(40),
|
||||||
..cfg.clone()
|
..cfg.clone()
|
||||||
};
|
};
|
||||||
|
let recycle_capacity = cfg_2.recycle_capacity() as u64;
|
||||||
let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
|
let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
|
||||||
assert!(number_of_files(spill_dir.path()) > 0);
|
assert!(number_of_files(spill_dir.path()) > 0);
|
||||||
for rid in 1..=10 {
|
for rid in 1..=10 {
|
||||||
|
@ -2718,7 +2788,7 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
assert!(file_count > engine.file_count(None));
|
assert!(file_count > engine.file_count(None));
|
||||||
// Append data, recycled files are reused.
|
// Append data, recycled files are reused.
|
||||||
for rid in 1..=30 {
|
for rid in 1..=recycle_capacity - 10 {
|
||||||
engine.append(rid, 20, 30, Some(&entry_data));
|
engine.append(rid, 20, 30, Some(&entry_data));
|
||||||
}
|
}
|
||||||
// No new file is created.
|
// No new file is created.
|
||||||
|
|
|
@ -62,7 +62,7 @@ impl Read for LogFile {
|
||||||
impl Seek for LogFile {
|
impl Seek for LogFile {
|
||||||
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
|
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
|
||||||
fail_point!("log_file::seek::err", |_| {
|
fail_point!("log_file::seek::err", |_| {
|
||||||
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
|
Err(std::io::Error::other("fp"))
|
||||||
});
|
});
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(offset) => self.offset = offset as usize,
|
SeekFrom::Start(offset) => self.offset = offset as usize,
|
||||||
|
|
|
@ -83,7 +83,7 @@ impl LogFd {
|
||||||
while readed < buf.len() {
|
while readed < buf.len() {
|
||||||
let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
|
let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
|
||||||
Ok(bytes) => bytes,
|
Ok(bytes) => bytes,
|
||||||
Err(e) if e == Errno::EINTR => continue,
|
Err(Errno::EINTR) => continue,
|
||||||
Err(e) => return Err(from_nix_error(e, "pread")),
|
Err(e) => return Err(from_nix_error(e, "pread")),
|
||||||
};
|
};
|
||||||
// EOF
|
// EOF
|
||||||
|
@ -106,7 +106,7 @@ impl LogFd {
|
||||||
while written < content.len() {
|
while written < content.len() {
|
||||||
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
|
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
|
||||||
Ok(bytes) => bytes,
|
Ok(bytes) => bytes,
|
||||||
Err(e) if e == Errno::EINTR => continue,
|
Err(Errno::EINTR) => continue,
|
||||||
Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
|
Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
|
||||||
Err(e) => return Err(from_nix_error(e, "pwrite")),
|
Err(e) => return Err(from_nix_error(e, "pwrite")),
|
||||||
};
|
};
|
||||||
|
|
|
@ -43,6 +43,8 @@ pub struct LogFileWriter<F: FileSystem> {
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All APIs provided by `LogFileWriter` are fail-safe, i.e. caller can continue
|
||||||
|
// using the same "writer" even if the previous operation failed.
|
||||||
impl<F: FileSystem> LogFileWriter<F> {
|
impl<F: FileSystem> LogFileWriter<F> {
|
||||||
fn open(
|
fn open(
|
||||||
handle: Arc<F::Handle>,
|
handle: Arc<F::Handle>,
|
||||||
|
@ -67,7 +69,7 @@ impl<F: FileSystem> LogFileWriter<F> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
|
fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
|
||||||
self.writer.seek(SeekFrom::Start(0))?;
|
self.writer.rewind()?;
|
||||||
self.written = 0;
|
self.written = 0;
|
||||||
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
|
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
|
||||||
format.encode(&mut buf).unwrap();
|
format.encode(&mut buf).unwrap();
|
||||||
|
@ -105,13 +107,12 @@ impl<F: FileSystem> LogFileWriter<F> {
|
||||||
}
|
}
|
||||||
self.capacity += alloc;
|
self.capacity += alloc;
|
||||||
}
|
}
|
||||||
self.writer.write_all(buf).map_err(|e| {
|
self.writer.write_all(buf).inspect_err(|_| {
|
||||||
self.writer
|
self.writer
|
||||||
.seek(SeekFrom::Start(self.written as u64))
|
.seek(SeekFrom::Start(self.written as u64))
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
panic!("failed to reseek after write failure: {}", e);
|
panic!("failed to reseek after write failure: {}", e);
|
||||||
});
|
});
|
||||||
e
|
|
||||||
})?;
|
})?;
|
||||||
self.written = new_written;
|
self.written = new_written;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -119,7 +120,8 @@ impl<F: FileSystem> LogFileWriter<F> {
|
||||||
|
|
||||||
pub fn sync(&mut self) -> IoResult<()> {
|
pub fn sync(&mut self) -> IoResult<()> {
|
||||||
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
|
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
|
||||||
self.handle.sync()?;
|
// Panic if sync fails, in case of data loss.
|
||||||
|
self.handle.sync().unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -218,7 +218,7 @@ pub mod debug {
|
||||||
let log_file_format = LogFileContext::new(file_id, Version::default());
|
let log_file_format = LogFileContext::new(file_id, Version::default());
|
||||||
for batch in bs.iter_mut() {
|
for batch in bs.iter_mut() {
|
||||||
let offset = writer.offset() as u64;
|
let offset = writer.offset() as u64;
|
||||||
let len = batch
|
let (len, _) = batch
|
||||||
.finish_populate(1 /* compression_threshold */, None)
|
.finish_populate(1 /* compression_threshold */, None)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
batch.prepare_write(&log_file_format).unwrap();
|
batch.prepare_write(&log_file_format).unwrap();
|
||||||
|
|
|
@ -7,6 +7,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use crossbeam::utils::CachePadded;
|
use crossbeam::utils::CachePadded;
|
||||||
use fail::fail_point;
|
use fail::fail_point;
|
||||||
|
use fs2::FileExt;
|
||||||
use log::error;
|
use log::error;
|
||||||
use parking_lot::{Mutex, MutexGuard, RwLock};
|
use parking_lot::{Mutex, MutexGuard, RwLock};
|
||||||
|
|
||||||
|
@ -177,8 +178,13 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
|
|
||||||
// Skip syncing directory in Windows. Refer to badger's discussion for more
|
// Skip syncing directory in Windows. Refer to badger's discussion for more
|
||||||
// detail: https://github.com/dgraph-io/badger/issues/699
|
// detail: https://github.com/dgraph-io/badger/issues/699
|
||||||
|
//
|
||||||
|
// Panic if sync calls fail, keep consistent with the behavior of
|
||||||
|
// `LogFileWriter::sync()`.
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?;
|
std::fs::File::open(PathBuf::from(&self.paths[path_id]))
|
||||||
|
.and_then(|d| d.sync_all())
|
||||||
|
.unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,12 +327,7 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
fail_point!("file_pipe_log::append");
|
fail_point!("file_pipe_log::append");
|
||||||
let mut writable_file = self.writable_file.lock();
|
let mut writable_file = self.writable_file.lock();
|
||||||
if writable_file.writer.offset() >= self.target_file_size {
|
if writable_file.writer.offset() >= self.target_file_size {
|
||||||
if let Err(e) = self.rotate_imp(&mut writable_file) {
|
self.rotate_imp(&mut writable_file)?;
|
||||||
panic!(
|
|
||||||
"error when rotate [{:?}:{}]: {e}",
|
|
||||||
self.queue, writable_file.seq,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let seq = writable_file.seq;
|
let seq = writable_file.seq;
|
||||||
|
@ -359,9 +360,7 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
}
|
}
|
||||||
let start_offset = writer.offset();
|
let start_offset = writer.offset();
|
||||||
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
|
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
|
||||||
if let Err(te) = writer.truncate() {
|
writer.truncate()?;
|
||||||
panic!("error when truncate {seq} after error: {e}, get: {}", te);
|
|
||||||
}
|
|
||||||
if is_no_space_err(&e) {
|
if is_no_space_err(&e) {
|
||||||
// TODO: There exists several corner cases should be tackled if
|
// TODO: There exists several corner cases should be tackled if
|
||||||
// `bytes.len()` > `target_file_size`. For example,
|
// `bytes.len()` > `target_file_size`. For example,
|
||||||
|
@ -372,12 +371,7 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
// - [3] Both main-dir and spill-dir have several recycled logs.
|
// - [3] Both main-dir and spill-dir have several recycled logs.
|
||||||
// But as `bytes.len()` is always smaller than `target_file_size` in common
|
// But as `bytes.len()` is always smaller than `target_file_size` in common
|
||||||
// cases, this issue will be ignored temprorarily.
|
// cases, this issue will be ignored temprorarily.
|
||||||
if let Err(e) = self.rotate_imp(&mut writable_file) {
|
self.rotate_imp(&mut writable_file)?;
|
||||||
panic!(
|
|
||||||
"error when rotate [{:?}:{}]: {e}",
|
|
||||||
self.queue, writable_file.seq
|
|
||||||
);
|
|
||||||
}
|
|
||||||
// If there still exists free space for this record, rotate the file
|
// If there still exists free space for this record, rotate the file
|
||||||
// and return a special TryAgain Err (for retry) to the caller.
|
// and return a special TryAgain Err (for retry) to the caller.
|
||||||
return Err(Error::TryAgain(format!(
|
return Err(Error::TryAgain(format!(
|
||||||
|
@ -403,15 +397,9 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
|
|
||||||
fn sync(&self) -> Result<()> {
|
fn sync(&self) -> Result<()> {
|
||||||
let mut writable_file = self.writable_file.lock();
|
let mut writable_file = self.writable_file.lock();
|
||||||
let seq = writable_file.seq;
|
|
||||||
let writer = &mut writable_file.writer;
|
let writer = &mut writable_file.writer;
|
||||||
{
|
let _t = StopWatch::new(perf_context!(log_sync_duration));
|
||||||
let _t = StopWatch::new(perf_context!(log_sync_duration));
|
writer.sync().map_err(Error::Io)?;
|
||||||
if let Err(e) = writer.sync() {
|
|
||||||
panic!("error when sync [{:?}:{seq}]: {e}", self.queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,7 +465,17 @@ impl<F: FileSystem> SinglePipe<F> {
|
||||||
pub struct DualPipes<F: FileSystem> {
|
pub struct DualPipes<F: FileSystem> {
|
||||||
pipes: [SinglePipe<F>; 2],
|
pipes: [SinglePipe<F>; 2],
|
||||||
|
|
||||||
_dir_locks: Vec<StdFile>,
|
dir_locks: Vec<StdFile>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: FileSystem> Drop for DualPipes<F> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
for lock in &self.dir_locks {
|
||||||
|
if let Err(e) = FileExt::unlock(lock) {
|
||||||
|
error!("error while unlocking directory: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: FileSystem> DualPipes<F> {
|
impl<F: FileSystem> DualPipes<F> {
|
||||||
|
@ -494,7 +492,7 @@ impl<F: FileSystem> DualPipes<F> {
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
pipes: [appender, rewriter],
|
pipes: [appender, rewriter],
|
||||||
_dir_locks: dir_locks,
|
dir_locks,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -726,7 +724,7 @@ mod tests {
|
||||||
// Retire files.
|
// Retire files.
|
||||||
assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first);
|
assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first);
|
||||||
// Try to read recycled file.
|
// Try to read recycled file.
|
||||||
for (_, handle) in handles.into_iter().enumerate() {
|
for handle in handles.into_iter() {
|
||||||
assert!(pipe_log.read_bytes(handle).is_err());
|
assert!(pipe_log.read_bytes(handle).is_err());
|
||||||
}
|
}
|
||||||
// Try to reuse.
|
// Try to reuse.
|
||||||
|
@ -740,4 +738,21 @@ mod tests {
|
||||||
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
|
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_release_on_drop() {
|
||||||
|
let dir = Builder::new()
|
||||||
|
.prefix("test_release_on_drop")
|
||||||
|
.tempdir()
|
||||||
|
.unwrap();
|
||||||
|
let path = dir.path().to_str().unwrap();
|
||||||
|
let cfg = Config {
|
||||||
|
dir: path.to_owned(),
|
||||||
|
target_file_size: ReadableSize(1),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let pipe_log = new_test_pipes(&cfg).unwrap();
|
||||||
|
drop(pipe_log);
|
||||||
|
assert!(new_test_pipes(&cfg).is_ok());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -379,7 +379,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
|
||||||
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
|
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
|
||||||
file_system: Arc<F>,
|
file_system: Arc<F>,
|
||||||
recovery_cfg: RecoveryConfig,
|
recovery_cfg: RecoveryConfig,
|
||||||
files: &mut Vec<File<F>>,
|
files: &mut [File<F>],
|
||||||
machine_factory: &FA,
|
machine_factory: &FA,
|
||||||
) -> Result<M> {
|
) -> Result<M> {
|
||||||
if recovery_cfg.concurrency == 0 || files.is_empty() {
|
if recovery_cfg.concurrency == 0 || files.is_empty() {
|
||||||
|
@ -390,7 +390,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
|
||||||
let recovery_mode = recovery_cfg.mode;
|
let recovery_mode = recovery_cfg.mode;
|
||||||
let recovery_read_block_size = recovery_cfg.read_block_size as usize;
|
let recovery_read_block_size = recovery_cfg.read_block_size as usize;
|
||||||
|
|
||||||
let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1);
|
let max_chunk_size = files.len().div_ceil(concurrency);
|
||||||
let chunks = files.par_chunks_mut(max_chunk_size);
|
let chunks = files.par_chunks_mut(max_chunk_size);
|
||||||
let chunk_count = chunks.len();
|
let chunk_count = chunks.len();
|
||||||
debug_assert!(chunk_count <= concurrency);
|
debug_assert!(chunk_count <= concurrency);
|
||||||
|
|
26
src/lib.rs
26
src/lib.rs
|
@ -167,6 +167,7 @@ impl GlobalStats {
|
||||||
pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
|
pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
#[cfg(test)]
|
||||||
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
|
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
|
||||||
assert!(!k.is_empty());
|
assert!(!k.is_empty());
|
||||||
let mut v = INTERNAL_KEY_PREFIX.to_vec();
|
let mut v = INTERNAL_KEY_PREFIX.to_vec();
|
||||||
|
@ -174,12 +175,23 @@ pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
|
||||||
v
|
v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
|
||||||
|
use log_batch::ATOMIC_GROUP_KEY;
|
||||||
|
|
||||||
|
assert!(k == ATOMIC_GROUP_KEY);
|
||||||
|
let mut v = INTERNAL_KEY_PREFIX.to_vec();
|
||||||
|
v.extend_from_slice(k);
|
||||||
|
v
|
||||||
|
}
|
||||||
|
|
||||||
/// We ensure internal keys are not visible to the user by:
|
/// We ensure internal keys are not visible to the user by:
|
||||||
/// (1) Writing internal keys will be rejected by `LogBatch::put`.
|
/// (1) Writing internal keys will be rejected by `LogBatch::put`.
|
||||||
/// (2) Internal keys are filtered out during apply and replay of both queues.
|
/// (2) Internal keys are filtered out during apply and replay of both queues.
|
||||||
/// This also makes sure future internal keys under the prefix won't become
|
/// This also makes sure future internal keys under the prefix won't become
|
||||||
/// visible after downgrading.
|
/// visible after downgrading.
|
||||||
#[inline]
|
#[inline]
|
||||||
|
#[cfg(test)]
|
||||||
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
|
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
|
||||||
if let Some(ext) = ext {
|
if let Some(ext) = ext {
|
||||||
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
|
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
|
||||||
|
@ -191,6 +203,20 @@ pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
|
||||||
|
use log_batch::ATOMIC_GROUP_KEY;
|
||||||
|
|
||||||
|
if let Some(ext) = ext {
|
||||||
|
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
|
||||||
|
&& s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
|
||||||
|
&& s[INTERNAL_KEY_PREFIX.len()..] == *ext
|
||||||
|
} else {
|
||||||
|
is_internal_key(s, Some(ATOMIC_GROUP_KEY))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::log_batch::MessageExt;
|
use crate::log_batch::MessageExt;
|
||||||
|
|
|
@ -2,9 +2,9 @@
|
||||||
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::io::BufRead;
|
use std::io::BufRead;
|
||||||
|
use std::mem;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{mem, u64};
|
|
||||||
|
|
||||||
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use log::error;
|
use log::error;
|
||||||
|
@ -53,7 +53,7 @@ pub enum CompressionType {
|
||||||
impl CompressionType {
|
impl CompressionType {
|
||||||
pub fn from_u8(t: u8) -> Result<Self> {
|
pub fn from_u8(t: u8) -> Result<Self> {
|
||||||
if t <= CompressionType::Lz4 as u8 {
|
if t <= CompressionType::Lz4 as u8 {
|
||||||
Ok(unsafe { mem::transmute(t) })
|
Ok(unsafe { mem::transmute::<u8, Self>(t) })
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Corruption(format!(
|
Err(Error::Corruption(format!(
|
||||||
"Unrecognized compression type: {t}"
|
"Unrecognized compression type: {t}"
|
||||||
|
@ -168,7 +168,7 @@ pub enum OpType {
|
||||||
impl OpType {
|
impl OpType {
|
||||||
pub fn from_u8(t: u8) -> Result<Self> {
|
pub fn from_u8(t: u8) -> Result<Self> {
|
||||||
if t <= OpType::Del as u8 {
|
if t <= OpType::Del as u8 {
|
||||||
Ok(unsafe { mem::transmute(t) })
|
Ok(unsafe { mem::transmute::<u8, Self>(t) })
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Corruption(format!("Unrecognized op type: {t}")))
|
Err(Error::Corruption(format!("Unrecognized op type: {t}")))
|
||||||
}
|
}
|
||||||
|
@ -767,28 +767,32 @@ impl LogBatch {
|
||||||
&mut self,
|
&mut self,
|
||||||
compression_threshold: usize,
|
compression_threshold: usize,
|
||||||
compression_level: Option<usize>,
|
compression_level: Option<usize>,
|
||||||
) -> Result<usize> {
|
) -> Result<(usize, f64)> {
|
||||||
let _t = StopWatch::new(perf_context!(log_populating_duration));
|
let _t = StopWatch::new(perf_context!(log_populating_duration));
|
||||||
debug_assert!(self.buf_state == BufState::Open);
|
debug_assert!(self.buf_state == BufState::Open);
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
self.buf_state = BufState::Encoded(self.buf.len(), 0);
|
self.buf_state = BufState::Encoded(self.buf.len(), 0);
|
||||||
return Ok(0);
|
return Ok((0, 0.0));
|
||||||
}
|
}
|
||||||
self.buf_state = BufState::Incomplete;
|
self.buf_state = BufState::Incomplete;
|
||||||
|
|
||||||
// entries
|
// entries
|
||||||
let (header_offset, compression_type) = if compression_threshold > 0
|
let (header_offset, compression_type, compression_ratio) = if compression_threshold > 0
|
||||||
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
|
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
|
||||||
{
|
{
|
||||||
let buf_len = self.buf.len();
|
let buf_len = self.buf.len();
|
||||||
lz4::append_compress_block(
|
let compression_ratio = lz4::append_compress_block(
|
||||||
&mut self.buf,
|
&mut self.buf,
|
||||||
LOG_BATCH_HEADER_LEN,
|
LOG_BATCH_HEADER_LEN,
|
||||||
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
|
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
|
||||||
)?;
|
)?;
|
||||||
(buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
|
(
|
||||||
|
buf_len - LOG_BATCH_HEADER_LEN,
|
||||||
|
CompressionType::Lz4,
|
||||||
|
compression_ratio,
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
(0, CompressionType::None)
|
(0, CompressionType::None, 0.0)
|
||||||
};
|
};
|
||||||
|
|
||||||
// checksum
|
// checksum
|
||||||
|
@ -830,7 +834,7 @@ impl LogBatch {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
|
self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
|
||||||
Ok(self.buf.len() - header_offset)
|
Ok((self.buf.len() - header_offset, compression_ratio))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Make preparations for the write of `LogBatch`.
|
/// Make preparations for the write of `LogBatch`.
|
||||||
|
@ -994,7 +998,7 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref ATOMIC_GROUP_ID: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
|
static ref ATOMIC_GROUP_ID: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
|
||||||
}
|
}
|
||||||
const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
|
pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
|
||||||
// <status>
|
// <status>
|
||||||
const ATOMIC_GROUP_VALUE_LEN: usize = 1;
|
const ATOMIC_GROUP_VALUE_LEN: usize = 1;
|
||||||
|
|
||||||
|
@ -1328,7 +1332,7 @@ mod tests {
|
||||||
offset: 0,
|
offset: 0,
|
||||||
};
|
};
|
||||||
let old_approximate_size = batch.approximate_size();
|
let old_approximate_size = batch.approximate_size();
|
||||||
let len = batch.finish_populate(usize::from(compress), None).unwrap();
|
let (len, _) = batch.finish_populate(usize::from(compress), None).unwrap();
|
||||||
assert!(old_approximate_size >= len);
|
assert!(old_approximate_size >= len);
|
||||||
assert_eq!(batch.approximate_size(), len);
|
assert_eq!(batch.approximate_size(), len);
|
||||||
let mut batch_handle = mocked_file_block_handle;
|
let mut batch_handle = mocked_file_block_handle;
|
||||||
|
@ -1493,7 +1497,7 @@ mod tests {
|
||||||
batch1.merge(&mut batch2).unwrap();
|
batch1.merge(&mut batch2).unwrap();
|
||||||
assert!(batch2.is_empty());
|
assert!(batch2.is_empty());
|
||||||
|
|
||||||
let len = batch1.finish_populate(0, None).unwrap();
|
let (len, _) = batch1.finish_populate(0, None).unwrap();
|
||||||
batch1.prepare_write(&file_context).unwrap();
|
batch1.prepare_write(&file_context).unwrap();
|
||||||
let encoded = batch1.encoded_bytes();
|
let encoded = batch1.encoded_bytes();
|
||||||
assert_eq!(len, encoded.len());
|
assert_eq!(len, encoded.len());
|
||||||
|
@ -1549,7 +1553,8 @@ mod tests {
|
||||||
offset: 0,
|
offset: 0,
|
||||||
};
|
};
|
||||||
let buf_len = batch.buf.len();
|
let buf_len = batch.buf.len();
|
||||||
let len = batch.finish_populate(1, None).unwrap();
|
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
|
||||||
|
assert!(compression_ratio == 0.0);
|
||||||
assert!(len == 0);
|
assert!(len == 0);
|
||||||
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
|
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
|
||||||
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
|
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
|
||||||
|
@ -1570,7 +1575,7 @@ mod tests {
|
||||||
));
|
));
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
batch
|
batch
|
||||||
.put_message(0, crate::make_internal_key(&[1]), &Entry::new())
|
.put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new())
|
||||||
.unwrap_err(),
|
.unwrap_err(),
|
||||||
Error::InvalidArgument(_)
|
Error::InvalidArgument(_)
|
||||||
));
|
));
|
||||||
|
@ -1671,7 +1676,8 @@ mod tests {
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
let old_approximate_size = batch.approximate_size();
|
let old_approximate_size = batch.approximate_size();
|
||||||
let len = batch.finish_populate(1, None).unwrap();
|
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
|
||||||
|
assert!(compression_ratio > 0.0);
|
||||||
assert!(old_approximate_size >= len);
|
assert!(old_approximate_size >= len);
|
||||||
assert_eq!(batch.approximate_size(), len);
|
assert_eq!(batch.approximate_size(), len);
|
||||||
let checksum = batch.item_batch.checksum;
|
let checksum = batch.item_batch.checksum;
|
||||||
|
|
|
@ -228,7 +228,7 @@ impl<A: AllocatorTrait> MemTable<A> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(g) = rhs.atomic_group.take() {
|
if let Some(g) = rhs.atomic_group.take() {
|
||||||
assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
|
assert!(self.atomic_group.is_none_or(|(_, end)| end <= g.0));
|
||||||
self.atomic_group = Some(g);
|
self.atomic_group = Some(g);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -545,7 +545,7 @@ impl<A: AllocatorTrait> MemTable<A> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
|
pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
|
||||||
assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
|
assert!(self.atomic_group.is_none_or(|(_, b)| b <= start));
|
||||||
self.atomic_group = Some((start, end));
|
self.atomic_group = Some((start, end));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -763,7 +763,7 @@ impl<A: AllocatorTrait> MemTable<A> {
|
||||||
debug_assert!(count > 0);
|
debug_assert!(count > 0);
|
||||||
self.entry_indexes
|
self.entry_indexes
|
||||||
.get(count - 1)
|
.get(count - 1)
|
||||||
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
|
.is_some_and(|ei| ei.entries.unwrap().id.seq <= gate.seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the region ID.
|
/// Returns the region ID.
|
||||||
|
|
|
@ -122,7 +122,7 @@ pub trait TimeMetric {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> TimeMetric for &'a Histogram {
|
impl TimeMetric for &Histogram {
|
||||||
fn observe(&self, duration: Duration) {
|
fn observe(&self, duration: Duration) {
|
||||||
Histogram::observe(self, duration.as_secs_f64());
|
Histogram::observe(self, duration.as_secs_f64());
|
||||||
}
|
}
|
||||||
|
@ -201,6 +201,12 @@ lazy_static! {
|
||||||
exponential_buckets(256.0, 1.8, 22).unwrap()
|
exponential_buckets(256.0, 1.8, 22).unwrap()
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM: Histogram = register_histogram!(
|
||||||
|
"raft_engine_write_compression_ratio",
|
||||||
|
"Bucketed histogram of Raft Engine write compression ratio",
|
||||||
|
exponential_buckets(0.0005, 1.8, 16).unwrap()
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
|
pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
|
||||||
"raft_engine_allocate_log_duration_seconds",
|
"raft_engine_allocate_log_duration_seconds",
|
||||||
"Bucketed histogram of Raft Engine allocate log duration",
|
"Bucketed histogram of Raft Engine allocate log duration",
|
||||||
|
|
18
src/purge.rs
18
src/purge.rs
|
@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize {
|
||||||
128 * 1024
|
128 * 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn max_forcely_sync_bytes() -> usize {
|
||||||
|
max_batch_bytes() * 4
|
||||||
|
}
|
||||||
|
|
||||||
pub struct PurgeManager<P>
|
pub struct PurgeManager<P>
|
||||||
where
|
where
|
||||||
P: PipeLog,
|
P: PipeLog,
|
||||||
|
@ -354,6 +358,7 @@ where
|
||||||
let mut current_entry_indexes = Vec::new();
|
let mut current_entry_indexes = Vec::new();
|
||||||
let mut current_entries = Vec::new();
|
let mut current_entries = Vec::new();
|
||||||
let mut current_size = 0;
|
let mut current_size = 0;
|
||||||
|
let mut unsynced_size = 0;
|
||||||
// Split the entries into smaller chunks, so that we don't OOM, and the
|
// Split the entries into smaller chunks, so that we don't OOM, and the
|
||||||
// compression overhead is not too high.
|
// compression overhead is not too high.
|
||||||
let mut entry_indexes = entry_indexes.into_iter().peekable();
|
let mut entry_indexes = entry_indexes.into_iter().peekable();
|
||||||
|
@ -362,6 +367,7 @@ where
|
||||||
current_size += entry.len();
|
current_size += entry.len();
|
||||||
current_entries.push(entry);
|
current_entries.push(entry);
|
||||||
current_entry_indexes.push(ei);
|
current_entry_indexes.push(ei);
|
||||||
|
unsynced_size += current_size;
|
||||||
// If this is the last entry, we handle them outside the loop.
|
// If this is the last entry, we handle them outside the loop.
|
||||||
if entry_indexes.peek().is_some()
|
if entry_indexes.peek().is_some()
|
||||||
&& current_size + previous_size > max_batch_bytes()
|
&& current_size + previous_size > max_batch_bytes()
|
||||||
|
@ -396,7 +402,15 @@ where
|
||||||
)?;
|
)?;
|
||||||
current_size = 0;
|
current_size = 0;
|
||||||
previous_size = 0;
|
previous_size = 0;
|
||||||
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
|
let sync = if unsynced_size >= max_forcely_sync_bytes() {
|
||||||
|
// Avoiding too many unsynced size can make the later `fdatasync` in
|
||||||
|
// the append progress blocked for too long.
|
||||||
|
unsynced_size = 0;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
|
||||||
if needs_atomicity && atomic_group_start.is_none() {
|
if needs_atomicity && atomic_group_start.is_none() {
|
||||||
atomic_group_start = Some(handle.id.seq);
|
atomic_group_start = Some(handle.id.seq);
|
||||||
}
|
}
|
||||||
|
@ -439,7 +453,7 @@ where
|
||||||
)?;
|
)?;
|
||||||
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
|
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
|
||||||
if sync {
|
if sync {
|
||||||
self.pipe_log.sync(LogQueue::Rewrite)?
|
self.pipe_log.sync(LogQueue::Rewrite)?;
|
||||||
}
|
}
|
||||||
log_batch.finish_write(file_handle);
|
log_batch.finish_write(file_handle);
|
||||||
self.memtables.apply_rewrite_writes(
|
self.memtables.apply_rewrite_writes(
|
||||||
|
|
|
@ -120,9 +120,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
|
||||||
return swap_r;
|
return swap_r;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.0.mem_allocator.allocate(layout).map_err(|e| {
|
self.0.mem_allocator.allocate(layout).inspect_err(|_| {
|
||||||
self.0.mem_usage.fetch_sub(layout.size(), Ordering::Relaxed);
|
self.0.mem_usage.fetch_sub(layout.size(), Ordering::Relaxed);
|
||||||
e
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,9 +190,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
|
||||||
self.0
|
self.0
|
||||||
.mem_allocator
|
.mem_allocator
|
||||||
.grow(ptr, old_layout, new_layout)
|
.grow(ptr, old_layout, new_layout)
|
||||||
.map_err(|e| {
|
.inspect_err(|_| {
|
||||||
self.0.mem_usage.fetch_sub(diff, Ordering::Relaxed);
|
self.0.mem_usage.fetch_sub(diff, Ordering::Relaxed);
|
||||||
e
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -253,11 +251,10 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
|
||||||
self.0
|
self.0
|
||||||
.mem_allocator
|
.mem_allocator
|
||||||
.shrink(ptr, old_layout, new_layout)
|
.shrink(ptr, old_layout, new_layout)
|
||||||
.map(|p| {
|
.inspect(|_| {
|
||||||
self.0
|
self.0
|
||||||
.mem_usage
|
.mem_usage
|
||||||
.fetch_sub(old_layout.size() - new_layout.size(), Ordering::Relaxed);
|
.fetch_sub(old_layout.size() - new_layout.size(), Ordering::Relaxed);
|
||||||
p
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -288,6 +285,7 @@ impl Page {
|
||||||
.read(true)
|
.read(true)
|
||||||
.write(true)
|
.write(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
|
.truncate(true)
|
||||||
.open(path)
|
.open(path)
|
||||||
.map_err(|e| error!("Failed to open swap file: {e}"))
|
.map_err(|e| error!("Failed to open swap file: {e}"))
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
|
@ -63,7 +63,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PanicGuard {
|
pub struct PanicGuard {
|
||||||
prev_hook: *mut (dyn Fn(&panic::PanicInfo<'_>) + Sync + Send + 'static),
|
prev_hook: *mut (dyn Fn(&panic::PanicHookInfo<'_>) + Sync + Send + 'static),
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PointerHolder<T: ?Sized>(*mut T);
|
struct PointerHolder<T: ?Sized>(*mut T);
|
||||||
|
|
20
src/util.rs
20
src/util.rs
|
@ -148,7 +148,7 @@ impl<'de> Deserialize<'de> for ReadableSize {
|
||||||
{
|
{
|
||||||
struct SizeVisitor;
|
struct SizeVisitor;
|
||||||
|
|
||||||
impl<'de> Visitor<'de> for SizeVisitor {
|
impl Visitor<'_> for SizeVisitor {
|
||||||
type Value = ReadableSize;
|
type Value = ReadableSize;
|
||||||
|
|
||||||
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
@ -221,14 +221,15 @@ pub fn unhash_u64(mut i: u64) -> u64 {
|
||||||
|
|
||||||
pub mod lz4 {
|
pub mod lz4 {
|
||||||
use crate::{Error, Result};
|
use crate::{Error, Result};
|
||||||
use std::{i32, ptr};
|
use std::ptr;
|
||||||
|
|
||||||
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
|
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
|
||||||
|
|
||||||
/// Compress content in `buf[skip..]`, and append output to `buf`.
|
/// Compress content in `buf[skip..]`, and append output to `buf`.
|
||||||
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> {
|
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<f64> {
|
||||||
let buf_len = buf.len();
|
let buf_len = buf.len();
|
||||||
let content_len = buf_len - skip;
|
let content_len = buf_len - skip;
|
||||||
|
let mut compression_ratio = 0.0;
|
||||||
if content_len > 0 {
|
if content_len > 0 {
|
||||||
if content_len > i32::MAX as usize {
|
if content_len > i32::MAX as usize {
|
||||||
return Err(Error::InvalidArgument(format!(
|
return Err(Error::InvalidArgument(format!(
|
||||||
|
@ -256,10 +257,11 @@ pub mod lz4 {
|
||||||
if compressed == 0 {
|
if compressed == 0 {
|
||||||
return Err(Error::Other(box_err!("Compression failed")));
|
return Err(Error::Other(box_err!("Compression failed")));
|
||||||
}
|
}
|
||||||
|
compression_ratio = compressed as f64 / content_len as f64;
|
||||||
buf.set_len(buf_len + 4 + compressed as usize);
|
buf.set_len(buf_len + 4 + compressed as usize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(compression_ratio)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
|
pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
@ -301,8 +303,12 @@ pub mod lz4 {
|
||||||
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
|
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
|
||||||
for mut vec in vecs.into_iter() {
|
for mut vec in vecs.into_iter() {
|
||||||
let uncompressed_len = vec.len();
|
let uncompressed_len = vec.len();
|
||||||
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
|
let compression_ratio =
|
||||||
.unwrap();
|
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
|
||||||
|
.unwrap();
|
||||||
|
if uncompressed_len == 0 {
|
||||||
|
assert_eq!(compression_ratio, 0.0);
|
||||||
|
}
|
||||||
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
|
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
|
||||||
assert_eq!(res, vec[..uncompressed_len].to_owned());
|
assert_eq!(res, vec[..uncompressed_len].to_owned());
|
||||||
}
|
}
|
||||||
|
@ -324,7 +330,7 @@ pub trait Factory<Target>: Send + Sync {
|
||||||
/// ```
|
/// ```
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn round_up(offset: usize, alignment: usize) -> usize {
|
pub fn round_up(offset: usize, alignment: usize) -> usize {
|
||||||
(offset + alignment - 1) / alignment * alignment
|
offset.div_ceil(alignment) * alignment
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an aligned `offset`.
|
/// Returns an aligned `offset`.
|
||||||
|
|
|
@ -17,7 +17,6 @@ use crate::PerfContext;
|
||||||
|
|
||||||
type Ptr<T> = Option<NonNull<T>>;
|
type Ptr<T> = Option<NonNull<T>>;
|
||||||
|
|
||||||
///
|
|
||||||
pub struct Writer<P, O> {
|
pub struct Writer<P, O> {
|
||||||
next: Cell<Ptr<Writer<P, O>>>,
|
next: Cell<Ptr<Writer<P, O>>>,
|
||||||
payload: *mut P,
|
payload: *mut P,
|
||||||
|
@ -95,7 +94,7 @@ impl<'a, 'b, P, O> WriteGroup<'a, 'b, P, O> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b, P, O> Drop for WriteGroup<'a, 'b, P, O> {
|
impl<P, O> Drop for WriteGroup<'_, '_, P, O> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.ref_barrier.leader_exit();
|
self.ref_barrier.leader_exit();
|
||||||
}
|
}
|
||||||
|
@ -108,7 +107,7 @@ pub struct WriterIter<'a, 'b, 'c, P: 'c, O: 'c> {
|
||||||
marker: PhantomData<&'a WriteGroup<'b, 'c, P, O>>,
|
marker: PhantomData<&'a WriteGroup<'b, 'c, P, O>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b, 'c, P, O> Iterator for WriterIter<'a, 'b, 'c, P, O> {
|
impl<'a, P, O> Iterator for WriterIter<'a, '_, '_, P, O> {
|
||||||
type Item = &'a mut Writer<P, O>;
|
type Item = &'a mut Writer<P, O>;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "stress"
|
name = "stress"
|
||||||
version = "0.4.0"
|
version = "0.4.2"
|
||||||
authors = ["The TiKV Authors"]
|
authors = ["The TiKV Authors"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
@ -121,8 +121,8 @@ fn test_pipe_log_listeners() {
|
||||||
append(
|
append(
|
||||||
&engine,
|
&engine,
|
||||||
region_id,
|
region_id,
|
||||||
(i as u64 + 1) / 2,
|
(i as u64).div_ceil(2),
|
||||||
(i as u64 + 1) / 2 + 1,
|
(i as u64).div_ceil(2) + 1,
|
||||||
Some(&data),
|
Some(&data),
|
||||||
);
|
);
|
||||||
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
|
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
|
||||||
|
@ -141,8 +141,8 @@ fn test_pipe_log_listeners() {
|
||||||
append(
|
append(
|
||||||
&engine,
|
&engine,
|
||||||
region_id,
|
region_id,
|
||||||
(i as u64 + 1) / 2,
|
(i as u64).div_ceil(2),
|
||||||
(i as u64 + 1) / 2 + 1,
|
(i as u64).div_ceil(2) + 1,
|
||||||
Some(&data),
|
Some(&data),
|
||||||
);
|
);
|
||||||
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
|
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
|
||||||
|
@ -178,8 +178,8 @@ fn test_pipe_log_listeners() {
|
||||||
append(
|
append(
|
||||||
&engine,
|
&engine,
|
||||||
region_id,
|
region_id,
|
||||||
(i as u64 + 1) / 2,
|
(i as u64).div_ceil(2),
|
||||||
(i as u64 + 1) / 2 + 1,
|
(i as u64).div_ceil(2) + 1,
|
||||||
Some(&data),
|
Some(&data),
|
||||||
);
|
);
|
||||||
assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3);
|
assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3);
|
||||||
|
@ -631,6 +631,52 @@ fn test_concurrent_write_perf_context() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_fetch_with_updated_entry_index() {
|
||||||
|
let dir = tempfile::Builder::new()
|
||||||
|
.prefix("test_fetch_with_updated_entry_index")
|
||||||
|
.tempdir()
|
||||||
|
.unwrap();
|
||||||
|
let entry_data = vec![b'x'; 1024];
|
||||||
|
let cfg = Config {
|
||||||
|
dir: dir.path().to_str().unwrap().to_owned(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let engine = Engine::open(cfg).unwrap();
|
||||||
|
for i in 0..10 {
|
||||||
|
append(
|
||||||
|
&engine,
|
||||||
|
i + 1,
|
||||||
|
1 + i * 10,
|
||||||
|
1 + i * 10 + 10,
|
||||||
|
Some(&entry_data),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
10,
|
||||||
|
engine
|
||||||
|
.fetch_entries_to::<MessageExtTyped>(1, 1, 11, None, &mut vec![])
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
// Mock: the file is broken or abnormal to read
|
||||||
|
{
|
||||||
|
let _f = FailGuard::new("log_file::read::err", "return");
|
||||||
|
engine
|
||||||
|
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
|
||||||
|
.unwrap_err();
|
||||||
|
}
|
||||||
|
// Mock: one entry have been updated by background rewrite thread.
|
||||||
|
{
|
||||||
|
let _f = FailGuard::new("log_file::read::err", "8*off->return->off");
|
||||||
|
assert_eq!(
|
||||||
|
10,
|
||||||
|
engine
|
||||||
|
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// FIXME: this test no longer works because recovery cannot reliably detect
|
// FIXME: this test no longer works because recovery cannot reliably detect
|
||||||
// overwrite anomaly.
|
// overwrite anomaly.
|
||||||
// See https://github.com/tikv/raft-engine/issues/250
|
// See https://github.com/tikv/raft-engine/issues/250
|
||||||
|
|
|
@ -124,8 +124,7 @@ fn test_file_write_error() {
|
||||||
assert_eq!(engine.last_index(2).unwrap(), 1);
|
assert_eq!(engine.last_index(2).unwrap(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
fn test_file_rotate_error(restart_after_failure: bool) {
|
||||||
fn test_file_rotate_error() {
|
|
||||||
let dir = tempfile::Builder::new()
|
let dir = tempfile::Builder::new()
|
||||||
.prefix("test_file_rotate_error")
|
.prefix("test_file_rotate_error")
|
||||||
.tempdir()
|
.tempdir()
|
||||||
|
@ -138,7 +137,7 @@ fn test_file_rotate_error() {
|
||||||
let fs = Arc::new(ObfuscatedFileSystem::default());
|
let fs = Arc::new(ObfuscatedFileSystem::default());
|
||||||
let entry = vec![b'x'; 1024];
|
let entry = vec![b'x'; 1024];
|
||||||
|
|
||||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
let mut engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||||
engine
|
engine
|
||||||
.write(&mut generate_batch(1, 1, 2, Some(&entry)), false)
|
.write(&mut generate_batch(1, 1, 2, Some(&entry)), false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -160,27 +159,46 @@ fn test_file_rotate_error() {
|
||||||
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
|
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
|
||||||
})
|
})
|
||||||
.is_err());
|
.is_err());
|
||||||
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
|
||||||
}
|
}
|
||||||
|
if restart_after_failure {
|
||||||
|
drop(engine);
|
||||||
|
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||||
|
}
|
||||||
|
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
||||||
{
|
{
|
||||||
// Fail to create new log file.
|
// Fail to create new log file.
|
||||||
let _f = FailGuard::new("default_fs::create::err", "return");
|
let _f = FailGuard::new("default_fs::create::err", "return");
|
||||||
assert!(catch_unwind_silent(|| {
|
assert!(engine
|
||||||
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
|
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
|
||||||
})
|
.is_err());
|
||||||
.is_err());
|
|
||||||
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
|
||||||
}
|
}
|
||||||
|
if restart_after_failure {
|
||||||
|
drop(engine);
|
||||||
|
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||||
|
}
|
||||||
|
let num_files_before = std::fs::read_dir(&dir).unwrap().count();
|
||||||
{
|
{
|
||||||
// Fail to write header of new log file.
|
// Fail to write header of new log file.
|
||||||
let _f = FailGuard::new("log_file::write::err", "1*off->return");
|
let _f = FailGuard::new("log_file::write::err", "1*off->return");
|
||||||
assert!(catch_unwind_silent(|| {
|
assert!(engine
|
||||||
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
|
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
|
||||||
})
|
.is_err());
|
||||||
.is_err());
|
}
|
||||||
|
if restart_after_failure {
|
||||||
|
drop(engine);
|
||||||
|
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||||
|
// The new log file is added during recovery phase of restart.
|
||||||
|
assert_eq!(engine.file_span(LogQueue::Append).1, 2);
|
||||||
|
} else {
|
||||||
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
||||||
}
|
}
|
||||||
{
|
// Although the header is not written, the file is still created.
|
||||||
|
assert_eq!(
|
||||||
|
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
|
||||||
|
1
|
||||||
|
);
|
||||||
|
if !restart_after_failure {
|
||||||
|
// If the engine restarted, the write does not require sync will succeed.
|
||||||
// Fail to sync new log file. The old log file is already sync-ed at this point.
|
// Fail to sync new log file. The old log file is already sync-ed at this point.
|
||||||
let _f = FailGuard::new("log_fd::sync::err", "return");
|
let _f = FailGuard::new("log_fd::sync::err", "return");
|
||||||
assert!(catch_unwind_silent(|| {
|
assert!(catch_unwind_silent(|| {
|
||||||
|
@ -190,18 +208,39 @@ fn test_file_rotate_error() {
|
||||||
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only one log file should be created after all the incidents.
|
||||||
|
assert_eq!(
|
||||||
|
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
|
||||||
|
1
|
||||||
|
);
|
||||||
// We can continue writing after the incidents.
|
// We can continue writing after the incidents.
|
||||||
engine
|
engine
|
||||||
.write(&mut generate_batch(2, 1, 2, Some(&entry)), true)
|
.write(&mut generate_batch(2, 1, 2, Some(&entry)), true)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
drop(engine);
|
if restart_after_failure {
|
||||||
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
|
drop(engine);
|
||||||
|
engine = Engine::open_with_file_system(cfg, fs).unwrap();
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
|
||||||
|
1
|
||||||
|
);
|
||||||
assert_eq!(engine.first_index(1).unwrap(), 1);
|
assert_eq!(engine.first_index(1).unwrap(), 1);
|
||||||
assert_eq!(engine.last_index(1).unwrap(), 4);
|
assert_eq!(engine.last_index(1).unwrap(), 4);
|
||||||
assert_eq!(engine.first_index(2).unwrap(), 1);
|
assert_eq!(engine.first_index(2).unwrap(), 1);
|
||||||
assert_eq!(engine.last_index(2).unwrap(), 1);
|
assert_eq!(engine.last_index(2).unwrap(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_file_rotate_error_without_restart() {
|
||||||
|
test_file_rotate_error(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_file_rotate_error_with_restart() {
|
||||||
|
test_file_rotate_error(true);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_concurrent_write_error() {
|
fn test_concurrent_write_error() {
|
||||||
let dir = tempfile::Builder::new()
|
let dir = tempfile::Builder::new()
|
||||||
|
@ -262,10 +301,8 @@ fn test_concurrent_write_error() {
|
||||||
let _f2 = FailGuard::new("log_file::truncate::err", "return");
|
let _f2 = FailGuard::new("log_file::truncate::err", "return");
|
||||||
let entry_clone = entry.clone();
|
let entry_clone = entry.clone();
|
||||||
ctx.write_ext(move |e| {
|
ctx.write_ext(move |e| {
|
||||||
catch_unwind_silent(|| {
|
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
|
||||||
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
|
.unwrap_err();
|
||||||
})
|
|
||||||
.unwrap_err();
|
|
||||||
});
|
});
|
||||||
// We don't test followers, their panics are hard to catch.
|
// We don't test followers, their panics are hard to catch.
|
||||||
ctx.join();
|
ctx.join();
|
||||||
|
@ -527,7 +564,7 @@ fn test_no_space_write_error() {
|
||||||
cfg.dir = dir.path().to_str().unwrap().to_owned();
|
cfg.dir = dir.path().to_str().unwrap().to_owned();
|
||||||
cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned());
|
cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned());
|
||||||
{
|
{
|
||||||
// Case 1: `Write` is abnormal for no space left, Engine should panic at
|
// Case 1: `Write` is abnormal for no space left, Engine should fail at
|
||||||
// `rotate`.
|
// `rotate`.
|
||||||
let cfg_err = Config {
|
let cfg_err = Config {
|
||||||
target_file_size: ReadableSize(1),
|
target_file_size: ReadableSize(1),
|
||||||
|
@ -535,12 +572,9 @@ fn test_no_space_write_error() {
|
||||||
};
|
};
|
||||||
let engine = Engine::open(cfg_err).unwrap();
|
let engine = Engine::open(cfg_err).unwrap();
|
||||||
let _f = FailGuard::new("log_fd::write::no_space_err", "return");
|
let _f = FailGuard::new("log_fd::write::no_space_err", "return");
|
||||||
assert!(catch_unwind_silent(|| {
|
assert!(engine
|
||||||
engine
|
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
|
||||||
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
|
.is_err());
|
||||||
.unwrap_err();
|
|
||||||
})
|
|
||||||
.is_err());
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
0,
|
0,
|
||||||
engine
|
engine
|
||||||
|
@ -554,12 +588,9 @@ fn test_no_space_write_error() {
|
||||||
let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off");
|
let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off");
|
||||||
let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return");
|
let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return");
|
||||||
// The first write should fail, because all dirs run out of space for writing.
|
// The first write should fail, because all dirs run out of space for writing.
|
||||||
assert!(catch_unwind_silent(|| {
|
assert!(engine
|
||||||
engine
|
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
|
||||||
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
|
.is_err());
|
||||||
.unwrap_err();
|
|
||||||
})
|
|
||||||
.is_err());
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
0,
|
0,
|
||||||
engine
|
engine
|
||||||
|
|
Loading…
Reference in New Issue