From d3e5ffbde1a6bc884fcf2e509a370a0f69810326 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 12 Apr 2021 15:03:09 +0800 Subject: [PATCH 1/2] feat: make get_for_update work for optimistic txns Signed-off-by: ekexium --- src/transaction/transaction.rs | 50 ++++++++++++++++++++++++---------- tests/integration_tests.rs | 17 ++++++++---- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 1fe8be4..0b162cc 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -116,6 +116,17 @@ impl Transaction { } /// Create a `get for udpate` request. + /// It has different behaviors in optimistic and pessimistic transactions. + /// + /// # Optimistic transaction + /// Once resolved this request will retrieve the value just like a normal `get` request, + /// and "locks" the key. This lock will not affect other (concurrent) transactions, but will + /// prevent the current transaction from successfully committing if there is another write + /// containing the "locked" key which is committed between the start and commit of the current transaction. + /// + /// The value is read from the `start timestamp`, thus it is cached in the local buffer. + /// + /// # Pessimistic transaction /// Once resolved this request will pessimistically lock and fetch the latest /// value associated with the given key at **current timestamp**. /// @@ -127,8 +138,6 @@ impl Transaction { /// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`. /// /// - /// It can only be used in pessimistic mode. - /// /// # Examples /// ```rust,no_run /// # use tikv_client::{Value, Config, TransactionClient}; @@ -146,7 +155,9 @@ impl Transaction { pub async fn get_for_update(&mut self, key: impl Into) -> Result> { self.check_allow_operation().await?; if !self.is_pessimistic() { - Err(Error::InvalidTransactionType) + let key = key.into(); + self.lock_keys(iter::once(key.clone())).await?; + self.get(key).await } else { let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?; debug_assert!(pairs.len() <= 1); @@ -227,7 +238,17 @@ impl Transaction { } /// Create a new 'batch get for update' request. + /// It has different behaviors in optimistic and pessimistic transactions. /// + /// # Optimistic transaction + /// Once resolved this request will retrieve the values just like a normal `batch_get` request, + /// and "locks" the keys. The locks will not affect other (concurrent) transactions, but will + /// prevent the current transaction from successfully committing if there is any other write + /// containing a "locked" key which is committed between the start and commit of the current transaction. + /// + /// The values are read from the `start timestamp`, thus they are cached in the local buffer. + /// + /// # Pessimistic transaction /// Once resolved this request will pessimistically lock the keys and /// fetch the values associated with the given keys. /// @@ -238,23 +259,20 @@ impl Transaction { /// /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result. /// - /// It can only be used in pessimistic mode. /// /// # Examples /// ```rust,no_run - /// # use tikv_client::{Key, Value, Config, TransactionClient}; + /// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair}; /// # use futures::prelude::*; /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); /// let mut txn = client.begin_pessimistic().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; - /// let result: HashMap = txn + /// let result: Vec = txn /// .batch_get_for_update(keys) /// .await - /// .unwrap() - /// .map(|pair| (pair.0, pair.1)) - /// .collect(); + /// .unwrap(); /// // now "TiKV" and "TiDB" are both locked /// // Finish the transaction... /// txn.commit().await.unwrap(); @@ -263,13 +281,15 @@ impl Transaction { pub async fn batch_get_for_update( &mut self, keys: impl IntoIterator>, - ) -> Result> { + ) -> Result> { self.check_allow_operation().await?; + let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); if !self.is_pessimistic() { - return Err(Error::InvalidTransactionType); + self.lock_keys(keys.clone()).await?; + Ok(self.batch_get(keys).await?.collect()) + } else { + self.pessimistic_lock(keys, true).await } - let keys: Vec = keys.into_iter().map(|it| it.into()).collect(); - Ok(self.pessimistic_lock(keys, true).await?.into_iter()) } /// Create a new 'scan' request. @@ -470,8 +490,8 @@ impl Transaction { } } TransactionKind::Pessimistic(_) => { - self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false) - .await?; + let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); + self.pessimistic_lock(keys.into_iter(), false).await?; } } Ok(()) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 74e7c9b..fbf2d5f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> { let res = txn.batch_get(keys.clone()).await?.collect::>(); assert_eq!(res.len(), keys.len()); - let res = txn - .batch_get_for_update(keys.clone()) - .await? - .collect::>(); + let res = txn.batch_get_for_update(keys.clone()).await?; assert_eq!(res.len(), keys.len()); txn.commit().await?; @@ -643,7 +640,8 @@ async fn get_for_update() -> Result<()> { let mut t1 = client.begin_pessimistic().await?; let mut t2 = client.begin_pessimistic().await?; - + let mut t3 = client.begin_optimistic().await?; + let mut t4 = client.begin_optimistic().await?; let mut t0 = client.begin_pessimistic().await?; t0.put(key1.clone(), value1).await?; t0.put(key2.clone(), value2).await?; @@ -657,12 +655,19 @@ async fn get_for_update() -> Result<()> { let res: HashMap<_, _> = t2 .batch_get_for_update(keys.clone()) .await? + .into_iter() .map(From::from) .collect(); t2.commit().await?; - assert!(res.get(&key1.into()).unwrap() == &value1); + assert!(res.get(&key1.clone().into()).unwrap() == &value1); assert!(res.get(&key2.into()).unwrap() == &value2); + assert!(t3.get_for_update(key1).await?.is_none()); + assert!(t3.commit().await.is_err()); + + assert!(t4.batch_get_for_update(keys).await?.len() == 0); + assert!(t4.commit().await.is_err()); + Ok(()) } From ef5f1705cfa08d4bb20fa2899b582d103eeaf281 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 19 Apr 2021 15:09:30 +0800 Subject: [PATCH 2/2] doc: rephrase doc Signed-off-by: ekexium --- src/transaction/transaction.rs | 52 +++++++++++++--------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index a21326b..e48deed 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -116,27 +116,30 @@ impl Transaction { } /// Create a `get for udpate` request. - /// It has different behaviors in optimistic and pessimistic transactions. + /// + /// The request reads and "locks" a key. It is similar to `SELECT ... FOR + /// UPDATE` in TiDB, and has different behavior in optimistic and + /// pessimistic transactions. /// /// # Optimistic transaction - /// Once resolved this request will retrieve the value just like a normal `get` request, - /// and "locks" the key. This lock will not affect other (concurrent) transactions, but will - /// prevent the current transaction from successfully committing if there is another write - /// containing the "locked" key which is committed between the start and commit of the current transaction. /// - /// The value is read from the `start timestamp`, thus it is cached in the local buffer. + /// It reads at the "start timestamp" and caches the value, just like normal + /// get requests. The lock is written in prewrite and commit, so it cannot + /// prevent concurrent transactions from writing the same key, but can only + /// prevent itself from committing. /// /// # Pessimistic transaction - /// Once resolved this request will pessimistically lock and fetch the latest - /// value associated with the given key at **current timestamp**. /// - /// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD. + /// It reads at the "current timestamp" and thus does not cache the value. + /// So following read requests won't be affected by the `get_for_udpate`. + /// A lock will be acquired immediately with this request, which prevents + /// concurrent transactions from mutating the keys. /// - /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB, - /// which is similar to that in MySQL. It reads the latest value (using current timestamp), - /// and the value is not cached in the local buffer. - /// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`. + /// The "current timestamp" (also called `for_update_ts` of the request) is + /// fetched immediately from the timestamp oracle. /// + /// Note: The behavior of the request under pessimistic transaction does not + /// follow snapshot isolation. /// /// # Examples /// ```rust,no_run @@ -238,27 +241,12 @@ impl Transaction { } /// Create a new 'batch get for update' request. - /// It has different behaviors in optimistic and pessimistic transactions. /// - /// # Optimistic transaction - /// Once resolved this request will retrieve the values just like a normal `batch_get` request, - /// and "locks" the keys. The locks will not affect other (concurrent) transactions, but will - /// prevent the current transaction from successfully committing if there is any other write - /// containing a "locked" key which is committed between the start and commit of the current transaction. - /// - /// The values are read from the `start timestamp`, thus they are cached in the local buffer. - /// - /// # Pessimistic transaction - /// Once resolved this request will pessimistically lock the keys and - /// fetch the values associated with the given keys. - /// - /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB, - /// which is similar to that in MySQL. It reads the latest value (using current timestamp), - /// and the value is not cached in the local buffer. - /// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`. - /// - /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result. + /// Similar [`get_for_update`](Transaction::get_for_update), but it works + /// for a batch of keys. /// + /// Non-existent entries will not appear in the result. The order of the + /// keys is not retained in the result. /// /// # Examples /// ```rust,no_run