Compare commits

...

27 Commits

Author SHA1 Message Date
lucasliang d53b2b9505
Add missing uts for the optimization on `fetch_entries_to`. (#385)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2025-07-07 03:23:09 +00:00
lucasliang 03f77d90c0
Optimize `fetch_entries_to` function to avoid stucking `write` too long. (#382)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2025-07-04 08:26:18 +00:00
Yang Zhang 392f5e66f8
Unlock the LOCK files on drop (#379)
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
2025-05-20 07:17:55 +00:00
Ti Chi Robot 7e12385c31
OWNERS: Auto Sync OWNERS files from community membership (#378)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2025-02-14 06:46:09 +00:00
Ti Chi Robot 6bb58c379b
OWNERS: Auto Sync OWNERS files from community membership (#369)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-12-04 06:58:50 +00:00
lucasliang de1ec93752
Fix `panics` on reading entries by stale index. (#370)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-11-06 02:50:58 +00:00
lucasliang e1c5dd80a6
CI: update CI workflows. (#372)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-11-05 13:21:43 +00:00
Ti Chi Robot 209e00d8c0
OWNERS: Auto Sync OWNERS files from community membership (#368)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-10-14 17:14:05 +00:00
Ti Chi Robot 0e40c53566
OWNERS: Auto Sync OWNERS files from community membership (#367)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-09-09 12:12:15 +00:00
Ti Chi Robot 897e0ebf67
OWNERS: Auto Sync OWNERS files from community membership (#366)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-09-09 03:21:51 +00:00
Ti Chi Robot c7828cb3e7
OWNERS: Auto Sync OWNERS files from community membership (#364)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-19 13:01:42 +00:00
Ti Chi Robot 3ad316a3c4
OWNERS: Auto Sync OWNERS files from community membership (#363)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-01 10:53:51 +00:00
Ti Chi Robot 8ef3b42057
OWNERS: Auto Sync OWNERS files from community membership (#362)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-01 10:19:21 +00:00
wuhuizuo c1f81d4d06
ci: add prow OWNERS file (#361)
* ci: add prow OWNERS file

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

* ci: update github action workflows [skip ci]

save runs.

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

* Update OWNERS

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

---------

Signed-off-by: wuhuizuo <wuhuizuo@126.com>
2024-07-30 13:39:59 +08:00
dependabot[bot] cf5d1b9144
Update strum requirement from 0.25.0 to 0.26.2 (#356)
---
updated-dependencies:
- dependency-name: strum
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: lucasliang <nkcs_lykx@hotmail.com>
2024-06-05 15:15:15 +08:00
lucasliang cd1533d79c
engine: supply the metric of the compression ratio on write. (#358)
* engine: add extra metrics on the compression ratio of write.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Calibrate the version of grcov.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix format errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Calibrate rustc version.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errors.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Add change logs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Format ci actions.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix ci errors.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Polish rust.yml.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

---------

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-04-23 15:28:49 +08:00
lucasliang 084c5ca0e4
*: bump 0.4.2 (#357)
* *: bump 0.4.2

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Bugfix.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Update toolchain.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Correct version

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errors.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix test compilation errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Polish changelog and upload codecov report with TOKEN.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix MacOS compilation errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Revert "Fix MacOS compilation errs."

This reverts commit 52773890c8.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Optimize the ci workflow.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Downgrade the version to meet the space usage limit of CI.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

---------

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-04-23 11:16:07 +08:00
lucasliang e505d631c8
rewrite: optimize the interval of `sync` when rewriting memtables. (#347)
In a cloud environment, refraining from unscheduling sync operations
when rewriting memtables might result in an accumulation of unsynced bytes
in the buffer. This accumulation has the potential to impede the foreground
write progress during sync.

This pull request introduces periodic sync operations when the amount of
stashed unsynced bytes exceeds a predefined threshold. This optimization
aims to address the issue and enhance performance.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-01-04 17:27:27 +08:00
Yang Zhang d043b4ad4e
Revert rustc version (#345)
* Revert rustc version change

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
2023-12-13 15:29:21 +08:00
Yang Zhang e8de5d75ae
Return error instead of panicking if rewriting fails (#343)
* Return error instead of panicing if rewriting fails

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Update rust version

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Update rust version in github workflow

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Update src/file_pipe_log/pipe.rs

Co-authored-by: lucasliang <nkcs_lykx@hotmail.com>
Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Update src/file_pipe_log/pipe.rs

Co-authored-by: lucasliang <nkcs_lykx@hotmail.com>
Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Address comments, fix test cases

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Fix format error

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Move panic inside

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Fix clippy

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Propagate error if writing header fails

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Adjust write header fail expectation, from panic to error

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Panic if write header fails since we do not truncate

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Failure other than sync should be returned

Signed-off-by: v01dstar <yang.zhang@pingcap.com>

* Address comments

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Fix test failures

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Change test exepectations

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Address comments

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Fix format

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Revert sync() signature

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Add more details to rotate test

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Fix style

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Address comments

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Address comments

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Fix clippy

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

* Trigger Github actions

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

---------

Signed-off-by: v01dstar <yang.zhang@pingcap.com>
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
Co-authored-by: lucasliang <nkcs_lykx@hotmail.com>
2023-12-06 18:43:43 -08:00
dependabot[bot] 385182bfc2
Update memmap2 requirement from 0.8 to 0.9 (#341)
Updates the requirements on [memmap2](https://github.com/RazrFalcon/memmap2-rs) to permit the latest version.
- [Changelog](https://github.com/RazrFalcon/memmap2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/RazrFalcon/memmap2-rs/compare/v0.8.0...v0.9.0)

---
updated-dependencies:
- dependency-name: memmap2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-10-05 11:57:13 +08:00
dependabot[bot] 70a8873fbb
Update memmap2 requirement from 0.7 to 0.8 (#340)
Updates the requirements on [memmap2](https://github.com/RazrFalcon/memmap2-rs) to permit the latest version.
- [Changelog](https://github.com/RazrFalcon/memmap2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/RazrFalcon/memmap2-rs/compare/v0.7.0...v0.8.0)

---
updated-dependencies:
- dependency-name: memmap2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-26 10:26:34 +08:00
Xinye Tao fa56f891fd
bump 0.4.1 (#336)
* bump 0.4.1

Signed-off-by: tabokie <xy.tao@outlook.com>

* Update CHANGELOG.md

Signed-off-by: Xinye Tao <xy.tao@outlook.com>

---------

Signed-off-by: tabokie <xy.tao@outlook.com>
Signed-off-by: Xinye Tao <xy.tao@outlook.com>
2023-09-14 17:40:57 +08:00
dependabot[bot] 9aa124558c
Update toml requirement from 0.7 to 0.8 (#337)
Updates the requirements on [toml](https://github.com/toml-rs/toml) to permit the latest version.
- [Commits](https://github.com/toml-rs/toml/compare/toml-v0.7.0...toml-v0.8.0)

---
updated-dependencies:
- dependency-name: toml
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-14 08:27:57 +08:00
Weny Xu 22dfb426cd
refactor: reduce scope of internal keys (#335)
* refactor: reduce scope of internal keys

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* Update CHANGELOG.md

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Xinye Tao <xy.tao@outlook.com>
2023-09-12 17:40:42 +08:00
Xinye Tao 3724bfc510
disable windows build (#333)
* Empty commit

Signed-off-by: tabokie <xy.tao@outlook.com>

* Use windows 2019

Signed-off-by: tabokie <xy.tao@outlook.com>

* Disable windows build

Signed-off-by: tabokie <xy.tao@outlook.com>

---------

Signed-off-by: tabokie <xy.tao@outlook.com>
2023-09-11 16:05:28 +08:00
glorv ec6aa90163
enlarge append recycle log threshold (#331)
* increate the log recycle threshold

Signed-off-by: glorv <glorvs@163.com>

* fix prefill

Signed-off-by: glorv <glorvs@163.com>

* keepd default prefill threshold the same value as recycle threshold

Signed-off-by: glorv <glorvs@163.com>

* fix comment

Signed-off-by: glorv <glorvs@163.com>

* fix unit test

Signed-off-by: glorv <glorvs@163.com>

* revert change

Signed-off-by: glorv <glorvs@163.com>

---------

Signed-off-by: glorv <glorvs@163.com>
2023-09-07 13:51:11 +08:00
27 changed files with 499 additions and 176 deletions

View File

@ -1,6 +1,15 @@
name: Rust
on: [push, pull_request]
on:
push:
branches-ignore:
- dependabot/**
pull_request:
paths-ignore:
- "OWNERS"
- "OWNERS_ALIASES"
- "*.md"
env:
CARGO_TERM_COLOR: always
@ -10,7 +19,9 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest, macos-latest, windows-latest ]
# tests with sanitizer on `ubuntu-latest` is supported by
# self-defined `nihtly test` job, so we don't need to test it here.
os: [ macos-latest ]
steps:
- uses: actions/checkout@v2
with:
@ -19,7 +30,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-07-01
toolchain: nightly-2025-04-03
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
@ -34,19 +45,13 @@ jobs:
git diff --exit-code
- name: Clippy
run: make clippy
env:
EXTRA_CARGO_ARGS: '--fix'
- name: Run tests
run: make test
env:
RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose'
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: make test
env:
RUST_BACKTRACE: 1
RUSTFLAGS: '-Zsanitizer=address'
RUSTDOCFLAGS: '-Zsanitizer=address'
EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu'
stable:
runs-on: ${{ matrix.os }}
strategy:
@ -60,7 +65,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.66.0
toolchain: 1.85.0
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
@ -87,7 +92,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-07-01
toolchain: nightly-2025-04-03
override: true
components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1
@ -96,15 +101,16 @@ jobs:
- name: Install grcov
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
- name: Run tests
run: |
make test_matrix
run: make test_matrix
env:
RUSTFLAGS: '-Zinstrument-coverage'
RUSTFLAGS: '-Cinstrument-coverage'
LLVM_PROFILE_FILE: '%p-%m.profraw'
EXTRA_CARGO_ARGS: '--verbose'
- name: Run grcov
run: grcov `find . \( -name "*.profraw" \) -print` --binary-path target/debug/deps/ -s . -t lcov --branch --ignore-not-existing --ignore '../**' --ignore '/*' -o coverage.lcov
- name: Upload
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
file: coverage.lcov

View File

@ -2,6 +2,24 @@
## [Unreleased]
### New Features
* Add a new Prometheus metric `raft_engine_write_compression_ratio` to track compression ratio of write #358
## [0.4.2] - 2024-04-16
### Behavior Changes
* Periodically flush unsynced bytes when rewriting to avoid I/O jitters if flushing too many bytes impede the foreground writes. (#347)
* Errors will be returned if rewriting fails, instread of `panic` directly. (#343)
## [0.4.1] - 2023-09-14
### Behavior Changes
* When log recycling is enabled, Raft Engine will now retain 50% more log files to reduce the chance of running out.
* Reduce the scope of keys reserved for internal use.
## [0.4.0] - 2023-09-01
### Behavior Changes

View File

@ -1,9 +1,9 @@
[package]
name = "raft-engine"
version = "0.4.0"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.66.0"
rust-version = "1.85.0"
description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
repository = "https://github.com/tikv/raft-engine"
@ -42,9 +42,12 @@ hex = "0.4"
if_chain = "1.0"
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
lz4-sys = "1.9"
memmap2 = { version = "0.7", optional = true }
log = { version = "0.4", features = [
"max_level_trace",
"release_max_level_debug",
] }
lz4-sys = { version = "=1.9.5" }
memmap2 = { version = "0.9", optional = true }
nix = "0.26"
num-derive = "0.4"
num-traits = "0.2"
@ -55,38 +58,33 @@ protobuf = "2"
rayon = "1.5"
rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1"
serde = { version = "1.0", features = ["derive"] }
serde = { version = "=1.0.194", features = ["derive"] }
serde_repr = "0.1"
strum = { version = "0.25.0", features = ["derive"] }
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0"
[dev-dependencies]
criterion = "0.4"
ctor = "0.2"
env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
"protobuf-codec",
] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
"protobuf-codec",
] }
rand = "0.8"
rand_distr = "0.4"
tempfile = "3.6"
toml = "0.7"
toml = "0.8"
[features]
default = ["internals", "scripting"]
internals = []
nightly = [
"prometheus/nightly",
]
failpoints = [
"fail/failpoints",
]
scripting = [
"rhai",
]
swap = [
"nightly",
"memmap2",
]
nightly = ["prometheus/nightly"]
failpoints = ["fail/failpoints"]
scripting = ["rhai"]
swap = ["nightly", "memmap2"]
std_fs = []
nightly_group = ["nightly", "swap"]
@ -95,8 +93,8 @@ nightly_group = ["nightly", "swap"]
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged.
grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" }
# Lock the version of cc-rs to avoid build failure on MacOS, ref https://github.com/rust-lang/cc-rs/issues/984.
cc = { git = "https://github.com/rust-lang/cc-rs", tag = "1.0.98" }
[workspace]
members = ["stress", "ctl"]

80
OWNERS Normal file
View File

@ -0,0 +1,80 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- 5kbpers
- AndreMouche
- andylokandy
- breezewish
- brson
- bufferflies
- BusyJay
- cfzjywxk
- Connor1996
- coocood
- crazycs520
- disksing
- ekexium
- gengliqi
- glorv
- hicqu
- hunterlxt
- imtbkcat
- innerr
- iosmanthus
- jackysp
- kennytm
- Little-Wallace
- liuzix
- lonng
- LykxSassinator
- lysu
- marsishandsome
- MyonKeminta
- niedhui
- NingLin-P
- nrc
- overvenus
- pingyu
- skyzh
- SpadeA-Tang
- sticnarf
- sunxiaoguang
- tabokie
- TennyZhuang
- tonyxuqqi
- v01dstar
- yiwu-arbug
- you06
- youjiali1995
- YuJuncen
- zhangjinpeng87
- zhongzc
- zhouqiang-cl
- zyguan
reviewers:
- 3AceShowHand
- 3pointer
- CalvinNeo
- ethercflow
- fredchenbj
- Fullstop000
- gozssky
- haojinming
- hbisheng
- hhwyt
- HuSharp
- jayzhan211
- Jibbow
- JmPotato
- Leavrth
- lhy1024
- longfangsong
- lzmhhh123
- Mossaka
- MrCroxx
- nolouch
- rleungx
- Rustin170506
- tier-cap
- wjhuang2016
- wshwsh12
- Xuanwo

View File

@ -54,7 +54,7 @@ Put this in your Cargo.toml:
```rust
[dependencies]
raft-engine = "0.4.0"
raft-engine = "0.4"
```
Available Cargo features:

View File

@ -1,9 +1,9 @@
[package]
name = "raft-engine-ctl"
version = "0.4.0"
version = "0.4.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.61.0"
rust-version = "1.75.0"
description = "A control tool for Raft Engine"
repository = "https://github.com/tikv/raft-engine"
license = "Apache-2.0"
@ -11,4 +11,7 @@ license = "Apache-2.0"
[dependencies]
clap = { version = "3.1", features = ["derive", "cargo"] }
env_logger = "0.10"
raft-engine = { path = "..", version = "0.4.0", features = ["scripting", "internals"] }
raft-engine = { path = "..", version = "0.4.1", features = [
"scripting",
"internals",
] }

View File

@ -360,7 +360,6 @@ mod tests {
use super::*;
use protobuf::CodedOutputStream;
use std::io::ErrorKind;
use std::{f32, f64, i16, i32, i64, u16, u32, u64};
const U16_TESTS: &[u16] = &[
i16::MIN as u16,

View File

@ -116,7 +116,7 @@ pub struct Config {
pub prefill_for_recycle: bool,
/// Maximum capacity for preparing log files for recycling when start.
/// If not `None`, its size is equal to `purge-threshold`.
/// If `None`, its size is equal to `purge-threshold`*1.5.
/// Only available for `prefill-for-recycle` is true.
///
/// Default: None
@ -219,10 +219,10 @@ impl Config {
}
if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
// (1) At most u32::MAX so that the file number can be capped into an u32
// without colliding. (2) Add some more file as an additional buffer to
// avoid jitters.
// without colliding. (2) Increase the threshold by 50% to add some more file
// as an additional buffer to avoid jitters.
std::cmp::min(
(self.purge_threshold.0 / self.target_file_size.0) as usize + 2,
(self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2,
u32::MAX as usize,
)
} else {
@ -241,7 +241,7 @@ impl Config {
if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
// Keep same with the maximum setting of `recycle_capacity`.
std::cmp::min(
(prefill_limit / self.target_file_size.0) as usize + 2,
(prefill_limit / self.target_file_size.0) as usize * 3 / 2,
u32::MAX as usize,
)
} else {

View File

@ -142,7 +142,7 @@ where
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(
let (len, compression_ratio) = log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level,
)?;
@ -172,7 +172,7 @@ where
}
perf_context!(log_write_duration).observe_since(now);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// As per trait protocol, sync error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
@ -225,6 +225,7 @@ where
now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM.observe(compression_ratio);
Ok(len)
}
@ -337,7 +338,27 @@ where
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
vec.push({
match read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i) {
Ok(entry) => entry,
Err(e) => {
// The index is not found in the file, it means the entry is already
// stale by `compact` or `rewrite`. Retry to read the entry from the
// memtable.
let immutable = memtable.read();
// Ensure that the corresponding memtable is locked with a read lock
// before completing the fetching of entries
// from the raft logs. This prevents the
// scenario where the index could become stale while
// being concurrently updated by the `rewrite` operation.
if let Some(idx) = immutable.get_entry(i.index) {
read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &idx)?
} else {
return Err(e);
}
}
}
});
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
return Ok(ents_idx.len());
@ -634,9 +655,11 @@ pub(crate) mod tests {
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> {
@ -1928,8 +1951,6 @@ pub(crate) mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn bench_engine_fetch_entries(b: &mut test::Bencher) {
use rand::{thread_rng, Rng};
let dir = tempfile::Builder::new()
.prefix("bench_engine_fetch_entries")
.tempdir()
@ -2162,6 +2183,7 @@ pub(crate) mod tests {
prefill_for_recycle: true,
..Default::default()
};
let recycle_capacity = cfg.recycle_capacity() as u64;
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
@ -2197,11 +2219,11 @@ pub(crate) mod tests {
assert_eq!(reserved_start_2, reserved_start_3);
// Reuse all of reserved files.
for rid in 1..=50 {
for rid in 1..=recycle_capacity {
engine.append(rid, 1, 11, Some(&entry_data));
}
assert!(fs.reserved_metadata.lock().unwrap().is_empty());
for rid in 1..=50 {
for rid in 1..=recycle_capacity {
engine.clean(rid);
}
engine.purge_manager.must_rewrite_append_queue(None, None);
@ -2585,6 +2607,53 @@ pub(crate) mod tests {
assert!(data.is_empty(), "data loss {:?}", data);
}
#[test]
fn test_fetch_with_concurrently_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_concurrently_rewrite")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(2048),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap());
let entry_data = vec![b'x'; 128];
// Set up a concurrent write with purge, and fetch.
let mut vec: Vec<Entry> = Vec::new();
let fetch_engine = engine.clone();
let flag = Arc::new(AtomicBool::new(false));
let start_flag = flag.clone();
let th = std::thread::spawn(move || {
while !start_flag.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
let region_id = thread_rng().gen_range(1..=10);
// Should not return file seqno out of range error.
let _ = fetch_engine
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
.map_err(|e| {
assert!(!format!("{e}").contains("file seqno out of"));
});
vec.clear();
}
});
for i in 0..10 {
for rid in 1..=10 {
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
}
flag.store(true, Ordering::Release);
for rid in 1..=10 {
engine.clean(rid);
}
engine.purge_expired_files().unwrap();
}
th.join().unwrap();
}
#[test]
fn test_internal_key_filter() {
let dir = tempfile::Builder::new()
@ -2704,6 +2773,7 @@ pub(crate) mod tests {
purge_threshold: ReadableSize(40),
..cfg.clone()
};
let recycle_capacity = cfg_2.recycle_capacity() as u64;
let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap();
assert!(number_of_files(spill_dir.path()) > 0);
for rid in 1..=10 {
@ -2718,7 +2788,7 @@ pub(crate) mod tests {
);
assert!(file_count > engine.file_count(None));
// Append data, recycled files are reused.
for rid in 1..=30 {
for rid in 1..=recycle_capacity - 10 {
engine.append(rid, 20, 30, Some(&entry_data));
}
// No new file is created.

2
src/env/default.rs vendored
View File

@ -62,7 +62,7 @@ impl Read for LogFile {
impl Seek for LogFile {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
fail_point!("log_file::seek::err", |_| {
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
Err(std::io::Error::other("fp"))
});
match pos {
SeekFrom::Start(offset) => self.offset = offset as usize,

View File

@ -83,7 +83,7 @@ impl LogFd {
while readed < buf.len() {
let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(Errno::EINTR) => continue,
Err(e) => return Err(from_nix_error(e, "pread")),
};
// EOF
@ -106,7 +106,7 @@ impl LogFd {
while written < content.len() {
let bytes = match pwrite(self.0, &content[written..], offset as i64) {
Ok(bytes) => bytes,
Err(e) if e == Errno::EINTR => continue,
Err(Errno::EINTR) => continue,
Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
Err(e) => return Err(from_nix_error(e, "pwrite")),
};

View File

@ -43,6 +43,8 @@ pub struct LogFileWriter<F: FileSystem> {
capacity: usize,
}
// All APIs provided by `LogFileWriter` are fail-safe, i.e. caller can continue
// using the same "writer" even if the previous operation failed.
impl<F: FileSystem> LogFileWriter<F> {
fn open(
handle: Arc<F::Handle>,
@ -67,7 +69,7 @@ impl<F: FileSystem> LogFileWriter<F> {
}
fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
self.writer.seek(SeekFrom::Start(0))?;
self.writer.rewind()?;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
format.encode(&mut buf).unwrap();
@ -105,13 +107,12 @@ impl<F: FileSystem> LogFileWriter<F> {
}
self.capacity += alloc;
}
self.writer.write_all(buf).map_err(|e| {
self.writer.write_all(buf).inspect_err(|_| {
self.writer
.seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e);
});
e
})?;
self.written = new_written;
Ok(())
@ -119,7 +120,8 @@ impl<F: FileSystem> LogFileWriter<F> {
pub fn sync(&mut self) -> IoResult<()> {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
// Panic if sync fails, in case of data loss.
self.handle.sync().unwrap();
Ok(())
}

View File

@ -218,7 +218,7 @@ pub mod debug {
let log_file_format = LogFileContext::new(file_id, Version::default());
for batch in bs.iter_mut() {
let offset = writer.offset() as u64;
let len = batch
let (len, _) = batch
.finish_populate(1 /* compression_threshold */, None)
.unwrap();
batch.prepare_write(&log_file_format).unwrap();

View File

@ -7,6 +7,7 @@ use std::sync::Arc;
use crossbeam::utils::CachePadded;
use fail::fail_point;
use fs2::FileExt;
use log::error;
use parking_lot::{Mutex, MutexGuard, RwLock};
@ -177,8 +178,13 @@ impl<F: FileSystem> SinglePipe<F> {
// Skip syncing directory in Windows. Refer to badger's discussion for more
// detail: https://github.com/dgraph-io/badger/issues/699
//
// Panic if sync calls fail, keep consistent with the behavior of
// `LogFileWriter::sync()`.
#[cfg(not(windows))]
std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?;
std::fs::File::open(PathBuf::from(&self.paths[path_id]))
.and_then(|d| d.sync_all())
.unwrap();
Ok(())
}
@ -321,12 +327,7 @@ impl<F: FileSystem> SinglePipe<F> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
if writable_file.writer.offset() >= self.target_file_size {
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq,
);
}
self.rotate_imp(&mut writable_file)?;
}
let seq = writable_file.seq;
@ -359,9 +360,7 @@ impl<F: FileSystem> SinglePipe<F> {
}
let start_offset = writer.offset();
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
if let Err(te) = writer.truncate() {
panic!("error when truncate {seq} after error: {e}, get: {}", te);
}
writer.truncate()?;
if is_no_space_err(&e) {
// TODO: There exists several corner cases should be tackled if
// `bytes.len()` > `target_file_size`. For example,
@ -372,12 +371,7 @@ impl<F: FileSystem> SinglePipe<F> {
// - [3] Both main-dir and spill-dir have several recycled logs.
// But as `bytes.len()` is always smaller than `target_file_size` in common
// cases, this issue will be ignored temprorarily.
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq
);
}
self.rotate_imp(&mut writable_file)?;
// If there still exists free space for this record, rotate the file
// and return a special TryAgain Err (for retry) to the caller.
return Err(Error::TryAgain(format!(
@ -403,15 +397,9 @@ impl<F: FileSystem> SinglePipe<F> {
fn sync(&self) -> Result<()> {
let mut writable_file = self.writable_file.lock();
let seq = writable_file.seq;
let writer = &mut writable_file.writer;
{
let _t = StopWatch::new(perf_context!(log_sync_duration));
if let Err(e) = writer.sync() {
panic!("error when sync [{:?}:{seq}]: {e}", self.queue);
}
}
let _t = StopWatch::new(perf_context!(log_sync_duration));
writer.sync().map_err(Error::Io)?;
Ok(())
}
@ -477,7 +465,17 @@ impl<F: FileSystem> SinglePipe<F> {
pub struct DualPipes<F: FileSystem> {
pipes: [SinglePipe<F>; 2],
_dir_locks: Vec<StdFile>,
dir_locks: Vec<StdFile>,
}
impl<F: FileSystem> Drop for DualPipes<F> {
fn drop(&mut self) {
for lock in &self.dir_locks {
if let Err(e) = FileExt::unlock(lock) {
error!("error while unlocking directory: {e}");
}
}
}
}
impl<F: FileSystem> DualPipes<F> {
@ -494,7 +492,7 @@ impl<F: FileSystem> DualPipes<F> {
Ok(Self {
pipes: [appender, rewriter],
_dir_locks: dir_locks,
dir_locks,
})
}
@ -726,7 +724,7 @@ mod tests {
// Retire files.
assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first);
// Try to read recycled file.
for (_, handle) in handles.into_iter().enumerate() {
for handle in handles.into_iter() {
assert!(pipe_log.read_bytes(handle).is_err());
}
// Try to reuse.
@ -740,4 +738,21 @@ mod tests {
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
}
}
#[test]
fn test_release_on_drop() {
let dir = Builder::new()
.prefix("test_release_on_drop")
.tempdir()
.unwrap();
let path = dir.path().to_str().unwrap();
let cfg = Config {
dir: path.to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let pipe_log = new_test_pipes(&cfg).unwrap();
drop(pipe_log);
assert!(new_test_pipes(&cfg).is_ok());
}
}

View File

@ -379,7 +379,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
file_system: Arc<F>,
recovery_cfg: RecoveryConfig,
files: &mut Vec<File<F>>,
files: &mut [File<F>],
machine_factory: &FA,
) -> Result<M> {
if recovery_cfg.concurrency == 0 || files.is_empty() {
@ -390,7 +390,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
let recovery_mode = recovery_cfg.mode;
let recovery_read_block_size = recovery_cfg.read_block_size as usize;
let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1);
let max_chunk_size = files.len().div_ceil(concurrency);
let chunks = files.par_chunks_mut(max_chunk_size);
let chunk_count = chunks.len();
debug_assert!(chunk_count <= concurrency);

View File

@ -167,6 +167,7 @@ impl GlobalStats {
pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__";
#[inline]
#[cfg(test)]
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
assert!(!k.is_empty());
let mut v = INTERNAL_KEY_PREFIX.to_vec();
@ -174,12 +175,23 @@ pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
v
}
#[cfg(not(test))]
pub(crate) fn make_internal_key(k: &[u8]) -> Vec<u8> {
use log_batch::ATOMIC_GROUP_KEY;
assert!(k == ATOMIC_GROUP_KEY);
let mut v = INTERNAL_KEY_PREFIX.to_vec();
v.extend_from_slice(k);
v
}
/// We ensure internal keys are not visible to the user by:
/// (1) Writing internal keys will be rejected by `LogBatch::put`.
/// (2) Internal keys are filtered out during apply and replay of both queues.
/// This also makes sure future internal keys under the prefix won't become
/// visible after downgrading.
#[inline]
#[cfg(test)]
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
if let Some(ext) = ext {
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
@ -191,6 +203,20 @@ pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
}
}
#[inline]
#[cfg(not(test))]
pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool {
use log_batch::ATOMIC_GROUP_KEY;
if let Some(ext) = ext {
s.len() == INTERNAL_KEY_PREFIX.len() + ext.len()
&& s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX
&& s[INTERNAL_KEY_PREFIX.len()..] == *ext
} else {
is_internal_key(s, Some(ATOMIC_GROUP_KEY))
}
}
#[cfg(test)]
mod tests {
use crate::log_batch::MessageExt;

View File

@ -2,9 +2,9 @@
use std::fmt::Debug;
use std::io::BufRead;
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::{mem, u64};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use log::error;
@ -53,7 +53,7 @@ pub enum CompressionType {
impl CompressionType {
pub fn from_u8(t: u8) -> Result<Self> {
if t <= CompressionType::Lz4 as u8 {
Ok(unsafe { mem::transmute(t) })
Ok(unsafe { mem::transmute::<u8, Self>(t) })
} else {
Err(Error::Corruption(format!(
"Unrecognized compression type: {t}"
@ -168,7 +168,7 @@ pub enum OpType {
impl OpType {
pub fn from_u8(t: u8) -> Result<Self> {
if t <= OpType::Del as u8 {
Ok(unsafe { mem::transmute(t) })
Ok(unsafe { mem::transmute::<u8, Self>(t) })
} else {
Err(Error::Corruption(format!("Unrecognized op type: {t}")))
}
@ -767,28 +767,32 @@ impl LogBatch {
&mut self,
compression_threshold: usize,
compression_level: Option<usize>,
) -> Result<usize> {
) -> Result<(usize, f64)> {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
self.buf_state = BufState::Encoded(self.buf.len(), 0);
return Ok(0);
return Ok((0, 0.0));
}
self.buf_state = BufState::Incomplete;
// entries
let (header_offset, compression_type) = if compression_threshold > 0
let (header_offset, compression_type, compression_ratio) = if compression_threshold > 0
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
{
let buf_len = self.buf.len();
lz4::append_compress_block(
let compression_ratio = lz4::append_compress_block(
&mut self.buf,
LOG_BATCH_HEADER_LEN,
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
)?;
(buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
(
buf_len - LOG_BATCH_HEADER_LEN,
CompressionType::Lz4,
compression_ratio,
)
} else {
(0, CompressionType::None)
(0, CompressionType::None, 0.0)
};
// checksum
@ -830,7 +834,7 @@ impl LogBatch {
}
self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
Ok(self.buf.len() - header_offset)
Ok((self.buf.len() - header_offset, compression_ratio))
}
/// Make preparations for the write of `LogBatch`.
@ -994,7 +998,7 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<
lazy_static! {
static ref ATOMIC_GROUP_ID: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
}
const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01];
// <status>
const ATOMIC_GROUP_VALUE_LEN: usize = 1;
@ -1328,7 +1332,7 @@ mod tests {
offset: 0,
};
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(usize::from(compress), None).unwrap();
let (len, _) = batch.finish_populate(usize::from(compress), None).unwrap();
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let mut batch_handle = mocked_file_block_handle;
@ -1493,7 +1497,7 @@ mod tests {
batch1.merge(&mut batch2).unwrap();
assert!(batch2.is_empty());
let len = batch1.finish_populate(0, None).unwrap();
let (len, _) = batch1.finish_populate(0, None).unwrap();
batch1.prepare_write(&file_context).unwrap();
let encoded = batch1.encoded_bytes();
assert_eq!(len, encoded.len());
@ -1549,7 +1553,8 @@ mod tests {
offset: 0,
};
let buf_len = batch.buf.len();
let len = batch.finish_populate(1, None).unwrap();
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio == 0.0);
assert!(len == 0);
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
@ -1570,7 +1575,7 @@ mod tests {
));
assert!(matches!(
batch
.put_message(0, crate::make_internal_key(&[1]), &Entry::new())
.put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new())
.unwrap_err(),
Error::InvalidArgument(_)
));
@ -1671,7 +1676,8 @@ mod tests {
},
];
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(1, None).unwrap();
let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio > 0.0);
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let checksum = batch.item_batch.checksum;

View File

@ -228,7 +228,7 @@ impl<A: AllocatorTrait> MemTable<A> {
}
if let Some(g) = rhs.atomic_group.take() {
assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
assert!(self.atomic_group.is_none_or(|(_, end)| end <= g.0));
self.atomic_group = Some(g);
}
@ -545,7 +545,7 @@ impl<A: AllocatorTrait> MemTable<A> {
}
pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
assert!(self.atomic_group.is_none_or(|(_, b)| b <= start));
self.atomic_group = Some((start, end));
}
@ -763,7 +763,7 @@ impl<A: AllocatorTrait> MemTable<A> {
debug_assert!(count > 0);
self.entry_indexes
.get(count - 1)
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq)
.is_some_and(|ei| ei.entries.unwrap().id.seq <= gate.seq)
}
/// Returns the region ID.

View File

@ -122,7 +122,7 @@ pub trait TimeMetric {
}
}
impl<'a> TimeMetric for &'a Histogram {
impl TimeMetric for &Histogram {
fn observe(&self, duration: Duration) {
Histogram::observe(self, duration.as_secs_f64());
}
@ -201,6 +201,12 @@ lazy_static! {
exponential_buckets(256.0, 1.8, 22).unwrap()
)
.unwrap();
pub static ref ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_write_compression_ratio",
"Bucketed histogram of Raft Engine write compression ratio",
exponential_buckets(0.0005, 1.8, 16).unwrap()
)
.unwrap();
pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_allocate_log_duration_seconds",
"Bucketed histogram of Raft Engine allocate log duration",

View File

@ -35,6 +35,10 @@ fn max_batch_bytes() -> usize {
128 * 1024
}
fn max_forcely_sync_bytes() -> usize {
max_batch_bytes() * 4
}
pub struct PurgeManager<P>
where
P: PipeLog,
@ -354,6 +358,7 @@ where
let mut current_entry_indexes = Vec::new();
let mut current_entries = Vec::new();
let mut current_size = 0;
let mut unsynced_size = 0;
// Split the entries into smaller chunks, so that we don't OOM, and the
// compression overhead is not too high.
let mut entry_indexes = entry_indexes.into_iter().peekable();
@ -362,6 +367,7 @@ where
current_size += entry.len();
current_entries.push(entry);
current_entry_indexes.push(ei);
unsynced_size += current_size;
// If this is the last entry, we handle them outside the loop.
if entry_indexes.peek().is_some()
&& current_size + previous_size > max_batch_bytes()
@ -396,7 +402,15 @@ where
)?;
current_size = 0;
previous_size = 0;
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
let sync = if unsynced_size >= max_forcely_sync_bytes() {
// Avoiding too many unsynced size can make the later `fdatasync` in
// the append progress blocked for too long.
unsynced_size = 0;
true
} else {
false
};
let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
if needs_atomicity && atomic_group_start.is_none() {
atomic_group_start = Some(handle.id.seq);
}
@ -439,7 +453,7 @@ where
)?;
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
if sync {
self.pipe_log.sync(LogQueue::Rewrite)?
self.pipe_log.sync(LogQueue::Rewrite)?;
}
log_batch.finish_write(file_handle);
self.memtables.apply_rewrite_writes(

View File

@ -120,9 +120,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
return swap_r;
}
}
self.0.mem_allocator.allocate(layout).map_err(|e| {
self.0.mem_allocator.allocate(layout).inspect_err(|_| {
self.0.mem_usage.fetch_sub(layout.size(), Ordering::Relaxed);
e
})
}
@ -191,9 +190,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
self.0
.mem_allocator
.grow(ptr, old_layout, new_layout)
.map_err(|e| {
.inspect_err(|_| {
self.0.mem_usage.fetch_sub(diff, Ordering::Relaxed);
e
})
}
}
@ -253,11 +251,10 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
self.0
.mem_allocator
.shrink(ptr, old_layout, new_layout)
.map(|p| {
.inspect(|_| {
self.0
.mem_usage
.fetch_sub(old_layout.size() - new_layout.size(), Ordering::Relaxed);
p
})
}
}
@ -288,6 +285,7 @@ impl Page {
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| error!("Failed to open swap file: {e}"))
.ok()?;

View File

@ -63,7 +63,7 @@ where
}
pub struct PanicGuard {
prev_hook: *mut (dyn Fn(&panic::PanicInfo<'_>) + Sync + Send + 'static),
prev_hook: *mut (dyn Fn(&panic::PanicHookInfo<'_>) + Sync + Send + 'static),
}
struct PointerHolder<T: ?Sized>(*mut T);

View File

@ -148,7 +148,7 @@ impl<'de> Deserialize<'de> for ReadableSize {
{
struct SizeVisitor;
impl<'de> Visitor<'de> for SizeVisitor {
impl Visitor<'_> for SizeVisitor {
type Value = ReadableSize;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -221,14 +221,15 @@ pub fn unhash_u64(mut i: u64) -> u64 {
pub mod lz4 {
use crate::{Error, Result};
use std::{i32, ptr};
use std::ptr;
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
/// Compress content in `buf[skip..]`, and append output to `buf`.
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> {
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<f64> {
let buf_len = buf.len();
let content_len = buf_len - skip;
let mut compression_ratio = 0.0;
if content_len > 0 {
if content_len > i32::MAX as usize {
return Err(Error::InvalidArgument(format!(
@ -256,10 +257,11 @@ pub mod lz4 {
if compressed == 0 {
return Err(Error::Other(box_err!("Compression failed")));
}
compression_ratio = compressed as f64 / content_len as f64;
buf.set_len(buf_len + 4 + compressed as usize);
}
}
Ok(())
Ok(compression_ratio)
}
pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
@ -301,8 +303,12 @@ pub mod lz4 {
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
for mut vec in vecs.into_iter() {
let uncompressed_len = vec.len();
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
let compression_ratio =
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
if uncompressed_len == 0 {
assert_eq!(compression_ratio, 0.0);
}
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
assert_eq!(res, vec[..uncompressed_len].to_owned());
}
@ -324,7 +330,7 @@ pub trait Factory<Target>: Send + Sync {
/// ```
#[inline]
pub fn round_up(offset: usize, alignment: usize) -> usize {
(offset + alignment - 1) / alignment * alignment
offset.div_ceil(alignment) * alignment
}
/// Returns an aligned `offset`.

View File

@ -17,7 +17,6 @@ use crate::PerfContext;
type Ptr<T> = Option<NonNull<T>>;
///
pub struct Writer<P, O> {
next: Cell<Ptr<Writer<P, O>>>,
payload: *mut P,
@ -95,7 +94,7 @@ impl<'a, 'b, P, O> WriteGroup<'a, 'b, P, O> {
}
}
impl<'a, 'b, P, O> Drop for WriteGroup<'a, 'b, P, O> {
impl<P, O> Drop for WriteGroup<'_, '_, P, O> {
fn drop(&mut self) {
self.ref_barrier.leader_exit();
}
@ -108,7 +107,7 @@ pub struct WriterIter<'a, 'b, 'c, P: 'c, O: 'c> {
marker: PhantomData<&'a WriteGroup<'b, 'c, P, O>>,
}
impl<'a, 'b, 'c, P, O> Iterator for WriterIter<'a, 'b, 'c, P, O> {
impl<'a, P, O> Iterator for WriterIter<'a, '_, '_, P, O> {
type Item = &'a mut Writer<P, O>;
fn next(&mut self) -> Option<Self::Item> {

View File

@ -1,6 +1,6 @@
[package]
name = "stress"
version = "0.4.0"
version = "0.4.2"
authors = ["The TiKV Authors"]
edition = "2018"

View File

@ -121,8 +121,8 @@ fn test_pipe_log_listeners() {
append(
&engine,
region_id,
(i as u64 + 1) / 2,
(i as u64 + 1) / 2 + 1,
(i as u64).div_ceil(2),
(i as u64).div_ceil(2) + 1,
Some(&data),
);
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
@ -141,8 +141,8 @@ fn test_pipe_log_listeners() {
append(
&engine,
region_id,
(i as u64 + 1) / 2,
(i as u64 + 1) / 2 + 1,
(i as u64).div_ceil(2),
(i as u64).div_ceil(2) + 1,
Some(&data),
);
assert_eq!(hook.0[&LogQueue::Append].appends(), i);
@ -178,8 +178,8 @@ fn test_pipe_log_listeners() {
append(
&engine,
region_id,
(i as u64 + 1) / 2,
(i as u64 + 1) / 2 + 1,
(i as u64).div_ceil(2),
(i as u64).div_ceil(2) + 1,
Some(&data),
);
assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3);
@ -631,6 +631,52 @@ fn test_concurrent_write_perf_context() {
}
}
#[test]
fn test_fetch_with_updated_entry_index() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_updated_entry_index")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 1024];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine = Engine::open(cfg).unwrap();
for i in 0..10 {
append(
&engine,
i + 1,
1 + i * 10,
1 + i * 10 + 10,
Some(&entry_data),
);
}
assert_eq!(
10,
engine
.fetch_entries_to::<MessageExtTyped>(1, 1, 11, None, &mut vec![])
.unwrap()
);
// Mock: the file is broken or abnormal to read
{
let _f = FailGuard::new("log_file::read::err", "return");
engine
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
.unwrap_err();
}
// Mock: one entry have been updated by background rewrite thread.
{
let _f = FailGuard::new("log_file::read::err", "8*off->return->off");
assert_eq!(
10,
engine
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
.unwrap()
);
}
}
// FIXME: this test no longer works because recovery cannot reliably detect
// overwrite anomaly.
// See https://github.com/tikv/raft-engine/issues/250

View File

@ -124,8 +124,7 @@ fn test_file_write_error() {
assert_eq!(engine.last_index(2).unwrap(), 1);
}
#[test]
fn test_file_rotate_error() {
fn test_file_rotate_error(restart_after_failure: bool) {
let dir = tempfile::Builder::new()
.prefix("test_file_rotate_error")
.tempdir()
@ -138,7 +137,7 @@ fn test_file_rotate_error() {
let fs = Arc::new(ObfuscatedFileSystem::default());
let entry = vec![b'x'; 1024];
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let mut engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
engine
.write(&mut generate_batch(1, 1, 2, Some(&entry)), false)
.unwrap();
@ -160,27 +159,46 @@ fn test_file_rotate_error() {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
}
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
{
// Fail to create new log file.
let _f = FailGuard::new("default_fs::create::err", "return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
}
let num_files_before = std::fs::read_dir(&dir).unwrap().count();
{
// Fail to write header of new log file.
let _f = FailGuard::new("log_file::write::err", "1*off->return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
// The new log file is added during recovery phase of restart.
assert_eq!(engine.file_span(LogQueue::Append).1, 2);
} else {
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
{
// Although the header is not written, the file is still created.
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
if !restart_after_failure {
// If the engine restarted, the write does not require sync will succeed.
// Fail to sync new log file. The old log file is already sync-ed at this point.
let _f = FailGuard::new("log_fd::sync::err", "return");
assert!(catch_unwind_silent(|| {
@ -190,18 +208,39 @@ fn test_file_rotate_error() {
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
// Only one log file should be created after all the incidents.
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
// We can continue writing after the incidents.
engine
.write(&mut generate_batch(2, 1, 2, Some(&entry)), true)
.unwrap();
drop(engine);
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg, fs).unwrap();
}
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
assert_eq!(engine.first_index(1).unwrap(), 1);
assert_eq!(engine.last_index(1).unwrap(), 4);
assert_eq!(engine.first_index(2).unwrap(), 1);
assert_eq!(engine.last_index(2).unwrap(), 1);
}
#[test]
fn test_file_rotate_error_without_restart() {
test_file_rotate_error(false);
}
#[test]
fn test_file_rotate_error_with_restart() {
test_file_rotate_error(true);
}
#[test]
fn test_concurrent_write_error() {
let dir = tempfile::Builder::new()
@ -262,10 +301,8 @@ fn test_concurrent_write_error() {
let _f2 = FailGuard::new("log_file::truncate::err", "return");
let entry_clone = entry.clone();
ctx.write_ext(move |e| {
catch_unwind_silent(|| {
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
})
.unwrap_err();
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
.unwrap_err();
});
// We don't test followers, their panics are hard to catch.
ctx.join();
@ -527,7 +564,7 @@ fn test_no_space_write_error() {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned());
{
// Case 1: `Write` is abnormal for no space left, Engine should panic at
// Case 1: `Write` is abnormal for no space left, Engine should fail at
// `rotate`.
let cfg_err = Config {
target_file_size: ReadableSize(1),
@ -535,12 +572,9 @@ fn test_no_space_write_error() {
};
let engine = Engine::open(cfg_err).unwrap();
let _f = FailGuard::new("log_fd::write::no_space_err", "return");
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine
@ -554,12 +588,9 @@ fn test_no_space_write_error() {
let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off");
let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return");
// The first write should fail, because all dirs run out of space for writing.
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine