Address reviewer comments

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2021-01-05 14:47:56 +13:00
parent f90a82fe35
commit 3a29cb39a4
6 changed files with 32 additions and 40 deletions

View File

@ -24,15 +24,15 @@ async fn main() {
.await .await
.expect("Could not connect to tikv"); .expect("Could not connect to tikv");
let key1: Key = b"key1".to_vec().into(); let key1: Key = b"key01".to_vec().into();
let value1: Value = b"value1".to_vec(); let value1: Value = b"value1".to_vec();
let key2: Key = b"key2".to_vec().into(); let key2: Key = b"key02".to_vec().into();
let value2: Value = b"value2".to_vec(); let value2: Value = b"value2".to_vec();
let mut txn0 = client let mut txn0 = client
.begin_optimistic() .begin_optimistic()
.await .await
.expect("Could not begin a transaction"); .expect("Could not begin a transaction");
for (key, value) in vec![(key1, value1), (key2, value2)] { for (key, value) in vec![(key1.clone(), value1), (key2, value2)] {
txn0.put(key, value).await.expect("Could not set key value"); txn0.put(key, value).await.expect("Could not set key value");
} }
txn0.commit().await.expect("Could not commit"); txn0.commit().await.expect("Could not commit");
@ -42,7 +42,6 @@ async fn main() {
.await .await
.expect("Could not begin a transaction"); .expect("Could not begin a transaction");
// lock the key // lock the key
let key1: Key = b"key1".to_vec().into();
let value = txn1 let value = txn1
.get_for_update(key1.clone()) .get_for_update(key1.clone())
.await .await
@ -54,9 +53,8 @@ async fn main() {
.begin_optimistic() .begin_optimistic()
.await .await
.expect("Could not begin a transaction"); .expect("Could not begin a transaction");
let key1: Key = b"key1".to_vec().into();
let value2: Value = b"value2".to_vec(); let value2: Value = b"value2".to_vec();
txn2.put(key1, value2).await.unwrap(); txn2.put(key1.clone(), value2).await.unwrap();
let result = txn2.commit().await; let result = txn2.commit().await;
assert!(result.is_err()); assert!(result.is_err());
} }

View File

@ -27,7 +27,6 @@ pub struct Config {
pub cert_path: Option<PathBuf>, pub cert_path: Option<PathBuf>,
pub key_path: Option<PathBuf>, pub key_path: Option<PathBuf>,
pub timeout: Duration, pub timeout: Duration,
pub try_one_pc: bool,
} }
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
@ -39,7 +38,6 @@ impl Default for Config {
cert_path: None, cert_path: None,
key_path: None, key_path: None,
timeout: DEFAULT_REQUEST_TIMEOUT, timeout: DEFAULT_REQUEST_TIMEOUT,
try_one_pc: false,
} }
} }
} }
@ -81,9 +79,4 @@ impl Config {
self.timeout = timeout; self.timeout = timeout;
self self
} }
pub fn try_one_pc(mut self) -> Self {
self.try_one_pc = true;
self
}
} }

View File

@ -105,6 +105,8 @@ pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)] #[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily}; pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)] #[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::{Timestamp, TimestampExt}; pub use crate::timestamp::{Timestamp, TimestampExt};
#[doc(inline)] #[doc(inline)]
pub use crate::transaction::{ pub use crate::transaction::{

View File

@ -37,7 +37,6 @@ pub struct Client {
/// The thread pool for background tasks including committing secondary keys and failed /// The thread pool for background tasks including committing secondary keys and failed
/// transaction cleanups. /// transaction cleanups.
bg_worker: ThreadPool, bg_worker: ThreadPool,
config: Config,
} }
impl Client { impl Client {
@ -78,11 +77,7 @@ impl Client {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect(); let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let bg_worker = ThreadPool::new()?; let bg_worker = ThreadPool::new()?;
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, true).await?);
Ok(Client { Ok(Client { pd, bg_worker })
pd,
bg_worker,
config,
})
} }
/// Creates a new [`Transaction`](Transaction) in optimistic mode. /// Creates a new [`Transaction`](Transaction) in optimistic mode.
@ -107,11 +102,7 @@ impl Client {
/// ``` /// ```
pub async fn begin_optimistic(&self) -> Result<Transaction> { pub async fn begin_optimistic(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?; let timestamp = self.current_timestamp().await?;
let mut options = TransactionOptions::new_optimistic(); Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
if self.config.try_one_pc {
options = options.try_one_pc();
}
Ok(self.new_transaction(timestamp, options))
} }
/// Creates a new [`Transaction`](Transaction) in pessimistic mode. /// Creates a new [`Transaction`](Transaction) in pessimistic mode.
@ -133,11 +124,7 @@ impl Client {
/// ``` /// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction> { pub async fn begin_pessimistic(&self) -> Result<Transaction> {
let timestamp = self.current_timestamp().await?; let timestamp = self.current_timestamp().await?;
let mut options = TransactionOptions::new_pessimistic(); Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
if self.config.try_one_pc {
options = options.try_one_pc();
}
Ok(self.new_transaction(timestamp, options))
} }
/// Creates a new customized [`Transaction`](Transaction). /// Creates a new customized [`Transaction`](Transaction).
@ -149,7 +136,7 @@ impl Client {
/// # futures::executor::block_on(async { /// # futures::executor::block_on(async {
/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap(); /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let mut transaction = client /// let mut transaction = client
/// .begin_with_options(TransactionOptions::default().async_commit()) /// .begin_with_options(TransactionOptions::default().use_async_commit())
/// .await /// .await
/// .unwrap(); /// .unwrap();
/// // ... Issue some commands. /// // ... Issue some commands.

View File

@ -579,7 +579,7 @@ impl TransactionOptions {
} }
/// Try to use async commit. /// Try to use async commit.
pub fn async_commit(mut self) -> TransactionOptions { pub fn use_async_commit(mut self) -> TransactionOptions {
self.async_commit = true; self.async_commit = true;
self self
} }

