mirror of https://github.com/tikv/client-rust.git
fix: don't set primary key in a pessimistic lock request until it succeeds
Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
parent
4870985dcd
commit
fc47bb93b6
|
@ -17,7 +17,7 @@ struct InnerBuffer {
|
||||||
impl InnerBuffer {
|
impl InnerBuffer {
|
||||||
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
|
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
|
||||||
let key = key.into();
|
let key = key.into();
|
||||||
if !matches!(entry, BufferEntry::Cached(_)) {
|
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
|
||||||
self.primary_key.get_or_insert_with(|| key.clone());
|
self.primary_key.get_or_insert_with(|| key.clone());
|
||||||
}
|
}
|
||||||
self.entry_map.insert(key, entry);
|
self.entry_map.insert(key, entry);
|
||||||
|
@ -31,17 +31,17 @@ impl InnerBuffer {
|
||||||
/// A caching layer which buffers reads and writes in a transaction.
|
/// A caching layer which buffers reads and writes in a transaction.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Buffer {
|
pub struct Buffer {
|
||||||
mutations: Mutex<InnerBuffer>,
|
inner: Mutex<InnerBuffer>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Buffer {
|
impl Buffer {
|
||||||
/// Get the primary key of the buffer.
|
/// Get the primary key of the buffer.
|
||||||
pub async fn get_primary_key(&self) -> Option<Key> {
|
pub async fn get_primary_key(&self) -> Option<Key> {
|
||||||
self.mutations.lock().await.primary_key.clone()
|
self.inner.lock().await.primary_key.clone()
|
||||||
}
|
}
|
||||||
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
|
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
|
||||||
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
|
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
|
||||||
self.mutations.lock().await.get_primary_key_or(key).clone()
|
self.inner.lock().await.get_primary_key_or(key).clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a value from the buffer.
|
/// Get a value from the buffer.
|
||||||
|
@ -64,7 +64,7 @@ impl Buffer {
|
||||||
MutationValue::Determined(value) => Ok(value),
|
MutationValue::Determined(value) => Ok(value),
|
||||||
MutationValue::Undetermined => {
|
MutationValue::Undetermined => {
|
||||||
let value = f(key.clone()).await?;
|
let value = f(key.clone()).await?;
|
||||||
let mut mutations = self.mutations.lock().await;
|
let mut mutations = self.inner.lock().await;
|
||||||
Self::update_cache(&mut mutations, key, value.clone());
|
Self::update_cache(&mut mutations, key, value.clone());
|
||||||
Ok(value)
|
Ok(value)
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ impl Buffer {
|
||||||
Fut: Future<Output = Result<Vec<KvPair>>>,
|
Fut: Future<Output = Result<Vec<KvPair>>>,
|
||||||
{
|
{
|
||||||
let (cached_results, undetermined_keys) = {
|
let (cached_results, undetermined_keys) = {
|
||||||
let mutations = self.mutations.lock().await;
|
let mutations = self.inner.lock().await;
|
||||||
// Partition the keys into those we have buffered and those we have to
|
// Partition the keys into those we have buffered and those we have to
|
||||||
// get from the store.
|
// get from the store.
|
||||||
let (undetermined_keys, cached_results): (
|
let (undetermined_keys, cached_results): (
|
||||||
|
@ -111,7 +111,7 @@ impl Buffer {
|
||||||
};
|
};
|
||||||
|
|
||||||
let fetched_results = f(Box::new(undetermined_keys)).await?;
|
let fetched_results = f(Box::new(undetermined_keys)).await?;
|
||||||
let mut mutations = self.mutations.lock().await;
|
let mut mutations = self.inner.lock().await;
|
||||||
for kvpair in &fetched_results {
|
for kvpair in &fetched_results {
|
||||||
let key = kvpair.0.clone();
|
let key = kvpair.0.clone();
|
||||||
let value = Some(kvpair.1.clone());
|
let value = Some(kvpair.1.clone());
|
||||||
|
@ -134,7 +134,7 @@ impl Buffer {
|
||||||
Fut: Future<Output = Result<Vec<KvPair>>>,
|
Fut: Future<Output = Result<Vec<KvPair>>>,
|
||||||
{
|
{
|
||||||
// read from local buffer
|
// read from local buffer
|
||||||
let mut mutations = self.mutations.lock().await;
|
let mut mutations = self.inner.lock().await;
|
||||||
let mutation_range = mutations.entry_map.range(range.clone());
|
let mutation_range = mutations.entry_map.range(range.clone());
|
||||||
|
|
||||||
// fetch from TiKV
|
// fetch from TiKV
|
||||||
|
@ -180,8 +180,8 @@ impl Buffer {
|
||||||
|
|
||||||
/// Lock the given key if necessary.
|
/// Lock the given key if necessary.
|
||||||
pub async fn lock(&self, key: Key) {
|
pub async fn lock(&self, key: Key) {
|
||||||
let mutations = &mut self.mutations.lock().await;
|
let mutations = &mut self.inner.lock().await;
|
||||||
mutations.primary_key.get_or_insert(key.clone());
|
mutations.primary_key.get_or_insert_with(|| key.clone());
|
||||||
let value = mutations
|
let value = mutations
|
||||||
.entry_map
|
.entry_map
|
||||||
.entry(key)
|
.entry(key)
|
||||||
|
@ -195,15 +195,12 @@ impl Buffer {
|
||||||
|
|
||||||
/// Insert a value into the buffer (does not write through).
|
/// Insert a value into the buffer (does not write through).
|
||||||
pub async fn put(&self, key: Key, value: Value) {
|
pub async fn put(&self, key: Key, value: Value) {
|
||||||
self.mutations
|
self.inner.lock().await.insert(key, BufferEntry::Put(value));
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.insert(key, BufferEntry::Put(value));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a value as Insert mutation into the buffer (does not write through).
|
/// Mark a value as Insert mutation into the buffer (does not write through).
|
||||||
pub async fn insert(&self, key: Key, value: Value) {
|
pub async fn insert(&self, key: Key, value: Value) {
|
||||||
self.mutations
|
self.inner
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.insert(key, BufferEntry::Insert(value));
|
.insert(key, BufferEntry::Insert(value));
|
||||||
|
@ -211,7 +208,7 @@ impl Buffer {
|
||||||
|
|
||||||
/// Mark a value as deleted.
|
/// Mark a value as deleted.
|
||||||
pub async fn delete(&self, key: Key) {
|
pub async fn delete(&self, key: Key) {
|
||||||
let mut mutations = self.mutations.lock().await;
|
let mut mutations = self.inner.lock().await;
|
||||||
let value = mutations
|
let value = mutations
|
||||||
.entry_map
|
.entry_map
|
||||||
.entry(key.clone())
|
.entry(key.clone())
|
||||||
|
@ -229,7 +226,7 @@ impl Buffer {
|
||||||
|
|
||||||
/// Converts the buffered mutations to the proto buffer version
|
/// Converts the buffered mutations to the proto buffer version
|
||||||
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
|
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
|
||||||
self.mutations
|
self.inner
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.entry_map
|
.entry_map
|
||||||
|
@ -239,7 +236,7 @@ impl Buffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
|
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
|
||||||
self.mutations
|
self.inner
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.entry_map
|
.entry_map
|
||||||
|
|
|
@ -644,7 +644,13 @@ impl<PdC: PdClient> Transaction<PdC> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let first_key = keys[0].clone();
|
let first_key = keys[0].clone();
|
||||||
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
|
// we do not set the primary key here, because pessimistic lock request
|
||||||
|
// can fail, in which case the keys may not be part of the transaction.
|
||||||
|
let primary_lock = self
|
||||||
|
.buffer
|
||||||
|
.get_primary_key()
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|| first_key.clone());
|
||||||
let for_update_ts = self.rpc.clone().get_timestamp().await?;
|
let for_update_ts = self.rpc.clone().get_timestamp().await?;
|
||||||
self.options.push_for_update_ts(for_update_ts.clone());
|
self.options.push_for_update_ts(for_update_ts.clone());
|
||||||
let request = new_pessimistic_lock_request(
|
let request = new_pessimistic_lock_request(
|
||||||
|
@ -664,6 +670,9 @@ impl<PdC: PdClient> Transaction<PdC> {
|
||||||
.plan();
|
.plan();
|
||||||
let pairs = plan.execute().await;
|
let pairs = plan.execute().await;
|
||||||
|
|
||||||
|
// primary key will be set here if needed
|
||||||
|
self.buffer.get_primary_key_or(&first_key).await;
|
||||||
|
|
||||||
self.start_auto_heartbeat().await;
|
self.start_auto_heartbeat().await;
|
||||||
|
|
||||||
for key in keys {
|
for key in keys {
|
||||||
|
|
Loading…
Reference in New Issue