Merge pull request #254 from ekexium/pessimistic-lock-primary-key

Don't set primary key in a pessimistic lock request until it succeeds
This commit is contained in:
Nick Cameron 2021-04-19 09:36:10 +12:00 committed by GitHub
commit e320efa51c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 24 deletions

View File

@ -21,37 +21,38 @@ struct InnerBuffer {
impl InnerBuffer { impl InnerBuffer {
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) { fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
let key = key.into(); let key = key.into();
if !matches!(entry, BufferEntry::Cached(_)) { if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
self.primary_key.get_or_insert_with(|| key.clone()); self.primary_key.get_or_insert_with(|| key.clone());
} }
self.entry_map.insert(key, entry); self.entry_map.insert(key, entry);
} }
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key { /// Set the primary key if it is not set
self.primary_key.get_or_insert(key.clone()) pub fn primary_key_or(&mut self, key: &Key) {
self.primary_key.get_or_insert(key.clone());
} }
} }
/// A caching layer which buffers reads and writes in a transaction. /// A caching layer which buffers reads and writes in a transaction.
pub struct Buffer { pub struct Buffer {
mutations: Mutex<InnerBuffer>, inner: Mutex<InnerBuffer>,
} }
impl Buffer { impl Buffer {
pub fn new(is_pessimistic: bool) -> Buffer { pub fn new(is_pessimistic: bool) -> Buffer {
Buffer { Buffer {
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)), inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
} }
} }
/// Get the primary key of the buffer. /// Get the primary key of the buffer.
pub async fn get_primary_key(&self) -> Option<Key> { pub async fn get_primary_key(&self) -> Option<Key> {
self.mutations.lock().await.primary_key.clone() self.inner.lock().await.primary_key.clone()
} }
/// Get the primary key of the buffer, if not exists, use `key` as the primary key. /// Set the primary key if it is not set
pub async fn get_primary_key_or(&self, key: &Key) -> Key { pub async fn primary_key_or(&self, key: &Key) {
self.mutations.lock().await.get_primary_key_or(key).clone() self.inner.lock().await.primary_key_or(key);
} }
/// Get a value from the buffer. /// Get a value from the buffer.
@ -74,7 +75,7 @@ impl Buffer {
MutationValue::Determined(value) => Ok(value), MutationValue::Determined(value) => Ok(value),
MutationValue::Undetermined => { MutationValue::Undetermined => {
let value = f(key.clone()).await?; let value = f(key.clone()).await?;
let mut mutations = self.mutations.lock().await; let mut mutations = self.inner.lock().await;
Self::update_cache(&mut mutations, key, value.clone()); Self::update_cache(&mut mutations, key, value.clone());
Ok(value) Ok(value)
} }
@ -95,7 +96,7 @@ impl Buffer {
Fut: Future<Output = Result<Vec<KvPair>>>, Fut: Future<Output = Result<Vec<KvPair>>>,
{ {
let (cached_results, undetermined_keys) = { let (cached_results, undetermined_keys) = {
let mutations = self.mutations.lock().await; let mutations = self.inner.lock().await;
// Partition the keys into those we have buffered and those we have to // Partition the keys into those we have buffered and those we have to
// get from the store. // get from the store.
let (undetermined_keys, cached_results): ( let (undetermined_keys, cached_results): (
@ -121,7 +122,7 @@ impl Buffer {
}; };
let fetched_results = f(Box::new(undetermined_keys)).await?; let fetched_results = f(Box::new(undetermined_keys)).await?;
let mut mutations = self.mutations.lock().await; let mut mutations = self.inner.lock().await;
for kvpair in &fetched_results { for kvpair in &fetched_results {
let key = kvpair.0.clone(); let key = kvpair.0.clone();
let value = Some(kvpair.1.clone()); let value = Some(kvpair.1.clone());
@ -144,7 +145,7 @@ impl Buffer {
Fut: Future<Output = Result<Vec<KvPair>>>, Fut: Future<Output = Result<Vec<KvPair>>>,
{ {
// read from local buffer // read from local buffer
let mut mutations = self.mutations.lock().await; let mut mutations = self.inner.lock().await;
let mutation_range = mutations.entry_map.range(range.clone()); let mutation_range = mutations.entry_map.range(range.clone());
// fetch from TiKV // fetch from TiKV
@ -190,8 +191,8 @@ impl Buffer {
/// Lock the given key if necessary. /// Lock the given key if necessary.
pub async fn lock(&self, key: Key) { pub async fn lock(&self, key: Key) {
let mutations = &mut self.mutations.lock().await; let mutations = &mut self.inner.lock().await;
mutations.primary_key.get_or_insert(key.clone()); mutations.primary_key.get_or_insert_with(|| key.clone());
let value = mutations let value = mutations
.entry_map .entry_map
.entry(key) .entry(key)
@ -205,15 +206,12 @@ impl Buffer {
/// Insert a value into the buffer (does not write through). /// Insert a value into the buffer (does not write through).
pub async fn put(&self, key: Key, value: Value) { pub async fn put(&self, key: Key, value: Value) {
self.mutations self.inner.lock().await.insert(key, BufferEntry::Put(value));
.lock()
.await
.insert(key, BufferEntry::Put(value));
} }
/// Mark a value as Insert mutation into the buffer (does not write through). /// Mark a value as Insert mutation into the buffer (does not write through).
pub async fn insert(&self, key: Key, value: Value) { pub async fn insert(&self, key: Key, value: Value) {
let mut mutations = self.mutations.lock().await; let mut mutations = self.inner.lock().await;
let mut entry = mutations.entry_map.entry(key.clone()); let mut entry = mutations.entry_map.entry(key.clone());
match entry { match entry {
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => { Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@ -225,7 +223,7 @@ impl Buffer {
/// Mark a value as deleted. /// Mark a value as deleted.
pub async fn delete(&self, key: Key) { pub async fn delete(&self, key: Key) {
let mut mutations = self.mutations.lock().await; let mut mutations = self.inner.lock().await;
let is_pessimistic = mutations.is_pessimistic; let is_pessimistic = mutations.is_pessimistic;
let mut entry = mutations.entry_map.entry(key.clone()); let mut entry = mutations.entry_map.entry(key.clone());
@ -241,7 +239,7 @@ impl Buffer {
/// Converts the buffered mutations to the proto buffer version /// Converts the buffered mutations to the proto buffer version
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> { pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
self.mutations self.inner
.lock() .lock()
.await .await
.entry_map .entry_map
@ -251,7 +249,7 @@ impl Buffer {
} }
async fn get_from_mutations(&self, key: &Key) -> MutationValue { async fn get_from_mutations(&self, key: &Key) -> MutationValue {
self.mutations self.inner
.lock() .lock()
.await .await
.entry_map .entry_map

View File

@ -649,7 +649,13 @@ impl<PdC: PdClient> Transaction<PdC> {
} }
let first_key = keys[0].clone().key(); let first_key = keys[0].clone().key();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await; // we do not set the primary key here, because pessimistic lock request
// can fail, in which case the keys may not be part of the transaction.
let primary_lock = self
.buffer
.get_primary_key()
.await
.unwrap_or_else(|| first_key.clone());
let for_update_ts = self.rpc.clone().get_timestamp().await?; let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone()); self.options.push_for_update_ts(for_update_ts.clone());
let request = new_pessimistic_lock_request( let request = new_pessimistic_lock_request(
@ -669,6 +675,9 @@ impl<PdC: PdClient> Transaction<PdC> {
.plan(); .plan();
let pairs = plan.execute().await; let pairs = plan.execute().await;
// primary key will be set here if needed
self.buffer.primary_key_or(&first_key).await;
self.start_auto_heartbeat().await; self.start_auto_heartbeat().await;
for key in keys { for key in keys {