View File

@ -9,7 +9,8 @@ use std::{
env, iter, env, iter,
}; };
use tikv_client::{ use tikv_client::{
ColumnFamily, Config, Key, KvPair, RawClient, Result, Transaction, TransactionClient, Value, ColumnFamily, Key, KvPair, RawClient, Result, Transaction, TransactionClient,
TransactionOptions, Value,
}; };
// Parameters used in test // Parameters used in test
@ -114,7 +115,10 @@ async fn crud() -> Result<()> {
txn.commit().await?; txn.commit().await?;
// Read again from TiKV // Read again from TiKV
let snapshot = client.snapshot(client.current_timestamp().await?); let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let batch_get_res: HashMap<Key, Value> = snapshot let batch_get_res: HashMap<Key, Value> = snapshot
.batch_get(vec!["foo".to_owned(), "bar".to_owned()]) .batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await? .await?
@ -213,7 +217,10 @@ async fn txn_write_million() -> Result<()> {
// test scan // test scan
let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough let limit = 2u32.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN + 2); // large enough
let snapshot = client.snapshot(client.current_timestamp().await?); let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let res = snapshot.scan(vec![].., limit).await?; let res = snapshot.scan(vec![].., limit).await?;
assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN)); assert_eq!(res.count(), 2usize.pow(NUM_BITS_KEY_PER_TXN + NUM_BITS_TXN));
@ -228,7 +235,10 @@ async fn txn_write_million() -> Result<()> {
let mut sum = 0; let mut sum = 0;
// empty key to key[0] // empty key to key[0]
let snapshot = client.snapshot(client.current_timestamp().await?); let snapshot = client.snapshot(
client.current_timestamp().await?,
TransactionOptions::default(),
);
let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?; let res = snapshot.scan(vec![]..keys[0].clone(), limit).await?;
sum += res.count(); sum += res.count();
@ -253,12 +263,13 @@ async fn txn_write_million() -> Result<()> {
#[serial] #[serial]
async fn txn_bank_transfer() -> Result<()> { async fn txn_bank_transfer() -> Result<()> {
clear_tikv().await?; clear_tikv().await?;
let config = Config::default().try_one_pc(); let client = TransactionClient::new(pd_addrs()).await?;
let client = TransactionClient::new_with_config(pd_addrs(), config).await?;
let mut rng = thread_rng(); let mut rng = thread_rng();
let people = gen_u32_keys(NUM_PEOPLE, &mut rng); let people = gen_u32_keys(NUM_PEOPLE, &mut rng);
let mut txn = client.begin_optimistic().await?; let mut txn = client
.begin_with_options(TransactionOptions::new_optimistic().try_one_pc())
.await?;
let mut sum: u32 = 0; let mut sum: u32 = 0;
for person in &people { for person in &people {
let init = rng.gen::<u8>() as u32; let init = rng.gen::<u8>() as u32;
@ -269,8 +280,9 @@ async fn txn_bank_transfer() -> Result<()> {
// transfer // transfer
for _ in 0..NUM_TRNASFER { for _ in 0..NUM_TRNASFER {
let mut txn = client.begin_optimistic().await?; let mut txn = client
txn.use_async_commit(); .begin_with_options(TransactionOptions::new_optimistic().use_async_commit())
.await?;
let chosen_people = people.iter().choose_multiple(&mut rng, 2); let chosen_people = people.iter().choose_multiple(&mut rng, 2);
let alice = chosen_people[0]; let alice = chosen_people[0];
let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?; let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?;