From b46022b7db0c3bf7a4818c4a6c956f2f522c5055 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 08:39:39 +1300 Subject: [PATCH 01/11] Some renaming (and adds one method) Signed-off-by: Nick Cameron --- examples/pessimistic.rs | 15 ++++++++--- examples/transaction.rs | 20 ++++++++++++--- src/transaction/client.rs | 19 ++++++++------ src/transaction/mod.rs | 2 +- src/transaction/transaction.rs | 46 +++++++++++++++++++--------------- tests/integration_tests.rs | 14 +++++------ 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index bca0fc7..fe293c3 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -28,7 +28,10 @@ async fn main() { let value1: Value = b"value1".to_vec(); let key2: Key = b"key2".to_vec().into(); let value2: Value = b"value2".to_vec(); - let mut txn0 = client.begin().await.expect("Could not begin a transaction"); + let mut txn0 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for (key, value) in vec![(key1, value1), (key2, value2)] { txn0.put(key, value).await.expect("Could not set key value"); } @@ -47,7 +50,10 @@ async fn main() { println!("{:?}", (&key1, value)); { // another txn cannot write to the locked key - let mut txn2 = client.begin().await.expect("Could not begin a transaction"); + let mut txn2 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let key1: Key = b"key1".to_vec().into(); let value2: Value = b"value2".to_vec(); txn2.put(key1, value2).await.unwrap(); @@ -58,7 +64,10 @@ async fn main() { let value3: Value = b"value3".to_vec(); txn1.put(key1.clone(), value3).await.unwrap(); txn1.commit().await.unwrap(); - let mut txn3 = client.begin().await.expect("Could not begin a transaction"); + let mut txn3 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let result = txn3.get(key1.clone()).await.unwrap().unwrap(); txn3.commit() .await diff --git a/examples/transaction.rs b/examples/transaction.rs index d089be3..4c82354 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -6,7 +6,10 @@ use crate::common::parse_args; use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value}; async fn puts(client: &Client, pairs: impl IntoIterator>) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for pair in pairs { let (key, value) = pair.into().into(); txn.put(key, value).await.expect("Could not set key value"); @@ -15,7 +18,10 @@ async fn puts(client: &Client, pairs: impl IntoIterator } async fn get(client: &Client, key: Key) -> Option { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let res = txn.get(key).await.expect("Could not get value"); txn.commit() .await @@ -24,7 +30,10 @@ async fn get(client: &Client, key: Key) -> Option { } async fn scan(client: &Client, range: impl Into, limit: u32) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); txn.scan(range, limit) .await .expect("Could not scan key-value pairs in range") @@ -33,7 +42,10 @@ async fn scan(client: &Client, range: impl Into, limit: u32) { } async fn dels(client: &Client, keys: impl IntoIterator) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for key in keys { txn.delete(key).await.expect("Could not delete the key"); } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0529c33..ac386fa 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -6,7 +6,7 @@ use crate::{ pd::{PdClient, PdRpcClient}, request::{KvRequest, OPTIMISTIC_BACKOFF}, timestamp::TimestampExt, - transaction::{Snapshot, Transaction, TransactionStyle}, + transaction::{Snapshot, Transaction, TransactionOptions}, Result, }; use futures::executor::ThreadPool; @@ -99,17 +99,17 @@ impl Client { /// use futures::prelude::*; /// # futures::executor::block_on(async { /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut transaction = client.begin().await.unwrap(); + /// let mut transaction = client.begin_optimistic().await.unwrap(); /// // ... Issue some commands. /// let commit = transaction.commit(); /// let result = commit.await.unwrap(); /// # }); /// ``` - pub async fn begin(&self) -> Result { + pub async fn begin_optimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; Ok(self.new_transaction( timestamp, - TransactionStyle::new_optimistic(self.config.try_one_pc), + TransactionOptions::new_optimistic(self.config.try_one_pc), false, )) } @@ -135,16 +135,21 @@ impl Client { let timestamp = self.current_timestamp().await?; Ok(self.new_transaction( timestamp, - TransactionStyle::new_pessimistic(self.config.try_one_pc), + TransactionOptions::new_pessimistic(self.config.try_one_pc), false, )) } + pub async fn begin_with_options(&self, opts: TransactionOptions) -> Result { + let timestamp = self.current_timestamp().await?; + Ok(self.new_transaction(timestamp, opts, false)) + } + /// Creates a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot { Snapshot::new(self.new_transaction( timestamp, - TransactionStyle::new_optimistic(self.config.try_one_pc), + TransactionOptions::new_optimistic(self.config.try_one_pc), true, )) } @@ -212,7 +217,7 @@ impl Client { fn new_transaction( &self, timestamp: Timestamp, - style: TransactionStyle, + style: TransactionOptions, read_only: bool, ) -> Transaction { Transaction::new( diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 1de2f34..264655f 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -12,7 +12,7 @@ pub use client::Client; pub(crate) use lock::{resolve_locks, HasLocks}; pub use snapshot::Snapshot; pub use transaction::Transaction; -use transaction::TransactionStyle; +use transaction::TransactionOptions; mod buffer; mod client; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 3c70470..aba32b7 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -35,31 +35,37 @@ pub enum TransactionKind { Pessimistic(u64), } -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub struct TransactionStyle { +impl Default for TransactionKind { + fn default() -> TransactionKind { + TransactionKind::Pessimistic(0) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] +pub struct TransactionOptions { kind: TransactionKind, try_one_pc: bool, async_commit: bool, } -impl TransactionStyle { - pub fn new_optimistic(try_one_pc: bool) -> TransactionStyle { - TransactionStyle { +impl TransactionOptions { + pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { kind: TransactionKind::Optimistic, try_one_pc, async_commit: false, } } - pub fn new_pessimistic(try_one_pc: bool) -> TransactionStyle { - TransactionStyle { + pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { kind: TransactionKind::Pessimistic(0), try_one_pc, async_commit: false, } } - pub fn async_commit(mut self) -> TransactionStyle { + pub fn async_commit(mut self) -> TransactionOptions { self.async_commit = true; self } @@ -101,7 +107,7 @@ impl TransactionStyle { /// use futures::prelude::*; /// # futures::executor::block_on(async { /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); -/// let txn = client.begin().await.unwrap(); +/// let txn = client.begin_optimistic().await.unwrap(); /// # }); /// ``` pub struct Transaction { @@ -110,7 +116,7 @@ pub struct Transaction { buffer: Buffer, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, } impl Transaction { @@ -118,7 +124,7 @@ impl Transaction { timestamp: Timestamp, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, read_only: bool, ) -> Transaction { let status = if read_only { @@ -149,7 +155,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// let result: Option = txn.get(key).await.unwrap(); /// // Finish the transaction... @@ -216,7 +222,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; /// let result: HashMap = txn /// .batch_get(keys) @@ -297,7 +303,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key1: Key = b"TiKV".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into(); /// let result: Vec = txn @@ -331,7 +337,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key1: Key = b"TiKV".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into(); /// let result: Vec = txn @@ -369,7 +375,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// let val = "TiKV".to_owned(); /// txn.put(key, val); @@ -397,7 +403,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// txn.delete(key); /// // Finish the transaction... @@ -429,7 +435,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); /// // ... Do some actions. /// txn.commit().await.unwrap(); @@ -451,7 +457,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// // ... Do some actions. /// let req = txn.commit(); /// let result: u64 = req.await.unwrap(); @@ -600,7 +606,7 @@ struct TwoPhaseCommitter { start_version: u64, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, #[new(default)] undetermined: bool, } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 68dd532..23962b4 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -55,7 +55,7 @@ async fn crud() -> Result<()> { clear_tikv().await?; let client = TransactionClient::new(pd_addrs()).await?; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; // Get non-existent keys assert!(txn.get("foo".to_owned()).await?.is_none()); @@ -91,7 +91,7 @@ async fn crud() -> Result<()> { txn.commit().await?; // Read from TiKV then update and delete - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; assert_eq!( txn.get("foo".to_owned()).await?, Some("bar".to_owned().into()) @@ -199,13 +199,13 @@ async fn txn_write_million() -> Result<()> { .map(|u| u.to_be_bytes().to_vec()) .take(2usize.pow(NUM_BITS_KEY_PER_TXN)) .collect::>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13 - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) { txn.put(k.clone(), v).await?; } txn.commit().await?; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; let res = txn.batch_get(keys).await?; assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); txn.commit().await?; @@ -258,7 +258,7 @@ async fn txn_bank_transfer() -> Result<()> { let mut rng = thread_rng(); let people = gen_u32_keys(NUM_PEOPLE, &mut rng); - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; let mut sum: u32 = 0; for person in &people { let init = rng.gen::() as u32; @@ -269,7 +269,7 @@ async fn txn_bank_transfer() -> Result<()> { // transfer for _ in 0..NUM_TRNASFER { - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; txn.use_async_commit(); let chosen_people = people.iter().choose_multiple(&mut rng, 2); let alice = chosen_people[0]; @@ -291,7 +291,7 @@ async fn txn_bank_transfer() -> Result<()> { // check let mut new_sum = 0; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; for person in people.iter() { new_sum += get_txn_u32(&txn, person.clone()).await?; } From d71e34d8c7a205be05bf5a32a0ba2fee2d9702ef Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 08:45:15 +1300 Subject: [PATCH 02/11] txn: move read_only to options Signed-off-by: Nick Cameron --- src/transaction/client.rs | 29 ++++--------------- src/transaction/transaction.rs | 53 ++++++++++++++++++---------------- 2 files changed, 34 insertions(+), 48 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index ac386fa..37e6e98 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -110,7 +110,6 @@ impl Client { Ok(self.new_transaction( timestamp, TransactionOptions::new_optimistic(self.config.try_one_pc), - false, )) } @@ -136,22 +135,17 @@ impl Client { Ok(self.new_transaction( timestamp, TransactionOptions::new_pessimistic(self.config.try_one_pc), - false, )) } - pub async fn begin_with_options(&self, opts: TransactionOptions) -> Result { + pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { let timestamp = self.current_timestamp().await?; - Ok(self.new_transaction(timestamp, opts, false)) + Ok(self.new_transaction(timestamp, options)) } /// Creates a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). - pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot { - Snapshot::new(self.new_transaction( - timestamp, - TransactionOptions::new_optimistic(self.config.try_one_pc), - true, - )) + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { + Snapshot::new(self.new_transaction(timestamp, options.read_only())) } /// Retrieves the current [`Timestamp`](Timestamp). @@ -214,18 +208,7 @@ impl Client { Ok(res) } - fn new_transaction( - &self, - timestamp: Timestamp, - style: TransactionOptions, - read_only: bool, - ) -> Transaction { - Transaction::new( - timestamp, - self.bg_worker.clone(), - self.pd.clone(), - style, - read_only, - ) + fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { + Transaction::new(timestamp, self.bg_worker.clone(), self.pd.clone(), options) } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index aba32b7..bfc6120 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -46,6 +46,7 @@ pub struct TransactionOptions { kind: TransactionKind, try_one_pc: bool, async_commit: bool, + read_only: bool, } impl TransactionOptions { @@ -54,6 +55,7 @@ impl TransactionOptions { kind: TransactionKind::Optimistic, try_one_pc, async_commit: false, + read_only: false, } } @@ -62,6 +64,7 @@ impl TransactionOptions { kind: TransactionKind::Pessimistic(0), try_one_pc, async_commit: false, + read_only: false, } } @@ -70,6 +73,11 @@ impl TransactionOptions { self } + pub fn read_only(mut self) -> TransactionOptions { + self.read_only = true; + self + } + fn push_for_update_ts(&mut self, for_update_ts: u64) { match &mut self.kind { TransactionKind::Optimistic => unreachable!(), @@ -116,7 +124,7 @@ pub struct Transaction { buffer: Buffer, bg_worker: ThreadPool, rpc: Arc, - style: TransactionOptions, + options: TransactionOptions, } impl Transaction { @@ -124,10 +132,9 @@ impl Transaction { timestamp: Timestamp, bg_worker: ThreadPool, rpc: Arc, - style: TransactionOptions, - read_only: bool, + options: TransactionOptions, ) -> Transaction { - let status = if read_only { + let status = if options.read_only { TransactionStatus::ReadOnly } else { TransactionStatus::Active @@ -138,7 +145,7 @@ impl Transaction { buffer: Default::default(), bg_worker, rpc, - style, + options, } } @@ -477,7 +484,7 @@ impl Transaction { self.timestamp.version(), self.bg_worker.clone(), self.rpc.clone(), - self.style, + self.options, ) .commit() .await; @@ -505,7 +512,7 @@ impl Transaction { self.timestamp.version(), self.bg_worker.clone(), self.rpc.clone(), - self.style, + self.options, ) .rollback() .await; @@ -545,7 +552,7 @@ impl Transaction { keys: impl IntoIterator>, ) -> Result<()> { assert!( - matches!(self.style.kind, TransactionKind::Pessimistic(_)), + matches!(self.options.kind, TransactionKind::Pessimistic(_)), "`pessimistic_lock` is only valid to use with pessimistic transactions" ); @@ -558,7 +565,7 @@ impl Transaction { let primary_lock = keys[0].clone(); let lock_ttl = DEFAULT_LOCK_TTL; let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap().version(); - self.style.push_for_update_ts(for_update_ts); + self.options.push_for_update_ts(for_update_ts); new_pessimistic_lock_request( keys, primary_lock.into(), @@ -582,11 +589,7 @@ impl Transaction { } fn is_pessimistic(&self) -> bool { - matches!(self.style.kind, TransactionKind::Pessimistic(_)) - } - - pub fn use_async_commit(&mut self) { - self.style = self.style.async_commit(); + matches!(self.options.kind, TransactionKind::Pessimistic(_)) } } @@ -606,7 +609,7 @@ struct TwoPhaseCommitter { start_version: u64, bg_worker: ThreadPool, rpc: Arc, - style: TransactionOptions, + options: TransactionOptions, #[new(default)] undetermined: bool, } @@ -620,11 +623,11 @@ impl TwoPhaseCommitter { let min_commit_ts = self.prewrite().await?; // If we didn't use 1pc, prewrite will set `try_one_pc` to false. - if self.style.try_one_pc { + if self.options.try_one_pc { return Ok(min_commit_ts); } - let commit_ts = if self.style.async_commit { + let commit_ts = if self.options.async_commit { assert_ne!(min_commit_ts, 0); min_commit_ts } else { @@ -653,7 +656,7 @@ impl TwoPhaseCommitter { let primary_lock = self.mutations[0].key.clone().into(); // TODO: calculate TTL for big transactions let lock_ttl = DEFAULT_LOCK_TTL; - let mut request = match self.style.kind { + let mut request = match self.options.kind { TransactionKind::Optimistic => new_prewrite_request( self.mutations.clone(), primary_lock, @@ -669,12 +672,12 @@ impl TwoPhaseCommitter { ), }; - request.use_async_commit = self.style.async_commit; - request.try_one_pc = self.style.try_one_pc; + request.use_async_commit = self.options.async_commit; + request.try_one_pc = self.options.try_one_pc; request.secondaries = self.mutations[1..].iter().map(|m| m.key.clone()).collect(); // FIXME set max_commit_ts and min_commit_ts - let response = match self.style.kind { + let response = match self.options.kind { TransactionKind::Optimistic => { request .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) @@ -687,7 +690,7 @@ impl TwoPhaseCommitter { } }; - if self.style.try_one_pc && response.len() == 1 { + if self.options.try_one_pc && response.len() == 1 { if response[0].one_pc_commit_ts == 0 { return Err(Error::OnePcFailure); } @@ -695,7 +698,7 @@ impl TwoPhaseCommitter { return Ok(response[0].one_pc_commit_ts); } - self.style.try_one_pc = false; + self.options.try_one_pc = false; let min_commit_ts = response .iter() @@ -733,7 +736,7 @@ impl TwoPhaseCommitter { let mutations = self.mutations.into_iter(); // Only skip the primary if we committed it earlier (i.e., we are not using async commit). - let keys = if self.style.async_commit { + let keys = if self.options.async_commit { mutations.skip(0) } else { if primary_only { @@ -754,7 +757,7 @@ impl TwoPhaseCommitter { .into_iter() .map(|mutation| mutation.key.into()) .collect(); - match self.style.kind { + match self.options.kind { TransactionKind::Optimistic if keys.is_empty() => Ok(()), TransactionKind::Optimistic => { new_batch_rollback_request(keys, self.start_version) From 6dbaa74b32aa4afcbb9b3d05a9dd9631029f4735 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 08:47:28 +1300 Subject: [PATCH 03/11] Move code around in transaction.rs Signed-off-by: Nick Cameron --- src/transaction/transaction.rs | 166 ++++++++++++++++----------------- 1 file changed, 83 insertions(+), 83 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index bfc6120..6d42a8d 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -12,83 +12,6 @@ use futures::{executor::ThreadPool, prelude::*, stream::BoxStream}; use std::{iter, ops::RangeBounds, sync::Arc}; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; -#[derive(PartialEq)] -enum TransactionStatus { - /// The transaction is read-only [`Snapshot`](super::Snapshot::Snapshot), no need to commit or rollback or panic on drop. - ReadOnly, - /// The transaction have not been committed or rolled back. - Active, - /// The transaction has committed. - Committed, - /// The transaction has tried to commit. Only `commit` is allowed. - StartedCommit, - /// The transaction has rolled back. - Rolledback, - /// The transaction has tried to rollback. Only `rollback` is allowed. - StartedRollback, -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum TransactionKind { - Optimistic, - /// Argument is for_update_ts - Pessimistic(u64), -} - -impl Default for TransactionKind { - fn default() -> TransactionKind { - TransactionKind::Pessimistic(0) - } -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] -pub struct TransactionOptions { - kind: TransactionKind, - try_one_pc: bool, - async_commit: bool, - read_only: bool, -} - -impl TransactionOptions { - pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions { - TransactionOptions { - kind: TransactionKind::Optimistic, - try_one_pc, - async_commit: false, - read_only: false, - } - } - - pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions { - TransactionOptions { - kind: TransactionKind::Pessimistic(0), - try_one_pc, - async_commit: false, - read_only: false, - } - } - - pub fn async_commit(mut self) -> TransactionOptions { - self.async_commit = true; - self - } - - pub fn read_only(mut self) -> TransactionOptions { - self.read_only = true; - self - } - - fn push_for_update_ts(&mut self, for_update_ts: u64) { - match &mut self.kind { - TransactionKind::Optimistic => unreachable!(), - TransactionKind::Pessimistic(old_for_update_ts) => { - self.kind = - TransactionKind::Pessimistic(std::cmp::max(*old_for_update_ts, for_update_ts)); - } - } - } -} - /// A undo-able set of actions on the dataset. /// /// Using a transaction you can prepare a set of actions (such as `get`, or `put`) on data at a @@ -593,6 +516,75 @@ impl Transaction { } } +impl Drop for Transaction { + fn drop(&mut self) { + if self.status == TransactionStatus::Active { + panic!("Dropping an active transaction. Consider commit or rollback it.") + } + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum TransactionKind { + Optimistic, + /// Argument is for_update_ts + Pessimistic(u64), +} + +impl Default for TransactionKind { + fn default() -> TransactionKind { + TransactionKind::Pessimistic(0) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] +pub struct TransactionOptions { + kind: TransactionKind, + try_one_pc: bool, + async_commit: bool, + read_only: bool, +} + +impl TransactionOptions { + pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { + kind: TransactionKind::Optimistic, + try_one_pc, + async_commit: false, + read_only: false, + } + } + + pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { + kind: TransactionKind::Pessimistic(0), + try_one_pc, + async_commit: false, + read_only: false, + } + } + + pub fn async_commit(mut self) -> TransactionOptions { + self.async_commit = true; + self + } + + pub fn read_only(mut self) -> TransactionOptions { + self.read_only = true; + self + } + + fn push_for_update_ts(&mut self, for_update_ts: u64) { + match &mut self.kind { + TransactionKind::Optimistic => unreachable!(), + TransactionKind::Pessimistic(old_for_update_ts) => { + self.kind = + TransactionKind::Pessimistic(std::cmp::max(*old_for_update_ts, for_update_ts)); + } + } + } +} + /// The default TTL of a lock in milliseconds const DEFAULT_LOCK_TTL: u64 = 3000; @@ -773,10 +765,18 @@ impl TwoPhaseCommitter { } } -impl Drop for Transaction { - fn drop(&mut self) { - if self.status == TransactionStatus::Active { - panic!("Dropping an active transaction. Consider commit or rollback it.") - } - } +#[derive(PartialEq)] +enum TransactionStatus { + /// The transaction is read-only [`Snapshot`](super::Snapshot::Snapshot), no need to commit or rollback or panic on drop. + ReadOnly, + /// The transaction have not been committed or rolled back. + Active, + /// The transaction has committed. + Committed, + /// The transaction has tried to commit. Only `commit` is allowed. + StartedCommit, + /// The transaction has rolled back. + Rolledback, + /// The transaction has tried to rollback. Only `rollback` is allowed. + StartedRollback, } From 304de84352df3af875a9ccc87a345a6d62f0f8e0 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 08:48:45 +1300 Subject: [PATCH 04/11] txn: rename TwoPhaseCommitter to Committer Signed-off-by: Nick Cameron --- src/transaction/transaction.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 6d42a8d..5f71002 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -402,7 +402,7 @@ impl Transaction { } self.status = TransactionStatus::StartedCommit; - let res = TwoPhaseCommitter::new( + let res = Committer::new( self.buffer.to_proto_mutations().await, self.timestamp.version(), self.bg_worker.clone(), @@ -430,7 +430,7 @@ impl Transaction { } self.status = TransactionStatus::StartedRollback; - let res = TwoPhaseCommitter::new( + let res = Committer::new( self.buffer.to_proto_mutations().await, self.timestamp.version(), self.bg_worker.clone(), @@ -596,7 +596,7 @@ const DEFAULT_LOCK_TTL: u64 = 3000; /// /// The committer implements `prewrite`, `commit` and `rollback` functions. #[derive(new)] -struct TwoPhaseCommitter { +struct Committer { mutations: Vec, start_version: u64, bg_worker: ThreadPool, @@ -606,7 +606,7 @@ struct TwoPhaseCommitter { undetermined: bool, } -impl TwoPhaseCommitter { +impl Committer { async fn commit(mut self) -> Result { if self.mutations.is_empty() { return Ok(0); From 0db6ab8923cb50cac21b354f12ec6b676c498a33 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 10:30:51 +1300 Subject: [PATCH 05/11] Refactor Backoff Signed-off-by: Nick Cameron --- src/backoff.rs | 364 +++++++++++++++------------------ src/lib.rs | 3 + src/raw/client.rs | 20 +- src/raw/requests.rs | 5 +- src/request.rs | 96 +++++---- src/transaction/client.rs | 7 +- src/transaction/lock.rs | 6 +- src/transaction/requests.rs | 9 +- src/transaction/transaction.rs | 45 ++-- 9 files changed, 265 insertions(+), 290 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index dae7595..b6fca1d 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -5,162 +5,9 @@ use rand::{thread_rng, Rng}; use std::time::Duration; -pub trait Backoff: Clone + Send + 'static { - // Returns the delay period for next retry. If the maximum retry count is hit returns None. - fn next_delay_duration(&mut self) -> Option; -} - -// NoBackoff means that we don't want any retry here. -#[derive(Clone)] -pub struct NoBackoff; - -impl Backoff for NoBackoff { - fn next_delay_duration(&mut self) -> Option { - None - } -} - -// Exponential backoff means that the retry delay should multiply a constant -// after each attempt, up to a maximum value. After each attempt, the new retry -// delay should be: -// -// new_delay = min(max_delay, base_delay * 2 ** attempts) -#[derive(Clone)] -pub struct NoJitterBackoff { - current_attempts: u32, - max_attempts: u32, - current_delay_ms: u64, - max_delay_ms: u64, -} - -impl NoJitterBackoff { - pub const fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self { - Self { - current_attempts: 0, - max_attempts, - current_delay_ms: base_delay_ms, - max_delay_ms, - } - } -} - -impl Backoff for NoJitterBackoff { - fn next_delay_duration(&mut self) -> Option { - if self.current_attempts >= self.max_attempts { - return None; - } - - let delay_ms = self.max_delay_ms.min(self.current_delay_ms); - - self.current_attempts += 1; - self.current_delay_ms <<= 1; - - Some(Duration::from_millis(delay_ms)) - } -} - -// Adds Jitter to the basic exponential backoff. Returns a random value between -// zero and the calculated exponential backoff: -// -// temp = min(max_delay, base_delay * 2 ** attempts) -// new_delay = random_between(0, temp) -#[derive(Clone)] -pub struct FullJitterBackoff { - current_attempts: u32, - max_attempts: u32, - current_delay_ms: u64, - max_delay_ms: u64, -} - -impl FullJitterBackoff { - #[allow(dead_code)] - pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self { - if base_delay_ms == 0 || max_delay_ms == 0 { - panic!("Both base_delay_ms and max_delay_ms must be positive"); - } - - Self { - current_attempts: 0, - max_attempts, - current_delay_ms: base_delay_ms, - max_delay_ms, - } - } -} - -impl Backoff for FullJitterBackoff { - fn next_delay_duration(&mut self) -> Option { - if self.current_attempts >= self.max_attempts { - return None; - } - - let delay_ms = self.max_delay_ms.min(self.current_delay_ms); - - let mut rng = thread_rng(); - let delay_ms: u64 = rng.gen_range(0, delay_ms); - - self.current_attempts += 1; - self.current_delay_ms <<= 1; - - Some(Duration::from_millis(delay_ms)) - } -} - -// Equal Jitter limits the random value should be equal or greater than half of -// the calculated exponential backoff: -// -// temp = min(max_delay, base_delay * 2 ** attempts) -// new_delay = random_between(temp / 2, temp) -#[derive(Clone)] -pub struct EqualJitterBackoff { - current_attempts: u32, - max_attempts: u32, - current_delay_ms: u64, - max_delay_ms: u64, -} - -impl EqualJitterBackoff { - #[allow(dead_code)] - pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self { - if base_delay_ms < 2 || max_delay_ms < 2 { - panic!("Both base_delay_ms and max_delay_ms must be greater than 1"); - } - - Self { - current_attempts: 0, - max_attempts, - current_delay_ms: base_delay_ms, - max_delay_ms, - } - } -} - -impl Backoff for EqualJitterBackoff { - fn next_delay_duration(&mut self) -> Option { - if self.current_attempts >= self.max_attempts { - return None; - } - - let delay_ms = self.max_delay_ms.min(self.current_delay_ms); - let half_delay_ms = delay_ms >> 1; - - let mut rng = thread_rng(); - let delay_ms: u64 = rng.gen_range(0, half_delay_ms) + half_delay_ms; - - self.current_attempts += 1; - self.current_delay_ms <<= 1; - - Some(Duration::from_millis(delay_ms)) - } -} - -// Decorrelated Jitter is always calculated with the previous backoff -// (the initial value is base_delay): -// -// temp = random_between(base_delay, previous_delay * 3) -// new_delay = min(max_delay, temp) -#[derive(Clone)] -pub struct DecorrelatedJitterBackoff { +#[derive(Debug, Clone)] +pub struct Backoff { + kind: BackoffKind, current_attempts: u32, max_attempts: u32, base_delay_ms: u64, @@ -168,14 +15,130 @@ pub struct DecorrelatedJitterBackoff { max_delay_ms: u64, } -impl DecorrelatedJitterBackoff { - #[allow(dead_code)] - pub fn new(base_delay_ms: u64, max_delay_ms: u64, max_attempts: u32) -> Self { - if base_delay_ms == 0 { - panic!("base_delay_ms must be positive"); +impl Backoff { + // Returns the delay period for next retry. If the maximum retry count is hit returns None. + pub fn next_delay_duration(&mut self) -> Option { + if self.current_attempts >= self.max_attempts { + return None; } + self.current_attempts += 1; - Self { + match self.kind { + BackoffKind::None => None, + BackoffKind::NoJitter => { + let delay_ms = self.max_delay_ms.min(self.current_delay_ms); + self.current_delay_ms <<= 1; + + Some(Duration::from_millis(delay_ms)) + } + BackoffKind::FullJitter => { + let delay_ms = self.max_delay_ms.min(self.current_delay_ms); + + let mut rng = thread_rng(); + let delay_ms: u64 = rng.gen_range(0, delay_ms); + self.current_delay_ms <<= 1; + + Some(Duration::from_millis(delay_ms)) + } + BackoffKind::EqualJitter => { + let delay_ms = self.max_delay_ms.min(self.current_delay_ms); + let half_delay_ms = delay_ms >> 1; + + let mut rng = thread_rng(); + let delay_ms: u64 = rng.gen_range(0, half_delay_ms) + half_delay_ms; + self.current_delay_ms <<= 1; + + Some(Duration::from_millis(delay_ms)) + } + BackoffKind::DecorrelatedJitter => { + let mut rng = thread_rng(); + let delay_ms: u64 = rng + .gen_range(0, self.current_delay_ms * 3 - self.base_delay_ms) + + self.base_delay_ms; + + let delay_ms = delay_ms.min(self.max_delay_ms); + self.current_delay_ms = delay_ms; + + Some(Duration::from_millis(delay_ms)) + } + } + } + + pub const fn no_backoff() -> Backoff { + Backoff { + kind: BackoffKind::None, + current_attempts: 0, + max_attempts: 0, + base_delay_ms: 0, + current_delay_ms: 0, + max_delay_ms: 0, + } + } + + pub const fn no_jitter_backoff( + base_delay_ms: u64, + max_delay_ms: u64, + max_attempts: u32, + ) -> Backoff { + Backoff { + kind: BackoffKind::NoJitter, + current_attempts: 0, + max_attempts, + base_delay_ms, + current_delay_ms: base_delay_ms, + max_delay_ms, + } + } + + pub fn full_jitter_backoff( + base_delay_ms: u64, + max_delay_ms: u64, + max_attempts: u32, + ) -> Backoff { + assert!( + base_delay_ms > 0 && max_delay_ms > 0, + "Both base_delay_ms and max_delay_ms must be positive" + ); + + Backoff { + kind: BackoffKind::FullJitter, + current_attempts: 0, + max_attempts, + base_delay_ms, + current_delay_ms: base_delay_ms, + max_delay_ms, + } + } + + pub fn equal_jitter_backoff( + base_delay_ms: u64, + max_delay_ms: u64, + max_attempts: u32, + ) -> Backoff { + assert!( + base_delay_ms > 1 && max_delay_ms > 1, + "Both base_delay_ms and max_delay_ms must be greater than 1" + ); + + Backoff { + kind: BackoffKind::EqualJitter, + current_attempts: 0, + max_attempts, + base_delay_ms, + current_delay_ms: base_delay_ms, + max_delay_ms, + } + } + + pub fn decorrelated_jitter_backoff( + base_delay_ms: u64, + max_delay_ms: u64, + max_attempts: u32, + ) -> Backoff { + assert!(base_delay_ms > 0, "base_delay_ms must be positive"); + + Backoff { + kind: BackoffKind::DecorrelatedJitter, current_attempts: 0, max_attempts, base_delay_ms, @@ -185,23 +148,34 @@ impl DecorrelatedJitterBackoff { } } -impl Backoff for DecorrelatedJitterBackoff { - fn next_delay_duration(&mut self) -> Option { - if self.current_attempts >= self.max_attempts { - return None; - } - - let mut rng = thread_rng(); - let delay_ms: u64 = - rng.gen_range(0, self.current_delay_ms * 3 - self.base_delay_ms) + self.base_delay_ms; - - let delay_ms = delay_ms.min(self.max_delay_ms); - - self.current_attempts += 1; - self.current_delay_ms = delay_ms; - - Some(Duration::from_millis(delay_ms)) - } +#[derive(Debug, Clone)] +enum BackoffKind { + // NoBackoff means that we don't want any retry here. + None, + // Exponential backoff means that the retry delay should multiply a constant + // after each attempt, up to a maximum value. After each attempt, the new retry + // delay should be: + // + // new_delay = min(max_delay, base_delay * 2 ** attempts) + NoJitter, + // Adds Jitter to the basic exponential backoff. Returns a random value between + // zero and the calculated exponential backoff: + // + // temp = min(max_delay, base_delay * 2 ** attempts) + // new_delay = random_between(0, temp) + FullJitter, + // Equal Jitter limits the random value should be equal or greater than half of + // the calculated exponential backoff: + // + // temp = min(max_delay, base_delay * 2 ** attempts) + // new_delay = random_between(temp / 2, temp) + EqualJitter, + // Decorrelated Jitter is always calculated with the previous backoff + // (the initial value is base_delay): + // + // temp = random_between(base_delay, previous_delay * 3) + // new_delay = min(max_delay, temp) + DecorrelatedJitter, } #[cfg(test)] @@ -212,22 +186,10 @@ mod test { #[test] fn test_no_jitter_backoff() { // Tests for zero attempts. - let mut backoff = NoJitterBackoff { - current_attempts: 0, - max_attempts: 0, - current_delay_ms: 0, - max_delay_ms: 0, - }; - + let mut backoff = Backoff::no_jitter_backoff(0, 0, 0); assert_eq!(backoff.next_delay_duration(), None); - let mut backoff = NoJitterBackoff { - current_attempts: 0, - max_attempts: 3, - current_delay_ms: 2, - max_delay_ms: 7, - }; - + let mut backoff = Backoff::no_jitter_backoff(2, 7, 3); assert_eq!( backoff.next_delay_duration(), Some(Duration::from_millis(2)) @@ -245,7 +207,7 @@ mod test { #[test] fn test_full_jitter_backoff() { - let mut backoff = FullJitterBackoff::new(2, 7, 3); + let mut backoff = Backoff::full_jitter_backoff(2, 7, 3); assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(2)); assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(4)); assert!(backoff.next_delay_duration().unwrap() <= Duration::from_millis(7)); @@ -255,18 +217,18 @@ mod test { #[test] #[should_panic(expected = "Both base_delay_ms and max_delay_ms must be positive")] fn test_full_jitter_backoff_with_invalid_base_delay_ms() { - FullJitterBackoff::new(0, 7, 3); + Backoff::full_jitter_backoff(0, 7, 3); } #[test] #[should_panic(expected = "Both base_delay_ms and max_delay_ms must be positive")] fn test_full_jitter_backoff_with_invalid_max_delay_ms() { - FullJitterBackoff::new(2, 0, 3); + Backoff::full_jitter_backoff(2, 0, 3); } #[test] fn test_equal_jitter_backoff() { - let mut backoff = EqualJitterBackoff::new(2, 7, 3); + let mut backoff = Backoff::equal_jitter_backoff(2, 7, 3); let first_delay_dur = backoff.next_delay_duration().unwrap(); assert!(first_delay_dur >= Duration::from_millis(1)); @@ -286,18 +248,18 @@ mod test { #[test] #[should_panic(expected = "Both base_delay_ms and max_delay_ms must be greater than 1")] fn test_equal_jitter_backoff_with_invalid_base_delay_ms() { - EqualJitterBackoff::new(1, 7, 3); + Backoff::equal_jitter_backoff(1, 7, 3); } #[test] #[should_panic(expected = "Both base_delay_ms and max_delay_ms must be greater than 1")] fn test_equal_jitter_backoff_with_invalid_max_delay_ms() { - EqualJitterBackoff::new(2, 1, 3); + Backoff::equal_jitter_backoff(2, 1, 3); } #[test] fn test_decorrelated_jitter_backoff() { - let mut backoff = DecorrelatedJitterBackoff::new(2, 7, 3); + let mut backoff = Backoff::decorrelated_jitter_backoff(2, 7, 3); let first_delay_dur = backoff.next_delay_duration().unwrap(); assert!(first_delay_dur >= Duration::from_millis(2)); @@ -319,6 +281,6 @@ mod test { #[test] #[should_panic(expected = "base_delay_ms must be positive")] fn test_decorrelated_jitter_backoff_with_invalid_base_delay_ms() { - DecorrelatedJitterBackoff::new(0, 7, 3); + Backoff::decorrelated_jitter_backoff(0, 7, 3); } } diff --git a/src/lib.rs b/src/lib.rs index 4cc8d23..2b28e60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,6 +98,8 @@ mod proptests; #[macro_use] extern crate log; +#[doc(inline)] +pub use crate::backoff::Backoff; #[doc(inline)] pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value}; #[doc(inline)] @@ -106,6 +108,7 @@ pub use crate::raw::{Client as RawClient, ColumnFamily}; pub use crate::timestamp::{Timestamp, TimestampExt}; #[doc(inline)] pub use crate::transaction::{Client as TransactionClient, Snapshot, Transaction}; +#[doc(inline)] pub use config::Config; #[doc(inline)] pub use region::{Region, RegionId, RegionVerId, StoreId}; diff --git a/src/raw/client.rs b/src/raw/client.rs index 3183ce2..dfe3318 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -6,7 +6,7 @@ use super::requests; use crate::{ config::Config, pd::PdRpcClient, - request::{KvRequest, OPTIMISTIC_BACKOFF}, + request::{KvRequest, RetryOptions}, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; use std::{sync::Arc, u32}; @@ -111,7 +111,7 @@ impl Client { /// ``` pub async fn get(&self, key: impl Into) -> Result> { requests::new_raw_get_request(key, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -138,7 +138,7 @@ impl Client { keys: impl IntoIterator>, ) -> Result> { requests::new_raw_batch_get_request(keys, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -160,7 +160,7 @@ impl Client { /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { requests::new_raw_put_request(key, value, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -186,7 +186,7 @@ impl Client { pairs: impl IntoIterator>, ) -> Result<()> { requests::new_raw_batch_put_request(pairs, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -209,7 +209,7 @@ impl Client { /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { requests::new_raw_delete_request(key, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -232,7 +232,7 @@ impl Client { /// ``` pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { requests::new_raw_batch_delete_request(keys, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -253,7 +253,7 @@ impl Client { /// ``` pub async fn delete_range(&self, range: impl Into) -> Result<()> { requests::new_raw_delete_range_request(range, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -389,7 +389,7 @@ impl Client { } let res = requests::new_raw_scan_request(range, limit, key_only, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await; res.map(|mut s| { s.truncate(limit as usize); @@ -411,7 +411,7 @@ impl Client { } requests::new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 3b40d65..bc91d0c 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -467,7 +467,7 @@ mod test { use super::*; use crate::{ mock::{MockKvClient, MockPdClient}, - request::OPTIMISTIC_BACKOFF, + request::RetryOptions, }; use futures::executor; use std::any::Any; @@ -502,7 +502,8 @@ mod test { key_only: true, ..Default::default() }; - let scan = executor::block_on(scan.execute(client, OPTIMISTIC_BACKOFF)).unwrap(); + let scan = + executor::block_on(scan.execute(client, RetryOptions::default_optimistic())).unwrap(); assert_eq!(scan.len(), 10); // TODO test the keys returned. diff --git a/src/request.rs b/src/request.rs index 1b43755..f55e64f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,7 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - backoff::{Backoff, NoBackoff, NoJitterBackoff}, + backoff::Backoff, pd::PdClient, stats::tikv_stats, store::Store, @@ -16,9 +16,9 @@ use std::{ }; use tikv_client_store::{HasError, HasRegionError, Request}; -const DEFAULT_REGION_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10); -pub const OPTIMISTIC_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10); -pub const PESSIMISTIC_BACKOFF: NoBackoff = NoBackoff; +const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); +pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10); +pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_backoff(); #[async_trait] pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { @@ -30,13 +30,13 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { /// is the part which differs among the requests. type KeyData: Send; - async fn execute( + async fn execute( self, pd_client: Arc, - lock_backoff: B, + retry: RetryOptions, ) -> Result { Self::reduce( - self.response_stream(pd_client, lock_backoff) + self.response_stream(pd_client, retry) .and_then(|mut response| match response.error() { Some(e) => future::err(e), None => future::ok(response), @@ -50,16 +50,15 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { fn response_stream( self, pd_client: Arc, - lock_backoff: impl Backoff, + retry: RetryOptions, ) -> BoxStream<'static, Result> { - self.retry_response_stream(pd_client, DEFAULT_REGION_BACKOFF, lock_backoff) + self.retry_response_stream(pd_client, retry) } fn retry_response_stream( mut self, pd_client: Arc, - region_backoff: impl Backoff, - lock_backoff: impl Backoff, + retry: RetryOptions, ) -> BoxStream<'static, Result> { let stores = self.store_stream(pd_client.clone()); stores @@ -75,29 +74,20 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { }) .map_ok(move |(request, mut response)| { if let Some(region_error) = response.region_error() { - return request.on_region_error( - region_error, - pd_client.clone(), - region_backoff.clone(), - lock_backoff.clone(), - ); + return request.on_region_error(region_error, pd_client.clone(), retry.clone()); } // Resolve locks let locks = response.take_locks(); if !locks.is_empty() { let pd_client = pd_client.clone(); - let region_backoff = region_backoff.clone(); - let lock_backoff = lock_backoff.clone(); + let retry = retry.clone(); return resolve_locks(locks, pd_client.clone()) .map_ok(move |resolved| { if !resolved { - request.on_resolve_lock_failed( - pd_client, - region_backoff, - lock_backoff, - ) + request.on_resolve_lock_failed(pd_client, retry) } else { - request.response_stream(pd_client, OPTIMISTIC_BACKOFF) + request + .response_stream(pd_client, RetryOptions::default_optimistic()) } }) .try_flatten_stream() @@ -113,10 +103,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { self, region_error: Error, pd_client: Arc, - mut region_backoff: impl Backoff, - lock_backoff: impl Backoff, + mut retry: RetryOptions, ) -> BoxStream<'static, Result> { - region_backoff.next_delay_duration().map_or( + retry.region_backoff.next_delay_duration().map_or( stream::once(future::err(region_error)).boxed(), move |delay_duration| { let fut = async move { @@ -124,11 +113,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { Ok(()) }; - fut.map_ok(move |_| { - self.retry_response_stream(pd_client, region_backoff, lock_backoff) - }) - .try_flatten_stream() - .boxed() + fut.map_ok(move |_| self.retry_response_stream(pd_client, retry)) + .try_flatten_stream() + .boxed() }, ) } @@ -136,21 +123,18 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { fn on_resolve_lock_failed( self, pd_client: Arc, - region_backoff: impl Backoff, - mut lock_backoff: impl Backoff, + mut retry: RetryOptions, ) -> BoxStream<'static, Result> { - lock_backoff.next_delay_duration().map_or( + retry.lock_backoff.next_delay_duration().map_or( stream::once(future::err(Error::ResolveLockError)).boxed(), move |delay_duration| { let fut = async move { futures_timer::Delay::new(delay_duration).await; Ok(()) }; - fut.map_ok(move |_| { - self.retry_response_stream(pd_client, region_backoff, lock_backoff) - }) - .try_flatten_stream() - .boxed() + fut.map_ok(move |_| self.retry_response_stream(pd_client, retry)) + .try_flatten_stream() + .boxed() }, ) } @@ -176,6 +160,28 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { } } +#[derive(Clone, Debug)] +pub struct RetryOptions { + region_backoff: Backoff, + lock_backoff: Backoff, +} + +impl RetryOptions { + pub const fn default_optimistic() -> RetryOptions { + RetryOptions { + region_backoff: DEFAULT_REGION_BACKOFF, + lock_backoff: OPTIMISTIC_BACKOFF, + } + } + + pub const fn default_pessimistic() -> RetryOptions { + RetryOptions { + region_backoff: DEFAULT_REGION_BACKOFF, + lock_backoff: PESSIMISTIC_BACKOFF, + } + } +} + pub fn store_stream_for_key( key_data: KeyData, pd_client: Arc, @@ -352,9 +358,11 @@ mod test { let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); - let region_backoff = NoJitterBackoff::new(1, 1, 3); - let lock_backoff = NoJitterBackoff::new(1, 1, 3); - let stream = request.retry_response_stream(pd_client, region_backoff, lock_backoff); + let retry = RetryOptions { + region_backoff: Backoff::no_jitter_backoff(1, 1, 3), + lock_backoff: Backoff::no_jitter_backoff(1, 1, 3), + }; + let stream = request.retry_response_stream(pd_client, retry); executor::block_on(async { stream.collect::>>().await }); diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 37e6e98..48ae732 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -4,7 +4,7 @@ use super::{requests::new_scan_lock_request, resolve_locks}; use crate::{ config::Config, pd::{PdClient, PdRpcClient}, - request::{KvRequest, OPTIMISTIC_BACKOFF}, + request::{KvRequest, RetryOptions}, timestamp::TimestampExt, transaction::{Snapshot, Transaction, TransactionOptions}, Result, @@ -183,8 +183,9 @@ impl Client { safepoint.clone(), SCAN_LOCK_BATCH_SIZE, ); - let res: Vec = - req.execute(self.pd.clone(), OPTIMISTIC_BACKOFF).await?; + let res: Vec = req + .execute(self.pd.clone(), RetryOptions::default_optimistic()) + .await?; if res.is_empty() { break; } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 58b3986..8f838a7 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -1,6 +1,6 @@ use crate::{ pd::PdClient, - request::{KvRequest, OPTIMISTIC_BACKOFF}, + request::{KvRequest, RetryOptions}, timestamp::TimestampExt, transaction::requests, Error, Key, RegionVerId, Result, @@ -54,7 +54,7 @@ pub async fn resolve_locks( Some(&commit_version) => commit_version, None => { let commit_version = requests::new_cleanup_request(primary_key, lock.lock_version) - .execute(pd_client.clone(), OPTIMISTIC_BACKOFF) + .execute(pd_client.clone(), RetryOptions::default_optimistic()) .await?; commit_versions.insert(lock.lock_version, commit_version); commit_version @@ -95,7 +95,7 @@ async fn resolve_lock_with_retry( } }; match requests::new_resolve_lock_request(context, start_version, commit_version) - .execute(pd_client.clone(), OPTIMISTIC_BACKOFF) + .execute(pd_client.clone(), RetryOptions::default_optimistic()) .await { Ok(_) => { diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 99da84d..d29e988 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,9 +1,11 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - backoff::Backoff, pd::PdClient, - request::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest}, + request::{ + store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest, + RetryOptions, + }, store::Store, timestamp::TimestampExt, transaction::HasLocks, @@ -201,8 +203,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest { self, region_error: Error, _pd_client: Arc, - _region_backoff: impl Backoff, - _lock_backoff: impl Backoff, + _: RetryOptions, ) -> BoxStream<'static, Result> { stream::once(future::err(region_error)).boxed() } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 5f71002..f38a95d 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -2,7 +2,7 @@ use crate::{ pd::{PdClient, PdRpcClient}, - request::{KvRequest, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF}, + request::{KvRequest, RetryOptions}, timestamp::TimestampExt, transaction::{buffer::Buffer, requests::*}, BoundRange, Error, Key, KvPair, Result, Value, @@ -12,7 +12,7 @@ use futures::{executor::ThreadPool, prelude::*, stream::BoxStream}; use std::{iter, ops::RangeBounds, sync::Arc}; use tikv_client_proto::{kvrpcpb, pdpb::Timestamp}; -/// A undo-able set of actions on the dataset. +/// An undo-able set of actions on the dataset. /// /// Using a transaction you can prepare a set of actions (such as `get`, or `put`) on data at a /// particular timestamp called `start_ts` obtained from the placement driver. @@ -98,7 +98,7 @@ impl Transaction { self.buffer .get_or_else(key, |key| { new_mvcc_get_request(key, self.timestamp.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) }) .await } @@ -132,7 +132,7 @@ impl Transaction { self.buffer .get_or_else(key, |key| { new_mvcc_get_request(key, self.timestamp.clone()) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) }) .await } @@ -173,7 +173,8 @@ impl Transaction { let rpc = self.rpc.clone(); self.buffer .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| { - new_mvcc_get_batch_request(keys, timestamp).execute(rpc, OPTIMISTIC_BACKOFF) + new_mvcc_get_batch_request(keys, timestamp) + .execute(rpc, RetryOptions::default_optimistic()) }) .await } @@ -459,7 +460,7 @@ impl Transaction { self.buffer .scan_and_fetch(range.into(), limit, move |new_range, new_limit| { new_mvcc_scan_request(new_range, timestamp, new_limit, key_only) - .execute(rpc, OPTIMISTIC_BACKOFF) + .execute(rpc, RetryOptions::default_optimistic()) }) .await } @@ -496,7 +497,7 @@ impl Transaction { lock_ttl, for_update_ts, ) - .execute(self.rpc.clone(), PESSIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_pessimistic()) .await } @@ -583,6 +584,13 @@ impl TransactionOptions { } } } + + fn retry_options(&self) -> RetryOptions { + match self.kind { + TransactionKind::Optimistic => RetryOptions::default_optimistic(), + TransactionKind::Pessimistic(_) => RetryOptions::default_pessimistic(), + } + } } /// The default TTL of a lock in milliseconds @@ -669,18 +677,9 @@ impl Committer { request.secondaries = self.mutations[1..].iter().map(|m| m.key.clone()).collect(); // FIXME set max_commit_ts and min_commit_ts - let response = match self.options.kind { - TransactionKind::Optimistic => { - request - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) - .await? - } - TransactionKind::Pessimistic(_) => { - request - .execute(self.rpc.clone(), PESSIMISTIC_BACKOFF) - .await? - } - }; + let response = request + .execute(self.rpc.clone(), self.options.retry_options()) + .await?; if self.options.try_one_pc && response.len() == 1 { if response[0].one_pc_commit_ts == 0 { @@ -709,7 +708,7 @@ impl Committer { let primary_key = vec![self.mutations[0].key.clone().into()]; let commit_version = self.rpc.clone().get_timestamp().await?.version(); new_commit_request(primary_key, self.start_version, commit_version) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .inspect_err(|e| { // We don't know whether the transaction is committed or not if we fail to receive // the response. Then, we mark the transaction as undetermined and propagate the @@ -739,7 +738,7 @@ impl Committer { .map(|mutation| mutation.key.into()) .collect(); new_commit_request(keys, self.start_version, commit_version) - .execute(self.rpc.clone(), OPTIMISTIC_BACKOFF) + .execute(self.rpc.clone(), RetryOptions::default_optimistic()) .await } @@ -753,12 +752,12 @@ impl Committer { TransactionKind::Optimistic if keys.is_empty() => Ok(()), TransactionKind::Optimistic => { new_batch_rollback_request(keys, self.start_version) - .execute(self.rpc, OPTIMISTIC_BACKOFF) + .execute(self.rpc, RetryOptions::default_optimistic()) .await } TransactionKind::Pessimistic(for_update_ts) => { new_pessimistic_rollback_request(keys, self.start_version, for_update_ts) - .execute(self.rpc, OPTIMISTIC_BACKOFF) + .execute(self.rpc, RetryOptions::default_optimistic()) .await } } From de9ca63dc9939361aba2c5c64331d416f7d3cf7e Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 10:43:47 +1300 Subject: [PATCH 06/11] Add transaction option for no retry Signed-off-by: Nick Cameron --- src/backoff.rs | 4 ++-- src/request.rs | 3 ++- src/transaction/transaction.rs | 43 ++++++++++++++++++++-------------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index b6fca1d..b8e9c2d 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -5,7 +5,7 @@ use rand::{thread_rng, Rng}; use std::time::Duration; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Backoff { kind: BackoffKind, current_attempts: u32, @@ -148,7 +148,7 @@ impl Backoff { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] enum BackoffKind { // NoBackoff means that we don't want any retry here. None, diff --git a/src/request.rs b/src/request.rs index f55e64f..71ff436 100644 --- a/src/request.rs +++ b/src/request.rs @@ -9,6 +9,7 @@ use crate::{ BoundRange, Error, Key, Result, }; use async_trait::async_trait; +use derive_new::new; use futures::{prelude::*, stream::BoxStream}; use std::{ cmp::{max, min}, @@ -160,7 +161,7 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { region_backoff: Backoff, lock_backoff: Backoff, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f38a95d..63c65c4 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ + backoff::Backoff, pd::{PdClient, PdRpcClient}, request::{KvRequest, RetryOptions}, timestamp::TimestampExt, @@ -408,7 +409,7 @@ impl Transaction { self.timestamp.version(), self.bg_worker.clone(), self.rpc.clone(), - self.options, + self.options.clone(), ) .commit() .await; @@ -436,7 +437,7 @@ impl Transaction { self.timestamp.version(), self.bg_worker.clone(), self.rpc.clone(), - self.options, + self.options.clone(), ) .rollback() .await; @@ -532,18 +533,19 @@ pub enum TransactionKind { Pessimistic(u64), } -impl Default for TransactionKind { - fn default() -> TransactionKind { - TransactionKind::Pessimistic(0) - } -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct TransactionOptions { kind: TransactionKind, try_one_pc: bool, async_commit: bool, read_only: bool, + retry_options: RetryOptions, +} + +impl Default for TransactionOptions { + fn default() -> TransactionOptions { + Self::new_pessimistic(false) + } } impl TransactionOptions { @@ -553,6 +555,7 @@ impl TransactionOptions { try_one_pc, async_commit: false, read_only: false, + retry_options: RetryOptions::default_optimistic(), } } @@ -562,6 +565,7 @@ impl TransactionOptions { try_one_pc, async_commit: false, read_only: false, + retry_options: RetryOptions::default_pessimistic(), } } @@ -575,6 +579,18 @@ impl TransactionOptions { self } + pub fn no_retry(self) -> TransactionOptions { + self.retry_options(RetryOptions::new( + Backoff::no_backoff(), + Backoff::no_backoff(), + )) + } + + pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions { + self.retry_options = options; + self + } + fn push_for_update_ts(&mut self, for_update_ts: u64) { match &mut self.kind { TransactionKind::Optimistic => unreachable!(), @@ -584,13 +600,6 @@ impl TransactionOptions { } } } - - fn retry_options(&self) -> RetryOptions { - match self.kind { - TransactionKind::Optimistic => RetryOptions::default_optimistic(), - TransactionKind::Pessimistic(_) => RetryOptions::default_pessimistic(), - } - } } /// The default TTL of a lock in milliseconds @@ -678,7 +687,7 @@ impl Committer { // FIXME set max_commit_ts and min_commit_ts let response = request - .execute(self.rpc.clone(), self.options.retry_options()) + .execute(self.rpc.clone(), self.options.retry_options.clone()) .await?; if self.options.try_one_pc && response.len() == 1 { From 59b4a8d32be9b4f748bdfb5a025b417ae3cfd5d3 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 10:55:31 +1300 Subject: [PATCH 07/11] Add an option to not resolve locks on key locked errors Signed-off-by: Nick Cameron --- src/request.rs | 11 +++++++++-- src/transaction/transaction.rs | 14 +++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/request.rs b/src/request.rs index 71ff436..936ccd1 100644 --- a/src/request.rs +++ b/src/request.rs @@ -80,6 +80,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { // Resolve locks let locks = response.take_locks(); if !locks.is_empty() { + if !retry.auto_resolve_locks { + return stream::once(future::err(Error::ResolveLockError)).boxed(); + } let pd_client = pd_client.clone(); let retry = retry.clone(); return resolve_locks(locks, pd_client.clone()) @@ -163,8 +166,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { #[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { - region_backoff: Backoff, - lock_backoff: Backoff, + pub region_backoff: Backoff, + pub lock_backoff: Backoff, + pub auto_resolve_locks: bool, } impl RetryOptions { @@ -172,6 +176,7 @@ impl RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: OPTIMISTIC_BACKOFF, + auto_resolve_locks: true, } } @@ -179,6 +184,7 @@ impl RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: PESSIMISTIC_BACKOFF, + auto_resolve_locks: true, } } } @@ -362,6 +368,7 @@ mod test { let retry = RetryOptions { region_backoff: Backoff::no_jitter_backoff(1, 1, 3), lock_backoff: Backoff::no_jitter_backoff(1, 1, 3), + auto_resolve_locks: true, }; let stream = request.retry_response_stream(pd_client, retry); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 63c65c4..a819d38 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -579,11 +579,15 @@ impl TransactionOptions { self } - pub fn no_retry(self) -> TransactionOptions { - self.retry_options(RetryOptions::new( - Backoff::no_backoff(), - Backoff::no_backoff(), - )) + pub fn no_resolve_locks(mut self) -> TransactionOptions { + self.retry_options.auto_resolve_locks = false; + self + } + + pub fn no_retry(mut self) -> TransactionOptions { + self.retry_options.region_backoff = Backoff::no_backoff(); + self.retry_options.lock_backoff = Backoff::no_backoff(); + self } pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions { From 5d584d93b36daace17b1b10b073279d1b31e305f Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 10:58:50 +1300 Subject: [PATCH 08/11] Add option to not automatically resolve regions Signed-off-by: Nick Cameron --- src/request.rs | 7 +++++++ src/transaction/transaction.rs | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/src/request.rs b/src/request.rs index 936ccd1..2dfbe85 100644 --- a/src/request.rs +++ b/src/request.rs @@ -75,6 +75,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { }) .map_ok(move |(request, mut response)| { if let Some(region_error) = response.region_error() { + if !retry.auto_resolve_regions { + return stream::once(future::err(region_error)).boxed(); + } return request.on_region_error(region_error, pd_client.clone(), retry.clone()); } // Resolve locks @@ -169,6 +172,7 @@ pub struct RetryOptions { pub region_backoff: Backoff, pub lock_backoff: Backoff, pub auto_resolve_locks: bool, + pub auto_resolve_regions: bool, } impl RetryOptions { @@ -177,6 +181,7 @@ impl RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: OPTIMISTIC_BACKOFF, auto_resolve_locks: true, + auto_resolve_regions: true, } } @@ -185,6 +190,7 @@ impl RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: PESSIMISTIC_BACKOFF, auto_resolve_locks: true, + auto_resolve_regions: true, } } } @@ -369,6 +375,7 @@ mod test { region_backoff: Backoff::no_jitter_backoff(1, 1, 3), lock_backoff: Backoff::no_jitter_backoff(1, 1, 3), auto_resolve_locks: true, + auto_resolve_regions: true, }; let stream = request.retry_response_stream(pd_client, retry); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index a819d38..8a31579 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -584,6 +584,11 @@ impl TransactionOptions { self } + pub fn no_resolve_regions(mut self) -> TransactionOptions { + self.retry_options.auto_resolve_regions = false; + self + } + pub fn no_retry(mut self) -> TransactionOptions { self.retry_options.region_backoff = Backoff::no_backoff(); self.retry_options.lock_backoff = Backoff::no_backoff(); From 1c7e843b5476bf77de3844a0030fa5d2e3a3bc01 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 11:06:01 +1300 Subject: [PATCH 09/11] Refactor the retry options Signed-off-by: Nick Cameron --- src/backoff.rs | 4 ++++ src/request.rs | 16 ++-------------- src/transaction/transaction.rs | 8 +------- 3 files changed, 7 insertions(+), 21 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index b8e9c2d..d55f4d8 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -64,6 +64,10 @@ impl Backoff { } } + pub fn is_none(&self) -> bool { + self.kind == BackoffKind::None + } + pub const fn no_backoff() -> Backoff { Backoff { kind: BackoffKind::None, diff --git a/src/request.rs b/src/request.rs index 2dfbe85..e5d1713 100644 --- a/src/request.rs +++ b/src/request.rs @@ -75,15 +75,12 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { }) .map_ok(move |(request, mut response)| { if let Some(region_error) = response.region_error() { - if !retry.auto_resolve_regions { - return stream::once(future::err(region_error)).boxed(); - } return request.on_region_error(region_error, pd_client.clone(), retry.clone()); } // Resolve locks let locks = response.take_locks(); if !locks.is_empty() { - if !retry.auto_resolve_locks { + if retry.lock_backoff.is_none() { return stream::once(future::err(Error::ResolveLockError)).boxed(); } let pd_client = pd_client.clone(); @@ -93,8 +90,7 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { if !resolved { request.on_resolve_lock_failed(pd_client, retry) } else { - request - .response_stream(pd_client, RetryOptions::default_optimistic()) + request.response_stream(pd_client, retry) } }) .try_flatten_stream() @@ -171,8 +167,6 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { pub struct RetryOptions { pub region_backoff: Backoff, pub lock_backoff: Backoff, - pub auto_resolve_locks: bool, - pub auto_resolve_regions: bool, } impl RetryOptions { @@ -180,8 +174,6 @@ impl RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: OPTIMISTIC_BACKOFF, - auto_resolve_locks: true, - auto_resolve_regions: true, } } @@ -189,8 +181,6 @@ impl RetryOptions { RetryOptions { region_backoff: DEFAULT_REGION_BACKOFF, lock_backoff: PESSIMISTIC_BACKOFF, - auto_resolve_locks: true, - auto_resolve_regions: true, } } } @@ -374,8 +364,6 @@ mod test { let retry = RetryOptions { region_backoff: Backoff::no_jitter_backoff(1, 1, 3), lock_backoff: Backoff::no_jitter_backoff(1, 1, 3), - auto_resolve_locks: true, - auto_resolve_regions: true, }; let stream = request.retry_response_stream(pd_client, retry); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 8a31579..68df1b2 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -580,18 +580,12 @@ impl TransactionOptions { } pub fn no_resolve_locks(mut self) -> TransactionOptions { - self.retry_options.auto_resolve_locks = false; + self.retry_options.lock_backoff = Backoff::no_backoff(); self } pub fn no_resolve_regions(mut self) -> TransactionOptions { - self.retry_options.auto_resolve_regions = false; - self - } - - pub fn no_retry(mut self) -> TransactionOptions { self.retry_options.region_backoff = Backoff::no_backoff(); - self.retry_options.lock_backoff = Backoff::no_backoff(); self } From f90a82fe35b1db28cafe75fbda216f11fe3d1655 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 22 Dec 2020 11:23:48 +1300 Subject: [PATCH 10/11] Add some docs and move try_one_pc out of options ctor Signed-off-by: Nick Cameron --- src/backoff.rs | 47 +++++++++++++++++++--------------- src/lib.rs | 4 ++- src/request.rs | 2 ++ src/transaction/client.rs | 35 +++++++++++++++++++------ src/transaction/mod.rs | 3 +-- src/transaction/transaction.rs | 30 ++++++++++++++++++---- 6 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/backoff.rs b/src/backoff.rs index d55f4d8..3f67dd2 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -5,6 +5,9 @@ use rand::{thread_rng, Rng}; use std::time::Duration; +/// When a request is retried, we can backoff for some time to avoid saturating the network. +/// +/// `Backoff` is an object which determines how long to wait for. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Backoff { kind: BackoffKind, @@ -64,10 +67,12 @@ impl Backoff { } } + /// True if we should not backoff at all (usually indicates that we should not retry a request). pub fn is_none(&self) -> bool { self.kind == BackoffKind::None } + /// Don't wait. Usually indicates that we should not retry a request. pub const fn no_backoff() -> Backoff { Backoff { kind: BackoffKind::None, @@ -79,6 +84,11 @@ impl Backoff { } } + // Exponential backoff means that the retry delay should multiply a constant + // after each attempt, up to a maximum value. After each attempt, the new retry + // delay should be: + // + // new_delay = min(max_delay, base_delay * 2 ** attempts) pub const fn no_jitter_backoff( base_delay_ms: u64, max_delay_ms: u64, @@ -94,6 +104,11 @@ impl Backoff { } } + // Adds Jitter to the basic exponential backoff. Returns a random value between + // zero and the calculated exponential backoff: + // + // temp = min(max_delay, base_delay * 2 ** attempts) + // new_delay = random_between(0, temp) pub fn full_jitter_backoff( base_delay_ms: u64, max_delay_ms: u64, @@ -114,6 +129,11 @@ impl Backoff { } } + // Equal Jitter limits the random value should be equal or greater than half of + // the calculated exponential backoff: + // + // temp = min(max_delay, base_delay * 2 ** attempts) + // new_delay = random_between(temp / 2, temp) pub fn equal_jitter_backoff( base_delay_ms: u64, max_delay_ms: u64, @@ -134,6 +154,11 @@ impl Backoff { } } + // Decorrelated Jitter is always calculated with the previous backoff + // (the initial value is base_delay): + // + // temp = random_between(base_delay, previous_delay * 3) + // new_delay = min(max_delay, temp) pub fn decorrelated_jitter_backoff( base_delay_ms: u64, max_delay_ms: u64, @@ -152,33 +177,13 @@ impl Backoff { } } +/// The pattern for computing backoff times. #[derive(Debug, Clone, PartialEq, Eq)] enum BackoffKind { - // NoBackoff means that we don't want any retry here. None, - // Exponential backoff means that the retry delay should multiply a constant - // after each attempt, up to a maximum value. After each attempt, the new retry - // delay should be: - // - // new_delay = min(max_delay, base_delay * 2 ** attempts) NoJitter, - // Adds Jitter to the basic exponential backoff. Returns a random value between - // zero and the calculated exponential backoff: - // - // temp = min(max_delay, base_delay * 2 ** attempts) - // new_delay = random_between(0, temp) FullJitter, - // Equal Jitter limits the random value should be equal or greater than half of - // the calculated exponential backoff: - // - // temp = min(max_delay, base_delay * 2 ** attempts) - // new_delay = random_between(temp / 2, temp) EqualJitter, - // Decorrelated Jitter is always calculated with the previous backoff - // (the initial value is base_delay): - // - // temp = random_between(base_delay, previous_delay * 3) - // new_delay = min(max_delay, temp) DecorrelatedJitter, } diff --git a/src/lib.rs b/src/lib.rs index 2b28e60..4545223 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,7 +107,9 @@ pub use crate::raw::{Client as RawClient, ColumnFamily}; #[doc(inline)] pub use crate::timestamp::{Timestamp, TimestampExt}; #[doc(inline)] -pub use crate::transaction::{Client as TransactionClient, Snapshot, Transaction}; +pub use crate::transaction::{ + Client as TransactionClient, Snapshot, Transaction, TransactionOptions, +}; #[doc(inline)] pub use config::Config; #[doc(inline)] diff --git a/src/request.rs b/src/request.rs index e5d1713..a520300 100644 --- a/src/request.rs +++ b/src/request.rs @@ -165,7 +165,9 @@ pub trait KvRequest: Request + Clone + Sync + Send + 'static + Sized { #[derive(Clone, Debug, new, Eq, PartialEq)] pub struct RetryOptions { + /// How to retry when there is a region error and we need to resolve regions with PD. pub region_backoff: Backoff, + /// How to retry when a key is locked. pub lock_backoff: Backoff, } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 48ae732..325e57f 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -107,10 +107,11 @@ impl Client { /// ``` pub async fn begin_optimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; - Ok(self.new_transaction( - timestamp, - TransactionOptions::new_optimistic(self.config.try_one_pc), - )) + let mut options = TransactionOptions::new_optimistic(); + if self.config.try_one_pc { + options = options.try_one_pc(); + } + Ok(self.new_transaction(timestamp, options)) } /// Creates a new [`Transaction`](Transaction) in pessimistic mode. @@ -132,12 +133,30 @@ impl Client { /// ``` pub async fn begin_pessimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; - Ok(self.new_transaction( - timestamp, - TransactionOptions::new_pessimistic(self.config.try_one_pc), - )) + let mut options = TransactionOptions::new_pessimistic(); + if self.config.try_one_pc { + options = options.try_one_pc(); + } + Ok(self.new_transaction(timestamp, options)) } + /// Creates a new customized [`Transaction`](Transaction). + /// + /// # Examples + /// ```rust,no_run + /// use tikv_client::{Config, TransactionClient, TransactionOptions}; + /// use futures::prelude::*; + /// # futures::executor::block_on(async { + /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let mut transaction = client + /// .begin_with_options(TransactionOptions::default().async_commit()) + /// .await + /// .unwrap(); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result = commit.await.unwrap(); + /// # }); + /// ``` pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 264655f..bf37880 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -11,8 +11,7 @@ pub use client::Client; pub(crate) use lock::{resolve_locks, HasLocks}; pub use snapshot::Snapshot; -pub use transaction::Transaction; -use transaction::TransactionOptions; +pub use transaction::{Transaction, TransactionOptions}; mod buffer; mod client; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 68df1b2..5e25ae6 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -526,6 +526,7 @@ impl Drop for Transaction { } } +/// Optimistic or pessimistic transaction. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum TransactionKind { Optimistic, @@ -533,62 +534,81 @@ pub enum TransactionKind { Pessimistic(u64), } +/// Options for configuring a transaction. #[derive(Clone, Eq, PartialEq, Debug)] pub struct TransactionOptions { + /// Optimistic or pessimistic (default) transaction. kind: TransactionKind, + /// Try using 1pc rather than 2pc (default is to always use 2pc). try_one_pc: bool, + /// Try to use async commit (default is not to). async_commit: bool, + /// Is the transaction read only? (Default is no). read_only: bool, + /// How to retry in the event of certain errors. retry_options: RetryOptions, } impl Default for TransactionOptions { fn default() -> TransactionOptions { - Self::new_pessimistic(false) + Self::new_pessimistic() } } impl TransactionOptions { - pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions { + /// Default options for an optimistic transaction. + pub fn new_optimistic() -> TransactionOptions { TransactionOptions { kind: TransactionKind::Optimistic, - try_one_pc, + try_one_pc: false, async_commit: false, read_only: false, retry_options: RetryOptions::default_optimistic(), } } - pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions { + /// Default options for a pessimistic transaction. + pub fn new_pessimistic() -> TransactionOptions { TransactionOptions { kind: TransactionKind::Pessimistic(0), - try_one_pc, + try_one_pc: false, async_commit: false, read_only: false, retry_options: RetryOptions::default_pessimistic(), } } + /// Try to use async commit. pub fn async_commit(mut self) -> TransactionOptions { self.async_commit = true; self } + /// Try to use 1pc. + pub fn try_one_pc(mut self) -> TransactionOptions { + self.try_one_pc = true; + self + } + + /// Make the transaction read only. pub fn read_only(mut self) -> TransactionOptions { self.read_only = true; self } + /// Don't automatically resolve locks and retry if keys are locked. pub fn no_resolve_locks(mut self) -> TransactionOptions { self.retry_options.lock_backoff = Backoff::no_backoff(); self } + /// Don't automatically resolve regions with PD if we have outdated region information. pub fn no_resolve_regions(mut self) -> TransactionOptions { self.retry_options.region_backoff = Backoff::no_backoff(); self } + /// Set RetryOptions. pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions { self.retry_options = options; self From 3a29cb39a4925a7265b453acb88189d6397344a2 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Tue, 5 Jan 2021 14:47:56 +1300 Subject: [PATCH 11/11] Address reviewer comments Signed-off-by: Nick Cameron --- examples/pessimistic.rs | 10 ++++------ src/config.rs | 7 ------- src/lib.rs | 2 ++ src/transaction/client.rs | 21 ++++----------------- src/transaction/transaction.rs | 2 +- tests/integration_tests.rs | 30 +++++++++++++++++++++--------- 6 files changed, 32 insertions(+), 40 deletions(-) diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index fe293c3..f4e8c44 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -24,15 +24,15 @@ async fn main() { .await .expect("Could not connect to tikv"); - let key1: Key = b"key1".to_vec().into(); + let key1: Key = b"key01".to_vec().into(); let value1: Value = b"value1".to_vec(); - let key2: Key = b"key2".to_vec().into(); + let key2: Key = b"key02".to_vec().into(); let value2: Value = b"value2".to_vec(); let mut txn0 = client .begin_optimistic() .await .expect("Could not begin a transaction"); - for (key, value) in vec![(key1, value1), (key2, value2)] { + for (key, value) in vec![(key1.clone(), value1), (key2, value2)] { txn0.put(key, value).await.expect("Could not set key value"); } txn0.commit().await.expect("Could not commit"); @@ -42,7 +42,6 @@ async fn main() { .await .expect("Could not begin a transaction"); // lock the key - let key1: Key = b"key1".to_vec().into(); let value = txn1 .get_for_update(key1.clone()) .await @@ -54,9 +53,8 @@ async fn main() { .begin_optimistic() .await .expect("Could not begin a transaction"); - let key1: Key = b"key1".to_vec().into(); let value2: Value = b"value2".to_vec(); - txn2.put(key1, value2).await.unwrap(); + txn2.put(key1.clone(), value2).await.unwrap(); let result = txn2.commit().await; assert!(result.is_err()); } diff --git a/src/config.rs b/src/config.rs index 17b88e5..effd07e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,7 +27,6 @@ pub struct Config { pub cert_path: Option, pub key_path: Option, pub timeout: Duration, - pub try_one_pc: bool, } const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); @@ -39,7 +38,6 @@ impl Default for Config { cert_path: None, key_path: None, timeout: DEFAULT_REQUEST_TIMEOUT, - try_one_pc: false, } } } @@ -81,9 +79,4 @@ impl Config { self.timeout = timeout; self } - - pub fn try_one_pc(mut self) -> Self { - self.try_one_pc = true; - self - } } diff --git a/src/lib.rs b/src/lib.rs index 4545223..4289ff0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,8 @@ pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value}; #[doc(inline)] pub use crate::raw::{Client as RawClient, ColumnFamily}; #[doc(inline)] +pub use crate::request::RetryOptions; +#[doc(inline)] pub use crate::timestamp::{Timestamp, TimestampExt}; #[doc(inline)] pub use crate::transaction::{ diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 325e57f..7dcb9a1 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -37,7 +37,6 @@ pub struct Client { /// The thread pool for background tasks including committing secondary keys and failed /// transaction cleanups. bg_worker: ThreadPool, - config: Config, } impl Client { @@ -78,11 +77,7 @@ impl Client { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let bg_worker = ThreadPool::new()?; let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?); - Ok(Client { - pd, - bg_worker, - config, - }) + Ok(Client { pd, bg_worker }) } /// Creates a new [`Transaction`](Transaction) in optimistic mode. @@ -107,11 +102,7 @@ impl Client { /// ``` pub async fn begin_optimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; - let mut options = TransactionOptions::new_optimistic(); - if self.config.try_one_pc { - options = options.try_one_pc(); - } - Ok(self.new_transaction(timestamp, options)) + Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) } /// Creates a new [`Transaction`](Transaction) in pessimistic mode. @@ -133,11 +124,7 @@ impl Client { /// ``` pub async fn begin_pessimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; - let mut options = TransactionOptions::new_pessimistic(); - if self.config.try_one_pc { - options = options.try_one_pc(); - } - Ok(self.new_transaction(timestamp, options)) + Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) } /// Creates a new customized [`Transaction`](Transaction). @@ -149,7 +136,7 @@ impl Client { /// # futures::executor::block_on(async { /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); /// let mut transaction = client - /// .begin_with_options(TransactionOptions::default().async_commit()) + /// .begin_with_options(TransactionOptions::default().use_async_commit()) /// .await /// .unwrap(); /// // ... Issue some commands. diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 5e25ae6..bd448db 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -579,7 +579,7 @@ impl TransactionOptions { } /// Try to use async commit. - pub fn async_commit(mut self) -> TransactionOptions { + pub fn use_async_commit(mut self) -> TransactionOptions { self.async_commit = true; self } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 23962b4..bd2b313 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -9,7 +9,8 @@ use std::{ env, iter, }; use tikv_client::{ - ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value, + ColumnFamily, Key, KvPair, RawClient, Result, Transaction, TransactionClient, + TransactionOptions, Value, }; // Parameters used in test @@ -114,7 +115,10 @@ async fn crud() -> Result<()> { txn.commit().await?; // Read again from TiKV - let snapshot = client.snapshot(client.current_timestamp().await?); + let snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::default(), + ); let batch_get_res: HashMap = snapshot .batch_get(vec!["foo".to_owned(), "bar".to_owned()]) .await? @@ -213,7 +217,10 @@ async fn txn_write_million() -> Result<()> { // test scan let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough - let snapshot = client.snapshot(client.current_timestamp().await?); + let snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::default(), + ); let res = snapshot.scan(vec![].., limit).await?; assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); @@ -228,7 +235,10 @@ async fn txn_write_million() -> Result<()> { let mut sum = 0; // empty key to key[0] - let snapshot = client.snapshot(client.current_timestamp().await?); + let snapshot = client.snapshot( + client.current_timestamp().await?, + TransactionOptions::default(), + ); let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?; sum += res.count(); @@ -253,12 +263,13 @@ async fn txn_write_million() -> Result<()> { #[serial] async fn txn_bank_transfer() -> Result<()> { clear_tikv().await?; - let config = Config::default().try_one_pc(); - let client = TransactionClient::new_with_config(pd_addrs(), config).await?; + let client = TransactionClient::new(pd_addrs()).await?; let mut rng = thread_rng(); let people = gen_u32_keys(NUM_PEOPLE, &mut rng); - let mut txn = client.begin_optimistic().await?; + let mut txn = client + .begin_with_options(TransactionOptions::new_optimistic().try_one_pc()) + .await?; let mut sum: u32 = 0; for person in &people { let init = rng.gen::() as u32; @@ -269,8 +280,9 @@ async fn txn_bank_transfer() -> Result<()> { // transfer for _ in 0..NUM_TRNASFER { - let mut txn = client.begin_optimistic().await?; - txn.use_async_commit(); + let mut txn = client + .begin_with_options(TransactionOptions::new_optimistic().use_async_commit()) + .await?; let chosen_people = people.iter().choose_multiple(&mut rng, 2); let alice = chosen_people[0]; let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?;