Compare commits

...

7 Commits

Author SHA1 Message Date
Xinye Tao b4d564c4a4
bump 0.2.2 (#254)
Signed-off-by: tabokie <xy.tao@outlook.com>
2022-07-26 14:12:08 +08:00
Xinye Tao a5251cf1d2
fix append recovery bug (#251) (#253)
Close #250

Signed-off-by: tabokie <xy.tao@outlook.com>
2022-07-26 13:29:06 +08:00
Xinye Tao f69b62a888
pipe: reseek after write failure (#245) (#247)
* pipe: reseek after write failure (#245)

* reseek after write failure

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* fix changelog

Signed-off-by: tabokie <xy.tao@outlook.com>
2022-07-18 14:21:55 +08:00
Xinye Tao c01135d9ad
bump 0.2.1 (#240)
Signed-off-by: tabokie <xy.tao@outlook.com>
2022-07-12 15:02:52 +08:00
Xinye Tao 229b9961e4
simplify `delete_metadata` trait method (#232) (#233)
Signed-off-by: tabokie <xy.tao@outlook.com>
2022-06-24 15:07:30 +08:00
Xinye Tao 605d329732
Add metadata deletion capability to `FileSystem` trait (#229) (#230)
* let file system manage file deletion

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* address comment and fix test

Signed-off-by: tabokie <xy.tao@outlook.com>

* detect and cleanup stale metadata on startup

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* fix issues

Signed-off-by: tabokie <xy.tao@outlook.com>
2022-06-23 14:49:16 +08:00
Xinye Tao c0df442e15
tolerate fallocate failure and print IO error in debug form (#225) (#226)
* tolerate fallocate failure and print IO error in debug form

Signed-off-by: tabokie <xy.tao@outlook.com>

* update CHANGELOG.md

Signed-off-by: tabokie <xy.tao@outlook.com>

* only tolerate e=95

Signed-off-by: tabokie <xy.tao@outlook.com>

* still tolerate unconditionally

Signed-off-by: tabokie <xy.tao@outlook.com>

* update doc

Signed-off-by: tabokie <xy.tao@outlook.com>

* add a failpoint test

Signed-off-by: tabokie <xy.tao@outlook.com>
2022-06-22 15:24:25 +08:00
17 changed files with 455 additions and 61 deletions

View File

@ -1,6 +1,21 @@
# Raft Engine Change Log
## [Unreleased]
## [0.2.2] - 2022-07-26
### Bug Fixes
* Avoid leaving fractured write after failure by reseeking the file writer. Panic if the reseek fails as well.
* Fix a parallel recovery panic bug.
## [0.2.1] - 2022-07-12
### Bug Fixes
* Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning.
### Public API Changes
* Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine.
## [0.2.0] - 2022-05-25

View File

@ -1,6 +1,6 @@
[package]
name = "raft-engine"
version = "0.2.0"
version = "0.2.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.57"

View File

@ -54,7 +54,7 @@ Put this in your Cargo.toml:
```rust
[dependencies]
raft-engine = "0.2.0"
raft-engine = "0.2.2"
```
Available Cargo features:

View File

@ -1,6 +1,6 @@
[package]
name = "raft-engine-ctl"
version = "0.2.0"
version = "0.2.2"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.57"
@ -11,4 +11,4 @@ license = "Apache-2.0"
[dependencies]
clap = { version = "3.0.0-rc.0", features = ["derive", "cargo"] }
env_logger = "0.9"
raft-engine = { path = "..", version = "0.2.0", features = ["scripting", "internals"] }
raft-engine = { path = "..", version = "0.2.2", features = ["scripting", "internals"] }

View File

@ -506,6 +506,7 @@ mod tests {
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use std::collections::BTreeSet;
use std::fs::OpenOptions;
use std::path::PathBuf;
@ -589,6 +590,15 @@ mod tests {
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}
fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
let (a, b) = self.file_span(queue);
(b - a + 1) as usize
} else {
self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
}
}
}
#[test]
@ -1576,4 +1586,134 @@ mod tests {
vec.clear();
});
}
pub struct DeleteMonitoredFileSystem {
inner: ObfuscatedFileSystem,
append_metadata: Mutex<BTreeSet<u64>>,
}
impl DeleteMonitoredFileSystem {
fn new() -> Self {
Self {
inner: ObfuscatedFileSystem::default(),
append_metadata: Mutex::new(BTreeSet::new()),
}
}
fn update_metadata(&self, path: &Path, delete: bool) -> bool {
let id = FileId::parse_file_name(path.file_name().unwrap().to_str().unwrap()).unwrap();
if id.queue == LogQueue::Append {
if delete {
self.append_metadata.lock().unwrap().remove(&id.seq)
} else {
self.append_metadata.lock().unwrap().insert(id.seq)
}
} else {
false
}
}
}
impl FileSystem for DeleteMonitoredFileSystem {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}
fn open<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.open(&path)?;
self.update_metadata(path.as_ref(), false);
Ok(handle)
}
fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
self.inner.delete(&path)?;
self.update_metadata(path.as_ref(), true);
Ok(())
}
fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
self.inner.delete_metadata(&path)?;
self.update_metadata(path.as_ref(), true);
Ok(())
}
fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
if self.inner.exists_metadata(&path) {
return true;
}
let id = FileId::parse_file_name(path.as_ref().file_name().unwrap().to_str().unwrap())
.unwrap();
if id.queue == LogQueue::Append {
self.append_metadata.lock().unwrap().contains(&id.seq)
} else {
false
}
}
fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
self.inner.new_reader(h)
}
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}
}
#[test]
fn test_managed_file_deletion() {
let dir = tempfile::Builder::new()
.prefix("test_managed_file_deletion")
.tempdir()
.unwrap();
let entry_data = vec![b'x'; 128];
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
purge_threshold: ReadableSize(1),
..Default::default()
};
let fs = Arc::new(DeleteMonitoredFileSystem::new());
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
for rid in 1..=10 {
engine.append(rid, 1, 11, Some(&entry_data));
}
for rid in 1..=5 {
engine.clean(rid);
}
let (start, _) = engine.file_span(LogQueue::Append);
engine.purge_expired_files().unwrap();
assert!(start < engine.file_span(LogQueue::Append).0);
assert_eq!(engine.file_count(None), fs.inner.file_count());
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
let engine = engine.reopen();
assert_eq!(engine.file_count(None), fs.inner.file_count());
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
// Simulate stale metadata.
for i in start / 2..start {
fs.append_metadata.lock().unwrap().insert(i);
}
let engine = engine.reopen();
let (start, _) = engine.file_span(LogQueue::Append);
assert_eq!(
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
&start
);
}
}

