mirror of https://github.com/tikv/raft-engine.git
Compare commits
7 Commits
Author | SHA1 | Date |
---|---|---|
|
b4d564c4a4 | |
|
a5251cf1d2 | |
|
f69b62a888 | |
|
c01135d9ad | |
|
229b9961e4 | |
|
605d329732 | |
|
c0df442e15 |
17
CHANGELOG.md
17
CHANGELOG.md
|
@ -1,6 +1,21 @@
|
|||
# Raft Engine Change Log
|
||||
|
||||
## [Unreleased]
|
||||
## [0.2.2] - 2022-07-26
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Avoid leaving fractured write after failure by reseeking the file writer. Panic if the reseek fails as well.
|
||||
* Fix a parallel recovery panic bug.
|
||||
|
||||
## [0.2.1] - 2022-07-12
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning.
|
||||
|
||||
### Public API Changes
|
||||
|
||||
* Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine.
|
||||
|
||||
## [0.2.0] - 2022-05-25
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "raft-engine"
|
||||
version = "0.2.0"
|
||||
version = "0.2.2"
|
||||
authors = ["The TiKV Project Developers"]
|
||||
edition = "2018"
|
||||
rust-version = "1.57"
|
||||
|
|
|
@ -54,7 +54,7 @@ Put this in your Cargo.toml:
|
|||
|
||||
```rust
|
||||
[dependencies]
|
||||
raft-engine = "0.2.0"
|
||||
raft-engine = "0.2.2"
|
||||
```
|
||||
|
||||
Available Cargo features:
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "raft-engine-ctl"
|
||||
version = "0.2.0"
|
||||
version = "0.2.2"
|
||||
authors = ["The TiKV Project Developers"]
|
||||
edition = "2018"
|
||||
rust-version = "1.57"
|
||||
|
@ -11,4 +11,4 @@ license = "Apache-2.0"
|
|||
[dependencies]
|
||||
clap = { version = "3.0.0-rc.0", features = ["derive", "cargo"] }
|
||||
env_logger = "0.9"
|
||||
raft-engine = { path = "..", version = "0.2.0", features = ["scripting", "internals"] }
|
||||
raft-engine = { path = "..", version = "0.2.2", features = ["scripting", "internals"] }
|
||||
|
|
140
src/engine.rs
140
src/engine.rs
|
@ -506,6 +506,7 @@ mod tests {
|
|||
use crate::util::ReadableSize;
|
||||
use kvproto::raft_serverpb::RaftLocalState;
|
||||
use raft::eraftpb::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::PathBuf;
|
||||
|
||||
|
@ -589,6 +590,15 @@ mod tests {
|
|||
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
|
||||
}
|
||||
}
|
||||
|
||||
fn file_count(&self, queue: Option<LogQueue>) -> usize {
|
||||
if let Some(queue) = queue {
|
||||
let (a, b) = self.file_span(queue);
|
||||
(b - a + 1) as usize
|
||||
} else {
|
||||
self.file_count(Some(LogQueue::Append)) + self.file_count(Some(LogQueue::Rewrite))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1576,4 +1586,134 @@ mod tests {
|
|||
vec.clear();
|
||||
});
|
||||
}
|
||||
|
||||
pub struct DeleteMonitoredFileSystem {
|
||||
inner: ObfuscatedFileSystem,
|
||||
append_metadata: Mutex<BTreeSet<u64>>,
|
||||
}
|
||||
|
||||
impl DeleteMonitoredFileSystem {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
inner: ObfuscatedFileSystem::default(),
|
||||
append_metadata: Mutex::new(BTreeSet::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_metadata(&self, path: &Path, delete: bool) -> bool {
|
||||
let id = FileId::parse_file_name(path.file_name().unwrap().to_str().unwrap()).unwrap();
|
||||
if id.queue == LogQueue::Append {
|
||||
if delete {
|
||||
self.append_metadata.lock().unwrap().remove(&id.seq)
|
||||
} else {
|
||||
self.append_metadata.lock().unwrap().insert(id.seq)
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileSystem for DeleteMonitoredFileSystem {
|
||||
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
|
||||
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
|
||||
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
|
||||
|
||||
fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
|
||||
let handle = self.inner.create(&path)?;
|
||||
self.update_metadata(path.as_ref(), false);
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
fn open<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
|
||||
let handle = self.inner.open(&path)?;
|
||||
self.update_metadata(path.as_ref(), false);
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
fn delete<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
|
||||
self.inner.delete(&path)?;
|
||||
self.update_metadata(path.as_ref(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_metadata<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
|
||||
self.inner.delete_metadata(&path)?;
|
||||
self.update_metadata(path.as_ref(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exists_metadata<P: AsRef<Path>>(&self, path: P) -> bool {
|
||||
if self.inner.exists_metadata(&path) {
|
||||
return true;
|
||||
}
|
||||
let id = FileId::parse_file_name(path.as_ref().file_name().unwrap().to_str().unwrap())
|
||||
.unwrap();
|
||||
if id.queue == LogQueue::Append {
|
||||
self.append_metadata.lock().unwrap().contains(&id.seq)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn new_reader(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Reader> {
|
||||
self.inner.new_reader(h)
|
||||
}
|
||||
|
||||
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
|
||||
self.inner.new_writer(h)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_managed_file_deletion() {
|
||||
let dir = tempfile::Builder::new()
|
||||
.prefix("test_managed_file_deletion")
|
||||
.tempdir()
|
||||
.unwrap();
|
||||
let entry_data = vec![b'x'; 128];
|
||||
let cfg = Config {
|
||||
dir: dir.path().to_str().unwrap().to_owned(),
|
||||
target_file_size: ReadableSize(1),
|
||||
purge_threshold: ReadableSize(1),
|
||||
..Default::default()
|
||||
};
|
||||
let fs = Arc::new(DeleteMonitoredFileSystem::new());
|
||||
|
||||
let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap();
|
||||
for rid in 1..=10 {
|
||||
engine.append(rid, 1, 11, Some(&entry_data));
|
||||
}
|
||||
for rid in 1..=5 {
|
||||
engine.clean(rid);
|
||||
}
|
||||
let (start, _) = engine.file_span(LogQueue::Append);
|
||||
engine.purge_expired_files().unwrap();
|
||||
assert!(start < engine.file_span(LogQueue::Append).0);
|
||||
assert_eq!(engine.file_count(None), fs.inner.file_count());
|
||||
let (start, _) = engine.file_span(LogQueue::Append);
|
||||
assert_eq!(
|
||||
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
|
||||
&start
|
||||
);
|
||||
|
||||
let engine = engine.reopen();
|
||||
assert_eq!(engine.file_count(None), fs.inner.file_count());
|
||||
let (start, _) = engine.file_span(LogQueue::Append);
|
||||
assert_eq!(
|
||||
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
|
||||
&start
|
||||
);
|
||||
|
||||
// Simulate stale metadata.
|
||||
for i in start / 2..start {
|
||||
fs.append_metadata.lock().unwrap().insert(i);
|
||||
}
|
||||
let engine = engine.reopen();
|
||||
let (start, _) = engine.file_span(LogQueue::Append);
|
||||
assert_eq!(
|
||||
fs.append_metadata.lock().unwrap().iter().next().unwrap(),
|
||||
&start
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,20 +152,20 @@ impl LogFd {
|
|||
});
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
fcntl::fallocate(
|
||||
if let Err(e) = fcntl::fallocate(
|
||||
self.0,
|
||||
fcntl::FallocateFlags::empty(),
|
||||
offset as i64,
|
||||
size as i64,
|
||||
)
|
||||
.map_err(|e| from_nix_error(e, "fallocate"))
|
||||
) {
|
||||
if e != nix::Error::EOPNOTSUPP {
|
||||
return Err(from_nix_error(e, "fallocate"));
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handle for LogFd {
|
||||
fn truncate(&self, offset: usize) -> IoResult<()> {
|
||||
|
@ -232,6 +232,9 @@ impl Read for LogFile {
|
|||
|
||||
impl Seek for LogFile {
|
||||
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
|
||||
fail_point!("log_file::seek::err", |_| {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::Other, "fp"))
|
||||
});
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => self.offset = offset as usize,
|
||||
SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize,
|
||||
|
@ -272,6 +275,10 @@ impl FileSystem for DefaultFileSystem {
|
|||
LogFd::open(path.as_ref())
|
||||
}
|
||||
|
||||
fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
|
||||
std::fs::remove_file(path)
|
||||
}
|
||||
|
||||
fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
|
||||
Ok(LogFile::new(handle))
|
||||
}
|
||||
|
|
|
@ -17,8 +17,29 @@ pub trait FileSystem: Send + Sync {
|
|||
type Writer: Seek + Write + Send + WriteExt;
|
||||
|
||||
fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
|
||||
|
||||
fn open<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
|
||||
|
||||
fn delete<P: AsRef<Path>>(&self, path: P) -> Result<()>;
|
||||
|
||||
/// Deletes user implemented metadata associated with `path`. Returns
|
||||
/// `true` if any metadata is deleted.
|
||||
///
|
||||
/// In older versions of Raft Engine, physical files are deleted without
|
||||
/// going through user implemented cleanup procedure. This method is used to
|
||||
/// detect and cleanup the user metadata that is no longer mapped to a
|
||||
/// physical file.
|
||||
fn delete_metadata<P: AsRef<Path>>(&self, _path: P) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns whether there is any user metadata associated with given `path`.
|
||||
fn exists_metadata<P: AsRef<Path>>(&self, _path: P) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;
|
||||
|
||||
fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::env::{DefaultFileSystem, FileSystem, WriteExt};
|
||||
|
@ -65,11 +66,26 @@ impl WriteExt for ObfuscatedWriter {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct ObfuscatedFileSystem(DefaultFileSystem);
|
||||
/// `[ObfuscatedFileSystem]` is a special implementation of `[FileSystem]`,
|
||||
/// which is used for constructing and simulating an abnormal file system for
|
||||
/// `[Read]` and `[Write]`.
|
||||
pub struct ObfuscatedFileSystem {
|
||||
inner: DefaultFileSystem,
|
||||
files: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Default for ObfuscatedFileSystem {
|
||||
fn default() -> Self {
|
||||
ObfuscatedFileSystem(DefaultFileSystem)
|
||||
ObfuscatedFileSystem {
|
||||
inner: DefaultFileSystem,
|
||||
files: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObfuscatedFileSystem {
|
||||
pub fn file_count(&self) -> usize {
|
||||
self.files.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,18 +95,30 @@ impl FileSystem for ObfuscatedFileSystem {
|
|||
type Writer = ObfuscatedWriter;
|
||||
|
||||
fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
|
||||
self.0.create(path)
|
||||
let r = self.inner.create(path);
|
||||
if r.is_ok() {
|
||||
self.files.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
fn open<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
|
||||
self.0.open(path)
|
||||
self.inner.open(path)
|
||||
}
|
||||
|
||||
fn new_reader(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Reader> {
|
||||
Ok(ObfuscatedReader(self.0.new_reader(inner)?))
|
||||
fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
|
||||
let r = self.inner.delete(path);
|
||||
if r.is_ok() {
|
||||
self.files.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
fn new_writer(&self, inner: Arc<Self::Handle>) -> IoResult<Self::Writer> {
|
||||
Ok(ObfuscatedWriter(self.0.new_writer(inner)?))
|
||||
fn new_reader(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Reader> {
|
||||
Ok(ObfuscatedReader(self.inner.new_reader(handle)?))
|
||||
}
|
||||
|
||||
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
|
||||
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ pub enum Error {
|
|||
InvalidArgument(String),
|
||||
#[error("Corruption: {0}")]
|
||||
Corruption(String),
|
||||
#[error("IO Error: {0}")]
|
||||
#[error("IO Error: {0:?}")]
|
||||
Io(#[from] IoError),
|
||||
#[error("Codec Error: {0}")]
|
||||
Codec(#[from] CodecError),
|
||||
|
|
|
@ -5,12 +5,14 @@
|
|||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use log::warn;
|
||||
|
||||
use crate::env::{FileSystem, Handle, WriteExt};
|
||||
use crate::metrics::*;
|
||||
use crate::pipe_log::FileBlockHandle;
|
||||
use crate::{Error, Result};
|
||||
|
||||
use super::format::LogFileHeader;
|
||||
use crate::env::{FileSystem, Handle, WriteExt};
|
||||
|
||||
/// Maximum number of bytes to allocate ahead.
|
||||
const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024;
|
||||
|
@ -82,10 +84,19 @@ impl<F: FileSystem> LogFileWriter<F> {
|
|||
target_size_hint.saturating_sub(self.capacity),
|
||||
),
|
||||
);
|
||||
self.writer.allocate(self.capacity, alloc)?;
|
||||
if let Err(e) = self.writer.allocate(self.capacity, alloc) {
|
||||
warn!("log file allocation failed: {}", e);
|
||||
}
|
||||
self.capacity += alloc;
|
||||
}
|
||||
self.writer.write_all(buf)?;
|
||||
self.writer.write_all(buf).map_err(|e| {
|
||||
self.writer
|
||||
.seek(SeekFrom::Start(self.written as u64))
|
||||
.unwrap_or_else(|e| {
|
||||
panic!("failed to reseek after write failure: {}", e);
|
||||
});
|
||||
e
|
||||
})?;
|
||||
self.written = new_written;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::{self, File};
|
||||
use std::fs::File;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crossbeam::utils::CachePadded;
|
||||
use fail::fail_point;
|
||||
use log::{error, warn};
|
||||
use log::error;
|
||||
use parking_lot::{Mutex, MutexGuard, RwLock};
|
||||
|
||||
use crate::config::Config;
|
||||
|
@ -269,17 +269,15 @@ impl<F: FileSystem> SinglePipe<F> {
|
|||
let path = file_id.build_file_path(&self.dir);
|
||||
#[cfg(feature = "failpoints")]
|
||||
{
|
||||
let remove_failure = || {
|
||||
fail::fail_point!("file_pipe_log::remove_file_failure", |_| true);
|
||||
let remove_skipped = || {
|
||||
fail::fail_point!("file_pipe_log::remove_file_skipped", |_| true);
|
||||
false
|
||||
};
|
||||
if remove_failure() {
|
||||
if remove_skipped() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Err(e) = fs::remove_file(&path) {
|
||||
warn!("Remove purged log file {:?} failed: {}", path, e);
|
||||
}
|
||||
self.file_system.delete(&path)?;
|
||||
}
|
||||
Ok(purged)
|
||||
}
|
||||
|
|
|
@ -145,6 +145,36 @@ impl<F: FileSystem> DualPipesBuilder<F> {
|
|||
),
|
||||
] {
|
||||
if max_id > 0 {
|
||||
// Try to cleanup stale metadata left by the previous version.
|
||||
let max_sample = 100;
|
||||
// Find the first obsolete metadata.
|
||||
let mut delete_start = None;
|
||||
for i in 0..max_sample {
|
||||
let seq = i * min_id / max_sample;
|
||||
let file_id = FileId { queue, seq };
|
||||
let path = file_id.build_file_path(dir);
|
||||
if self.file_system.exists_metadata(&path) {
|
||||
delete_start = Some(i.saturating_sub(1) * min_id / max_sample + 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Delete metadata starting from the oldest. Abort on error.
|
||||
if let Some(start) = delete_start {
|
||||
let mut success = 0;
|
||||
for seq in start..min_id {
|
||||
let file_id = FileId { queue, seq };
|
||||
let path = file_id.build_file_path(dir);
|
||||
if let Err(e) = self.file_system.delete_metadata(&path) {
|
||||
error!("failed to delete metadata of {}: {}.", path.display(), e);
|
||||
break;
|
||||
}
|
||||
success += 1;
|
||||
}
|
||||
warn!(
|
||||
"deleted {} stale files of {:?} in range [{}, {}).",
|
||||
success, queue, start, min_id,
|
||||
);
|
||||
}
|
||||
for seq in min_id..=max_id {
|
||||
let file_id = FileId { queue, seq };
|
||||
let path = file_id.build_file_path(dir);
|
||||
|
|
|
@ -19,9 +19,8 @@
|
|||
#![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))]
|
||||
#![cfg_attr(feature = "swap", feature(slice_ptr_len))]
|
||||
#![cfg_attr(feature = "swap", feature(alloc_layout_extra))]
|
||||
// For testing only.
|
||||
#![cfg_attr(feature = "swap", feature(alloc_error_hook))]
|
||||
#![cfg_attr(feature = "swap", feature(cfg_sanitize))]
|
||||
#![cfg_attr(all(test, feature = "swap"), feature(alloc_error_hook))]
|
||||
#![cfg_attr(all(test, feature = "swap"), feature(cfg_sanitize))]
|
||||
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
|
106
src/memtable.rs
106
src/memtable.rs
|
@ -343,28 +343,25 @@ impl<A: AllocatorTrait> MemTable<A> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Appends some entries from rewrite queue. Assumes this table has no
|
||||
/// append data.
|
||||
/// Appends some entries from append queue. Assumes this table has no
|
||||
/// rewrite data.
|
||||
///
|
||||
/// This method is only used for recovery.
|
||||
pub fn append_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
|
||||
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
|
||||
let len = entry_indexes.len();
|
||||
if len > 0 {
|
||||
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
|
||||
debug_assert_eq!(self.rewrite_count, 0);
|
||||
self.prepare_append(
|
||||
entry_indexes[0].index,
|
||||
// Rewrite -> Compact Append -> Rewrite.
|
||||
true, /* allow_hole */
|
||||
// Refer to case in `merge_append_table`. They can be adapted
|
||||
// to attack this path via a global rewrite without deleting
|
||||
// obsolete rewrite files.
|
||||
false, /* allow_hole */
|
||||
// Refer to case in `merge_newer_neighbor`.
|
||||
true, /* allow_overwrite */
|
||||
);
|
||||
self.global_stats.add(LogQueue::Rewrite, len);
|
||||
self.global_stats.add(LogQueue::Append, len);
|
||||
for ei in &entry_indexes {
|
||||
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
|
||||
self.entry_indexes.push_back(ei.into());
|
||||
}
|
||||
self.rewrite_count = self.entry_indexes.len();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -442,6 +439,31 @@ impl<A: AllocatorTrait> MemTable<A> {
|
|||
self.rewrite_count = pos + rewrite_len;
|
||||
}
|
||||
|
||||
/// Appends some entries from rewrite queue. Assumes this table has no
|
||||
/// append data.
|
||||
///
|
||||
/// This method is only used for recovery.
|
||||
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
|
||||
let len = entry_indexes.len();
|
||||
if len > 0 {
|
||||
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
|
||||
self.prepare_append(
|
||||
entry_indexes[0].index,
|
||||
// Rewrite -> Compact Append -> Rewrite.
|
||||
true, /* allow_hole */
|
||||
// Refer to case in `merge_append_table`. They can be adapted
|
||||
// to attack this path via a global rewrite without deleting
|
||||
// obsolete rewrite files.
|
||||
true, /* allow_overwrite */
|
||||
);
|
||||
self.global_stats.add(LogQueue::Rewrite, len);
|
||||
for ei in &entry_indexes {
|
||||
self.entry_indexes.push_back(ei.into());
|
||||
}
|
||||
self.rewrite_count = self.entry_indexes.len();
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes all entries with index smaller than `index`. Returns the number
|
||||
/// of deleted entries.
|
||||
pub fn compact_to(&mut self, index: u64) -> u64 {
|
||||
|
@ -984,6 +1006,38 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Applies changes from log items that are replayed from a append queue.
|
||||
/// Assumes it haven't applied any rewrite data.
|
||||
///
|
||||
/// This method is only used for recovery.
|
||||
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
|
||||
for item in log_items {
|
||||
let raft = item.raft_group_id;
|
||||
let memtable = self.get_or_insert(raft);
|
||||
match item.content {
|
||||
LogItemContent::EntryIndexes(entries_to_add) => {
|
||||
memtable.write().replay_append(entries_to_add.0);
|
||||
}
|
||||
LogItemContent::Command(Command::Clean) => {
|
||||
self.remove(raft, true /* record_tombstone */);
|
||||
}
|
||||
LogItemContent::Command(Command::Compact { index }) => {
|
||||
memtable.write().compact_to(index);
|
||||
}
|
||||
LogItemContent::Kv(kv) => match kv.op_type {
|
||||
OpType::Put => {
|
||||
let value = kv.value.unwrap();
|
||||
memtable.write().put(kv.key, value, kv.file_id.unwrap());
|
||||
}
|
||||
OpType::Del => {
|
||||
let key = kv.key;
|
||||
memtable.write().delete(key.as_slice());
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies changes from log items that have been written to rewrite queue.
|
||||
pub fn apply_rewrite_writes(
|
||||
&self,
|
||||
|
@ -1015,15 +1069,16 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
|
|||
/// Assumes it haven't applied any append data.
|
||||
///
|
||||
/// This method is only used for recovery.
|
||||
pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) {
|
||||
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
|
||||
for item in log_items {
|
||||
let raft = item.raft_group_id;
|
||||
let memtable = self.get_or_insert(raft);
|
||||
match item.content {
|
||||
LogItemContent::EntryIndexes(entries_to_add) => {
|
||||
memtable.write().append_rewrite(entries_to_add.0);
|
||||
memtable.write().replay_rewrite(entries_to_add.0);
|
||||
}
|
||||
LogItemContent::Command(Command::Clean) => {
|
||||
// Only append tombstone needs to be recorded.
|
||||
self.remove(raft, false /* record_tombstone */);
|
||||
}
|
||||
LogItemContent::Command(Command::Compact { index }) => {
|
||||
|
@ -1109,10 +1164,8 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
|
|||
}
|
||||
}
|
||||
match file_id.queue {
|
||||
LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()),
|
||||
LogQueue::Rewrite => self
|
||||
.memtables
|
||||
.apply_replayed_rewrite_writes(item_batch.drain()),
|
||||
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
|
||||
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1120,10 +1173,8 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
|
|||
fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
|
||||
self.log_batch.merge(&mut rhs.log_batch.clone());
|
||||
match queue {
|
||||
LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()),
|
||||
LogQueue::Rewrite => self
|
||||
.memtables
|
||||
.apply_replayed_rewrite_writes(rhs.log_batch.drain()),
|
||||
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
|
||||
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
|
||||
}
|
||||
self.memtables.merge_newer_neighbor(rhs.memtables);
|
||||
Ok(())
|
||||
|
@ -1924,7 +1975,7 @@ mod tests {
|
|||
memtable.compact_to(7);
|
||||
}
|
||||
Some(LogQueue::Rewrite) => {
|
||||
memtable.append_rewrite(generate_entry_indexes(
|
||||
memtable.replay_rewrite(generate_entry_indexes(
|
||||
0,
|
||||
7,
|
||||
FileId::new(LogQueue::Rewrite, 1),
|
||||
|
@ -1966,7 +2017,7 @@ mod tests {
|
|||
memtable.compact_to(10);
|
||||
}
|
||||
Some(LogQueue::Rewrite) => {
|
||||
memtable.append_rewrite(generate_entry_indexes(
|
||||
memtable.replay_rewrite(generate_entry_indexes(
|
||||
0,
|
||||
7,
|
||||
FileId::new(LogQueue::Rewrite, 1),
|
||||
|
@ -2017,7 +2068,7 @@ mod tests {
|
|||
memtable.merge_newer_neighbor(&mut m1);
|
||||
}
|
||||
Some(LogQueue::Rewrite) => {
|
||||
memtable.append_rewrite(generate_entry_indexes(
|
||||
memtable.replay_rewrite(generate_entry_indexes(
|
||||
0,
|
||||
10,
|
||||
FileId::new(LogQueue::Rewrite, 1),
|
||||
|
@ -2099,6 +2150,13 @@ mod tests {
|
|||
batches[1].add_command(last_rid, Command::Compact { index: 5 });
|
||||
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));
|
||||
|
||||
// entries [1, 10] => entries [11, 20][5, 10] => compact 8
|
||||
last_rid += 1;
|
||||
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
|
||||
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
|
||||
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
|
||||
batches[2].add_command(last_rid, Command::Compact { index: 8 });
|
||||
|
||||
for b in batches.iter_mut() {
|
||||
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "stress"
|
||||
version = "0.2.0"
|
||||
version = "0.2.2"
|
||||
authors = ["The TiKV Authors"]
|
||||
edition = "2018"
|
||||
|
||||
|
|
|
@ -373,7 +373,7 @@ fn test_incomplete_purge() {
|
|||
let engine = Engine::open(cfg.clone()).unwrap();
|
||||
|
||||
{
|
||||
let _f = FailGuard::new("file_pipe_log::remove_file_failure", "return");
|
||||
let _f = FailGuard::new("file_pipe_log::remove_file_skipped", "return");
|
||||
append(&engine, rid, 0, 20, Some(&data));
|
||||
let append_first = engine.file_span(LogQueue::Append).0;
|
||||
engine.compact_to(rid, 18);
|
||||
|
|
|
@ -287,6 +287,68 @@ fn test_concurrent_write_error() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_non_atomic_write_error() {
|
||||
let dir = tempfile::Builder::new()
|
||||
.prefix("test_non_atomic_write_error")
|
||||
.tempdir()
|
||||
.unwrap();
|
||||
let cfg = Config {
|
||||
dir: dir.path().to_str().unwrap().to_owned(),
|
||||
bytes_per_sync: ReadableSize::kb(1024),
|
||||
target_file_size: ReadableSize::kb(1024),
|
||||
..Default::default()
|
||||
};
|
||||
let fs = Arc::new(ObfuscatedFileSystem::default());
|
||||
let entry = vec![b'x'; 1024];
|
||||
let rid = 1;
|
||||
|
||||
{
|
||||
// Write partially succeeds. We can reopen.
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
let _f1 = FailGuard::new("log_fd::write::err", "return");
|
||||
engine
|
||||
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
|
||||
.unwrap_err();
|
||||
}
|
||||
{
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
assert_eq!(engine.first_index(rid), None);
|
||||
}
|
||||
{
|
||||
// Write partially succeeds. We can overwrite.
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off");
|
||||
engine
|
||||
.write(&mut generate_batch(rid, 0, 1, Some(&entry)), true)
|
||||
.unwrap_err();
|
||||
engine
|
||||
.write(&mut generate_batch(rid, 5, 6, Some(&entry)), true)
|
||||
.unwrap();
|
||||
assert_eq!(engine.first_index(rid).unwrap(), 5);
|
||||
}
|
||||
{
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
assert_eq!(engine.first_index(rid).unwrap(), 5);
|
||||
}
|
||||
{
|
||||
// Write partially succeeds and can't be reverted. We panic.
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
let _f1 = FailGuard::new("log_fd::write::err", "return");
|
||||
let _f2 = FailGuard::new("log_file::seek::err", "return");
|
||||
assert!(catch_unwind_silent(|| {
|
||||
engine
|
||||
.write(&mut generate_batch(rid, 6, 7, Some(&entry)), true)
|
||||
.unwrap_err();
|
||||
})
|
||||
.is_err());
|
||||
}
|
||||
{
|
||||
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
|
||||
assert_eq!(engine.last_index(rid), Some(5));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "scripting")]
|
||||
#[test]
|
||||
fn test_error_during_repair() {
|
||||
|
@ -353,3 +415,28 @@ fn test_swappy_page_create_error() {
|
|||
vec.resize(1024, 0);
|
||||
assert_eq!(allocator.memory_usage(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_allocate_error() {
|
||||
let dir = tempfile::Builder::new()
|
||||
.prefix("test_file_allocate_error")
|
||||
.tempdir()
|
||||
.unwrap();
|
||||
let cfg = Config {
|
||||
dir: dir.path().to_str().unwrap().to_owned(),
|
||||
target_file_size: ReadableSize::mb(100),
|
||||
..Default::default()
|
||||
};
|
||||
let fs = Arc::new(ObfuscatedFileSystem::default());
|
||||
let entry = vec![b'x'; 1024];
|
||||
{
|
||||
let _f = FailGuard::new("log_fd::allocate::err", "return");
|
||||
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
|
||||
engine
|
||||
.write(&mut generate_batch(1, 1, 5, Some(&entry)), true)
|
||||
.unwrap();
|
||||
}
|
||||
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
|
||||
assert_eq!(engine.first_index(1).unwrap(), 1);
|
||||
assert_eq!(engine.last_index(1).unwrap(), 4);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue