diff --git a/rust-toolchain b/rust-toolchain deleted file mode 100644 index 3e2f5f2..0000000 --- a/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -nightly-2021-03-15 diff --git a/src/raw/client.rs b/src/raw/client.rs index 4fb425f..5e8917b 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -3,7 +3,7 @@ use tikv_client_common::Error; use crate::{ - backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF}, + backoff::DEFAULT_REGION_BACKOFF, config::Config, pd::PdRpcClient, raw::lowering::*, @@ -115,7 +115,6 @@ impl Client { let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .single_region() .await? - .resolve_lock(OPTIMISTIC_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF) .post_process_default() .plan(); @@ -146,7 +145,6 @@ impl Client { ) -> Result> { let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .merge(Collect) @@ -284,7 +282,6 @@ impl Client { let request = new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -311,7 +308,6 @@ impl Client { pub async fn delete_range(&self, range: impl Into) -> Result<()> { let request = new_raw_delete_range_request(range.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -482,7 +478,6 @@ impl Client { let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .merge(Collect) @@ -514,7 +509,6 @@ impl Client { self.cf.clone(), ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .merge(Collect) @@ -532,7 +526,6 @@ impl Client { let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .single_region() .await? - .resolve_lock(OPTIMISTIC_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF) .extract_error() .plan(); @@ -548,7 +541,6 @@ impl Client { let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .resolve_lock(OPTIMISTIC_BACKOFF) .multi_region() .retry_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -562,7 +554,6 @@ impl Client { let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .single_region() .await? - .resolve_lock(OPTIMISTIC_BACKOFF) .retry_region(DEFAULT_REGION_BACKOFF) .extract_error() .plan(); diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 2ec1d86..5c8f897 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -21,37 +21,38 @@ struct InnerBuffer { impl InnerBuffer { fn insert(&mut self, key: impl Into, entry: BufferEntry) { let key = key.into(); - if !matches!(entry, BufferEntry::Cached(_)) { + if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) { self.primary_key.get_or_insert_with(|| key.clone()); } self.entry_map.insert(key, entry); } - pub fn get_primary_key_or(&mut self, key: &Key) -> &Key { - self.primary_key.get_or_insert(key.clone()) + /// Set the primary key if it is not set + pub fn primary_key_or(&mut self, key: &Key) { + self.primary_key.get_or_insert(key.clone()); } } /// A caching layer which buffers reads and writes in a transaction. pub struct Buffer { - mutations: Mutex, + inner: Mutex, } impl Buffer { pub fn new(is_pessimistic: bool) -> Buffer { Buffer { - mutations: Mutex::new(InnerBuffer::new(is_pessimistic)), + inner: Mutex::new(InnerBuffer::new(is_pessimistic)), } } /// Get the primary key of the buffer. pub async fn get_primary_key(&self) -> Option { - self.mutations.lock().await.primary_key.clone() + self.inner.lock().await.primary_key.clone() } - /// Get the primary key of the buffer, if not exists, use `key` as the primary key. - pub async fn get_primary_key_or(&self, key: &Key) -> Key { - self.mutations.lock().await.get_primary_key_or(key).clone() + /// Set the primary key if it is not set + pub async fn primary_key_or(&self, key: &Key) { + self.inner.lock().await.primary_key_or(key); } /// Get a value from the buffer. @@ -74,7 +75,7 @@ impl Buffer { MutationValue::Determined(value) => Ok(value), MutationValue::Undetermined => { let value = f(key.clone()).await?; - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; Self::update_cache(&mut mutations, key, value.clone()); Ok(value) } @@ -95,7 +96,7 @@ impl Buffer { Fut: Future>>, { let (cached_results, undetermined_keys) = { - let mutations = self.mutations.lock().await; + let mutations = self.inner.lock().await; // Partition the keys into those we have buffered and those we have to // get from the store. let (undetermined_keys, cached_results): ( @@ -121,7 +122,7 @@ impl Buffer { }; let fetched_results = f(Box::new(undetermined_keys)).await?; - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; for kvpair in &fetched_results { let key = kvpair.0.clone(); let value = Some(kvpair.1.clone()); @@ -144,7 +145,7 @@ impl Buffer { Fut: Future>>, { // read from local buffer - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; let mutation_range = mutations.entry_map.range(range.clone()); // fetch from TiKV @@ -190,8 +191,8 @@ impl Buffer { /// Lock the given key if necessary. pub async fn lock(&self, key: Key) { - let mutations = &mut self.mutations.lock().await; - mutations.primary_key.get_or_insert(key.clone()); + let mutations = &mut self.inner.lock().await; + mutations.primary_key.get_or_insert_with(|| key.clone()); let value = mutations .entry_map .entry(key) @@ -205,15 +206,12 @@ impl Buffer { /// Insert a value into the buffer (does not write through). pub async fn put(&self, key: Key, value: Value) { - self.mutations - .lock() - .await - .insert(key, BufferEntry::Put(value)); + self.inner.lock().await.insert(key, BufferEntry::Put(value)); } /// Mark a value as Insert mutation into the buffer (does not write through). pub async fn insert(&self, key: Key, value: Value) { - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; let mut entry = mutations.entry_map.entry(key.clone()); match entry { Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => { @@ -225,7 +223,7 @@ impl Buffer { /// Mark a value as deleted. pub async fn delete(&self, key: Key) { - let mut mutations = self.mutations.lock().await; + let mut mutations = self.inner.lock().await; let is_pessimistic = mutations.is_pessimistic; let mut entry = mutations.entry_map.entry(key.clone()); @@ -241,7 +239,7 @@ impl Buffer { /// Converts the buffered mutations to the proto buffer version pub async fn to_proto_mutations(&self) -> Vec { - self.mutations + self.inner .lock() .await .entry_map @@ -251,7 +249,7 @@ impl Buffer { } async fn get_from_mutations(&self, key: &Key) -> MutationValue { - self.mutations + self.inner .lock() .await .entry_map diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 591d088..71801fa 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -116,18 +116,30 @@ impl Transaction { } /// Create a `get for udpate` request. - /// Once resolved this request will pessimistically lock and fetch the latest - /// value associated with the given key at **current timestamp**. /// - /// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD. + /// The request reads and "locks" a key. It is similar to `SELECT ... FOR + /// UPDATE` in TiDB, and has different behavior in optimistic and + /// pessimistic transactions. /// - /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB, - /// which is similar to that in MySQL. It reads the latest value (using current timestamp), - /// and the value is not cached in the local buffer. - /// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`. + /// # Optimistic transaction /// + /// It reads at the "start timestamp" and caches the value, just like normal + /// get requests. The lock is written in prewrite and commit, so it cannot + /// prevent concurrent transactions from writing the same key, but can only + /// prevent itself from committing. /// - /// It can only be used in pessimistic mode. + /// # Pessimistic transaction + /// + /// It reads at the "current timestamp" and thus does not cache the value. + /// So following read requests won't be affected by the `get_for_udpate`. + /// A lock will be acquired immediately with this request, which prevents + /// concurrent transactions from mutating the keys. + /// + /// The "current timestamp" (also called `for_update_ts` of the request) is + /// fetched immediately from the timestamp oracle. + /// + /// Note: The behavior of the request under pessimistic transaction does not + /// follow snapshot isolation. /// /// # Examples /// ```rust,no_run @@ -146,7 +158,9 @@ impl Transaction { pub async fn get_for_update(&mut self, key: impl Into) -> Result> { self.check_allow_operation().await?; if !self.is_pessimistic() { - Err(Error::InvalidTransactionType) + let key = key.into(); + self.lock_keys(iter::once(key.clone())).await?; + self.get(key).await } else { let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?; debug_assert!(pairs.len() <= 1); @@ -228,33 +242,25 @@ impl Transaction { /// Create a new 'batch get for update' request. /// - /// Once resolved this request will pessimistically lock the keys and - /// fetch the values associated with the given keys. + /// Similar [`get_for_update`](Transaction::get_for_update), but it works + /// for a batch of keys. /// - /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB, - /// which is similar to that in MySQL. It reads the latest value (using current timestamp), - /// and the value is not cached in the local buffer. - /// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`. - /// - /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result. - /// - /// It can only be used in pessimistic mode. + /// Non-existent entries will not appear in the result. The order of the + /// keys is not retained in the result. /// /// # Examples /// ```rust,no_run - /// # use tikv_client::{Key, Value, Config, TransactionClient}; + /// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair}; /// # use futures::prelude::*; /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); /// let mut txn = client.begin_pessimistic().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; - /// let result: HashMap = txn + /// let result: Vec = txn /// .batch_get_for_update(keys) /// .await - /// .unwrap() - /// .map(|pair| (pair.0, pair.1)) - /// .collect(); + /// .unwrap(); /// // now "TiKV" and "TiDB" are both locked /// // Finish the transaction... /// txn.commit().await.unwrap(); @@ -263,13 +269,15 @@ impl Transaction { pub async fn batch_get_for_update( &mut self, keys: impl IntoIterator>, - ) -> Result> { + ) -> Result> { self.check_allow_operation().await?; + let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); if !self.is_pessimistic() { - return Err(Error::InvalidTransactionType); + self.lock_keys(keys.clone()).await?; + Ok(self.batch_get(keys).await?.collect()) + } else { + self.pessimistic_lock(keys, true).await } - let keys: Vec = keys.into_iter().map(|it| it.into()).collect(); - Ok(self.pessimistic_lock(keys, true).await?.into_iter()) } /// Create a new 'scan' request. @@ -473,8 +481,8 @@ impl Transaction { } } TransactionKind::Pessimistic(_) => { - self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false) - .await?; + let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); + self.pessimistic_lock(keys.into_iter(), false).await?; } } Ok(()) @@ -649,7 +657,13 @@ impl Transaction { } let first_key = keys[0].clone().key(); - let primary_lock = self.buffer.get_primary_key_or(&first_key).await; + // we do not set the primary key here, because pessimistic lock request + // can fail, in which case the keys may not be part of the transaction. + let primary_lock = self + .buffer + .get_primary_key() + .await + .unwrap_or_else(|| first_key.clone()); let for_update_ts = self.rpc.clone().get_timestamp().await?; self.options.push_for_update_ts(for_update_ts.clone()); let request = new_pessimistic_lock_request( @@ -669,6 +683,9 @@ impl Transaction { .plan(); let pairs = plan.execute().await; + // primary key will be set here if needed + self.buffer.primary_key_or(&first_key).await; + self.start_auto_heartbeat().await; for key in keys { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 8ec25a6..5c8d962 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> { let res = txn.batch_get(keys.clone()).await?.collect::>(); assert_eq!(res.len(), keys.len()); - let res = txn - .batch_get_for_update(keys.clone()) - .await? - .collect::>(); + let res = txn.batch_get_for_update(keys.clone()).await?; assert_eq!(res.len(), keys.len()); txn.commit().await?; @@ -686,7 +683,8 @@ async fn get_for_update() -> Result<()> { let mut t1 = client.begin_pessimistic().await?; let mut t2 = client.begin_pessimistic().await?; - + let mut t3 = client.begin_optimistic().await?; + let mut t4 = client.begin_optimistic().await?; let mut t0 = client.begin_pessimistic().await?; t0.put(key1.clone(), value1).await?; t0.put(key2.clone(), value2).await?; @@ -700,12 +698,19 @@ async fn get_for_update() -> Result<()> { let res: HashMap<_, _> = t2 .batch_get_for_update(keys.clone()) .await? + .into_iter() .map(From::from) .collect(); t2.commit().await?; - assert!(res.get(&key1.into()).unwrap() == &value1); + assert!(res.get(&key1.clone().into()).unwrap() == &value1); assert!(res.get(&key2.into()).unwrap() == &value2); + assert!(t3.get_for_update(key1).await?.is_none()); + assert!(t3.commit().await.is_err()); + + assert!(t4.batch_get_for_update(keys).await?.len() == 0); + assert!(t4.commit().await.is_err()); + Ok(()) } diff --git a/tikv-client-proto/proto/coprocessor_v2.proto b/tikv-client-proto/proto/coprocessor_v2.proto index b321a1f..a77a6a4 100644 --- a/tikv-client-proto/proto/coprocessor_v2.proto +++ b/tikv-client-proto/proto/coprocessor_v2.proto @@ -26,7 +26,6 @@ message RawCoprocessorResponse { bytes data = 1; errorpb.Error region_error = 2; - // Error message for cases like if no coprocessor with a matching name is found // or on a version mismatch between plugin_api and the coprocessor. string other_error = 4; diff --git a/tikv-client-proto/proto/tikvpb.proto b/tikv-client-proto/proto/tikvpb.proto index d2f3f8a..2ac6fbb 100644 --- a/tikv-client-proto/proto/tikvpb.proto +++ b/tikv-client-proto/proto/tikvpb.proto @@ -100,7 +100,7 @@ service Tikv { rpc EstablishMPPConnection(mpp.EstablishMPPConnectionRequest) returns (stream mpp.MPPDataPacket) {} /// CheckLeader sends all information (includes region term and epoch) to other stores. - /// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose + /// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose /// term and epoch match with local information in the store. /// After the client collected all responses from all stores, it checks if got a quorum of responses from /// other stores for every region, and decides to advance resolved ts from these regions.