Merge pull request #271 from nrc/buffer-insert

Buffer improvements
This commit is contained in:
Nick Cameron 2021-04-28 15:32:12 +12:00 committed by GitHub
commit 0535be6a49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 65 deletions

View File

@ -48,8 +48,8 @@ script:
# For now we only run full integration tests on Linux. Here's why:
# * Docker on OS X is not supported by Travis.
# * Docker on Windows seems to not have the correct binary at `"/c/Program Files/Docker/Docker/DockerCli.exe" to switch it to Linux containers.
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd:nightly --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv:nightly --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker run -d --net=host --name kv --rm --ulimit nofile=90000:90000 pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi

View File

@ -8,7 +8,7 @@ This crate provides an easy-to-use client for [TiKV](https://github.com/tikv/tik
This crate lets you connect to a TiKV cluster and use either a transactional or raw (simple get/put style without transactional consistency guarantees) API to access and update your data.
This is an open source (Apache 2) project maintained by the TiKV Authors. We welcome community contributions, see below for more info.
The TiKV Rust client is an open source (Apache 2) project maintained by the TiKV Authors. We welcome contributions, see below for more info.
## Getting started
@ -19,7 +19,7 @@ The TiKV client is a Rust library (crate). To use this crate in your project, ad
tikv-client = 0.1
```
Note, that you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
Note, you will need `tikv-client = { git = "https://github.com/tikv/client-rust.git" }` until we publish the crate (should be any day now).
The minimum supported version of Rust is 1.40. The minimum supported version of TiKV is 5.0.

View File

@ -24,19 +24,19 @@ impl Buffer {
}
/// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> {
pub fn get_primary_key(&self) -> Option<Key> {
self.primary_key.clone()
}
/// Set the primary key if it is not set
pub async fn primary_key_or(&mut self, key: &Key) {
pub fn primary_key_or(&mut self, key: &Key) {
self.primary_key.get_or_insert_with(|| key.clone());
}
/// Get a value from the buffer.
/// If the returned value is None, it means the key doesn't exist in buffer yet.
pub async fn get(&self, key: &Key) -> Option<Value> {
match self.get_from_mutations(key).await {
pub fn get(&self, key: &Key) -> Option<Value> {
match self.get_from_mutations(key) {
MutationValue::Determined(value) => value,
MutationValue::Undetermined => None,
}
@ -49,7 +49,7 @@ impl Buffer {
F: FnOnce(Key) -> Fut,
Fut: Future<Output = Result<Option<Value>>>,
{
match self.get_from_mutations(&key).await {
match self.get_from_mutations(&key) {
MutationValue::Determined(value) => Ok(value),
MutationValue::Undetermined => {
let value = f(key.clone()).await?;
@ -161,7 +161,7 @@ impl Buffer {
}
/// Lock the given key if necessary.
pub async fn lock(&mut self, key: Key) {
pub fn lock(&mut self, key: Key) {
self.primary_key.get_or_insert_with(|| key.clone());
let value = self
.entry_map
@ -174,13 +174,22 @@ impl Buffer {
}
}
/// Insert a value into the buffer (does not write through).
pub async fn put(&mut self, key: Key, value: Value) {
self.insert_entry(key, BufferEntry::Put(value));
/// Put a value into the buffer (does not write through).
pub fn put(&mut self, key: Key, value: Value) {
let mut entry = self.entry_map.entry(key.clone());
match entry {
Entry::Occupied(ref mut o)
if matches!(o.get(), BufferEntry::Insert(_))
|| matches!(o.get(), BufferEntry::CheckNotExist) =>
{
o.insert(BufferEntry::Insert(value));
}
_ => self.insert_entry(key, BufferEntry::Put(value)),
}
}
/// Mark a value as Insert mutation into the buffer (does not write through).
pub async fn insert(&mut self, key: Key, value: Value) {
pub fn insert(&mut self, key: Key, value: Value) {
let mut entry = self.entry_map.entry(key.clone());
match entry {
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@ -191,13 +200,15 @@ impl Buffer {
}
/// Mark a value as deleted.
pub async fn delete(&mut self, key: Key) {
pub fn delete(&mut self, key: Key) {
let is_pessimistic = self.is_pessimistic;
let mut entry = self.entry_map.entry(key.clone());
match entry {
Entry::Occupied(ref mut o)
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
if !is_pessimistic
&& (matches!(o.get(), BufferEntry::Insert(_))
|| matches!(o.get(), BufferEntry::CheckNotExist)) =>
{
o.insert(BufferEntry::CheckNotExist);
}
@ -206,14 +217,14 @@ impl Buffer {
}
/// Converts the buffered mutations to the proto buffer version
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
pub fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
self.entry_map
.iter()
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
.collect()
}
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
fn get_from_mutations(&self, key: &Key) -> MutationValue {
self.entry_map
.get(&key)
.map(BufferEntry::get_value)
@ -352,26 +363,26 @@ impl MutationValue {
mod tests {
use super::*;
use futures::{executor::block_on, future::ready};
use tikv_client_common::internal_err;
#[tokio::test]
#[allow(unreachable_code)]
async fn set_and_get_from_buffer() {
#[test]
fn set_and_get_from_buffer() {
let mut buffer = Buffer::new(false);
buffer
.put(b"key1".to_vec().into(), b"value1".to_vec())
.await;
buffer
.put(b"key2".to_vec().into(), b"value2".to_vec())
.await;
buffer.put(b"key1".to_vec().into(), b"value1".to_vec());
buffer.put(b"key2".to_vec().into(), b"value2".to_vec());
assert_eq!(
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
.unwrap()
.unwrap(),
block_on(
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
""
))))
)
.unwrap()
.unwrap(),
b"value1".to_vec()
);
buffer.delete(b"key2".to_vec().into()).await;
buffer.put(b"key1".to_vec().into(), b"value".to_vec()).await;
buffer.delete(b"key2".to_vec().into());
buffer.put(b"key1".to_vec().into(), b"value".to_vec());
assert_eq!(
block_on(buffer.batch_get_or_else(
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@ -386,27 +397,24 @@ mod tests {
);
}
#[tokio::test]
#[allow(unreachable_code)]
async fn insert_and_get_from_buffer() {
#[test]
fn insert_and_get_from_buffer() {
let mut buffer = Buffer::new(false);
buffer
.insert(b"key1".to_vec().into(), b"value1".to_vec())
.await;
buffer
.insert(b"key2".to_vec().into(), b"value2".to_vec())
.await;
buffer.insert(b"key1".to_vec().into(), b"value1".to_vec());
buffer.insert(b"key2".to_vec().into(), b"value2".to_vec());
assert_eq!(
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
.unwrap()
.unwrap(),
block_on(
buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(Err(internal_err!(
""
))))
)
.unwrap()
.unwrap(),
b"value1".to_vec()
);
buffer.delete(b"key2".to_vec().into()).await;
buffer
.insert(b"key1".to_vec().into(), b"value".to_vec())
.await;
buffer.delete(b"key2".to_vec().into());
buffer.insert(b"key1".to_vec().into(), b"value".to_vec());
assert_eq!(
block_on(buffer.batch_get_or_else(
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
@ -419,7 +427,6 @@ mod tests {
}
#[test]
#[allow(unreachable_code)]
fn repeat_reads_are_cached() {
let k1: Key = b"key1".to_vec().into();
let k1_ = k1.clone();
@ -433,7 +440,7 @@ mod tests {
let mut buffer = Buffer::new(false);
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Err(internal_err!("")))));
assert_eq!(r1.unwrap().unwrap(), v1);
assert_eq!(r2.unwrap().unwrap(), v1);
@ -443,7 +450,7 @@ mod tests {
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))
}),
);
let r2 = block_on(buffer.get_or_else(k2.clone(), move |_| ready(panic!())));
let r2 = block_on(buffer.get_or_else(k2.clone(), move |_| ready(Err(internal_err!("")))));
let r3 = block_on(
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
ready(Ok(vec![]))
@ -462,4 +469,42 @@ mod tests {
vec![KvPair(k1, v1), KvPair(k2, v2)]
);
}
// Check that multiple writes to the same key combine in the correct way.
#[test]
fn state_machine() {
let mut buffer = Buffer::new(false);
macro_rules! assert_entry {
($key: ident, $p: pat) => {
assert!(matches!(buffer.entry_map.get(&$key), Some(&$p),))
};
}
// Insert + Delete = CheckNotExists
let key: Key = b"key1".to_vec().into();
buffer.insert(key.clone(), b"value1".to_vec());
buffer.delete(key.clone());
assert_entry!(key, BufferEntry::CheckNotExist);
// CheckNotExists + Delete = CheckNotExists
buffer.delete(key.clone());
assert_entry!(key, BufferEntry::CheckNotExist);
// CheckNotExists + Put = Insert
buffer.put(key.clone(), b"value2".to_vec());
assert_entry!(key, BufferEntry::Insert(_));
// Insert + Put = Insert
let key: Key = b"key2".to_vec().into();
buffer.insert(key.clone(), b"value1".to_vec());
buffer.put(key.clone(), b"value2".to_vec());
assert_entry!(key, BufferEntry::Insert(_));
// Delete + Insert = Put
let key: Key = b"key3".to_vec().into();
buffer.delete(key.clone());
buffer.insert(key.clone(), b"value1".to_vec());
assert_entry!(key, BufferEntry::Put(_));
}
}

View File

@ -398,7 +398,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
}
self.buffer.put(key, value.into()).await;
self.buffer.put(key, value.into());
Ok(())
}
@ -424,7 +424,7 @@ impl<PdC: PdClient> Transaction<PdC> {
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.check_allow_operation().await?;
let key = key.into();
if self.buffer.get(&key).await.is_some() {
if self.buffer.get(&key).is_some() {
return Err(Error::DuplicateKeyInsertion);
}
if self.is_pessimistic() {
@ -434,7 +434,7 @@ impl<PdC: PdClient> Transaction<PdC> {
)
.await?;
}
self.buffer.insert(key, value.into()).await;
self.buffer.insert(key, value.into());
Ok(())
}
@ -462,7 +462,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
}
self.buffer.delete(key).await;
self.buffer.delete(key);
Ok(())
}
@ -497,7 +497,7 @@ impl<PdC: PdClient> Transaction<PdC> {
match self.options.kind {
TransactionKind::Optimistic => {
for key in keys {
self.buffer.lock(key.into()).await;
self.buffer.lock(key.into());
}
}
TransactionKind::Pessimistic(_) => {
@ -535,8 +535,8 @@ impl<PdC: PdClient> Transaction<PdC> {
*status = TransactionStatus::StartedCommit;
}
let primary_key = self.buffer.get_primary_key().await;
let mutations = self.buffer.to_proto_mutations().await;
let primary_key = self.buffer.get_primary_key();
let mutations = self.buffer.to_proto_mutations();
if mutations.is_empty() {
assert!(primary_key.is_none());
return Ok(None);
@ -595,8 +595,8 @@ impl<PdC: PdClient> Transaction<PdC> {
*status = TransactionStatus::StartedRollback;
}
let primary_key = self.buffer.get_primary_key().await;
let mutations = self.buffer.to_proto_mutations().await;
let primary_key = self.buffer.get_primary_key();
let mutations = self.buffer.to_proto_mutations();
let res = Committer::new(
primary_key,
mutations,
@ -619,7 +619,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Returns the TTL set on the transaction's locks by TiKV.
pub async fn send_heart_beat(&mut self) -> Result<u64> {
self.check_allow_operation().await?;
let primary_key = match self.buffer.get_primary_key().await {
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
None => return Err(Error::NoPrimaryKey),
};
@ -695,7 +695,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let primary_lock = self
.buffer
.get_primary_key()
.await
.unwrap_or_else(|| first_key.clone());
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
@ -717,12 +716,12 @@ impl<PdC: PdClient> Transaction<PdC> {
let pairs = plan.execute().await;
// primary key will be set here if needed
self.buffer.primary_key_or(&first_key).await;
self.buffer.primary_key_or(&first_key);
self.start_auto_heartbeat().await;
for key in keys {
self.buffer.lock(key.key()).await;
self.buffer.lock(key.key());
}
pairs
@ -755,7 +754,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let primary_key = self
.buffer
.get_primary_key()
.await
.expect("Primary key should exist");
let start_ts = self.timestamp.clone();
let region_backoff = self.options.retry_options.region_backoff.clone();