Fix two bugs with insert and delete

Fixes #234

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2021-04-09 04:04:59 +12:00
parent 4870985dcd
commit e8a9175adb
5 changed files with 131 additions and 37 deletions

1
rust-toolchain Normal file
View File

@ -0,0 +1 @@
nightly-2021-03-15

View File

@ -1,17 +1,21 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{BoundRange, Key, KvPair, Result, Value}; use crate::{BoundRange, Key, KvPair, Result, Value};
use derive_new::new;
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{btree_map::Entry, BTreeMap, HashMap},
future::Future, future::Future,
}; };
use tikv_client_proto::kvrpcpb; use tikv_client_proto::kvrpcpb;
use tokio::sync::{Mutex, MutexGuard}; use tokio::sync::{Mutex, MutexGuard};
#[derive(Default)] #[derive(new)]
struct InnerBuffer { struct InnerBuffer {
#[new(default)]
primary_key: Option<Key>, primary_key: Option<Key>,
#[new(default)]
entry_map: BTreeMap<Key, BufferEntry>, entry_map: BTreeMap<Key, BufferEntry>,
is_pessimistic: bool,
} }
impl InnerBuffer { impl InnerBuffer {
@ -29,16 +33,22 @@ impl InnerBuffer {
} }
/// A caching layer which buffers reads and writes in a transaction. /// A caching layer which buffers reads and writes in a transaction.
#[derive(Default)]
pub struct Buffer { pub struct Buffer {
mutations: Mutex<InnerBuffer>, mutations: Mutex<InnerBuffer>,
} }
impl Buffer { impl Buffer {
pub fn new(is_pessimistic: bool) -> Buffer {
Buffer {
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
}
}
/// Get the primary key of the buffer. /// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> { pub async fn get_primary_key(&self) -> Option<Key> {
self.mutations.lock().await.primary_key.clone() self.mutations.lock().await.primary_key.clone()
} }
/// Get the primary key of the buffer, if not exists, use `key` as the primary key. /// Get the primary key of the buffer, if not exists, use `key` as the primary key.
pub async fn get_primary_key_or(&self, key: &Key) -> Key { pub async fn get_primary_key_or(&self, key: &Key) -> Key {
self.mutations.lock().await.get_primary_key_or(key).clone() self.mutations.lock().await.get_primary_key_or(key).clone()
@ -203,28 +213,30 @@ impl Buffer {
/// Mark a value as Insert mutation into the buffer (does not write through). /// Mark a value as Insert mutation into the buffer (does not write through).
pub async fn insert(&self, key: Key, value: Value) { pub async fn insert(&self, key: Key, value: Value) {
self.mutations let mut mutations = self.mutations.lock().await;
.lock() let mut entry = mutations.entry_map.entry(key.clone());
.await match entry {
.insert(key, BufferEntry::Insert(value)); Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
o.insert(BufferEntry::Put(value));
}
_ => mutations.insert(key, BufferEntry::Insert(value)),
}
} }
/// Mark a value as deleted. /// Mark a value as deleted.
pub async fn delete(&self, key: Key) { pub async fn delete(&self, key: Key) {
let mut mutations = self.mutations.lock().await; let mut mutations = self.mutations.lock().await;
let value = mutations let is_pessimistic = mutations.is_pessimistic;
.entry_map let mut entry = mutations.entry_map.entry(key.clone());
.entry(key.clone())
.or_insert(BufferEntry::Del);
let new_value: BufferEntry; match entry {
if let BufferEntry::Insert(_) = value { Entry::Occupied(ref mut o)
new_value = BufferEntry::CheckNotExist if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
} else { {
new_value = BufferEntry::Del o.insert(BufferEntry::CheckNotExist);
}
_ => mutations.insert(key, BufferEntry::Del),
} }
mutations.insert(key, new_value);
} }
/// Converts the buffered mutations to the proto buffer version /// Converts the buffered mutations to the proto buffer version
@ -378,7 +390,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[allow(unreachable_code)] #[allow(unreachable_code)]
async fn set_and_get_from_buffer() { async fn set_and_get_from_buffer() {
let buffer = Buffer::default(); let buffer = Buffer::new(false);
buffer buffer
.put(b"key1".to_vec().into(), b"value1".to_vec()) .put(b"key1".to_vec().into(), b"value1".to_vec())
.await; .await;
@ -411,7 +423,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[allow(unreachable_code)] #[allow(unreachable_code)]
async fn insert_and_get_from_buffer() { async fn insert_and_get_from_buffer() {
let buffer = Buffer::default(); let buffer = Buffer::new(false);
buffer buffer
.insert(b"key1".to_vec().into(), b"value1".to_vec()) .insert(b"key1".to_vec().into(), b"value1".to_vec())
.await; .await;
@ -453,13 +465,13 @@ mod tests {
let v2: Value = b"value2".to_vec(); let v2: Value = b"value2".to_vec();
let v2_ = v2.clone(); let v2_ = v2.clone();
let buffer = Buffer::default(); let buffer = Buffer::new(false);
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_))))); let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!()))); let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
assert_eq!(r1.unwrap().unwrap(), v1); assert_eq!(r1.unwrap().unwrap(), v1);
assert_eq!(r2.unwrap().unwrap(), v1); assert_eq!(r2.unwrap().unwrap(), v1);
let buffer = Buffer::default(); let buffer = Buffer::new(false);
let r1 = block_on( let r1 = block_on(
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| { buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()])) ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))

View File

