diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index bca0fc7..fe293c3 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -28,7 +28,10 @@ async fn main() { let value1: Value = b"value1".to_vec(); let key2: Key = b"key2".to_vec().into(); let value2: Value = b"value2".to_vec(); - let mut txn0 = client.begin().await.expect("Could not begin a transaction"); + let mut txn0 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for (key, value) in vec![(key1, value1), (key2, value2)] { txn0.put(key, value).await.expect("Could not set key value"); } @@ -47,7 +50,10 @@ async fn main() { println!("{:?}", (&key1, value)); { // another txn cannot write to the locked key - let mut txn2 = client.begin().await.expect("Could not begin a transaction"); + let mut txn2 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let key1: Key = b"key1".to_vec().into(); let value2: Value = b"value2".to_vec(); txn2.put(key1, value2).await.unwrap(); @@ -58,7 +64,10 @@ async fn main() { let value3: Value = b"value3".to_vec(); txn1.put(key1.clone(), value3).await.unwrap(); txn1.commit().await.unwrap(); - let mut txn3 = client.begin().await.expect("Could not begin a transaction"); + let mut txn3 = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let result = txn3.get(key1.clone()).await.unwrap().unwrap(); txn3.commit() .await diff --git a/examples/transaction.rs b/examples/transaction.rs index d089be3..4c82354 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -6,7 +6,10 @@ use crate::common::parse_args; use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value}; async fn puts(client: &Client, pairs: impl IntoIterator>) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for pair in pairs { let (key, value) = pair.into().into(); txn.put(key, value).await.expect("Could not set key value"); @@ -15,7 +18,10 @@ async fn puts(client: &Client, pairs: impl IntoIterator } async fn get(client: &Client, key: Key) -> Option { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); let res = txn.get(key).await.expect("Could not get value"); txn.commit() .await @@ -24,7 +30,10 @@ async fn get(client: &Client, key: Key) -> Option { } async fn scan(client: &Client, range: impl Into, limit: u32) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); txn.scan(range, limit) .await .expect("Could not scan key-value pairs in range") @@ -33,7 +42,10 @@ async fn scan(client: &Client, range: impl Into, limit: u32) { } async fn dels(client: &Client, keys: impl IntoIterator) { - let mut txn = client.begin().await.expect("Could not begin a transaction"); + let mut txn = client + .begin_optimistic() + .await + .expect("Could not begin a transaction"); for key in keys { txn.delete(key).await.expect("Could not delete the key"); } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0529c33..ac386fa 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -6,7 +6,7 @@ use crate::{ pd::{PdClient, PdRpcClient}, request::{KvRequest, OPTIMISTIC_BACKOFF}, timestamp::TimestampExt, - transaction::{Snapshot, Transaction, TransactionStyle}, + transaction::{Snapshot, Transaction, TransactionOptions}, Result, }; use futures::executor::ThreadPool; @@ -99,17 +99,17 @@ impl Client { /// use futures::prelude::*; /// # futures::executor::block_on(async { /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut transaction = client.begin().await.unwrap(); + /// let mut transaction = client.begin_optimistic().await.unwrap(); /// // ... Issue some commands. /// let commit = transaction.commit(); /// let result = commit.await.unwrap(); /// # }); /// ``` - pub async fn begin(&self) -> Result { + pub async fn begin_optimistic(&self) -> Result { let timestamp = self.current_timestamp().await?; Ok(self.new_transaction( timestamp, - TransactionStyle::new_optimistic(self.config.try_one_pc), + TransactionOptions::new_optimistic(self.config.try_one_pc), false, )) } @@ -135,16 +135,21 @@ impl Client { let timestamp = self.current_timestamp().await?; Ok(self.new_transaction( timestamp, - TransactionStyle::new_pessimistic(self.config.try_one_pc), + TransactionOptions::new_pessimistic(self.config.try_one_pc), false, )) } + pub async fn begin_with_options(&self, opts: TransactionOptions) -> Result { + let timestamp = self.current_timestamp().await?; + Ok(self.new_transaction(timestamp, opts, false)) + } + /// Creates a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). pub fn snapshot(&self, timestamp: Timestamp) -> Snapshot { Snapshot::new(self.new_transaction( timestamp, - TransactionStyle::new_optimistic(self.config.try_one_pc), + TransactionOptions::new_optimistic(self.config.try_one_pc), true, )) } @@ -212,7 +217,7 @@ impl Client { fn new_transaction( &self, timestamp: Timestamp, - style: TransactionStyle, + style: TransactionOptions, read_only: bool, ) -> Transaction { Transaction::new( diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 1de2f34..264655f 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -12,7 +12,7 @@ pub use client::Client; pub(crate) use lock::{resolve_locks, HasLocks}; pub use snapshot::Snapshot; pub use transaction::Transaction; -use transaction::TransactionStyle; +use transaction::TransactionOptions; mod buffer; mod client; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 3c70470..aba32b7 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -35,31 +35,37 @@ pub enum TransactionKind { Pessimistic(u64), } -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub struct TransactionStyle { +impl Default for TransactionKind { + fn default() -> TransactionKind { + TransactionKind::Pessimistic(0) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] +pub struct TransactionOptions { kind: TransactionKind, try_one_pc: bool, async_commit: bool, } -impl TransactionStyle { - pub fn new_optimistic(try_one_pc: bool) -> TransactionStyle { - TransactionStyle { +impl TransactionOptions { + pub fn new_optimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { kind: TransactionKind::Optimistic, try_one_pc, async_commit: false, } } - pub fn new_pessimistic(try_one_pc: bool) -> TransactionStyle { - TransactionStyle { + pub fn new_pessimistic(try_one_pc: bool) -> TransactionOptions { + TransactionOptions { kind: TransactionKind::Pessimistic(0), try_one_pc, async_commit: false, } } - pub fn async_commit(mut self) -> TransactionStyle { + pub fn async_commit(mut self) -> TransactionOptions { self.async_commit = true; self } @@ -101,7 +107,7 @@ impl TransactionStyle { /// use futures::prelude::*; /// # futures::executor::block_on(async { /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); -/// let txn = client.begin().await.unwrap(); +/// let txn = client.begin_optimistic().await.unwrap(); /// # }); /// ``` pub struct Transaction { @@ -110,7 +116,7 @@ pub struct Transaction { buffer: Buffer, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, } impl Transaction { @@ -118,7 +124,7 @@ impl Transaction { timestamp: Timestamp, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, read_only: bool, ) -> Transaction { let status = if read_only { @@ -149,7 +155,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// let result: Option = txn.get(key).await.unwrap(); /// // Finish the transaction... @@ -216,7 +222,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; /// let result: HashMap = txn /// .batch_get(keys) @@ -297,7 +303,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key1: Key = b"TiKV".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into(); /// let result: Vec = txn @@ -331,7 +337,7 @@ impl Transaction { /// # use std::collections::HashMap; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key1: Key = b"TiKV".to_vec().into(); /// let key2: Key = b"TiDB".to_vec().into(); /// let result: Vec = txn @@ -369,7 +375,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// let val = "TiKV".to_owned(); /// txn.put(key, val); @@ -397,7 +403,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// let key = "TiKV".to_owned(); /// txn.delete(key); /// // Finish the transaction... @@ -429,7 +435,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); /// // ... Do some actions. /// txn.commit().await.unwrap(); @@ -451,7 +457,7 @@ impl Transaction { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let mut txn = client.begin().await.unwrap(); + /// let mut txn = client.begin_optimistic().await.unwrap(); /// // ... Do some actions. /// let req = txn.commit(); /// let result: u64 = req.await.unwrap(); @@ -600,7 +606,7 @@ struct TwoPhaseCommitter { start_version: u64, bg_worker: ThreadPool, rpc: Arc, - style: TransactionStyle, + style: TransactionOptions, #[new(default)] undetermined: bool, } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 68dd532..23962b4 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -55,7 +55,7 @@ async fn crud() -> Result<()> { clear_tikv().await?; let client = TransactionClient::new(pd_addrs()).await?; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; // Get non-existent keys assert!(txn.get("foo".to_owned()).await?.is_none()); @@ -91,7 +91,7 @@ async fn crud() -> Result<()> { txn.commit().await?; // Read from TiKV then update and delete - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; assert_eq!( txn.get("foo".to_owned()).await?, Some("bar".to_owned().into()) @@ -199,13 +199,13 @@ async fn txn_write_million() -> Result<()> { .map(|u| u.to_be_bytes().to_vec()) .take(2usize.pow(NUM_BITS_KEY_PER_TXN)) .collect::>(); // each txn puts 2 ^ 12 keys. 12 = 25 - 13 - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; for (k, v) in keys.iter().zip(iter::repeat(1u32.to_be_bytes().to_vec())) { txn.put(k.clone(), v).await?; } txn.commit().await?; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; let res = txn.batch_get(keys).await?; assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN)); txn.commit().await?; @@ -258,7 +258,7 @@ async fn txn_bank_transfer() -> Result<()> { let mut rng = thread_rng(); let people = gen_u32_keys(NUM_PEOPLE, &mut rng); - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; let mut sum: u32 = 0; for person in &people { let init = rng.gen::() as u32; @@ -269,7 +269,7 @@ async fn txn_bank_transfer() -> Result<()> { // transfer for _ in 0..NUM_TRNASFER { - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; txn.use_async_commit(); let chosen_people = people.iter().choose_multiple(&mut rng, 2); let alice = chosen_people[0]; @@ -291,7 +291,7 @@ async fn txn_bank_transfer() -> Result<()> { // check let mut new_sum = 0; - let mut txn = client.begin().await?; + let mut txn = client.begin_optimistic().await?; for person in people.iter() { new_sum += get_txn_u32(&txn, person.clone()).await?; }