diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 1d7f61b..7f47790 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -17,7 +17,7 @@ struct InnerBuffer { impl InnerBuffer { fn insert(&mut self, key: impl Into, entry: BufferEntry) { let key = key.into(); - if !matches!(entry, BufferEntry::Cached(_)) { + if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) { self.primary_key.get_or_insert_with(|| key.clone()); } self.entry_map.insert(key, entry); @@ -31,17 +31,17 @@ impl InnerBuffer { /// A caching layer which buffers reads and writes in a transaction. #[derive(Default)] pub struct Buffer { - mutations: Mutex, + inner: Mutex, } impl Buffer { /// Get the primary key of the buffer. pub async fn get_primary_key(&self) -> Option { - self.mutations.lock().await.primary_key.clone() + self.inner.lock().await.primary_key.clone() } /// Get the primary key of the buffer, if not exists, use `key` as the primary key. pub async fn get_primary_key_or(&self, key: &Key) -> Key { - self.mutations.lock().await.get_primary_key_or(key).clone() + self.inner.lock().await.get_primary_key_or(key).clone() } /// Get a value from the buffer. @@ -64,7 +64,7 @@ impl Buffer { MutationValue::Determined(value) => Ok(value), MutationValue::Undetermined => { let value = f(key.clone()).await?; - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; Self::update_cache(&mut mutations, key, value.clone()); Ok(value) } @@ -85,7 +85,7 @@ impl Buffer { Fut: Future>>, { let (cached_results, undetermined_keys) = { - let mutations = self.mutations.lock().await; + let mutations = self.inner.lock().await; // Partition the keys into those we have buffered and those we have to // get from the store. let (undetermined_keys, cached_results): ( @@ -111,7 +111,7 @@ impl Buffer { }; let fetched_results = f(Box::new(undetermined_keys)).await?; - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; for kvpair in &fetched_results { let key = kvpair.0.clone(); let value = Some(kvpair.1.clone()); @@ -134,7 +134,7 @@ impl Buffer { Fut: Future>>, { // read from local buffer - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; let mutation_range = mutations.entry_map.range(range.clone()); // fetch from TiKV @@ -180,8 +180,8 @@ impl Buffer { /// Lock the given key if necessary. pub async fn lock(&self, key: Key) { - let mutations = &mut self.mutations.lock().await; - mutations.primary_key.get_or_insert(key.clone()); + let mutations = &mut self.inner.lock().await; + mutations.primary_key.get_or_insert_with(|| key.clone()); let value = mutations .entry_map .entry(key) @@ -195,15 +195,12 @@ impl Buffer { /// Insert a value into the buffer (does not write through). pub async fn put(&self, key: Key, value: Value) { - self.mutations - .lock() - .await - .insert(key, BufferEntry::Put(value)); + self.inner.lock().await.insert(key, BufferEntry::Put(value)); } /// Mark a value as Insert mutation into the buffer (does not write through). pub async fn insert(&self, key: Key, value: Value) { - self.mutations + self.inner .lock() .await .insert(key, BufferEntry::Insert(value)); @@ -211,7 +208,7 @@ impl Buffer { /// Mark a value as deleted. pub async fn delete(&self, key: Key) { - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; let value = mutations .entry_map .entry(key.clone()) @@ -229,7 +226,7 @@ impl Buffer { /// Converts the buffered mutations to the proto buffer version pub async fn to_proto_mutations(&self) -> Vec { - self.mutations + self.inner .lock() .await .entry_map @@ -239,7 +236,7 @@ impl Buffer { } async fn get_from_mutations(&self, key: &Key) -> MutationValue { - self.mutations + self.inner .lock() .await .entry_map diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 1fe8be4..dd5e432 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -644,7 +644,13 @@ impl Transaction { } let first_key = keys[0].clone(); - let primary_lock = self.buffer.get_primary_key_or(&first_key).await; + // we do not set the primary key here, because pessimistic lock request + // can fail, in which case the keys may not be part of the transaction. + let primary_lock = self + .buffer + .get_primary_key() + .await + .unwrap_or_else(|| first_key.clone()); let for_update_ts = self.rpc.clone().get_timestamp().await?; self.options.push_for_update_ts(for_update_ts.clone()); let request = new_pessimistic_lock_request( @@ -664,6 +670,9 @@ impl Transaction { .plan(); let pairs = plan.execute().await; + // primary key will be set here if needed + self.buffer.get_primary_key_or(&first_key).await; + self.start_auto_heartbeat().await; for key in keys {