From 74db41c3e5ed1899708215fc284a4673f179f6e7 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 22 Feb 2022 15:48:07 +0800 Subject: [PATCH] txn: Error handling for pessimistic locks (#332) * wip Signed-off-by: pingyu * wip Signed-off-by: pingyu * wip Signed-off-by: pingyu * close #313: add tests Signed-off-by: pingyu * trigger actions Signed-off-by: pingyu * Issue Number #313: fix CI by set timeout longer. Signed-off-by: pingyu * Issue Number #313: Add comment. Signed-off-by: pingyu --- .github/workflows/ci.yml | 2 +- src/raw/client.rs | 12 ++- src/request/plan.rs | 47 ++++++-- src/request/plan_builder.rs | 18 ++++ src/transaction/buffer.rs | 41 +++++++ src/transaction/requests.rs | 178 ++++++++++++++++++++++++++----- src/transaction/transaction.rs | 63 +++++++++-- tests/common/mod.rs | 7 +- tests/integration_tests.rs | 37 +++++++ tikv-client-common/src/errors.rs | 5 + 10 files changed, 362 insertions(+), 48 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8aac0e7..95819a6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,4 +107,4 @@ jobs: - name: start tiup playground run: /home/runner/.tiup/bin/tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config /home/runner/work/client-rust/client-rust/config/tikv.toml --pd.config /home/runner/work/client-rust/client-rust/config/pd.toml & - name: integration test - run: make integration-test + run: MULTI_REGION=1 make integration-test diff --git a/src/raw/client.rs b/src/raw/client.rs index 50bd671..3252e40 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -13,7 +13,7 @@ use crate::{ pd::{PdClient, PdRpcClient}, raw::lowering::*, request::{Collect, CollectSingle, Plan}, - BoundRange, ColumnFamily, Key, KvPair, Result, Value, + Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; @@ -359,11 +359,19 @@ impl Client { /// # }); /// ``` pub async fn delete_range(&self, range: impl Into) -> Result<()> { + self.delete_range_opt(range, DEFAULT_REGION_BACKOFF).await + } + + pub async fn delete_range_opt( + &self, + range: impl Into, + backoff: Backoff, + ) -> Result<()> { debug!(self.logger, "invoking raw delete_range request"); self.assert_non_atomic()?; let request = new_raw_delete_range_request(range.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .extract_error() .plan(); plan.execute().await?; diff --git a/src/request/plan.rs b/src/request/plan.rs index ed13f17..ce78526 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -16,6 +16,7 @@ use crate::{ stats::tikv_stats, store::RegionStore, transaction::{resolve_locks, HasLocks}, + util::iter::FlatMapOkIterExt, Error, Result, }; @@ -63,6 +64,11 @@ pub struct RetryableMultiRegion { pub(super) inner: P, pub pd_client: Arc, pub backoff: Backoff, + + /// Preserve all regions' results for other downstream plans to handle. + /// If true, return Ok and preserve all regions' results, even if some of them are Err. + /// Otherwise, return the first Err if there is any. + pub preserve_region_results: bool, } impl RetryableMultiRegion @@ -76,6 +82,7 @@ where current_plan: P, backoff: Backoff, permits: Arc, + preserve_region_results: bool, ) -> Result<::Result> { let shards = current_plan.shards(&pd_client).collect::>().await; let mut handles = Vec::new(); @@ -89,16 +96,29 @@ where region_store, backoff.clone(), permits.clone(), + preserve_region_results, )); handles.push(handle); } - Ok(try_join_all(handles) - .await? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect()) + + let results = try_join_all(handles).await?; + if preserve_region_results { + Ok(results + .into_iter() + .flat_map_ok(|x| x) + .map(|x| match x { + Ok(r) => r, + Err(e) => Err(e), + }) + .collect()) + } else { + Ok(results + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect()) + } } #[async_recursion] @@ -108,6 +128,7 @@ where region_store: RegionStore, mut backoff: Backoff, permits: Arc, + preserve_region_results: bool, ) -> Result<::Result> { // limit concurrent requests let permit = permits.acquire().await.unwrap(); @@ -125,7 +146,14 @@ where if !region_error_resolved { futures_timer::Delay::new(duration).await; } - Self::single_plan_handler(pd_client, plan, backoff, permits).await + Self::single_plan_handler( + pd_client, + plan, + backoff, + permits, + preserve_region_results, + ) + .await } None => Err(Error::RegionError(e)), } @@ -242,6 +270,7 @@ impl Clone for RetryableMultiRegion { inner: self.inner.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), + preserve_region_results: self.preserve_region_results, } } } @@ -263,6 +292,7 @@ where self.inner.clone(), self.backoff.clone(), concurrency_permits.clone(), + self.preserve_region_results, ) .await } @@ -556,6 +586,7 @@ mod test { }, pd_client: Arc::new(MockPdClient::default()), backoff: Backoff::no_backoff(), + preserve_region_results: false, }; assert!(plan.execute().await.is_err()) } diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index c0052e8..ce269fa 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -113,6 +113,23 @@ where pub fn retry_multi_region( self, backoff: Backoff, + ) -> PlanBuilder, Targetted> { + self.make_retry_multi_region(backoff, false) + } + + /// Preserve all results, even some of them are Err. + /// To pass all responses to merge, and handle partial successful results correctly. + pub fn retry_multi_region_preserve_results( + self, + backoff: Backoff, + ) -> PlanBuilder, Targetted> { + self.make_retry_multi_region(backoff, true) + } + + fn make_retry_multi_region( + self, + backoff: Backoff, + preserve_region_results: bool, ) -> PlanBuilder, Targetted> { PlanBuilder { pd_client: self.pd_client.clone(), @@ -120,6 +137,7 @@ where inner: self.plan, pd_client: self.pd_client, backoff, + preserve_region_results, }, phantom: PhantomData, } diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 1ec1ce1..5cb1dd1 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -174,6 +174,19 @@ impl Buffer { } } + /// Unlock the given key if locked. + pub fn unlock(&mut self, key: &Key) { + if let Some(value) = self.entry_map.get_mut(key) { + if let BufferEntry::Locked(v) = value { + if let Some(v) = v { + *value = BufferEntry::Cached(v.take()); + } else { + self.entry_map.remove(key); + } + } + } + } + /// Put a value into the buffer (does not write through). pub fn put(&mut self, key: Key, value: Value) { let mut entry = self.entry_map.entry(key.clone()); @@ -485,6 +498,12 @@ mod tests { }; } + macro_rules! assert_entry_none { + ($key: ident) => { + assert!(matches!(buffer.entry_map.get(&$key), None,)) + }; + } + // Insert + Delete = CheckNotExists let key: Key = b"key1".to_vec().into(); buffer.insert(key.clone(), b"value1".to_vec()); @@ -510,5 +529,27 @@ mod tests { buffer.delete(key.clone()); buffer.insert(key.clone(), b"value1".to_vec()); assert_entry!(key, BufferEntry::Put(_)); + + // Lock + Unlock = None + let key: Key = b"key4".to_vec().into(); + buffer.lock(key.clone()); + buffer.unlock(&key); + assert_entry_none!(key); + + // Cached + Lock + Unlock = Cached + let key: Key = b"key5".to_vec().into(); + let val: Value = b"value5".to_vec(); + let val_ = val.clone(); + let r = block_on(buffer.get_or_else(key.clone(), move |_| ready(Ok(Some(val_))))); + assert_eq!(r.unwrap().unwrap(), val); + buffer.lock(key.clone()); + buffer.unlock(&key); + assert_entry!(key, BufferEntry::Cached(Some(_))); + assert_eq!( + block_on(buffer.get_or_else(key, move |_| ready(Err(internal_err!(""))))) + .unwrap() + .unwrap(), + val + ); } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 8d421f5..e1d05df 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -16,6 +16,7 @@ use crate::{ use either::Either; use futures::stream::BoxStream; use std::{collections::HashMap, iter, sync::Arc}; +use tikv_client_common::Error::PessimisticLockError; use tikv_client_proto::{ kvrpcpb::{self, TxnHeartBeatResponse}, pdpb::Timestamp, @@ -379,35 +380,53 @@ impl Merge>>, >, ) -> Result { - input - .into_iter() - .flat_map_ok(|ResponseWithShard(mut resp, mutations)| { - let values = resp.take_values(); - let values_len = values.len(); - let not_founds = resp.take_not_founds(); - let kvpairs = mutations - .into_iter() - .map(|m| m.key) - .zip(values) - .map(KvPair::from); - assert_eq!(kvpairs.len(), values_len); - if not_founds.is_empty() { - // Legacy TiKV does not distiguish not existing key and existing key - // that with empty value. We assume that key does not exist if value - // is empty. - Either::Left(kvpairs.filter(|kvpair| !kvpair.value().is_empty())) - } else { - assert_eq!(kvpairs.len(), not_founds.len()); - Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| { - if not_found { - None - } else { - Some(kvpair) - } - })) - } + if input.iter().any(Result::is_err) { + let (success, mut errors): (Vec<_>, Vec<_>) = + input.into_iter().partition(Result::is_ok); + let first_err = errors.pop().unwrap(); + let success_keys = success + .into_iter() + .map(Result::unwrap) + .flat_map(|ResponseWithShard(_resp, mutations)| { + mutations.into_iter().map(|m| m.key) + }) + .collect(); + Err(PessimisticLockError { + inner: Box::new(first_err.unwrap_err()), + success_keys, }) - .collect() + } else { + Ok(input + .into_iter() + .map(Result::unwrap) + .flat_map(|ResponseWithShard(mut resp, mutations)| { + let values = resp.take_values(); + let values_len = values.len(); + let not_founds = resp.take_not_founds(); + let kvpairs = mutations + .into_iter() + .map(|m| m.key) + .zip(values) + .map(KvPair::from); + assert_eq!(kvpairs.len(), values_len); + if not_founds.is_empty() { + // Legacy TiKV does not distinguish not existing key and existing key + // that with empty value. We assume that key does not exist if value + // is empty. + Either::Left(kvpairs.filter(|kvpair| !kvpair.value().is_empty())) + } else { + assert_eq!(kvpairs.len(), not_founds.len()); + Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| { + if not_found { + None + } else { + Some(kvpair) + } + })) + } + }) + .collect()) + } } } @@ -653,3 +672,106 @@ impl HasLocks for kvrpcpb::PrewriteResponse { .collect() } } + +#[cfg(test)] +mod tests { + use crate::{ + request::{plan::Merge, CollectWithShard, ResponseWithShard}, + KvPair, + }; + use tikv_client_common::Error::{PessimisticLockError, ResolveLockError}; + use tikv_client_proto::kvrpcpb; + + #[tokio::test] + async fn test_merge_pessimistic_lock_response() { + let (key1, key2, key3, key4) = (b"key1", b"key2", b"key3", b"key4"); + let (value1, value4) = (b"value1", b"value4"); + let value_empty = b""; + + let resp1 = ResponseWithShard( + kvrpcpb::PessimisticLockResponse { + values: vec![value1.to_vec()], + ..Default::default() + }, + vec![kvrpcpb::Mutation { + op: kvrpcpb::Op::PessimisticLock as i32, + key: key1.to_vec(), + ..Default::default() + }], + ); + + let resp_empty_value = ResponseWithShard( + kvrpcpb::PessimisticLockResponse { + values: vec![value_empty.to_vec()], + ..Default::default() + }, + vec![kvrpcpb::Mutation { + op: kvrpcpb::Op::PessimisticLock as i32, + key: key2.to_vec(), + ..Default::default() + }], + ); + + let resp_not_found = ResponseWithShard( + kvrpcpb::PessimisticLockResponse { + values: vec![value_empty.to_vec(), value4.to_vec()], + not_founds: vec![true, false], + ..Default::default() + }, + vec![ + kvrpcpb::Mutation { + op: kvrpcpb::Op::PessimisticLock as i32, + key: key3.to_vec(), + ..Default::default() + }, + kvrpcpb::Mutation { + op: kvrpcpb::Op::PessimisticLock as i32, + key: key4.to_vec(), + ..Default::default() + }, + ], + ); + + let merger = CollectWithShard {}; + { + // empty values & not founds are filtered. + let input = vec![ + Ok(resp1.clone()), + Ok(resp_empty_value.clone()), + Ok(resp_not_found.clone()), + ]; + let result = merger.merge(input); + + assert_eq!( + result.unwrap(), + vec![ + KvPair::new(key1.to_vec(), value1.to_vec()), + KvPair::new(key4.to_vec(), value4.to_vec()), + ] + ); + } + { + let input = vec![ + Ok(resp1), + Ok(resp_empty_value), + Err(ResolveLockError), + Ok(resp_not_found), + ]; + let result = merger.merge(input); + + if let PessimisticLockError { + inner, + success_keys, + } = result.unwrap_err() + { + assert!(matches!(*inner, ResolveLockError)); + assert_eq!( + success_keys, + vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()] + ); + } else { + panic!(); + } + } + } +} diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index ad4184a..02dd05c 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -744,27 +744,74 @@ impl Transaction { primary_lock, self.timestamp.clone(), MAX_TTL, - for_update_ts, + for_update_ts.clone(), need_value, ); let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .preserve_shard() - .retry_multi_region(self.options.retry_options.region_backoff.clone()) + .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) .merge(CollectWithShard) .plan(); let pairs = plan.execute().await; - // primary key will be set here if needed - self.buffer.primary_key_or(&first_key); + if let Err(err) = pairs { + match err { + Error::PessimisticLockError { + inner, + success_keys, + } if !success_keys.is_empty() => { + let keys = success_keys.into_iter().map(Key::from); + self.pessimistic_lock_rollback(keys, self.timestamp.clone(), for_update_ts) + .await?; + Err(*inner) + } + _ => Err(err), + } + } else { + // primary key will be set here if needed + self.buffer.primary_key_or(&first_key); - self.start_auto_heartbeat().await; + self.start_auto_heartbeat().await; - for key in keys { - self.buffer.lock(key.key()); + for key in keys { + self.buffer.lock(key.key()); + } + + pairs + } + } + + /// Rollback pessimistic lock + async fn pessimistic_lock_rollback( + &mut self, + keys: impl Iterator, + start_version: Timestamp, + for_update_ts: Timestamp, + ) -> Result<()> { + debug!(self.logger, "rollback pessimistic lock"); + + let keys: Vec<_> = keys.into_iter().collect(); + if keys.is_empty() { + return Ok(()); } - pairs + let req = new_pessimistic_rollback_request( + keys.clone().into_iter(), + start_version, + for_update_ts, + ); + let plan = PlanBuilder::new(self.rpc.clone(), req) + .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .retry_multi_region(self.options.retry_options.region_backoff.clone()) + .extract_error() + .plan(); + plan.execute().await?; + + for key in keys { + self.buffer.unlock(&key); + } + Ok(()) } /// Checks if the transaction can perform arbitrary operations. diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 32c77c9..d666be7 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -16,9 +16,14 @@ pub async fn clear_tikv() { ColumnFamily::Lock, ColumnFamily::Write, ]; + // DEFAULT_REGION_BACKOFF is not long enough for CI environment. So set a longer backoff. + let backoff = tikv_client::Backoff::no_jitter_backoff(100, 10000, 10); for cf in cfs { let raw_client = RawClient::new(pd_addrs(), None).await.unwrap().with_cf(cf); - raw_client.delete_range(vec![]..).await.unwrap(); + raw_client + .delete_range_opt(vec![].., backoff.clone()) + .await + .unwrap(); } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index f2eeb39..f4bc05a 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -674,6 +674,43 @@ async fn txn_lock_keys() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn txn_lock_keys_error_handle() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?; + + // Keys in `k` should locate in different regions. See `init()` for boundary of regions. + let k: Vec = vec![ + 0x00000000_u32, + 0x40000000_u32, + 0x80000000_u32, + 0xC0000000_u32, + ] + .into_iter() + .map(|x| x.to_be_bytes().to_vec().into()) + .collect(); + + let mut t1 = client.begin_pessimistic().await?; + let mut t2 = client.begin_pessimistic().await?; + let mut t3 = client.begin_pessimistic().await?; + + t1.lock_keys(vec![k[0].clone(), k[1].clone()]).await?; + assert!(t2 + .lock_keys(vec![k[0].clone(), k[2].clone()]) + .await + .is_err()); + t3.lock_keys(vec![k[2].clone(), k[3].clone()]).await?; + + t1.rollback().await?; + t3.rollback().await?; + + t2.lock_keys(vec![k[0].clone(), k[2].clone()]).await?; + t2.commit().await?; + + Ok(()) +} + #[tokio::test] #[serial] async fn txn_get_for_update() -> Result<()> { diff --git a/tikv-client-common/src/errors.rs b/tikv-client-common/src/errors.rs index 86d6854..db30910 100644 --- a/tikv-client-common/src/errors.rs +++ b/tikv-client-common/src/errors.rs @@ -88,6 +88,11 @@ pub enum Error { InternalError { message: String }, #[error("{0}")] StringError(String), + #[error("PessimisticLock error: {:?}", inner)] + PessimisticLockError { + inner: Box, + success_keys: Vec>, + }, } impl From for Error {