Compare commits

...

16 Commits

Author SHA1 Message Date
lucasliang d53b2b9505
Add missing uts for the optimization on `fetch_entries_to`. (#385)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2025-07-07 03:23:09 +00:00
lucasliang 03f77d90c0
Optimize `fetch_entries_to` function to avoid stucking `write` too long. (#382)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2025-07-04 08:26:18 +00:00
Yang Zhang 392f5e66f8
Unlock the LOCK files on drop (#379)
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
2025-05-20 07:17:55 +00:00
Ti Chi Robot 7e12385c31
OWNERS: Auto Sync OWNERS files from community membership (#378)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2025-02-14 06:46:09 +00:00
Ti Chi Robot 6bb58c379b
OWNERS: Auto Sync OWNERS files from community membership (#369)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-12-04 06:58:50 +00:00
lucasliang de1ec93752
Fix `panics` on reading entries by stale index. (#370)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-11-06 02:50:58 +00:00
lucasliang e1c5dd80a6
CI: update CI workflows. (#372)
Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-11-05 13:21:43 +00:00
Ti Chi Robot 209e00d8c0
OWNERS: Auto Sync OWNERS files from community membership (#368)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-10-14 17:14:05 +00:00
Ti Chi Robot 0e40c53566
OWNERS: Auto Sync OWNERS files from community membership (#367)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-09-09 12:12:15 +00:00
Ti Chi Robot 897e0ebf67
OWNERS: Auto Sync OWNERS files from community membership (#366)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-09-09 03:21:51 +00:00
Ti Chi Robot c7828cb3e7
OWNERS: Auto Sync OWNERS files from community membership (#364)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-19 13:01:42 +00:00
Ti Chi Robot 3ad316a3c4
OWNERS: Auto Sync OWNERS files from community membership (#363)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-01 10:53:51 +00:00
Ti Chi Robot 8ef3b42057
OWNERS: Auto Sync OWNERS files from community membership (#362)
Signed-off-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
2024-08-01 10:19:21 +00:00
wuhuizuo c1f81d4d06
ci: add prow OWNERS file (#361)
* ci: add prow OWNERS file

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

* ci: update github action workflows [skip ci]

save runs.

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

* Update OWNERS

Signed-off-by: wuhuizuo <wuhuizuo@126.com>

---------

Signed-off-by: wuhuizuo <wuhuizuo@126.com>
2024-07-30 13:39:59 +08:00
dependabot[bot] cf5d1b9144
Update strum requirement from 0.25.0 to 0.26.2 (#356)
---
updated-dependencies:
- dependency-name: strum
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: lucasliang <nkcs_lykx@hotmail.com>
2024-06-05 15:15:15 +08:00
lucasliang cd1533d79c
engine: supply the metric of the compression ratio on write. (#358)
* engine: add extra metrics on the compression ratio of write.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Calibrate the version of grcov.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix format errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Calibrate rustc version.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix clippy errors.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Add change logs.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Format ci actions.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Fix ci errors.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

* Polish rust.yml.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

---------

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
2024-04-23 15:28:49 +08:00
20 changed files with 333 additions and 90 deletions

View File

@ -1,6 +1,15 @@
name: Rust name: Rust
on: [push, pull_request] on:
push:
branches-ignore:
- dependabot/**
pull_request:
paths-ignore:
- "OWNERS"
- "OWNERS_ALIASES"
- "*.md"
env: env:
CARGO_TERM_COLOR: always CARGO_TERM_COLOR: always
@ -10,7 +19,9 @@ jobs:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
matrix: matrix:
os: [ ubuntu-latest, macos-latest ] # tests with sanitizer on `ubuntu-latest` is supported by
# self-defined `nihtly test` job, so we don't need to test it here.
os: [ macos-latest ]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
@ -19,7 +30,7 @@ jobs:
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:
profile: minimal profile: minimal
toolchain: nightly-2023-12-31 toolchain: nightly-2025-04-03
override: true override: true
components: rustfmt, clippy, rust-src components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1 - uses: Swatinem/rust-cache@v1
@ -41,14 +52,6 @@ jobs:
env: env:
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
EXTRA_CARGO_ARGS: '--verbose' EXTRA_CARGO_ARGS: '--verbose'
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: make test
env:
RUST_BACKTRACE: 1
RUSTFLAGS: '-Zsanitizer=address'
RUSTDOCFLAGS: '-Zsanitizer=address'
EXTRA_CARGO_ARGS: '--verbose -Zbuild-std --target x86_64-unknown-linux-gnu'
stable: stable:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
@ -62,7 +65,7 @@ jobs:
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:
profile: minimal profile: minimal
toolchain: 1.75.0 toolchain: 1.85.0
override: true override: true
components: rustfmt, clippy, rust-src components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1 - uses: Swatinem/rust-cache@v1
@ -89,7 +92,7 @@ jobs:
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:
profile: minimal profile: minimal
toolchain: nightly-2023-12-31 toolchain: nightly-2025-04-03
override: true override: true
components: llvm-tools-preview components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1 - uses: Swatinem/rust-cache@v1
@ -98,8 +101,7 @@ jobs:
- name: Install grcov - name: Install grcov
run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi
- name: Run tests - name: Run tests
run: | run: make test_matrix
make test_matrix
env: env:
RUSTFLAGS: '-Cinstrument-coverage' RUSTFLAGS: '-Cinstrument-coverage'
LLVM_PROFILE_FILE: '%p-%m.profraw' LLVM_PROFILE_FILE: '%p-%m.profraw'

View File

@ -2,6 +2,10 @@
## [Unreleased] ## [Unreleased]
### New Features
* Add a new Prometheus metric `raft_engine_write_compression_ratio` to track compression ratio of write #358
## [0.4.2] - 2024-04-16 ## [0.4.2] - 2024-04-16
### Behavior Changes ### Behavior Changes

View File

@ -3,7 +3,7 @@ name = "raft-engine"
version = "0.4.2" version = "0.4.2"
authors = ["The TiKV Project Developers"] authors = ["The TiKV Project Developers"]
edition = "2018" edition = "2018"
rust-version = "1.75.0" rust-version = "1.85.0"
description = "A persistent storage engine for Multi-Raft logs" description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md" readme = "README.md"
repository = "https://github.com/tikv/raft-engine" repository = "https://github.com/tikv/raft-engine"
@ -42,8 +42,11 @@ hex = "0.4"
if_chain = "1.0" if_chain = "1.0"
lazy_static = "1.3" lazy_static = "1.3"
libc = "0.2" libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } log = { version = "0.4", features = [
lz4-sys = "1.9" "max_level_trace",
"release_max_level_debug",
] }
lz4-sys = { version = "=1.9.5" }
memmap2 = { version = "0.9", optional = true } memmap2 = { version = "0.9", optional = true }
nix = "0.26" nix = "0.26"
num-derive = "0.4" num-derive = "0.4"
@ -55,17 +58,21 @@ protobuf = "2"
rayon = "1.5" rayon = "1.5"
rhai = { version = "1.7", features = ["sync"], optional = true } rhai = { version = "1.7", features = ["sync"], optional = true }
scopeguard = "1.1" scopeguard = "1.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "=1.0.194", features = ["derive"] }
serde_repr = "0.1" serde_repr = "0.1"
strum = { version = "0.25.0", features = ["derive"] } strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0" thiserror = "1.0"
[dev-dependencies] [dev-dependencies]
criterion = "0.4" criterion = "0.4"
ctor = "0.2" ctor = "0.2"
env_logger = "0.10" env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = [
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } "protobuf-codec",
] }
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = [
"protobuf-codec",
] }
rand = "0.8" rand = "0.8"
rand_distr = "0.4" rand_distr = "0.4"
tempfile = "3.6" tempfile = "3.6"
@ -74,19 +81,10 @@ toml = "0.8"
[features] [features]
default = ["internals", "scripting"] default = ["internals", "scripting"]
internals = [] internals = []
nightly = [ nightly = ["prometheus/nightly"]
"prometheus/nightly", failpoints = ["fail/failpoints"]
] scripting = ["rhai"]
failpoints = [ swap = ["nightly", "memmap2"]
"fail/failpoints",
]
scripting = [
"rhai",
]
swap = [
"nightly",
"memmap2",
]
std_fs = [] std_fs = []
nightly_group = ["nightly", "swap"] nightly_group = ["nightly", "swap"]
@ -95,6 +93,8 @@ nightly_group = ["nightly", "swap"]
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
# Lock the version of cc-rs to avoid build failure on MacOS, ref https://github.com/rust-lang/cc-rs/issues/984.
cc = { git = "https://github.com/rust-lang/cc-rs", tag = "1.0.98" }
[workspace] [workspace]
members = ["stress", "ctl"] members = ["stress", "ctl"]

80
OWNERS Normal file
View File

@ -0,0 +1,80 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- 5kbpers
- AndreMouche
- andylokandy
- breezewish
- brson
- bufferflies
- BusyJay
- cfzjywxk
- Connor1996
- coocood
- crazycs520
- disksing
- ekexium
- gengliqi
- glorv
- hicqu
- hunterlxt
- imtbkcat
- innerr
- iosmanthus
- jackysp
- kennytm
- Little-Wallace
- liuzix
- lonng
- LykxSassinator
- lysu
- marsishandsome
- MyonKeminta
- niedhui
- NingLin-P
- nrc
- overvenus
- pingyu
- skyzh
- SpadeA-Tang
- sticnarf
- sunxiaoguang
- tabokie
- TennyZhuang
- tonyxuqqi
- v01dstar
- yiwu-arbug
- you06
- youjiali1995
- YuJuncen
- zhangjinpeng87
- zhongzc
- zhouqiang-cl
- zyguan
reviewers:
- 3AceShowHand
- 3pointer
- CalvinNeo
- ethercflow
- fredchenbj
- Fullstop000
- gozssky
- haojinming
- hbisheng
- hhwyt
- HuSharp
- jayzhan211
- Jibbow
- JmPotato
- Leavrth
- lhy1024
- longfangsong
- lzmhhh123
- Mossaka
- MrCroxx
- nolouch
- rleungx
- Rustin170506
- tier-cap
- wjhuang2016
- wshwsh12
- Xuanwo

View File

@ -11,4 +11,7 @@ license = "Apache-2.0"
[dependencies] [dependencies]
clap = { version = "3.1", features = ["derive", "cargo"] } clap = { version = "3.1", features = ["derive", "cargo"] }
env_logger = "0.10" env_logger = "0.10"
raft-engine = { path = "..", version = "0.4.2", features = ["scripting", "internals"] } raft-engine = { path = "..", version = "0.4.1", features = [
"scripting",
"internals",
] }

View File

@ -360,7 +360,6 @@ mod tests {
use super::*; use super::*;
use protobuf::CodedOutputStream; use protobuf::CodedOutputStream;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::{f32, f64, i16, i32, i64, u16, u32, u64};
const U16_TESTS: &[u16] = &[ const U16_TESTS: &[u16] = &[
i16::MIN as u16, i16::MIN as u16,

View File

@ -142,7 +142,7 @@ where
return Ok(0); return Ok(0);
} }
let start = Instant::now(); let start = Instant::now();
let len = log_batch.finish_populate( let (len, compression_ratio) = log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize, self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level, self.cfg.compression_level,
)?; )?;
@ -225,6 +225,7 @@ where
now = end; now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64()); ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64); ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM.observe(compression_ratio);
Ok(len) Ok(len)
} }
@ -337,7 +338,27 @@ where
.read() .read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?; .fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() { for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?); vec.push({
match read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i) {
Ok(entry) => entry,
Err(e) => {
// The index is not found in the file, it means the entry is already
// stale by `compact` or `rewrite`. Retry to read the entry from the
// memtable.
let immutable = memtable.read();
// Ensure that the corresponding memtable is locked with a read lock
// before completing the fetching of entries
// from the raft logs. This prevents the
// scenario where the index could become stale while
// being concurrently updated by the `rewrite` operation.
if let Some(idx) = immutable.get_entry(i.index) {
read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &idx)?
} else {
return Err(e);
}
}
}
});
} }
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
return Ok(ents_idx.len()); return Ok(ents_idx.len());
@ -634,9 +655,11 @@ pub(crate) mod tests {
use crate::util::ReadableSize; use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState; use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry; use raft::eraftpb::Entry;
use rand::{thread_rng, Rng};
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>; pub(crate) type RaftLogEngine<F = DefaultFileSystem> = Engine<F>;
impl<F: FileSystem> RaftLogEngine<F> { impl<F: FileSystem> RaftLogEngine<F> {
@ -1928,8 +1951,6 @@ pub(crate) mod tests {
#[cfg(feature = "nightly")] #[cfg(feature = "nightly")]
#[bench] #[bench]
fn bench_engine_fetch_entries(b: &mut test::Bencher) { fn bench_engine_fetch_entries(b: &mut test::Bencher) {
use rand::{thread_rng, Rng};
let dir = tempfile::Builder::new() let dir = tempfile::Builder::new()
.prefix("bench_engine_fetch_entries") .prefix("bench_engine_fetch_entries")
.tempdir() .tempdir()
@ -2586,6 +2607,53 @@ pub(crate) mod tests {
assert!(data.is_empty(), "data loss {:?}", data); assert!(data.is_empty(), "data loss {:?}", data);
} }
#[test]
fn test_fetch_with_concurrently_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_concurrently_rewrite")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(2048),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = Arc::new(RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap());
let entry_data = vec![b'x'; 128];
// Set up a concurrent write with purge, and fetch.
let mut vec: Vec<Entry> = Vec::new();
let fetch_engine = engine.clone();
let flag = Arc::new(AtomicBool::new(false));
let start_flag = flag.clone();
let th = std::thread::spawn(move || {
while !start_flag.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(10));
}
for _ in 0..10 {
let region_id = thread_rng().gen_range(1..=10);
// Should not return file seqno out of range error.
let _ = fetch_engine
.fetch_entries_to::<Entry>(region_id, 1, 101, None, &mut vec)
.map_err(|e| {
assert!(!format!("{e}").contains("file seqno out of"));
});
vec.clear();
}
});
for i in 0..10 {
for rid in 1..=10 {
engine.append(rid, 1 + i * 10, 1 + i * 10 + 10, Some(&entry_data));
}
flag.store(true, Ordering::Release);
for rid in 1..=10 {
engine.clean(rid);
}
engine.purge_expired_files().unwrap();
}
th.join().unwrap();
}
#[test] #[test]
fn test_internal_key_filter() { fn test_internal_key_filter() {
let dir = tempfile::Builder::new() let dir = tempfile::Builder::new()

2
src/env/default.rs vendored
View File

@ -62,7 +62,7 @@ impl Read for LogFile {
impl Seek for LogFile { impl Seek for LogFile {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> { fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
fail_point!("log_file::seek::err", |_| { fail_point!("log_file::seek::err", |_| {
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp")) Err(std::io::Error::other("fp"))
}); });
match pos { match pos {
SeekFrom::Start(offset) => self.offset = offset as usize, SeekFrom::Start(offset) => self.offset = offset as usize,

View File

@ -107,13 +107,12 @@ impl<F: FileSystem> LogFileWriter<F> {
} }
self.capacity += alloc; self.capacity += alloc;
} }
self.writer.write_all(buf).map_err(|e| { self.writer.write_all(buf).inspect_err(|_| {
self.writer self.writer
.seek(SeekFrom::Start(self.written as u64)) .seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e); panic!("failed to reseek after write failure: {}", e);
}); });
e
})?; })?;
self.written = new_written; self.written = new_written;
Ok(()) Ok(())

View File

@ -218,7 +218,7 @@ pub mod debug {
let log_file_format = LogFileContext::new(file_id, Version::default()); let log_file_format = LogFileContext::new(file_id, Version::default());
for batch in bs.iter_mut() { for batch in bs.iter_mut() {
let offset = writer.offset() as u64; let offset = writer.offset() as u64;
let len = batch let (len, _) = batch
.finish_populate(1 /* compression_threshold */, None) .finish_populate(1 /* compression_threshold */, None)
.unwrap(); .unwrap();
batch.prepare_write(&log_file_format).unwrap(); batch.prepare_write(&log_file_format).unwrap();

View File

@ -7,6 +7,7 @@ use std::sync::Arc;
use crossbeam::utils::CachePadded; use crossbeam::utils::CachePadded;
use fail::fail_point; use fail::fail_point;
use fs2::FileExt;
use log::error; use log::error;
use parking_lot::{Mutex, MutexGuard, RwLock}; use parking_lot::{Mutex, MutexGuard, RwLock};
@ -464,7 +465,17 @@ impl<F: FileSystem> SinglePipe<F> {
pub struct DualPipes<F: FileSystem> { pub struct DualPipes<F: FileSystem> {
pipes: [SinglePipe<F>; 2], pipes: [SinglePipe<F>; 2],
_dir_locks: Vec<StdFile>, dir_locks: Vec<StdFile>,
}
impl<F: FileSystem> Drop for DualPipes<F> {
fn drop(&mut self) {
for lock in &self.dir_locks {
if let Err(e) = FileExt::unlock(lock) {
error!("error while unlocking directory: {e}");
}
}
}
} }
impl<F: FileSystem> DualPipes<F> { impl<F: FileSystem> DualPipes<F> {
@ -481,7 +492,7 @@ impl<F: FileSystem> DualPipes<F> {
Ok(Self { Ok(Self {
pipes: [appender, rewriter], pipes: [appender, rewriter],
_dir_locks: dir_locks, dir_locks,
}) })
} }
@ -727,4 +738,21 @@ mod tests {
assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1)); assert_eq!(pipe_log.read_bytes(handle).unwrap(), content(i + 1));
} }
} }
#[test]
fn test_release_on_drop() {
let dir = Builder::new()
.prefix("test_release_on_drop")
.tempdir()
.unwrap();
let path = dir.path().to_str().unwrap();
let cfg = Config {
dir: path.to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let pipe_log = new_test_pipes(&cfg).unwrap();
drop(pipe_log);
assert!(new_test_pipes(&cfg).is_ok());
}
} }

View File

@ -379,7 +379,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>( fn recover_queue_imp<M: ReplayMachine, FA: Factory<M>>(
file_system: Arc<F>, file_system: Arc<F>,
recovery_cfg: RecoveryConfig, recovery_cfg: RecoveryConfig,
files: &mut Vec<File<F>>, files: &mut [File<F>],
machine_factory: &FA, machine_factory: &FA,
) -> Result<M> { ) -> Result<M> {
if recovery_cfg.concurrency == 0 || files.is_empty() { if recovery_cfg.concurrency == 0 || files.is_empty() {
@ -390,7 +390,7 @@ impl<F: FileSystem> DualPipesBuilder<F> {
let recovery_mode = recovery_cfg.mode; let recovery_mode = recovery_cfg.mode;
let recovery_read_block_size = recovery_cfg.read_block_size as usize; let recovery_read_block_size = recovery_cfg.read_block_size as usize;
let max_chunk_size = std::cmp::max((files.len() + concurrency - 1) / concurrency, 1); let max_chunk_size = files.len().div_ceil(concurrency);
let chunks = files.par_chunks_mut(max_chunk_size); let chunks = files.par_chunks_mut(max_chunk_size);
let chunk_count = chunks.len(); let chunk_count = chunks.len();
debug_assert!(chunk_count <= concurrency); debug_assert!(chunk_count <= concurrency);

View File

@ -2,9 +2,9 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::io::BufRead; use std::io::BufRead;
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, u64};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use log::error; use log::error;
@ -53,7 +53,7 @@ pub enum CompressionType {
impl CompressionType { impl CompressionType {
pub fn from_u8(t: u8) -> Result<Self> { pub fn from_u8(t: u8) -> Result<Self> {
if t <= CompressionType::Lz4 as u8 { if t <= CompressionType::Lz4 as u8 {
Ok(unsafe { mem::transmute(t) }) Ok(unsafe { mem::transmute::<u8, Self>(t) })
} else { } else {
Err(Error::Corruption(format!( Err(Error::Corruption(format!(
"Unrecognized compression type: {t}" "Unrecognized compression type: {t}"
@ -168,7 +168,7 @@ pub enum OpType {
impl OpType { impl OpType {
pub fn from_u8(t: u8) -> Result<Self> { pub fn from_u8(t: u8) -> Result<Self> {
if t <= OpType::Del as u8 { if t <= OpType::Del as u8 {
Ok(unsafe { mem::transmute(t) }) Ok(unsafe { mem::transmute::<u8, Self>(t) })
} else { } else {
Err(Error::Corruption(format!("Unrecognized op type: {t}"))) Err(Error::Corruption(format!("Unrecognized op type: {t}")))
} }
@ -767,28 +767,32 @@ impl LogBatch {
&mut self, &mut self,
compression_threshold: usize, compression_threshold: usize,
compression_level: Option<usize>, compression_level: Option<usize>,
) -> Result<usize> { ) -> Result<(usize, f64)> {
let _t = StopWatch::new(perf_context!(log_populating_duration)); let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open); debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() { if self.is_empty() {
self.buf_state = BufState::Encoded(self.buf.len(), 0); self.buf_state = BufState::Encoded(self.buf.len(), 0);
return Ok(0); return Ok((0, 0.0));
} }
self.buf_state = BufState::Incomplete; self.buf_state = BufState::Incomplete;
// entries // entries
let (header_offset, compression_type) = if compression_threshold > 0 let (header_offset, compression_type, compression_ratio) = if compression_threshold > 0
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
{ {
let buf_len = self.buf.len(); let buf_len = self.buf.len();
lz4::append_compress_block( let compression_ratio = lz4::append_compress_block(
&mut self.buf, &mut self.buf,
LOG_BATCH_HEADER_LEN, LOG_BATCH_HEADER_LEN,
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL), compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
)?; )?;
(buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4) (
buf_len - LOG_BATCH_HEADER_LEN,
CompressionType::Lz4,
compression_ratio,
)
} else { } else {
(0, CompressionType::None) (0, CompressionType::None, 0.0)
}; };
// checksum // checksum
@ -830,7 +834,7 @@ impl LogBatch {
} }
self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN); self.buf_state = BufState::Encoded(header_offset, footer_roffset - LOG_BATCH_HEADER_LEN);
Ok(self.buf.len() - header_offset) Ok((self.buf.len() - header_offset, compression_ratio))
} }
/// Make preparations for the write of `LogBatch`. /// Make preparations for the write of `LogBatch`.
@ -1328,7 +1332,7 @@ mod tests {
offset: 0, offset: 0,
}; };
let old_approximate_size = batch.approximate_size(); let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(usize::from(compress), None).unwrap(); let (len, _) = batch.finish_populate(usize::from(compress), None).unwrap();
assert!(old_approximate_size >= len); assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len); assert_eq!(batch.approximate_size(), len);
let mut batch_handle = mocked_file_block_handle; let mut batch_handle = mocked_file_block_handle;
@ -1493,7 +1497,7 @@ mod tests {
batch1.merge(&mut batch2).unwrap(); batch1.merge(&mut batch2).unwrap();
assert!(batch2.is_empty()); assert!(batch2.is_empty());
let len = batch1.finish_populate(0, None).unwrap(); let (len, _) = batch1.finish_populate(0, None).unwrap();
batch1.prepare_write(&file_context).unwrap(); batch1.prepare_write(&file_context).unwrap();
let encoded = batch1.encoded_bytes(); let encoded = batch1.encoded_bytes();
assert_eq!(len, encoded.len()); assert_eq!(len, encoded.len());
@ -1549,7 +1553,8 @@ mod tests {
offset: 0, offset: 0,
}; };
let buf_len = batch.buf.len(); let buf_len = batch.buf.len();
let len = batch.finish_populate(1, None).unwrap(); let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio == 0.0);
assert!(len == 0); assert!(len == 0);
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0)); assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2); let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
@ -1671,7 +1676,8 @@ mod tests {
}, },
]; ];
let old_approximate_size = batch.approximate_size(); let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(1, None).unwrap(); let (len, compression_ratio) = batch.finish_populate(1, None).unwrap();
assert!(compression_ratio > 0.0);
assert!(old_approximate_size >= len); assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len); assert_eq!(batch.approximate_size(), len);
let checksum = batch.item_batch.checksum; let checksum = batch.item_batch.checksum;

View File

@ -228,7 +228,7 @@ impl<A: AllocatorTrait> MemTable<A> {
} }
if let Some(g) = rhs.atomic_group.take() { if let Some(g) = rhs.atomic_group.take() {
assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0)); assert!(self.atomic_group.is_none_or(|(_, end)| end <= g.0));
self.atomic_group = Some(g); self.atomic_group = Some(g);
} }
@ -545,7 +545,7 @@ impl<A: AllocatorTrait> MemTable<A> {
} }
pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) { pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
assert!(self.atomic_group.map_or(true, |(_, b)| b <= start)); assert!(self.atomic_group.is_none_or(|(_, b)| b <= start));
self.atomic_group = Some((start, end)); self.atomic_group = Some((start, end));
} }
@ -763,7 +763,7 @@ impl<A: AllocatorTrait> MemTable<A> {
debug_assert!(count > 0); debug_assert!(count > 0);
self.entry_indexes self.entry_indexes
.get(count - 1) .get(count - 1)
.map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq) .is_some_and(|ei| ei.entries.unwrap().id.seq <= gate.seq)
} }
/// Returns the region ID. /// Returns the region ID.

View File

@ -122,7 +122,7 @@ pub trait TimeMetric {
} }
} }
impl<'a> TimeMetric for &'a Histogram { impl TimeMetric for &Histogram {
fn observe(&self, duration: Duration) { fn observe(&self, duration: Duration) {
Histogram::observe(self, duration.as_secs_f64()); Histogram::observe(self, duration.as_secs_f64());
} }
@ -201,6 +201,12 @@ lazy_static! {
exponential_buckets(256.0, 1.8, 22).unwrap() exponential_buckets(256.0, 1.8, 22).unwrap()
) )
.unwrap(); .unwrap();
pub static ref ENGINE_WRITE_COMPRESSION_RATIO_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_write_compression_ratio",
"Bucketed histogram of Raft Engine write compression ratio",
exponential_buckets(0.0005, 1.8, 16).unwrap()
)
.unwrap();
pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!( pub static ref LOG_ALLOCATE_DURATION_HISTOGRAM: Histogram = register_histogram!(
"raft_engine_allocate_log_duration_seconds", "raft_engine_allocate_log_duration_seconds",
"Bucketed histogram of Raft Engine allocate log duration", "Bucketed histogram of Raft Engine allocate log duration",

View File

@ -120,9 +120,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
return swap_r; return swap_r;
} }
} }
self.0.mem_allocator.allocate(layout).map_err(|e| { self.0.mem_allocator.allocate(layout).inspect_err(|_| {
self.0.mem_usage.fetch_sub(layout.size(), Ordering::Relaxed); self.0.mem_usage.fetch_sub(layout.size(), Ordering::Relaxed);
e
}) })
} }
@ -191,9 +190,8 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
self.0 self.0
.mem_allocator .mem_allocator
.grow(ptr, old_layout, new_layout) .grow(ptr, old_layout, new_layout)
.map_err(|e| { .inspect_err(|_| {
self.0.mem_usage.fetch_sub(diff, Ordering::Relaxed); self.0.mem_usage.fetch_sub(diff, Ordering::Relaxed);
e
}) })
} }
} }
@ -253,11 +251,10 @@ unsafe impl<A: Allocator + Send + Sync> Allocator for SwappyAllocator<A> {
self.0 self.0
.mem_allocator .mem_allocator
.shrink(ptr, old_layout, new_layout) .shrink(ptr, old_layout, new_layout)
.map(|p| { .inspect(|_| {
self.0 self.0
.mem_usage .mem_usage
.fetch_sub(old_layout.size() - new_layout.size(), Ordering::Relaxed); .fetch_sub(old_layout.size() - new_layout.size(), Ordering::Relaxed);
p
}) })
} }
} }