21
src/env/default.rs vendored
View File

@ -152,18 +152,18 @@ impl LogFd {
});
#[cfg(target_os = "linux")]
{
fcntl::fallocate(
if let Err(e) = fcntl::fallocate(
self.0,
fcntl::FallocateFlags::empty(),
offset as i64,
size as i64,
)
.map_err(|e| from_nix_error(e, "fallocate"))
}
#[cfg(not(target_os = "linux"))]
{
Ok(())
) {
if e != nix::Error::EOPNOTSUPP {
return Err(from_nix_error(e, "fallocate"));
}
}
}
Ok(())
}
}
@ -232,6 +232,9 @@ impl Read for LogFile {
impl Seek for LogFile {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
fail_point!("log_file::seek::err", |_| {
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
});
match pos {
SeekFrom::Start(offset) => self.offset = offset as usize,
SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize,
@ -272,6 +275,10 @@ impl FileSystem for DefaultFileSystem {
LogFd::open(path.as_ref())
}
fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
std::fs::remove_file(path)
}
fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(LogFile::new(handle))
}

21
src/env/mod.rs vendored
View File

@ -17,8 +17,29 @@ pub trait FileSystem: Send + Sync {
type Writer: Seek + Write + Send + WriteExt;
fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
fn open<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
fn delete<P: AsRef<Path>>(&self, path: P) -> Result<()>;
/// Deletes user implemented metadata associated with `path`. Returns
/// `true` if any metadata is deleted.
///
/// In older versions of Raft Engine, physical files are deleted without
/// going through user implemented cleanup procedure. This method is used to
/// detect and cleanup the user metadata that is no longer mapped to a
/// physical file.
fn delete_metadata<P: AsRef<Path>>(&self, _path: P) -> Result<()> {
Ok(())
}
/// Returns whether there is any user metadata associated with given `path`.
fn exists_metadata<P: AsRef<Path>>(&self, _path: P) -> bool {
false
}
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;
fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;
}

44
src/env/obfuscated.rs vendored
View File

@ -2,6 +2,7 @@
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::env::{DefaultFileSystem, FileSystem, WriteExt};
@ -65,11 +66,26 @@ impl WriteExt for ObfuscatedWriter {
}
}
pub struct ObfuscatedFileSystem(DefaultFileSystem);
/// `[ObfuscatedFileSystem]` is a special implementation of `[FileSystem]`,
/// which is used for constructing and simulating an abnormal file system for
/// `[Read]` and `[Write]`.
pub struct ObfuscatedFileSystem {
inner: DefaultFileSystem,
files: AtomicUsize,
}
impl Default for ObfuscatedFileSystem {
fn default() -> Self {
ObfuscatedFileSystem(DefaultFileSystem)
ObfuscatedFileSystem {
inner: DefaultFileSystem,
files: AtomicUsize::new(0),
}
}
}
impl ObfuscatedFileSystem {
pub fn file_count(&self) -> usize {
self.files.load(Ordering::Relaxed)
}
}
@ -79,18 +95,30 @@ impl FileSystem for ObfuscatedFileSystem {
type Writer = ObfuscatedWriter;
fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
self.0.create(path)
let r = self.inner.create(path);
if r.is_ok() {
self.files.fetch_add(1, Ordering::Relaxed);
}
r
}
fn open<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
self.0.open(path)
self.inner.open(path)
}
fn new_reader(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(ObfuscatedReader(self.0.new_reader(inner)?))
fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
let r = self.inner.delete(path);
if r.is_ok() {
self.files.fetch_sub(1, Ordering::Relaxed);
}
r
}
fn new_writer(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.0.new_writer(inner)?))
fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
Ok(ObfuscatedReader(self.inner.new_reader(handle)?))
}
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}
}