@ -106,8 +106,34 @@ pub fn new_pessimistic_rollback_request(
) )
} }
pub trait PessimisticLock: Clone {
fn key(self) -> Key;
fn assertion(&self) -> kvrpcpb::Assertion;
}
impl PessimisticLock for Key {
fn key(self) -> Key {
self
}
fn assertion(&self) -> kvrpcpb::Assertion {
kvrpcpb::Assertion::None
}
}
impl PessimisticLock for (Key, kvrpcpb::Assertion) {
fn key(self) -> Key {
self.0
}
fn assertion(&self) -> kvrpcpb::Assertion {
self.1
}
}
pub fn new_pessimistic_lock_request( pub fn new_pessimistic_lock_request(
keys: impl Iterator<Item = Key>, locks: impl Iterator<Item = impl PessimisticLock>,
primary_lock: Key, primary_lock: Key,
start_version: Timestamp, start_version: Timestamp,
lock_ttl: u64, lock_ttl: u64,
@ -115,13 +141,15 @@ pub fn new_pessimistic_lock_request(
need_value: bool, need_value: bool,
) -> kvrpcpb::PessimisticLockRequest { ) -> kvrpcpb::PessimisticLockRequest {
requests::new_pessimistic_lock_request( requests::new_pessimistic_lock_request(
keys.map(|key| { locks
let mut mutation = kvrpcpb::Mutation::default(); .map(|pl| {
mutation.set_op(kvrpcpb::Op::PessimisticLock); let mut mutation = kvrpcpb::Mutation::default();
mutation.set_key(key.into()); mutation.set_op(kvrpcpb::Op::PessimisticLock);
mutation mutation.set_assertion(pl.assertion());
}) mutation.set_key(pl.key().into());
.collect(), mutation
})
.collect(),
primary_lock.into(), primary_lock.into(),
start_version.version(), start_version.version(),
lock_ttl, lock_ttl,

View File

@ -66,7 +66,7 @@ impl<PdC: PdClient> Transaction<PdC> {
Transaction { Transaction {
status: Arc::new(RwLock::new(status)), status: Arc::new(RwLock::new(status)),
timestamp, timestamp,
buffer: Default::default(), buffer: Buffer::new(options.is_pessimistic()),
rpc, rpc,
options, options,
is_heartbeat_started: false, is_heartbeat_started: false,
@ -401,8 +401,11 @@ impl<PdC: PdClient> Transaction<PdC> {
return Err(Error::DuplicateKeyInsertion); return Err(Error::DuplicateKeyInsertion);
} }
if self.is_pessimistic() { if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false) self.pessimistic_lock(
.await?; iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
false,
)
.await?;
} }
self.buffer.insert(key, value.into()).await; self.buffer.insert(key, value.into()).await;
Ok(()) Ok(())
@ -630,7 +633,7 @@ impl<PdC: PdClient> Transaction<PdC> {
/// Only valid for pessimistic transactions, panics if called on an optimistic transaction. /// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
async fn pessimistic_lock( async fn pessimistic_lock(
&mut self, &mut self,
keys: impl IntoIterator<Item = Key>, keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool, need_value: bool,
) -> Result<Vec<KvPair>> { ) -> Result<Vec<KvPair>> {
assert!( assert!(
@ -638,12 +641,12 @@ impl<PdC: PdClient> Transaction<PdC> {
"`pessimistic_lock` is only valid to use with pessimistic transactions" "`pessimistic_lock` is only valid to use with pessimistic transactions"
); );
let keys: Vec<Key> = keys.into_iter().collect(); let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() { if keys.is_empty() {
return Ok(vec![]); return Ok(vec![]);
} }
let first_key = keys[0].clone(); let first_key = keys[0].clone().key();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await; let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
let for_update_ts = self.rpc.clone().get_timestamp().await?; let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone()); self.options.push_for_update_ts(for_update_ts.clone());
@ -667,7 +670,7 @@ impl<PdC: PdClient> Transaction<PdC> {
self.start_auto_heartbeat().await; self.start_auto_heartbeat().await;
for key in keys { for key in keys {
self.buffer.lock(key).await; self.buffer.lock(key.key()).await;
} }
pairs pairs
@ -891,6 +894,13 @@ impl TransactionOptions {
self.auto_heartbeat = false; self.auto_heartbeat = false;
self self
} }
pub fn is_pessimistic(&self) -> bool {
match self.kind {
TransactionKind::Pessimistic(_) => true,
TransactionKind::Optimistic => false,
}
}
} }
/// The default TTL of a lock in milliseconds. /// The default TTL of a lock in milliseconds.

View File

@ -596,6 +596,49 @@ async fn pessimistic_rollback() -> Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
#[serial]
async fn pessimistic_delete() -> Result<()> {
clear_tikv().await;
let client =
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;
// The transaction will lock the keys and must release the locks on commit, even when values are
// not written to the DB.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.commit().await?;
// Check that the keys are not locked.
let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;
// As before, but rollback instead of commit.
let mut txn = client.begin_pessimistic().await?;
txn.put(vec![1], vec![42]).await?;
txn.delete(vec![1]).await?;
txn.delete(vec![2]).await?;
txn.insert(vec![2], vec![42]).await?;
txn.delete(vec![2]).await?;
txn.put(vec![3], vec![42]).await?;
txn.rollback().await?;
let mut txn2 = client.begin_optimistic().await?;
txn2.put(vec![1], vec![42]).await?;
txn2.put(vec![2], vec![42]).await?;
txn2.put(vec![3], vec![42]).await?;
txn2.commit().await?;
Ok(())
}
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn lock_keys() -> Result<()> { async fn lock_keys() -> Result<()> {