View File

@ -63,7 +63,7 @@ where
} }
pub struct PanicGuard { pub struct PanicGuard {
prev_hook: *mut (dyn Fn(&panic::PanicInfo<'_>) + Sync + Send + 'static), prev_hook: *mut (dyn Fn(&panic::PanicHookInfo<'_>) + Sync + Send + 'static),
} }
struct PointerHolder<T: ?Sized>(*mut T); struct PointerHolder<T: ?Sized>(*mut T);

View File

@ -148,7 +148,7 @@ impl<'de> Deserialize<'de> for ReadableSize {
{ {
struct SizeVisitor; struct SizeVisitor;
impl<'de> Visitor<'de> for SizeVisitor { impl Visitor<'_> for SizeVisitor {
type Value = ReadableSize; type Value = ReadableSize;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -221,14 +221,15 @@ pub fn unhash_u64(mut i: u64) -> u64 {
pub mod lz4 { pub mod lz4 {
use crate::{Error, Result}; use crate::{Error, Result};
use std::{i32, ptr}; use std::ptr;
pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
/// Compress content in `buf[skip..]`, and append output to `buf`. /// Compress content in `buf[skip..]`, and append output to `buf`.
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> { pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<f64> {
let buf_len = buf.len(); let buf_len = buf.len();
let content_len = buf_len - skip; let content_len = buf_len - skip;
let mut compression_ratio = 0.0;
if content_len > 0 { if content_len > 0 {
if content_len > i32::MAX as usize { if content_len > i32::MAX as usize {
return Err(Error::InvalidArgument(format!( return Err(Error::InvalidArgument(format!(
@ -256,10 +257,11 @@ pub mod lz4 {
if compressed == 0 { if compressed == 0 {
return Err(Error::Other(box_err!("Compression failed"))); return Err(Error::Other(box_err!("Compression failed")));
} }
compression_ratio = compressed as f64 / content_len as f64;
buf.set_len(buf_len + 4 + compressed as usize); buf.set_len(buf_len + 4 + compressed as usize);
} }
} }
Ok(()) Ok(compression_ratio)
} }
pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> { pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
@ -301,8 +303,12 @@ pub mod lz4 {
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()]; let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
for mut vec in vecs.into_iter() { for mut vec in vecs.into_iter() {
let uncompressed_len = vec.len(); let uncompressed_len = vec.len();
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) let compression_ratio =
.unwrap(); super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
if uncompressed_len == 0 {
assert_eq!(compression_ratio, 0.0);
}
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap(); let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
assert_eq!(res, vec[..uncompressed_len].to_owned()); assert_eq!(res, vec[..uncompressed_len].to_owned());
} }
@ -324,7 +330,7 @@ pub trait Factory<Target>: Send + Sync {
/// ``` /// ```
#[inline] #[inline]
pub fn round_up(offset: usize, alignment: usize) -> usize { pub fn round_up(offset: usize, alignment: usize) -> usize {
(offset + alignment - 1) / alignment * alignment offset.div_ceil(alignment) * alignment
} }
/// Returns an aligned `offset`. /// Returns an aligned `offset`.

View File

@ -17,7 +17,6 @@ use crate::PerfContext;
type Ptr<T> = Option<NonNull<T>>; type Ptr<T> = Option<NonNull<T>>;
///
pub struct Writer<P, O> { pub struct Writer<P, O> {
next: Cell<Ptr<Writer<P, O>>>, next: Cell<Ptr<Writer<P, O>>>,
payload: *mut P, payload: *mut P,
@ -95,7 +94,7 @@ impl<'a, 'b, P, O> WriteGroup<'a, 'b, P, O> {
} }
} }
impl<'a, 'b, P, O> Drop for WriteGroup<'a, 'b, P, O> { impl<P, O> Drop for WriteGroup<'_, '_, P, O> {
fn drop(&mut self) { fn drop(&mut self) {
self.ref_barrier.leader_exit(); self.ref_barrier.leader_exit();
} }
@ -108,7 +107,7 @@ pub struct WriterIter<'a, 'b, 'c, P: 'c, O: 'c> {
marker: PhantomData<&'a WriteGroup<'b, 'c, P, O>>, marker: PhantomData<&'a WriteGroup<'b, 'c, P, O>>,
} }
impl<'a, 'b, 'c, P, O> Iterator for WriterIter<'a, 'b, 'c, P, O> { impl<'a, P, O> Iterator for WriterIter<'a, '_, '_, P, O> {
type Item = &'a mut Writer<P, O>; type Item = &'a mut Writer<P, O>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {

View File

@ -121,8 +121,8 @@ fn test_pipe_log_listeners() {
append( append(
&engine, &engine,
region_id, region_id,
(i as u64 + 1) / 2, (i as u64).div_ceil(2),
(i as u64 + 1) / 2 + 1, (i as u64).div_ceil(2) + 1,
Some(&data), Some(&data),
); );
assert_eq!(hook.0[&LogQueue::Append].appends(), i); assert_eq!(hook.0[&LogQueue::Append].appends(), i);
@ -141,8 +141,8 @@ fn test_pipe_log_listeners() {
append( append(
&engine, &engine,
region_id, region_id,
(i as u64 + 1) / 2, (i as u64).div_ceil(2),
(i as u64 + 1) / 2 + 1, (i as u64).div_ceil(2) + 1,
Some(&data), Some(&data),
); );
assert_eq!(hook.0[&LogQueue::Append].appends(), i); assert_eq!(hook.0[&LogQueue::Append].appends(), i);
@ -178,8 +178,8 @@ fn test_pipe_log_listeners() {
append( append(
&engine, &engine,
region_id, region_id,
(i as u64 + 1) / 2, (i as u64).div_ceil(2),
(i as u64 + 1) / 2 + 1, (i as u64).div_ceil(2) + 1,
Some(&data), Some(&data),
); );
assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3); assert_eq!(hook.0[&LogQueue::Append].appends(), i + 3);
@ -631,6 +631,52 @@ fn test_concurrent_write_perf_context() {
} }
} }
#[test]
fn test_fetch_with_updated_entry_index() {
let dir = tempfile::Builder::new()
.prefix("test_fetch_with_updated_entry_index")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 1024];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine = Engine::open(cfg).unwrap();
for i in 0..10 {
append(
&engine,
i + 1,
1 + i * 10,
1 + i * 10 + 10,
Some(&entry_data),
);
}
assert_eq!(
10,
engine
.fetch_entries_to::<MessageExtTyped>(1, 1, 11, None, &mut vec![])
.unwrap()
);
// Mock: the file is broken or abnormal to read
{
let _f = FailGuard::new("log_file::read::err", "return");
engine
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
.unwrap_err();
}
// Mock: one entry have been updated by background rewrite thread.
{
let _f = FailGuard::new("log_file::read::err", "8*off->return->off");
assert_eq!(
10,
engine
.fetch_entries_to::<MessageExtTyped>(10, 91, 101, None, &mut vec![])
.unwrap()
);
}
}
// FIXME: this test no longer works because recovery cannot reliably detect // FIXME: this test no longer works because recovery cannot reliably detect
// overwrite anomaly. // overwrite anomaly.
// See https://github.com/tikv/raft-engine/issues/250 // See https://github.com/tikv/raft-engine/issues/250