View File

@ -13,7 +13,7 @@ pub enum Error {
InvalidArgument(String),
#[error("Corruption: {0}")]
Corruption(String),
#[error("IO Error: {0}")]
#[error("IO Error: {0:?}")]
Io(#[from] IoError),
#[error("Codec Error: {0}")]
Codec(#[from] CodecError),

View File

@ -5,12 +5,14 @@
use std::io::{Read, Seek, SeekFrom, Write};
use std::sync::Arc;
use log::warn;
use crate::env::{FileSystem, Handle, WriteExt};
use crate::metrics::*;
use crate::pipe_log::FileBlockHandle;
use crate::{Error, Result};
use super::format::LogFileHeader;
use crate::env::{FileSystem, Handle, WriteExt};
/// Maximum number of bytes to allocate ahead.
const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024;
@ -82,10 +84,19 @@ impl<F: FileSystem> LogFileWriter<F> {
target_size_hint.saturating_sub(self.capacity),
),
);
self.writer.allocate(self.capacity, alloc)?;
if let Err(e) = self.writer.allocate(self.capacity, alloc) {
warn!("log file allocation failed: {}", e);
}
self.capacity += alloc;
}
self.writer.write_all(buf)?;
self.writer.write_all(buf).map_err(|e| {
self.writer
.seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e);
});
e
})?;
self.written = new_written;
Ok(())
}

View File

@ -1,13 +1,13 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
use std::collections::VecDeque;
use std::fs::{self, File};
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use crossbeam::utils::CachePadded;
use fail::fail_point;
use log::{error, warn};
use log::error;
use parking_lot::{Mutex, MutexGuard, RwLock};
use crate::config::Config;
@ -269,17 +269,15 @@ impl<F: FileSystem> SinglePipe<F> {
let path = file_id.build_file_path(&self.dir);
#[cfg(feature = "failpoints")]
{
let remove_failure = || {
fail::fail_point!("file_pipe_log::remove_file_failure", |_| true);
let remove_skipped = || {
fail::fail_point!("file_pipe_log::remove_file_skipped", |_| true);
false
};
if remove_failure() {
if remove_skipped() {
continue;
}
}
if let Err(e) = fs::remove_file(&path) {
warn!("Remove purged log file {:?} failed: {}", path, e);
}
self.file_system.delete(&path)?;
}
Ok(purged)
}

View File

@ -145,6 +145,36 @@ impl<F: FileSystem> DualPipesBuilder<F> {
),
] {
if max_id > 0 {
// Try to cleanup stale metadata left by the previous version.
let max_sample = 100;
// Find the first obsolete metadata.
let mut delete_start = None;
for i in 0..max_sample {
let seq = i * min_id / max_sample;
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);
if self.file_system.exists_metadata(&path) {
delete_start = Some(i.saturating_sub(1) * min_id / max_sample + 1);
break;
}
}
// Delete metadata starting from the oldest. Abort on error.
if let Some(start) = delete_start {
let mut success = 0;
for seq in start..min_id {
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);
if let Err(e) = self.file_system.delete_metadata(&path) {
error!("failed to delete metadata of {}: {}.", path.display(), e);
break;
}
success += 1;
}
warn!(
"deleted {} stale files of {:?} in range [{}, {}).",
success, queue, start, min_id,
);
}
for seq in min_id..=max_id {
let file_id = FileId { queue, seq };
let path = file_id.build_file_path(dir);

View File

@ -19,9 +19,8 @@
#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
// For testing only.
#![cfg_attr(feature = "swap", feature(alloc_error_hook))]
#![cfg_attr(feature = "swap", feature(cfg_sanitize))]
#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]
#[macro_use]
extern crate lazy_static;

View File

@ -343,28 +343,25 @@ impl<A: AllocatorTrait> MemTable<A> {
}
}
/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
/// Appends some entries from append queue. Assumes this table has no
/// rewrite data.
///
/// This method is only used for recovery.
pub fn append_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
debug_assert_eq!(self.rewrite_count, 0);
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
false, /* allow_hole */
// Refer to case in `merge_newer_neighbor`.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}
@ -442,6 +439,31 @@ impl<A: AllocatorTrait> MemTable<A> {
self.rewrite_count = pos + rewrite_len;
}
/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
///
/// This method is only used for recovery.
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}
/// Removes all entries with index smaller than `index`. Returns the number
/// of deleted entries.
pub fn compact_to(&mut self, index: u64) -> u64 {
@ -984,6 +1006,38 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
}
}
/// Applies changes from log items that are replayed from a append queue.
/// Assumes it haven't applied any rewrite data.
///
/// This method is only used for recovery.
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}
/// Applies changes from log items that have been written to rewrite queue.
pub fn apply_rewrite_writes(
&self,
@ -1015,15 +1069,16 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
/// Assumes it haven't applied any append data.
///
/// This method is only used for recovery.
pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) {
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().append_rewrite(entries_to_add.0);
memtable.write().replay_rewrite(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
// Only append tombstone needs to be recorded.
self.remove(raft, false /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
@ -1109,10 +1164,8 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
}
}
match file_id.queue {
LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(item_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
}
Ok(())
}
@ -1120,10 +1173,8 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
self.log_batch.merge(&mut rhs.log_batch.clone());
match queue {
LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(rhs.log_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
}
self.memtables.merge_newer_neighbor(rhs.memtables);
Ok(())
@ -1924,7 +1975,7 @@ mod tests {
memtable.compact_to(7);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
@ -1966,7 +2017,7 @@ mod tests {
memtable.compact_to(10);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
@ -2017,7 +2068,7 @@ mod tests {
memtable.merge_newer_neighbor(&mut m1);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Rewrite, 1),
@ -2099,6 +2150,13 @@ mod tests {
batches[1].add_command(last_rid, Command::Compact { index: 5 });
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));
// entries [1, 10] => entries [11, 20][5, 10] => compact 8
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
batches[2].add_command(last_rid, Command::Compact { index: 8 });
for b in batches.iter_mut() {
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
}

View File

@ -1,6 +1,6 @@
[package]
name = "stress"
version = "0.2.0"
version = "0.2.2"
authors = ["The TiKV Authors"]
edition = "2018"

View File

@ -373,7 +373,7 @@ fn test_incomplete_purge() {
let engine = Engine::open(cfg.clone()).unwrap();
{
let _f = FailGuard::new("file_pipe_log::remove_file_failure", "return");
let _f = FailGuard::new("file_pipe_log::remove_file_skipped", "return");
append(&engine, rid, 0, 20, Some(&data));
let append_first = engine.file_span(LogQueue::Append).0;
engine.compact_to(rid, 18);

View File

@ -287,6 +287,68 @@ fn test_concurrent_write_error() {
);
}
#[test]
fn test_non_atomic_write_error() {
let dir = tempfile::Builder::new()
.prefix("test_non_atomic_write_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
bytes_per_sync: ReadableSize::kb(1024),
target_file_size: ReadableSize::kb(1024),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let entry = vec![b'x'; 1024];
let rid = 1;
{
// Write partially succeeds. We can reopen.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "return");
engine
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
.unwrap_err();
}
{
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
assert_eq!(engine.first_index(rid), None);
}
{
// Write partially succeeds. We can overwrite.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off");
engine
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
.unwrap_err();
engine
.write(&mut generate_batch(rid, 5, 6, Some(&entry)), true)
.unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 5);
}
{
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 5);
}
{
// Write partially succeeds and can't be reverted. We panic.
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let _f1 = FailGuard::new("log_fd::write::err", "return");
let _f2 = FailGuard::new("log_file::seek::err", "return");
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(rid, 6, 7, Some(&entry)), true)
.unwrap_err();
})
.is_err());
}
{
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
assert_eq!(engine.last_index(rid), Some(5));
}
}
#[cfg(feature = "scripting")]
#[test]
fn test_error_during_repair() {
@ -353,3 +415,28 @@ fn test_swappy_page_create_error() {
vec.resize(1024, 0);
assert_eq!(allocator.memory_usage(), 0);
}
#[test]
fn test_file_allocate_error() {
let dir = tempfile::Builder::new()
.prefix("test_file_allocate_error")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::mb(100),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
let entry = vec![b'x'; 1024];
{
let _f = FailGuard::new("log_fd::allocate::err", "return");
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
engine
.write(&mut generate_batch(1, 1, 5, Some(&entry)), true)
.unwrap();
}
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
assert_eq!(engine.first_index(1).unwrap(), 1);
assert_eq!(engine.last_index(1).unwrap(), 4);
}